From 8085982717b6de828d174c6772adef648b2de057 Mon Sep 17 00:00:00 2001 From: zer0stars <74260741+zer0stars@users.noreply.github.com> Date: Mon, 16 Feb 2026 15:38:23 -0500 Subject: [PATCH] Add data_base64 support and simplify CE JSON marshaling - RawEvent is now a standalone struct with custom MarshalJSON/UnmarshalJSON that handles both "data" and "data_base64" per CloudEvents JSON spec, including round-trip preservation and MIME-based wire form selection. - CloudEvent[A] stores data_base64 opaquely without decoding; consumers decide how to handle it. - Simplify unmarshalCloudEventWithPayload by using the cloudEventHeader type alias for known fields instead of per-field deserialization. - Replace headerToMap helper with sjson for consistent header serialization. - Add BytesForSignature and IsJSONDataContentType helpers. - Reject events with both "data" and "data_base64" present. - Add comprehensive positive and negative tests for all new behavior. BREAKING: RawEvent changed from type alias to standalone struct. Co-authored-by: Cursor --- cloudevent.go | 88 ++++++++++++- cloudevent_json.go | 92 ++++++++----- cloudevent_test.go | 319 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 469 insertions(+), 30 deletions(-) diff --git a/cloudevent.go b/cloudevent.go index 5c8c9ad..74af95c 100644 --- a/cloudevent.go +++ b/cloudevent.go @@ -2,8 +2,14 @@ package cloudevent import ( + "encoding/base64" "encoding/json" + "fmt" + "mime" + "strings" "time" + + "github.com/tidwall/sjson" ) const ( @@ -95,10 +101,90 @@ type CloudEvent[A any] struct { CloudEventHeader // Data contains domain-specific information about the event. Data A `json:"data"` + + DataBase64 string `json:"data_base64,omitempty"` } // RawEvent is a cloudevent with a json.RawMessage data field. -type RawEvent = CloudEvent[json.RawMessage] +// It supports both "data" and "data_base64" (CloudEvents JSON spec). +type RawEvent struct { + CloudEventHeader + Data json.RawMessage `json:"data,omitempty"` + + // DataBase64 is the raw "data_base64" string when the event was received with + // data_base64 (CloudEvents spec). When set, MarshalJSON emits data_base64 for + // round-trip; otherwise wire form is chosen from DataContentType and Data. + DataBase64 string `json:"data_base64,omitempty"` +} + +// BytesForSignature returns the bytes that were signed (wire form of data or data_base64). +// Use for signature verification; not the same as Data when the CE used data_base64. +func (r RawEvent) BytesForSignature() []byte { + if r.DataBase64 != "" { + return []byte(r.DataBase64) + } + return r.Data +} + +// UnmarshalJSON implements json.Unmarshaler so that both "data" and "data_base64" +// are supported; Data is always set to the resolved payload bytes. +func (r *RawEvent) UnmarshalJSON(data []byte) error { + var dataRaw json.RawMessage + var dataBase64 string + header, err := unmarshalCloudEventWithPayload(data, func(d json.RawMessage, b64 string) error { + dataRaw = d + dataBase64 = b64 + return nil + }) + if err != nil { + return err + } + r.CloudEventHeader = header + if dataRaw != nil && dataBase64 != "" { + return fmt.Errorf("cloudevent: both \"data\" and \"data_base64\" present; only one allowed") + } + if dataBase64 != "" { + decoded, err := base64.StdEncoding.DecodeString(dataBase64) + if err != nil { + return err + } + r.Data = decoded + r.DataBase64 = dataBase64 + } else { + r.Data = dataRaw + r.DataBase64 = "" + } + return nil +} + +// IsJSONDataContentType returns true if the MIME type indicates a JSON payload. +// Matches "application/json" and any "+json" suffix type (e.g. "application/cloudevents+json"). +func IsJSONDataContentType(ct string) bool { + parsed, _, err := mime.ParseMediaType(strings.TrimSpace(ct)) + return err == nil && (parsed == "application/json" || strings.HasSuffix(parsed, "+json")) +} + +// MarshalJSON implements json.Marshaler. Uses DataContentType to choose wire form: +// application/json -> "data"; otherwise -> "data_base64" (CloudEvents spec). +func (r RawEvent) MarshalJSON() ([]byte, error) { + data, err := json.Marshal(r.CloudEventHeader) + if err != nil { + return nil, err + } + if len(r.Data) > 0 || r.DataBase64 != "" { + if r.DataBase64 != "" { + data, err = sjson.SetBytes(data, "data_base64", r.DataBase64) + } else if IsJSONDataContentType(r.DataContentType) || (r.DataContentType == "" && json.Valid(r.Data)) { + data, err = sjson.SetRawBytes(data, "data", r.Data) + } else { + data, err = sjson.SetBytes(data, "data_base64", base64.StdEncoding.EncodeToString(r.Data)) + } + if err != nil { + return nil, err + } + } + return data, nil +} // Equals returns true if the two CloudEventHeaders share the same IndexKey. func (c *CloudEventHeader) Equals(other CloudEventHeader) bool { diff --git a/cloudevent_json.go b/cloudevent_json.go index 84dec80..2644cac 100644 --- a/cloudevent_json.go +++ b/cloudevent_json.go @@ -2,31 +2,56 @@ package cloudevent import ( "encoding/json" + "fmt" "reflect" "strings" "github.com/tidwall/sjson" ) -var definedCloudeEventHdrFields = getJSONFieldNames(reflect.TypeOf(CloudEventHeader{})) +var definedCloudeEventHdrFields = getJSONFieldNames(reflect.TypeFor[CloudEventHeader]()) type cloudEventHeader CloudEventHeader // UnmarshalJSON implements custom JSON unmarshaling for CloudEvent. +// It transparently handles both "data" and "data_base64" wire formats. func (c *CloudEvent[A]) UnmarshalJSON(data []byte) error { - var err error - c.CloudEventHeader, err = unmarshalCloudEvent(data, c.setDataField) - return err + var dataRaw json.RawMessage + var dataBase64 string + header, err := unmarshalCloudEventWithPayload(data, func(d json.RawMessage, b64 string) error { + dataRaw = d + dataBase64 = b64 + return nil + }) + if err != nil { + return err + } + c.CloudEventHeader = header + if dataRaw != nil && dataBase64 != "" { + return fmt.Errorf("cloudevent: both \"data\" and \"data_base64\" present; only one allowed") + } + if dataBase64 != "" { + c.DataBase64 = dataBase64 + } else if dataRaw != nil { + if err := json.Unmarshal(dataRaw, &c.Data); err != nil { + return err + } + } + return nil } -// MarshalJSON implements custom JSON marshaling for CloudEventHeader. +// MarshalJSON implements custom JSON marshaling for CloudEvent[A]. +// When DataBase64 is set, emits "data_base64"; otherwise emits "data". func (c CloudEvent[A]) MarshalJSON() ([]byte, error) { - // Marshal the base struct data, err := json.Marshal(c.CloudEventHeader) if err != nil { return nil, err } - data, err = sjson.SetBytes(data, "data", c.Data) + if c.DataBase64 != "" { + data, err = sjson.SetBytes(data, "data_base64", c.DataBase64) + } else { + data, err = sjson.SetBytes(data, "data", c.Data) + } if err != nil { return nil, err } @@ -42,21 +67,18 @@ func (c *CloudEventHeader) UnmarshalJSON(data []byte) error { // MarshalJSON implements custom JSON marshaling for CloudEventHeader. func (c CloudEventHeader) MarshalJSON() ([]byte, error) { - // Marshal the base struct aux := (cloudEventHeader)(c) aux.SpecVersion = SpecVersion data, err := json.Marshal(aux) if err != nil { return nil, err } - // Add all extras using sjson] for k, v := range c.Extras { data, err = sjson.SetBytes(data, k, v) if err != nil { return nil, err } } - return data, nil } @@ -90,30 +112,48 @@ func getJSONFieldNames(t reflect.Type) map[string]struct{} { // unmarshalCloudEvent unmarshals the CloudEventHeader and data field. func unmarshalCloudEvent(data []byte, dataFunc func(json.RawMessage) error) (CloudEventHeader, error) { - c := CloudEventHeader{} - aux := cloudEventHeader{} - // Unmarshal known fields directly into the struct - if err := json.Unmarshal(data, &aux); err != nil { + return unmarshalCloudEventWithPayload(data, func(dataRaw json.RawMessage, _ string) error { + return dataFunc(dataRaw) + }) +} + +// unmarshalCloudEventWithPayload unmarshals the CloudEventHeader and returns both +// "data" and "data_base64" for RawEvent. +func unmarshalCloudEventWithPayload(data []byte, payloadFunc func(dataRaw json.RawMessage, dataBase64 string) error) (CloudEventHeader, error) { + // Unmarshal known header fields via the type alias (no custom UnmarshalJSON). + var c CloudEventHeader + if err := json.Unmarshal(data, (*cloudEventHeader)(&c)); err != nil { return c, err } - aux.SpecVersion = SpecVersion - c = (CloudEventHeader)(aux) - // Create a map to hold all JSON fields + c.SpecVersion = SpecVersion + + // Second pass into raw map to extract data, data_base64, and extras. rawFields := make(map[string]json.RawMessage) if err := json.Unmarshal(data, &rawFields); err != nil { return c, err } - // Separate known and unknown fields + var dataRaw json.RawMessage + var dataBase64 string + if raw, ok := rawFields["data_base64"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &dataBase64); err != nil { + return c, err + } + } + if raw, ok := rawFields["data"]; ok { + dataRaw = raw + } + if dataRaw != nil || dataBase64 != "" { + if err := payloadFunc(dataRaw, dataBase64); err != nil { + return c, err + } + } + for key, rawValue := range rawFields { if _, ok := definedCloudeEventHdrFields[key]; ok { - // Skip defined fields continue } - if key == "data" { - if err := dataFunc(rawValue); err != nil { - return c, err - } + if key == "data" || key == "data_base64" { continue } if c.Extras == nil { @@ -131,9 +171,3 @@ func unmarshalCloudEvent(data []byte, dataFunc func(json.RawMessage) error) (Clo // ignoreDataField is a function that ignores the data field. // It is used when unmarshalling the CloudEventHeader so that the data field is not added to the Extras map. func ignoreDataField(json.RawMessage) error { return nil } - -// setDataField is a function that sets the data field. -// It is used to unmarshal the data field into the CloudEvent[A].Data field. -func (c *CloudEvent[A]) setDataField(data json.RawMessage) error { - return json.Unmarshal(data, &c.Data) -} diff --git a/cloudevent_test.go b/cloudevent_test.go index e73bd34..4f0cc42 100644 --- a/cloudevent_test.go +++ b/cloudevent_test.go @@ -345,3 +345,322 @@ func TestCloudEventHeader_UnmarshalJSON(t *testing.T) { }) } } + +func TestCloudEvent_UnmarshalJSON_DataBase64(t *testing.T) { + t.Parallel() + now := time.Now().UTC().Truncate(time.Millisecond) + + // base64 of `{"message":"hello","count":42}` + jsonStr := `{ + "id": "b64-1", + "source": "test-source", + "producer": "test-producer", + "subject": "test-subject", + "time": "` + now.Format(time.RFC3339Nano) + `", + "type": "dimo.status", + "data_base64": "eyJtZXNzYWdlIjoiaGVsbG8iLCJjb3VudCI6NDJ9" + }` + var ev cloudevent.CloudEvent[TestData] + err := json.Unmarshal([]byte(jsonStr), &ev) + require.NoError(t, err) + assert.Equal(t, "b64-1", ev.ID) + assert.Equal(t, TestData{}, ev.Data, "Data should not be populated from data_base64") + assert.Equal(t, "eyJtZXNzYWdlIjoiaGVsbG8iLCJjb3VudCI6NDJ9", ev.DataBase64) +} + +func TestCloudEvent_MarshalJSON_DataBase64(t *testing.T) { + t.Parallel() + now := time.Now().UTC().Truncate(time.Millisecond) + ev := cloudevent.CloudEvent[TestData]{ + CloudEventHeader: cloudevent.CloudEventHeader{ + ID: "b64-m", + Source: "test-source", + Producer: "test-producer", + Subject: "test-subject", + Time: now, + Type: cloudevent.TypeStatus, + }, + Data: TestData{Message: "hello", Count: 42}, + DataBase64: "eyJtZXNzYWdlIjoiaGVsbG8iLCJjb3VudCI6NDJ9", + } + out, err := json.Marshal(ev) + require.NoError(t, err) + + var m map[string]any + require.NoError(t, json.Unmarshal(out, &m)) + assert.Equal(t, "eyJtZXNzYWdlIjoiaGVsbG8iLCJjb3VudCI6NDJ9", m["data_base64"]) + assert.Nil(t, m["data"], "data field should not be present when data_base64 is set") +} + +func TestCloudEvent_UnmarshalJSON_BothDataAndDataBase64(t *testing.T) { + t.Parallel() + jsonStr := `{"id":"1","source":"s","type":"t","data":{"message":"hi","count":1},"data_base64":"Zm9v"}` + var ev cloudevent.CloudEvent[TestData] + err := json.Unmarshal([]byte(jsonStr), &ev) + require.Error(t, err, "expected error when both data and data_base64 are present") + assert.Contains(t, err.Error(), "both") +} + +func TestCloudEvent_UnmarshalJSON_InvalidBase64(t *testing.T) { + t.Parallel() + jsonStr := `{"id":"1","source":"s","type":"t","data_base64":"$$not-base64$$"}` + var ev cloudevent.CloudEvent[TestData] + err := json.Unmarshal([]byte(jsonStr), &ev) + require.NoError(t, err, "CloudEvent[A] should not validate base64 encoding") + assert.Equal(t, "$$not-base64$$", ev.DataBase64) + assert.Equal(t, TestData{}, ev.Data) +} + +func TestCloudEvent_DataBase64_RoundTrip(t *testing.T) { + t.Parallel() + input := `{ + "id": "rt-1", + "source": "s", + "producer": "p", + "subject": "sub", + "time": "2025-01-01T00:00:00Z", + "type": "dimo.status", + "data_base64": "eyJtZXNzYWdlIjoicnQiLCJjb3VudCI6N30=" + }` + var ev cloudevent.CloudEvent[TestData] + require.NoError(t, json.Unmarshal([]byte(input), &ev)) + assert.Equal(t, TestData{}, ev.Data, "Data should not be populated from data_base64") + assert.Equal(t, "eyJtZXNzYWdlIjoicnQiLCJjb3VudCI6N30=", ev.DataBase64) + + out, err := json.Marshal(ev) + require.NoError(t, err) + + var m map[string]any + require.NoError(t, json.Unmarshal(out, &m)) + assert.Equal(t, "eyJtZXNzYWdlIjoicnQiLCJjb3VudCI6N30=", m["data_base64"]) + assert.Nil(t, m["data"]) +} + +// --- RawEvent positive tests --- + +func TestRawEvent_UnmarshalJSON_DataField(t *testing.T) { + t.Parallel() + input := `{ + "id":"r1","source":"s","producer":"p","subject":"sub", + "time":"2025-06-01T00:00:00Z","type":"dimo.status", + "data":{"temp":72} + }` + var ev cloudevent.RawEvent + require.NoError(t, json.Unmarshal([]byte(input), &ev)) + + assert.Equal(t, "r1", ev.ID) + assert.JSONEq(t, `{"temp":72}`, string(ev.Data)) + assert.Empty(t, ev.DataBase64, "DataBase64 should be empty when data field is used") +} + +func TestRawEvent_UnmarshalJSON_DataBase64Field(t *testing.T) { + t.Parallel() + // base64("hello world") = "aGVsbG8gd29ybGQ=" + input := `{ + "id":"r2","source":"s","producer":"p","subject":"sub", + "time":"2025-06-01T00:00:00Z","type":"dimo.status", + "data_base64":"aGVsbG8gd29ybGQ=" + }` + var ev cloudevent.RawEvent + require.NoError(t, json.Unmarshal([]byte(input), &ev)) + + assert.Equal(t, "r2", ev.ID) + assert.Equal(t, []byte("hello world"), []byte(ev.Data)) + assert.Equal(t, "aGVsbG8gd29ybGQ=", ev.DataBase64) +} + +func TestRawEvent_UnmarshalJSON_DataBase64_WithExtras(t *testing.T) { + t.Parallel() + input := `{ + "id":"r3","source":"s","producer":"p","subject":"sub", + "time":"2025-06-01T00:00:00Z","type":"dimo.status", + "data_base64":"Zm9v", + "customfield":"bar" + }` + var ev cloudevent.RawEvent + require.NoError(t, json.Unmarshal([]byte(input), &ev)) + + assert.Equal(t, "Zm9v", ev.DataBase64) + assert.Equal(t, []byte("foo"), []byte(ev.Data)) + require.Contains(t, ev.Extras, "customfield") + assert.Equal(t, "bar", ev.Extras["customfield"]) +} + +func TestRawEvent_RoundTrip_DataBase64(t *testing.T) { + t.Parallel() + input := `{ + "id":"rt","source":"s","producer":"p","subject":"sub", + "time":"2025-06-01T00:00:00Z","type":"dimo.status", + "data_base64":"aGVsbG8gd29ybGQ=" + }` + var ev cloudevent.RawEvent + require.NoError(t, json.Unmarshal([]byte(input), &ev)) + + out, err := json.Marshal(ev) + require.NoError(t, err) + + var m map[string]any + require.NoError(t, json.Unmarshal(out, &m)) + assert.Equal(t, "aGVsbG8gd29ybGQ=", m["data_base64"]) + assert.Nil(t, m["data"], "data should not be present when round-tripping data_base64") +} + +func TestRawEvent_RoundTrip_DataJSON(t *testing.T) { + t.Parallel() + input := `{ + "id":"rt2","source":"s","producer":"p","subject":"sub", + "time":"2025-06-01T00:00:00Z","type":"dimo.status", + "data":{"key":"value"} + }` + var ev cloudevent.RawEvent + require.NoError(t, json.Unmarshal([]byte(input), &ev)) + + out, err := json.Marshal(ev) + require.NoError(t, err) + + var m map[string]any + require.NoError(t, json.Unmarshal(out, &m)) + assert.Nil(t, m["data_base64"], "data_base64 should not be present when data is JSON") + assert.Equal(t, map[string]any{"key": "value"}, m["data"]) +} + +func TestRawEvent_MarshalJSON_NonJSONData_EmitsBase64(t *testing.T) { + t.Parallel() + ev := cloudevent.RawEvent{ + CloudEventHeader: cloudevent.CloudEventHeader{ + ID: "nj1", + Source: "s", + Producer: "p", + Subject: "sub", + Time: time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC), + Type: cloudevent.TypeStatus, + DataContentType: "application/octet-stream", + }, + Data: []byte("binary\x00data"), + } + out, err := json.Marshal(ev) + require.NoError(t, err) + + var m map[string]any + require.NoError(t, json.Unmarshal(out, &m)) + assert.Nil(t, m["data"], "non-JSON content type should not emit data field") + assert.NotEmpty(t, m["data_base64"]) +} + +func TestRawEvent_MarshalJSON_ExplicitJSONContentType(t *testing.T) { + t.Parallel() + ev := cloudevent.RawEvent{ + CloudEventHeader: cloudevent.CloudEventHeader{ + ID: "jct", + Source: "s", + Producer: "p", + Subject: "sub", + Time: time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC), + Type: cloudevent.TypeStatus, + DataContentType: "application/json", + }, + Data: json.RawMessage(`{"ok":true}`), + } + out, err := json.Marshal(ev) + require.NoError(t, err) + + var m map[string]any + require.NoError(t, json.Unmarshal(out, &m)) + assert.Equal(t, map[string]any{"ok": true}, m["data"]) + assert.Nil(t, m["data_base64"]) +} + +func TestRawEvent_MarshalJSON_DataBase64TakesPrecedence(t *testing.T) { + t.Parallel() + ev := cloudevent.RawEvent{ + CloudEventHeader: cloudevent.CloudEventHeader{ + ID: "bp", + Source: "s", + Subject: "sub", + Time: time.Date(2025, 6, 1, 0, 0, 0, 0, time.UTC), + Type: cloudevent.TypeStatus, + }, + Data: json.RawMessage(`{"ignored":true}`), + DataBase64: "cHJlc2V0", + } + out, err := json.Marshal(ev) + require.NoError(t, err) + + var m map[string]any + require.NoError(t, json.Unmarshal(out, &m)) + assert.Equal(t, "cHJlc2V0", m["data_base64"]) + assert.Nil(t, m["data"], "data_base64 should take precedence over data") +} + +func TestRawEvent_BytesForSignature_DataBase64(t *testing.T) { + t.Parallel() + ev := cloudevent.RawEvent{ + Data: []byte("decoded payload"), + DataBase64: "b3JpZ2luYWw=", + } + assert.Equal(t, []byte("b3JpZ2luYWw="), ev.BytesForSignature()) +} + +func TestRawEvent_BytesForSignature_DataOnly(t *testing.T) { + t.Parallel() + ev := cloudevent.RawEvent{ + Data: json.RawMessage(`{"sig":"data"}`), + } + assert.Equal(t, json.RawMessage(`{"sig":"data"}`), json.RawMessage(ev.BytesForSignature())) +} + +// --- IsJSONDataContentType tests --- + +func TestIsJSONDataContentType(t *testing.T) { + t.Parallel() + tests := []struct { + ct string + expected bool + }{ + {"application/json", true}, + {"application/json; charset=utf-8", true}, + {"application/cloudevents+json", true}, + {"application/vnd.custom+json", true}, + {"application/octet-stream", false}, + {"text/plain", false}, + {"", false}, + } + for _, tt := range tests { + assert.Equal(t, tt.expected, cloudevent.IsJSONDataContentType(tt.ct), "content type: %q", tt.ct) + } +} + +func TestRawEvent_UnmarshalJSON_InvalidBase64(t *testing.T) { + t.Parallel() + jsonStr := `{"id":"1","source":"s","type":"t","data_base64":"$$not-base64$$"}` + var ev cloudevent.RawEvent + err := json.Unmarshal([]byte(jsonStr), &ev) + require.Error(t, err, "expected error for invalid base64 in data_base64") +} + +func TestRawEvent_UnmarshalJSON_BothDataAndDataBase64(t *testing.T) { + t.Parallel() + jsonStr := `{"id":"1","source":"s","type":"t","data":{"x":1},"data_base64":"Zm9v"}` + var ev cloudevent.RawEvent + err := json.Unmarshal([]byte(jsonStr), &ev) + require.Error(t, err, "expected error when both data and data_base64 are present") + assert.Contains(t, err.Error(), "both") +} + +func TestCloudEvent_UnmarshalJSON_InvalidTime(t *testing.T) { + t.Parallel() + jsonStr := `{"id":"1","source":"s","type":"t","time":12345,"data":{"message":"hi","count":1}}` + var ev cloudevent.CloudEvent[TestData] + err := json.Unmarshal([]byte(jsonStr), &ev) + require.Error(t, err, "expected error for invalid time field type") +} + +func TestCloudEvent_UnmarshalJSON_NoDataField(t *testing.T) { + t.Parallel() + jsonStr := `{"id":"1","source":"s","type":"t","subject":"sub","time":"2025-01-01T00:00:00Z"}` + var ev cloudevent.CloudEvent[TestData] + err := json.Unmarshal([]byte(jsonStr), &ev) + require.NoError(t, err, "CloudEvent without data field should succeed") + assert.Equal(t, "1", ev.ID) + assert.Equal(t, TestData{}, ev.Data) +}