-
Notifications
You must be signed in to change notification settings - Fork 0
Add data_base64 support and simplify CE JSON marshaling #39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -2,8 +2,14 @@ | |||||
| package cloudevent | ||||||
|
|
||||||
| import ( | ||||||
| "encoding/base64" | ||||||
| "encoding/json" | ||||||
| "fmt" | ||||||
| "mime" | ||||||
| "strings" | ||||||
| "time" | ||||||
|
|
||||||
| "github.com/tidwall/sjson" | ||||||
| ) | ||||||
|
|
||||||
| const ( | ||||||
|
|
@@ -98,7 +104,85 @@ type CloudEvent[A any] struct { | |||||
| } | ||||||
|
|
||||||
| // RawEvent is a cloudevent with a json.RawMessage data field. | ||||||
| type RawEvent = CloudEvent[json.RawMessage] | ||||||
| // It supports both "data" and "data_base64" (CloudEvents JSON spec). | ||||||
| type RawEvent struct { | ||||||
| CloudEventHeader | ||||||
| Data json.RawMessage `json:"data,omitempty"` | ||||||
|
|
||||||
| // DataBase64 is the raw "data_base64" string when the event was received with | ||||||
| // data_base64 (CloudEvents spec). When set, MarshalJSON emits data_base64 for | ||||||
| // round-trip; otherwise wire form is chosen from DataContentType and Data. | ||||||
| DataBase64 string `json:"data_base64,omitempty"` | ||||||
| } | ||||||
|
|
||||||
| // BytesForSignature returns the bytes that were signed (wire form of data or data_base64). | ||||||
| // Use for signature verification; not the same as Data when the CE used data_base64. | ||||||
| func (r RawEvent) BytesForSignature() []byte { | ||||||
| if r.DataBase64 != "" { | ||||||
| return []byte(r.DataBase64) | ||||||
| } | ||||||
| return r.Data | ||||||
| } | ||||||
|
|
||||||
| // UnmarshalJSON implements json.Unmarshaler so that both "data" and "data_base64" | ||||||
| // are supported; Data is always set to the resolved payload bytes. | ||||||
| func (r *RawEvent) UnmarshalJSON(data []byte) error { | ||||||
| var dataRaw json.RawMessage | ||||||
| var dataBase64 string | ||||||
| header, err := unmarshalCloudEventWithPayload(data, func(d json.RawMessage, b64 string) error { | ||||||
| dataRaw = d | ||||||
| dataBase64 = b64 | ||||||
| return nil | ||||||
| }) | ||||||
| if err != nil { | ||||||
| return err | ||||||
| } | ||||||
| r.CloudEventHeader = header | ||||||
| if dataRaw != nil && dataBase64 != "" { | ||||||
| return fmt.Errorf("cloudevent: both \"data\" and \"data_base64\" present; only one allowed") | ||||||
| } | ||||||
| if dataBase64 != "" { | ||||||
| decoded, err := base64.StdEncoding.DecodeString(dataBase64) | ||||||
| if err != nil { | ||||||
| return err | ||||||
| } | ||||||
| r.Data = decoded | ||||||
| r.DataBase64 = dataBase64 | ||||||
| } else { | ||||||
| r.Data = dataRaw | ||||||
| r.DataBase64 = "" | ||||||
| } | ||||||
| return nil | ||||||
| } | ||||||
|
|
||||||
| // IsJSONDataContentType returns true if the MIME type indicates a JSON payload. | ||||||
| // Matches "application/json" and any "+json" suffix type (e.g. "application/cloudevents+json"). | ||||||
| func IsJSONDataContentType(ct string) bool { | ||||||
| parsed, _, err := mime.ParseMediaType(strings.TrimSpace(ct)) | ||||||
| return err == nil && (parsed == "application/json" || strings.HasSuffix(parsed, "+json")) | ||||||
| } | ||||||
|
|
||||||
| // MarshalJSON implements json.Marshaler. Uses DataContentType to choose wire form: | ||||||
| // application/json -> "data"; otherwise -> "data_base64" (CloudEvents spec). | ||||||
| func (r RawEvent) MarshalJSON() ([]byte, error) { | ||||||
| data, err := json.Marshal(r.CloudEventHeader) | ||||||
| if err != nil { | ||||||
| return nil, err | ||||||
| } | ||||||
| if len(r.Data) > 0 || r.DataBase64 != "" { | ||||||
| if r.DataBase64 != "" { | ||||||
| data, err = sjson.SetBytes(data, "data_base64", r.DataBase64) | ||||||
| } else if IsJSONDataContentType(r.DataContentType) || (r.DataContentType == "" && json.Valid(r.Data)) { | ||||||
|
||||||
| } else if IsJSONDataContentType(r.DataContentType) || (r.DataContentType == "" && json.Valid(r.Data)) { | |
| } else if json.Valid(r.Data) && (IsJSONDataContentType(r.DataContentType) || r.DataContentType == "") { |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,7 +8,7 @@ import ( | |
| "github.com/tidwall/sjson" | ||
| ) | ||
|
|
||
| var definedCloudeEventHdrFields = getJSONFieldNames(reflect.TypeOf(CloudEventHeader{})) | ||
| var definedCloudeEventHdrFields = getJSONFieldNames(reflect.TypeFor[CloudEventHeader]()) | ||
|
|
||
| type cloudEventHeader CloudEventHeader | ||
|
|
||
|
|
@@ -19,9 +19,8 @@ func (c *CloudEvent[A]) UnmarshalJSON(data []byte) error { | |
| return err | ||
| } | ||
|
|
||
| // MarshalJSON implements custom JSON marshaling for CloudEventHeader. | ||
| // MarshalJSON implements custom JSON marshaling for CloudEvent[A]. | ||
| func (c CloudEvent[A]) MarshalJSON() ([]byte, error) { | ||
| // Marshal the base struct | ||
| data, err := json.Marshal(c.CloudEventHeader) | ||
| if err != nil { | ||
| return nil, err | ||
|
|
@@ -42,21 +41,18 @@ func (c *CloudEventHeader) UnmarshalJSON(data []byte) error { | |
|
|
||
| // MarshalJSON implements custom JSON marshaling for CloudEventHeader. | ||
| func (c CloudEventHeader) MarshalJSON() ([]byte, error) { | ||
| // Marshal the base struct | ||
| aux := (cloudEventHeader)(c) | ||
| aux.SpecVersion = SpecVersion | ||
| data, err := json.Marshal(aux) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| // Add all extras using sjson] | ||
| for k, v := range c.Extras { | ||
| data, err = sjson.SetBytes(data, k, v) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
|
|
||
| return data, nil | ||
| } | ||
|
|
||
|
|
@@ -90,30 +86,100 @@ func getJSONFieldNames(t reflect.Type) map[string]struct{} { | |
|
|
||
| // unmarshalCloudEvent unmarshals the CloudEventHeader and data field. | ||
| func unmarshalCloudEvent(data []byte, dataFunc func(json.RawMessage) error) (CloudEventHeader, error) { | ||
| return unmarshalCloudEventWithPayload(data, func(dataRaw json.RawMessage, _ string) error { | ||
| return dataFunc(dataRaw) | ||
| }) | ||
| } | ||
|
|
||
| // unmarshalCloudEventWithPayload unmarshals the CloudEventHeader and returns both | ||
| // "data" and "data_base64" for RawEvent. | ||
| // Single-pass: decode only into map[string]json.RawMessage, then fill header from raw fields. | ||
| func unmarshalCloudEventWithPayload(data []byte, payloadFunc func(dataRaw json.RawMessage, dataBase64 string) error) (CloudEventHeader, error) { | ||
| c := CloudEventHeader{} | ||
| aux := cloudEventHeader{} | ||
| // Unmarshal known fields directly into the struct | ||
| if err := json.Unmarshal(data, &aux); err != nil { | ||
| return c, err | ||
| } | ||
| aux.SpecVersion = SpecVersion | ||
| c = (CloudEventHeader)(aux) | ||
| // Create a map to hold all JSON fields | ||
| rawFields := make(map[string]json.RawMessage) | ||
| if err := json.Unmarshal(data, &rawFields); err != nil { | ||
| return c, err | ||
| } | ||
|
|
||
| // Separate known and unknown fields | ||
| // Populate known header fields from raw values (one small unmarshal per field). | ||
| if raw, ok := rawFields["id"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &c.ID); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
| if raw, ok := rawFields["source"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &c.Source); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
| if raw, ok := rawFields["producer"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &c.Producer); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
| c.SpecVersion = SpecVersion | ||
| if raw, ok := rawFields["subject"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &c.Subject); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
| if raw, ok := rawFields["time"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &c.Time); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
| if raw, ok := rawFields["type"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &c.Type); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
| if raw, ok := rawFields["datacontenttype"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &c.DataContentType); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
| if raw, ok := rawFields["dataschema"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &c.DataSchema); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
| if raw, ok := rawFields["dataversion"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &c.DataVersion); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
| if raw, ok := rawFields["signature"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &c.Signature); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
| if raw, ok := rawFields["tags"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &c.Tags); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
|
|
||
| var dataRaw json.RawMessage | ||
| var dataBase64 string | ||
| if raw, ok := rawFields["data_base64"]; ok && len(raw) > 0 { | ||
| if err := json.Unmarshal(raw, &dataBase64); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
| if raw, ok := rawFields["data"]; ok { | ||
| dataRaw = raw | ||
| } | ||
| if dataRaw != nil || dataBase64 != "" { | ||
| if err := payloadFunc(dataRaw, dataBase64); err != nil { | ||
| return c, err | ||
| } | ||
| } | ||
|
Comment on lines
+162
to
+176
|
||
|
|
||
| for key, rawValue := range rawFields { | ||
| if _, ok := definedCloudeEventHdrFields[key]; ok { | ||
| // Skip defined fields | ||
| continue | ||
| } | ||
| if key == "data" { | ||
| if err := dataFunc(rawValue); err != nil { | ||
| return c, err | ||
| } | ||
| if key == "data" || key == "data_base64" { | ||
| continue | ||
| } | ||
| if c.Extras == nil { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -345,3 +345,38 @@ func TestCloudEventHeader_UnmarshalJSON(t *testing.T) { | |
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestRawEvent_UnmarshalJSON_InvalidBase64(t *testing.T) { | ||
| t.Parallel() | ||
| jsonStr := `{"id":"1","source":"s","type":"t","data_base64":"$$not-base64$$"}` | ||
| var ev cloudevent.RawEvent | ||
| err := json.Unmarshal([]byte(jsonStr), &ev) | ||
| require.Error(t, err, "expected error for invalid base64 in data_base64") | ||
| } | ||
|
|
||
| func TestRawEvent_UnmarshalJSON_BothDataAndDataBase64(t *testing.T) { | ||
| t.Parallel() | ||
| jsonStr := `{"id":"1","source":"s","type":"t","data":{"x":1},"data_base64":"Zm9v"}` | ||
| var ev cloudevent.RawEvent | ||
| err := json.Unmarshal([]byte(jsonStr), &ev) | ||
| require.Error(t, err, "expected error when both data and data_base64 are present") | ||
| assert.Contains(t, err.Error(), "both") | ||
| } | ||
|
Comment on lines
+349
to
+364
|
||
|
|
||
| func TestCloudEvent_UnmarshalJSON_InvalidTime(t *testing.T) { | ||
| t.Parallel() | ||
| jsonStr := `{"id":"1","source":"s","type":"t","time":12345,"data":{"message":"hi","count":1}}` | ||
| var ev cloudevent.CloudEvent[TestData] | ||
| err := json.Unmarshal([]byte(jsonStr), &ev) | ||
| require.Error(t, err, "expected error for invalid time field type") | ||
| } | ||
|
|
||
| func TestCloudEvent_UnmarshalJSON_NoDataField(t *testing.T) { | ||
| t.Parallel() | ||
| jsonStr := `{"id":"1","source":"s","type":"t","subject":"sub","time":"2025-01-01T00:00:00Z"}` | ||
| var ev cloudevent.CloudEvent[TestData] | ||
| err := json.Unmarshal([]byte(jsonStr), &ev) | ||
| require.NoError(t, err, "CloudEvent without data field should succeed") | ||
| assert.Equal(t, "1", ev.ID) | ||
| assert.Equal(t, TestData{}, ev.Data) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RawEvent.UnmarshalJSONusesdataBase64 != ""to decide whetherdata_base64was provided. This loses information (and skips base64 decoding) when the input contains an explicit empty base64 payload ("data_base64":""), and also won’t error if bothdataand an emptydata_base64are present. Consider propagating a separate “field present” boolean fromunmarshalCloudEventWithPayloadand basing the branch + mutual-exclusion check on presence rather than non-empty string.