Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 87 additions & 1 deletion cloudevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
package cloudevent

import (
"encoding/base64"
"encoding/json"
"fmt"
"mime"
"strings"
"time"

"github.com/tidwall/sjson"
)

const (
Expand Down Expand Up @@ -95,10 +101,90 @@ type CloudEvent[A any] struct {
CloudEventHeader
// Data contains domain-specific information about the event.
Data A `json:"data"`

DataBase64 string `json:"data_base64,omitempty"`
}

// RawEvent is a cloudevent with a json.RawMessage data field.
type RawEvent = CloudEvent[json.RawMessage]
// It supports both "data" and "data_base64" (CloudEvents JSON spec).
type RawEvent struct {
CloudEventHeader
Data json.RawMessage `json:"data,omitempty"`

// DataBase64 is the raw "data_base64" string when the event was received with
// data_base64 (CloudEvents spec). When set, MarshalJSON emits data_base64 for
// round-trip; otherwise wire form is chosen from DataContentType and Data.
DataBase64 string `json:"data_base64,omitempty"`
}

// BytesForSignature returns the bytes that were signed (wire form of data or data_base64).
// Use for signature verification; not the same as Data when the CE used data_base64.
func (r RawEvent) BytesForSignature() []byte {
if r.DataBase64 != "" {
return []byte(r.DataBase64)
}
return r.Data
}

// UnmarshalJSON implements json.Unmarshaler so that both "data" and "data_base64"
// are supported; Data is always set to the resolved payload bytes.
func (r *RawEvent) UnmarshalJSON(data []byte) error {
var dataRaw json.RawMessage
var dataBase64 string
header, err := unmarshalCloudEventWithPayload(data, func(d json.RawMessage, b64 string) error {
dataRaw = d
dataBase64 = b64
return nil
})
if err != nil {
return err
}
r.CloudEventHeader = header
if dataRaw != nil && dataBase64 != "" {
return fmt.Errorf("cloudevent: both \"data\" and \"data_base64\" present; only one allowed")
}
if dataBase64 != "" {
decoded, err := base64.StdEncoding.DecodeString(dataBase64)
if err != nil {
return err
}
r.Data = decoded
r.DataBase64 = dataBase64
} else {
r.Data = dataRaw
r.DataBase64 = ""
}
return nil
}

// IsJSONDataContentType returns true if the MIME type indicates a JSON payload.
// Matches "application/json" and any "+json" suffix type (e.g. "application/cloudevents+json").
func IsJSONDataContentType(ct string) bool {
parsed, _, err := mime.ParseMediaType(strings.TrimSpace(ct))
return err == nil && (parsed == "application/json" || strings.HasSuffix(parsed, "+json"))
}

// MarshalJSON implements json.Marshaler. Uses DataContentType to choose wire form:
// application/json -> "data"; otherwise -> "data_base64" (CloudEvents spec).
func (r RawEvent) MarshalJSON() ([]byte, error) {
data, err := json.Marshal(r.CloudEventHeader)
if err != nil {
return nil, err
}
if len(r.Data) > 0 || r.DataBase64 != "" {
if r.DataBase64 != "" {
data, err = sjson.SetBytes(data, "data_base64", r.DataBase64)
} else if IsJSONDataContentType(r.DataContentType) || (r.DataContentType == "" && json.Valid(r.Data)) {
data, err = sjson.SetRawBytes(data, "data", r.Data)
} else {
data, err = sjson.SetBytes(data, "data_base64", base64.StdEncoding.EncodeToString(r.Data))
}
if err != nil {
return nil, err
}
}
return data, nil
}

// Equals returns true if the two CloudEventHeaders share the same IndexKey.
func (c *CloudEventHeader) Equals(other CloudEventHeader) bool {
Expand Down
92 changes: 63 additions & 29 deletions cloudevent_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,56 @@ package cloudevent

import (
"encoding/json"
"fmt"
"reflect"
"strings"

"github.com/tidwall/sjson"
)

var definedCloudeEventHdrFields = getJSONFieldNames(reflect.TypeOf(CloudEventHeader{}))
var definedCloudeEventHdrFields = getJSONFieldNames(reflect.TypeFor[CloudEventHeader]())

type cloudEventHeader CloudEventHeader

// UnmarshalJSON implements custom JSON unmarshaling for CloudEvent.
// It transparently handles both "data" and "data_base64" wire formats.
func (c *CloudEvent[A]) UnmarshalJSON(data []byte) error {
var err error
c.CloudEventHeader, err = unmarshalCloudEvent(data, c.setDataField)
return err
var dataRaw json.RawMessage
var dataBase64 string
header, err := unmarshalCloudEventWithPayload(data, func(d json.RawMessage, b64 string) error {
dataRaw = d
dataBase64 = b64
return nil
})
if err != nil {
return err
}
c.CloudEventHeader = header
if dataRaw != nil && dataBase64 != "" {
return fmt.Errorf("cloudevent: both \"data\" and \"data_base64\" present; only one allowed")
}
if dataBase64 != "" {
c.DataBase64 = dataBase64
} else if dataRaw != nil {
if err := json.Unmarshal(dataRaw, &c.Data); err != nil {
return err
}
}
return nil
}

// MarshalJSON implements custom JSON marshaling for CloudEventHeader.
// MarshalJSON implements custom JSON marshaling for CloudEvent[A].
// When DataBase64 is set, emits "data_base64"; otherwise emits "data".
func (c CloudEvent[A]) MarshalJSON() ([]byte, error) {
// Marshal the base struct
data, err := json.Marshal(c.CloudEventHeader)
if err != nil {
return nil, err
}
data, err = sjson.SetBytes(data, "data", c.Data)
if c.DataBase64 != "" {
data, err = sjson.SetBytes(data, "data_base64", c.DataBase64)
} else {
data, err = sjson.SetBytes(data, "data", c.Data)
}
if err != nil {
return nil, err
}
Expand All @@ -42,21 +67,18 @@ func (c *CloudEventHeader) UnmarshalJSON(data []byte) error {

// MarshalJSON implements custom JSON marshaling for CloudEventHeader.
func (c CloudEventHeader) MarshalJSON() ([]byte, error) {
// Marshal the base struct
aux := (cloudEventHeader)(c)
aux.SpecVersion = SpecVersion
data, err := json.Marshal(aux)
if err != nil {
return nil, err
}
// Add all extras using sjson]
for k, v := range c.Extras {
data, err = sjson.SetBytes(data, k, v)
if err != nil {
return nil, err
}
}

return data, nil
}

Expand Down Expand Up @@ -90,30 +112,48 @@ func getJSONFieldNames(t reflect.Type) map[string]struct{} {

// unmarshalCloudEvent unmarshals the CloudEventHeader and data field.
func unmarshalCloudEvent(data []byte, dataFunc func(json.RawMessage) error) (CloudEventHeader, error) {
c := CloudEventHeader{}
aux := cloudEventHeader{}
// Unmarshal known fields directly into the struct
if err := json.Unmarshal(data, &aux); err != nil {
return unmarshalCloudEventWithPayload(data, func(dataRaw json.RawMessage, _ string) error {
return dataFunc(dataRaw)
})
}

// unmarshalCloudEventWithPayload unmarshals the CloudEventHeader and returns both
// "data" and "data_base64" for RawEvent.
func unmarshalCloudEventWithPayload(data []byte, payloadFunc func(dataRaw json.RawMessage, dataBase64 string) error) (CloudEventHeader, error) {
// Unmarshal known header fields via the type alias (no custom UnmarshalJSON).
var c CloudEventHeader
if err := json.Unmarshal(data, (*cloudEventHeader)(&c)); err != nil {
return c, err
}
aux.SpecVersion = SpecVersion
c = (CloudEventHeader)(aux)
// Create a map to hold all JSON fields
c.SpecVersion = SpecVersion

// Second pass into raw map to extract data, data_base64, and extras.
rawFields := make(map[string]json.RawMessage)
if err := json.Unmarshal(data, &rawFields); err != nil {
return c, err
}

// Separate known and unknown fields
var dataRaw json.RawMessage
var dataBase64 string
if raw, ok := rawFields["data_base64"]; ok && len(raw) > 0 {
if err := json.Unmarshal(raw, &dataBase64); err != nil {
return c, err
}
}
if raw, ok := rawFields["data"]; ok {
dataRaw = raw
}
if dataRaw != nil || dataBase64 != "" {
if err := payloadFunc(dataRaw, dataBase64); err != nil {
return c, err
}
}

for key, rawValue := range rawFields {
if _, ok := definedCloudeEventHdrFields[key]; ok {
// Skip defined fields
continue
}
if key == "data" {
if err := dataFunc(rawValue); err != nil {
return c, err
}
if key == "data" || key == "data_base64" {
continue
}
if c.Extras == nil {
Expand All @@ -131,9 +171,3 @@ func unmarshalCloudEvent(data []byte, dataFunc func(json.RawMessage) error) (Clo
// ignoreDataField is a function that ignores the data field.
// It is used when unmarshalling the CloudEventHeader so that the data field is not added to the Extras map.
func ignoreDataField(json.RawMessage) error { return nil }

// setDataField is a function that sets the data field.
// It is used to unmarshal the data field into the CloudEvent[A].Data field.
func (c *CloudEvent[A]) setDataField(data json.RawMessage) error {
return json.Unmarshal(data, &c.Data)
}
Loading