diff --git a/cloudevent.go b/cloudevent.go index 5c8c9ad..c6a6847 100644 --- a/cloudevent.go +++ b/cloudevent.go @@ -2,8 +2,14 @@ package cloudevent import ( + "encoding/base64" "encoding/json" + "fmt" + "mime" + "strings" "time" + + "github.com/tidwall/sjson" ) const ( @@ -98,7 +104,85 @@ type CloudEvent[A any] struct { } // RawEvent is a cloudevent with a json.RawMessage data field. -type RawEvent = CloudEvent[json.RawMessage] +// It supports both "data" and "data_base64" (CloudEvents JSON spec). +type RawEvent struct { + CloudEventHeader + Data json.RawMessage `json:"data,omitempty"` + + // DataBase64 is the raw "data_base64" string when the event was received with + // data_base64 (CloudEvents spec). When set, MarshalJSON emits data_base64 for + // round-trip; otherwise wire form is chosen from DataContentType and Data. + DataBase64 string `json:"data_base64,omitempty"` +} + +// BytesForSignature returns the bytes that were signed (wire form of data or data_base64). +// Use for signature verification; not the same as Data when the CE used data_base64. +func (r RawEvent) BytesForSignature() []byte { + if r.DataBase64 != "" { + return []byte(r.DataBase64) + } + return r.Data +} + +// UnmarshalJSON implements json.Unmarshaler so that both "data" and "data_base64" +// are supported; Data is always set to the resolved payload bytes. +func (r *RawEvent) UnmarshalJSON(data []byte) error { + var dataRaw json.RawMessage + var dataBase64 string + header, err := unmarshalCloudEventWithPayload(data, func(d json.RawMessage, b64 string) error { + dataRaw = d + dataBase64 = b64 + return nil + }) + if err != nil { + return err + } + r.CloudEventHeader = header + if dataRaw != nil && dataBase64 != "" { + return fmt.Errorf("cloudevent: both \"data\" and \"data_base64\" present; only one allowed") + } + if dataBase64 != "" { + decoded, err := base64.StdEncoding.DecodeString(dataBase64) + if err != nil { + return err + } + r.Data = decoded + r.DataBase64 = dataBase64 + } else { + r.Data = dataRaw + r.DataBase64 = "" + } + return nil +} + +// IsJSONDataContentType returns true if the MIME type indicates a JSON payload. +// Matches "application/json" and any "+json" suffix type (e.g. "application/cloudevents+json"). +func IsJSONDataContentType(ct string) bool { + parsed, _, err := mime.ParseMediaType(strings.TrimSpace(ct)) + return err == nil && (parsed == "application/json" || strings.HasSuffix(parsed, "+json")) +} + +// MarshalJSON implements json.Marshaler. Uses DataContentType to choose wire form: +// application/json -> "data"; otherwise -> "data_base64" (CloudEvents spec). +func (r RawEvent) MarshalJSON() ([]byte, error) { + data, err := json.Marshal(r.CloudEventHeader) + if err != nil { + return nil, err + } + if len(r.Data) > 0 || r.DataBase64 != "" { + if r.DataBase64 != "" { + data, err = sjson.SetBytes(data, "data_base64", r.DataBase64) + } else if IsJSONDataContentType(r.DataContentType) || (r.DataContentType == "" && json.Valid(r.Data)) { + data, err = sjson.SetRawBytes(data, "data", r.Data) + } else { + data, err = sjson.SetBytes(data, "data_base64", base64.StdEncoding.EncodeToString(r.Data)) + } + if err != nil { + return nil, err + } + } + return data, nil +} // Equals returns true if the two CloudEventHeaders share the same IndexKey. func (c *CloudEventHeader) Equals(other CloudEventHeader) bool { diff --git a/cloudevent_json.go b/cloudevent_json.go index 84dec80..2c82da1 100644 --- a/cloudevent_json.go +++ b/cloudevent_json.go @@ -8,7 +8,7 @@ import ( "github.com/tidwall/sjson" ) -var definedCloudeEventHdrFields = getJSONFieldNames(reflect.TypeOf(CloudEventHeader{})) +var definedCloudeEventHdrFields = getJSONFieldNames(reflect.TypeFor[CloudEventHeader]()) type cloudEventHeader CloudEventHeader @@ -19,9 +19,8 @@ func (c *CloudEvent[A]) UnmarshalJSON(data []byte) error { return err } -// MarshalJSON implements custom JSON marshaling for CloudEventHeader. +// MarshalJSON implements custom JSON marshaling for CloudEvent[A]. func (c CloudEvent[A]) MarshalJSON() ([]byte, error) { - // Marshal the base struct data, err := json.Marshal(c.CloudEventHeader) if err != nil { return nil, err @@ -42,21 +41,18 @@ func (c *CloudEventHeader) UnmarshalJSON(data []byte) error { // MarshalJSON implements custom JSON marshaling for CloudEventHeader. func (c CloudEventHeader) MarshalJSON() ([]byte, error) { - // Marshal the base struct aux := (cloudEventHeader)(c) aux.SpecVersion = SpecVersion data, err := json.Marshal(aux) if err != nil { return nil, err } - // Add all extras using sjson] for k, v := range c.Extras { data, err = sjson.SetBytes(data, k, v) if err != nil { return nil, err } } - return data, nil } @@ -90,30 +86,100 @@ func getJSONFieldNames(t reflect.Type) map[string]struct{} { // unmarshalCloudEvent unmarshals the CloudEventHeader and data field. func unmarshalCloudEvent(data []byte, dataFunc func(json.RawMessage) error) (CloudEventHeader, error) { + return unmarshalCloudEventWithPayload(data, func(dataRaw json.RawMessage, _ string) error { + return dataFunc(dataRaw) + }) +} + +// unmarshalCloudEventWithPayload unmarshals the CloudEventHeader and returns both +// "data" and "data_base64" for RawEvent. +// Single-pass: decode only into map[string]json.RawMessage, then fill header from raw fields. +func unmarshalCloudEventWithPayload(data []byte, payloadFunc func(dataRaw json.RawMessage, dataBase64 string) error) (CloudEventHeader, error) { c := CloudEventHeader{} - aux := cloudEventHeader{} - // Unmarshal known fields directly into the struct - if err := json.Unmarshal(data, &aux); err != nil { - return c, err - } - aux.SpecVersion = SpecVersion - c = (CloudEventHeader)(aux) - // Create a map to hold all JSON fields rawFields := make(map[string]json.RawMessage) if err := json.Unmarshal(data, &rawFields); err != nil { return c, err } - // Separate known and unknown fields + // Populate known header fields from raw values (one small unmarshal per field). + if raw, ok := rawFields["id"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &c.ID); err != nil { + return c, err + } + } + if raw, ok := rawFields["source"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &c.Source); err != nil { + return c, err + } + } + if raw, ok := rawFields["producer"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &c.Producer); err != nil { + return c, err + } + } + c.SpecVersion = SpecVersion + if raw, ok := rawFields["subject"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &c.Subject); err != nil { + return c, err + } + } + if raw, ok := rawFields["time"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &c.Time); err != nil { + return c, err + } + } + if raw, ok := rawFields["type"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &c.Type); err != nil { + return c, err + } + } + if raw, ok := rawFields["datacontenttype"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &c.DataContentType); err != nil { + return c, err + } + } + if raw, ok := rawFields["dataschema"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &c.DataSchema); err != nil { + return c, err + } + } + if raw, ok := rawFields["dataversion"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &c.DataVersion); err != nil { + return c, err + } + } + if raw, ok := rawFields["signature"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &c.Signature); err != nil { + return c, err + } + } + if raw, ok := rawFields["tags"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &c.Tags); err != nil { + return c, err + } + } + + var dataRaw json.RawMessage + var dataBase64 string + if raw, ok := rawFields["data_base64"]; ok && len(raw) > 0 { + if err := json.Unmarshal(raw, &dataBase64); err != nil { + return c, err + } + } + if raw, ok := rawFields["data"]; ok { + dataRaw = raw + } + if dataRaw != nil || dataBase64 != "" { + if err := payloadFunc(dataRaw, dataBase64); err != nil { + return c, err + } + } + for key, rawValue := range rawFields { if _, ok := definedCloudeEventHdrFields[key]; ok { - // Skip defined fields continue } - if key == "data" { - if err := dataFunc(rawValue); err != nil { - return c, err - } + if key == "data" || key == "data_base64" { continue } if c.Extras == nil { diff --git a/cloudevent_test.go b/cloudevent_test.go index e73bd34..2b9f64f 100644 --- a/cloudevent_test.go +++ b/cloudevent_test.go @@ -345,3 +345,38 @@ func TestCloudEventHeader_UnmarshalJSON(t *testing.T) { }) } } + +func TestRawEvent_UnmarshalJSON_InvalidBase64(t *testing.T) { + t.Parallel() + jsonStr := `{"id":"1","source":"s","type":"t","data_base64":"$$not-base64$$"}` + var ev cloudevent.RawEvent + err := json.Unmarshal([]byte(jsonStr), &ev) + require.Error(t, err, "expected error for invalid base64 in data_base64") +} + +func TestRawEvent_UnmarshalJSON_BothDataAndDataBase64(t *testing.T) { + t.Parallel() + jsonStr := `{"id":"1","source":"s","type":"t","data":{"x":1},"data_base64":"Zm9v"}` + var ev cloudevent.RawEvent + err := json.Unmarshal([]byte(jsonStr), &ev) + require.Error(t, err, "expected error when both data and data_base64 are present") + assert.Contains(t, err.Error(), "both") +} + +func TestCloudEvent_UnmarshalJSON_InvalidTime(t *testing.T) { + t.Parallel() + jsonStr := `{"id":"1","source":"s","type":"t","time":12345,"data":{"message":"hi","count":1}}` + var ev cloudevent.CloudEvent[TestData] + err := json.Unmarshal([]byte(jsonStr), &ev) + require.Error(t, err, "expected error for invalid time field type") +} + +func TestCloudEvent_UnmarshalJSON_NoDataField(t *testing.T) { + t.Parallel() + jsonStr := `{"id":"1","source":"s","type":"t","subject":"sub","time":"2025-01-01T00:00:00Z"}` + var ev cloudevent.CloudEvent[TestData] + err := json.Unmarshal([]byte(jsonStr), &ev) + require.NoError(t, err, "CloudEvent without data field should succeed") + assert.Equal(t, "1", ev.ID) + assert.Equal(t, TestData{}, ev.Data) +}