Skip to content

Commit f841971

Browse files
committed
fix: measurements not querying on device_id
(cherry picked from commit 538c75c)
1 parent 4759616 commit f841971

2 files changed

Lines changed: 42 additions & 13 deletions

File tree

services/core/measurements/application.go

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,17 @@ import (
2121
// Store stores measurement data
2222
type Store interface {
2323
Query(context.Context, Filter, pagination.Request) (*pagination.Page[Measurement], error)
24-
ListDatastreams(context.Context, DatastreamFilter, pagination.Request) (*pagination.Page[Datastream], error)
24+
ListDatastreams(
25+
context.Context,
26+
DatastreamFilter,
27+
pagination.Request,
28+
) (*pagination.Page[Datastream], error)
2529
GetDatastream(ctx context.Context, id uuid.UUID, filter DatastreamFilter) (*Datastream, error)
26-
FindOrCreateDatastream(ctx context.Context, tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*Datastream, error)
30+
FindOrCreateDatastream(
31+
ctx context.Context,
32+
tenantID, sensorID int64,
33+
observedProperty, UnitOfMeasurement string,
34+
) (*Datastream, error)
2735
StoreMeasurements(context.Context, []Measurement) error
2836
}
2937

@@ -144,9 +152,18 @@ func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error {
144152
m.ObservedProperty = m.SensorExternalID + "_" + m.ObservedProperty
145153
}
146154

147-
archiveTimeDays, _ := lo.Coalesce(sensor.ArchiveTime, &s.systemArchiveTime) // msg.Organisation.ArchiveTime)
155+
archiveTimeDays, _ := lo.Coalesce(
156+
sensor.ArchiveTime,
157+
&s.systemArchiveTime,
158+
) // msg.Organisation.ArchiveTime)
148159

149-
ds, err := s.store.FindOrCreateDatastream(ctx, msg.TenantID, sensor.ID, m.ObservedProperty, m.UnitOfMeasurement)
160+
ds, err := s.store.FindOrCreateDatastream(
161+
ctx,
162+
msg.TenantID,
163+
sensor.ID,
164+
m.ObservedProperty,
165+
m.UnitOfMeasurement,
166+
)
150167
if err != nil {
151168
return err
152169
}
@@ -167,7 +184,8 @@ func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error {
167184
measurement.MeasurementTimestamp = time.UnixMilli(m.Timestamp)
168185
measurement.MeasurementValue = m.Value
169186
measurement.MeasurementProperties = m.Properties
170-
measurement.MeasurementExpiration = time.UnixMilli(msg.ReceivedAt).Add(time.Duration(*archiveTimeDays) * 24 * time.Hour)
187+
measurement.MeasurementExpiration = time.UnixMilli(msg.ReceivedAt).
188+
Add(time.Duration(*archiveTimeDays) * 24 * time.Hour)
171189

172190
// Measurement location is either explicitly set or falls back to device location
173191
if m.Latitude != nil && m.Longitude != nil {
@@ -184,15 +202,19 @@ func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error {
184202

185203
// Filter contains query information for a list of measurements
186204
type Filter struct {
187-
Start time.Time `url:",required"`
188-
End time.Time `url:",required"`
189-
DeviceIDs []string
190-
SensorCodes []string
191-
Datastream []string
192-
TenantID []int64
205+
Start time.Time `url:"start,required"`
206+
End time.Time `url:"end,required"`
207+
DeviceIDs []string `url:"device_id"`
208+
SensorCodes []string `url:"sensor_codes"`
209+
Datastream []string `url:"datastream"`
210+
TenantID []int64 `url:"tenant_id"`
193211
}
194212

195-
func (s *Service) QueryMeasurements(ctx context.Context, f Filter, r pagination.Request) (*pagination.Page[Measurement], error) {
213+
func (s *Service) QueryMeasurements(
214+
ctx context.Context,
215+
f Filter,
216+
r pagination.Request,
217+
) (*pagination.Page[Measurement], error) {
196218
if err := auth.MustHavePermissions(ctx, auth.Permissions{auth.READ_MEASUREMENTS}); err != nil {
197219
return nil, err
198220
}
@@ -215,7 +237,11 @@ type DatastreamFilter struct {
215237
TenantID []int64
216238
}
217239

218-
func (s *Service) ListDatastreams(ctx context.Context, filter DatastreamFilter, r pagination.Request) (*pagination.Page[Datastream], error) {
240+
func (s *Service) ListDatastreams(
241+
ctx context.Context,
242+
filter DatastreamFilter,
243+
r pagination.Request,
244+
) (*pagination.Page[Datastream], error) {
219245
if err := auth.MustHavePermissions(ctx, auth.Permissions{auth.READ_MEASUREMENTS}); err != nil {
220246
return nil, err
221247
}

services/core/measurements/infra/store_psql.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,9 @@ func (s *MeasurementStorePSQL) Query(ctx context.Context, filter measurements.Fi
157157
if len(filter.SensorCodes) > 0 {
158158
q = q.Where(sq.Eq{"sensor_code": filter.SensorCodes})
159159
}
160+
if len(filter.DeviceIDs) > 0 {
161+
q = q.Where(sq.Eq{"device_id": filter.DeviceIDs})
162+
}
160163
if len(filter.Datastream) > 0 {
161164
q = q.Where(sq.Eq{"datastream_id": filter.Datastream})
162165
}

0 commit comments

Comments
 (0)