Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"strings"
"time"

"github.com/smartcontractkit/data-streams-sdk/go/feed"
"github.com/smartcontractkit/data-streams-sdk/go/v2/feed"
)

// Client is the data streams client interface.
Expand Down Expand Up @@ -151,25 +151,41 @@ func (c *client) GetLatestReport(ctx context.Context, id feed.ID) (r *ReportResp
// ReportResponse implements the report envelope that contains the full report payload,
// its FeedID and timestamps. For decoding the Report Payload use report.Decode().
type ReportResponse struct {
FeedID feed.ID `json:"feedID"`
FullReport []byte `json:"fullReport"`
ValidFromTimestamp uint64 `json:"validFromTimestamp"`
ObservationsTimestamp uint64 `json:"observationsTimestamp"`
FeedID feed.ID
FullReport []byte
ValidFromTimestamp time.Time
ObservationsTimestamp time.Time
}

func (r *ReportResponse) UnmarshalJSON(b []byte) (err error) {
type Alias ReportResponse
aux := &struct {
FullReport string `json:"fullReport"`
*Alias
}{
Alias: (*Alias)(r),
}
FeedID feed.ID `json:"feedID"`
FullReport string `json:"fullReport"`
ValidFromTimestamp uint64 `json:"validFromTimestamp"`
ObservationsTimestamp uint64 `json:"observationsTimestamp"`
ValidFromTimestampMs uint64 `json:"validFromTimestampMs"`
ObservationsTimestampMs uint64 `json:"observationsTimestampMs"`
}{}

if err := json.Unmarshal(b, aux); err != nil {
return err
}

r.FeedID = aux.FeedID

// V2 payloads use milliseconds, V1 payloads use seconds
if aux.ValidFromTimestampMs > 0 {
r.ValidFromTimestamp = time.UnixMilli(int64(aux.ValidFromTimestampMs))
} else if aux.ValidFromTimestamp > 0 {
r.ValidFromTimestamp = time.Unix(int64(aux.ValidFromTimestamp), 0)
}

if aux.ObservationsTimestampMs > 0 {
r.ObservationsTimestamp = time.UnixMilli(int64(aux.ObservationsTimestampMs))
} else if aux.ObservationsTimestamp > 0 {
r.ObservationsTimestamp = time.Unix(int64(aux.ObservationsTimestamp), 0)
}

if len(aux.FullReport) < 3 {
return nil
}
Expand All @@ -182,13 +198,26 @@ func (r *ReportResponse) UnmarshalJSON(b []byte) (err error) {
}

func (r *ReportResponse) MarshalJSON() ([]byte, error) {
type Alias ReportResponse
var validFrom, observationsTS uint64

// Wrapper timestamps are always in milliseconds
if !r.ValidFromTimestamp.IsZero() {
validFrom = uint64(r.ValidFromTimestamp.UnixMilli())
}
if !r.ObservationsTimestamp.IsZero() {
observationsTS = uint64(r.ObservationsTimestamp.UnixMilli())
}

return json.Marshal(&struct {
FullReport string `json:"fullReport"`
*Alias
FeedID feed.ID `json:"feedID"`
FullReport string `json:"fullReport"`
ValidFromTimestampMs uint64 `json:"validFromTimestampMs"`
ObservationsTimestampMs uint64 `json:"observationsTimestampMs"`
}{
FullReport: "0x" + hex.EncodeToString(r.FullReport),
Alias: (*Alias)(r),
FeedID: r.FeedID,
FullReport: "0x" + hex.EncodeToString(r.FullReport),
ValidFromTimestampMs: validFrom,
ObservationsTimestampMs: observationsTS,
})
}

Expand Down Expand Up @@ -243,7 +272,7 @@ func (c *client) GetReportPage(ctx context.Context, id feed.ID, pageTS uint64) (
}
r.NextPageTS = 0
if len(r.Reports) > 0 {
r.NextPageTS = r.Reports[len(r.Reports)-1].ObservationsTimestamp + 1
r.NextPageTS = uint64(r.Reports[len(r.Reports)-1].ObservationsTimestamp.Unix()) + 1
}
return r, err
}
Expand Down
69 changes: 56 additions & 13 deletions go/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package streams

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -10,11 +11,53 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/smartcontractkit/data-streams-sdk/go/feed"
"github.com/smartcontractkit/data-streams-sdk/go/v2/feed"
)

// reportResponseEqual compares two ReportResponse structs, handling time.Time comparison properly
func reportResponseEqual(a, b *ReportResponse) bool {
if a.FeedID != b.FeedID {
return false
}
if !bytes.Equal(a.FullReport, b.FullReport) {
return false
}
if !a.ObservationsTimestamp.Equal(b.ObservationsTimestamp) {
return false
}
if !a.ValidFromTimestamp.Equal(b.ValidFromTimestamp) {
return false
}
return true
}

// reportResponsesEqual compares slices of ReportResponse
func reportResponsesEqual(a, b []*ReportResponse) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if !reportResponseEqual(a[i], b[i]) {
return false
}
}
return true
}

// reportPageEqual compares two ReportPage structs
func reportPageEqual(a, b *ReportPage) bool {
if !reportResponsesEqual(a.Reports, b.Reports) {
return false
}
if a.NextPageTS != b.NextPageTS {
return false
}
return true
}

func mustFeedIDfromString(s string) (f feed.ID) {
err := f.FromString(s)
if err != nil {
Expand Down Expand Up @@ -68,8 +111,8 @@ func TestClient_GetFeeds(t *testing.T) {

func TestClient_GetReports(t *testing.T) {
expectedReports := []*ReportResponse{
{FeedID: feed1, ObservationsTimestamp: 12344},
{FeedID: feed2, ObservationsTimestamp: 12344},
{FeedID: feed1, ObservationsTimestamp: time.Unix(12344, 0)},
{FeedID: feed2, ObservationsTimestamp: time.Unix(12344, 0)},
}
expectedFeedIdListStr := fmt.Sprintf("%s,%s", feed1.String(), feed2.String())

Expand Down Expand Up @@ -111,7 +154,7 @@ func TestClient_GetReports(t *testing.T) {

fmt.Println(expectedReports[0], reports[0])

if !reflect.DeepEqual(reports, expectedReports) {
if !reportResponsesEqual(reports, expectedReports) {
t.Errorf("GetFeeds() = %v, want %v", reports, expectedReports)
}
}
Expand Down Expand Up @@ -155,7 +198,7 @@ func TestClient_GetLatestReport(t *testing.T) {
t.Fatalf("GetLatestReport() error = %v", err)
}

if !reflect.DeepEqual(report, expectedReport) {
if !reportResponseEqual(report, expectedReport) {
t.Errorf("GetLatestReport() = %v, want %v", report, expectedReport)
}
}
Expand All @@ -165,18 +208,18 @@ func TestClient_GetReportPage(t *testing.T) {

expectedReportPage1 := &ReportPage{
Reports: []*ReportResponse{
{FeedID: feed1, ObservationsTimestamp: 1234567890, FullReport: hexutil.Bytes(`report1 payload`)},
{FeedID: feed1, ObservationsTimestamp: 1234567891, FullReport: hexutil.Bytes(`report2 payload`)},
{FeedID: feed1, FullReport: hexutil.Bytes(`report1 payload`), ObservationsTimestamp: time.Unix(1234567897, 0)},
{FeedID: feed1, FullReport: hexutil.Bytes(`report2 payload`), ObservationsTimestamp: time.Unix(1234567898, 0)},
},
NextPageTS: 1234567892,
NextPageTS: 1234567899, // Last ObservationsTimestamp (1234567898) + 1
}

expectedReportPage2 := &ReportPage{
Reports: []*ReportResponse{
{FeedID: feed1, ObservationsTimestamp: 1234567892, FullReport: hexutil.Bytes(`report3 payload`)},
{FeedID: feed1, ObservationsTimestamp: 1234567893, FullReport: hexutil.Bytes(`report4 payload`)},
{FeedID: feed1, FullReport: hexutil.Bytes(`report3 payload`), ObservationsTimestamp: time.Unix(1234567997, 0)},
{FeedID: feed1, FullReport: hexutil.Bytes(`report4 payload`), ObservationsTimestamp: time.Unix(1234567998, 0)},
},
NextPageTS: 1234567894,
NextPageTS: 1234567999, // Last ObservationsTimestamp (1234567998) + 1
}

ms := newMockServer(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -230,7 +273,7 @@ func TestClient_GetReportPage(t *testing.T) {
t.Fatalf("GetReportPage() error = %v", err)
}

if !reflect.DeepEqual(reportPage, expectedReportPage1) {
if !reportPageEqual(reportPage, expectedReportPage1) {
t.Errorf("GetReportPage() = %v, want %v", reportPage, expectedReportPage1)
}

Expand All @@ -239,7 +282,7 @@ func TestClient_GetReportPage(t *testing.T) {
t.Fatalf("GetReportPage() error = %v", err)
}

if !reflect.DeepEqual(reportPage, expectedReportPage2) {
if !reportPageEqual(reportPage, expectedReportPage2) {
t.Errorf("GetReportPage() = %v, want %v", reportPage, expectedReportPage2)
}
}
Expand Down
2 changes: 1 addition & 1 deletion go/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package streams
import "net/textproto"

const (
apiV1WS = "/api/v1/ws"
apiV2WS = "/api/v2/ws"
apiV1Feeds = "/api/v1/feeds"
apiV1Reports = "/api/v1/reports"
apiV1ReportsBulk = "/api/v1/reports/bulk"
Expand Down
8 changes: 4 additions & 4 deletions go/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"os"
"time"

streams "github.com/smartcontractkit/data-streams-sdk/go"
"github.com/smartcontractkit/data-streams-sdk/go/feed"
streamsReport "github.com/smartcontractkit/data-streams-sdk/go/report"
v3 "github.com/smartcontractkit/data-streams-sdk/go/report/v3"
streams "github.com/smartcontractkit/data-streams-sdk/go/v2"
"github.com/smartcontractkit/data-streams-sdk/go/v2/feed"
streamsReport "github.com/smartcontractkit/data-streams-sdk/go/v2/report"
v3 "github.com/smartcontractkit/data-streams-sdk/go/v2/report/v3"
)

func ExampleClient() {
Expand Down
38 changes: 37 additions & 1 deletion go/feed/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"encoding/hex"
"fmt"
"time"
)

// FeedVersion represents the feed report schema version
Expand All @@ -27,6 +28,27 @@ const (
_
)

// Resolution represents the timestamp resolution for a feed
type Resolution uint8

const (
// ResolutionSeconds indicates timestamps are in seconds
ResolutionSeconds Resolution = 0
// ResolutionMilliseconds indicates timestamps are in milliseconds
ResolutionMilliseconds Resolution = 1
)

func (r Resolution) String() string {
switch r {
case ResolutionSeconds:
return "seconds"
case ResolutionMilliseconds:
return "milliseconds"
default:
return "undefined"
}
}

// ID type
type ID [32]byte

Expand Down Expand Up @@ -76,6 +98,20 @@ type Feed struct {
FeedID ID `json:"feedID"`
}

// Version returns the feed schema version (masked to ignore resolution nibble)
func (f *ID) Version() FeedVersion {
return FeedVersion(binary.BigEndian.Uint16(f[:2]))
return FeedVersion(binary.BigEndian.Uint16(f[:2]) & 0x0FFF)
}

// Resolution returns the timestamp resolution for this feed
func (f *ID) Resolution() Resolution {
return Resolution(f[0] >> 4)
}

// ParseTimestamp converts a raw uint64 timestamp to time.Time based on resolution.
func ParseTimestamp(ts uint64, res Resolution) time.Time {
if res == ResolutionMilliseconds {
return time.UnixMilli(int64(ts))
}
return time.Unix(int64(ts), 0)
}
Loading