Skip to content

Properly handle event payload members of blob and string type#1064

Open
sugmanue wants to merge 1 commit intosmithy-lang:mainfrom
sugmanue:sugmanue/fix-event-stream-encoding
Open

Properly handle event payload members of blob and string type#1064
sugmanue wants to merge 1 commit intosmithy-lang:mainfrom
sugmanue:sugmanue/fix-event-stream-encoding

Conversation

@sugmanue
Copy link
Contributor

@sugmanue sugmanue commented Mar 12, 2026

Issue #, if available:

Description of changes:

Properly handle event payload members of blob and string type, those are serialized directly into the event payload using a different content-type header.

Serializes a member with the trait @eventPayload directly into the payload of the event. The following member types have special handling:

  • blob For blob members the non-encoded bytes are the payload of the event and the event :content-type header is set to "application/octet-stream"
  • string For string members the UTF-8 encoded bytes are the payload of the event and the event :content-type header is set to "text/plain"
  • If the payload is empty, no :content-type is added as header of the message
  • Any other is encoded with the underlying protocol and uses the content type defined for it, unless

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@sugmanue sugmanue enabled auto-merge (squash) March 12, 2026 01:09
Comment on lines 64 to +65
var typeHolder = new AtomicReference<String>();
var headers = new HashMap<String, HeaderValue>();
var payload = encodeInput(item, typeHolder, headers);
headers.put(":message-type", HeaderValue.fromString("event"));
headers.put(":event-type", HeaderValue.fromString(typeHolder.get()));
headers.put(":content-type", HeaderValue.fromString(payloadMediaType));
return new AwsEventFrame(new Message(headers, payload));
var contentTypeHolder = new AtomicReference<>(payloadMediaType);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we using AtomicReference here? this should be synchronous – if this is to emulate out parameters, use a one-element array or something similar to give us a reference we can mutate from within the encodeInput routine. alternatively, have that method return a record class that contains the payload, type, and content-type.

public void writeStruct(Schema schema, SerializableStruct struct) {
var memberName = schema.memberName();
if (possibleTypes.contains(memberName) &&
typeHolder.compareAndSet(null, memberName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there anything we can do to stop iterating over the struct fields once we set typeHolder? maybe throw an exception to short-circuit the serializer?

for (var memberSchema : eventSchema.members()) {
if (memberSchema.hasTrait(TraitKey.ERROR_TRAIT)) {
if (result.put(memberSchema.memberTarget().id(), memberSchema) != null) {
throw new IllegalStateException("Duplicate key");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you include the duplicate in the exception message?

* Returns true if the schema have any members that ought to be serialized in
* the event payload.
*/
static boolean hasPayloadMembers(Schema struct) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't something to fix in this PR, but this feels like the kind of thing we'd want to attach as a trait on the struct

private final Codec codec;
private final EventHeaderSerializer headerSerializer;
private final ShapeSerializer baseSerializer;
private final AtomicReference<String> contentTypeHolder;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like above, AtomicReference is the wrong thing to use if we don't need atomicity. it's not appropriate to use as a generic mutable reference, unless you commit to only using getPlain and setPlain to interact with it (but even that would be an abuse of its APIs)

@@ -0,0 +1,18 @@
## Example: Transcribe Streaming Client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

static class EventPayloadSerializer extends InterceptingSerializer {
private final OutputStream out;
private final Codec codec;
private final AtomicReference<String> contentEncodingHolder;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be contentTypeHolder, not contentEncodingHolder

public void writeBlob(Schema schema, ByteBuffer value) {
contentEncodingHolder.set("application/octet-stream");
try {
out.write(ByteBufferUtils.getBytes(value));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ByteBufferUtils.getBytes will trigger a copy if the payload does not exactly fill the underlying array, but we can avoid that with out.write(value.array(), value.arrayOffset() + value.position(), value.remaining())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants