Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import software.amazon.smithy.java.core.serde.SpecificShapeDeserializer;
import software.amazon.smithy.java.core.serde.event.EventDecoder;
import software.amazon.smithy.java.core.serde.event.EventStream;
import software.amazon.smithy.java.core.serde.event.EventStreamingException;
import software.amazon.smithy.java.core.serde.event.EventStreamingProtocolException;

/**
* A decoder for AWS events
Expand Down Expand Up @@ -58,11 +60,24 @@ public SerializableStruct decode(AwsEventFrame frame) {

private E decodeEvent(AwsEventFrame frame) {
var message = frame.unwrap();
// TODO Add support for :message-type other than "event".
var eventType = getEventType(message);
var messageType = getMessageType(message);
return switch (messageType) {
case "error" -> throw decodeError(message);
case "event" -> decodePayload(getEventType(message), message);
case "exception" -> decodeModeledException(message);
default -> throw new IllegalStateException("Unknown message type: " + messageType);
};
}

private E decodeModeledException(Message message) {
var exceptionType = expectHeader(message, ":exception-type").getString();
return decodePayload(exceptionType, message);
}

private E decodePayload(String eventType, Message message) {
var memberSchema = eventSchema.member(eventType);
if (memberSchema == null) {
throw new IllegalArgumentException("Unsupported event type: " + eventType);
throw new EventStreamingProtocolException("Unsupported event type: " + eventType);
}
var codecDeserializer = codec.createDeserializer(message.getPayload());
var headers = message.getHeaders();
Expand All @@ -75,6 +90,12 @@ private E decodeEvent(AwsEventFrame frame) {
return builder.build();
}

private EventStreamingProtocolException decodeError(Message message) {
var errorCode = expectHeader(message, ":error-code").getString();
var errorMessage = expectHeader(message, ":error-message").getString();
return new EventStreamingException(errorCode, errorMessage);
}

@Override
public IR decodeInitialEvent(AwsEventFrame frame, EventStream<?> eventStream) {
var message = frame.unwrap();
Expand All @@ -84,8 +105,8 @@ public IR decodeInitialEvent(AwsEventFrame frame, EventStream<?> eventStream) {
var responseDeserializer = new InitialResponseDeserializer(publisherMember, eventStream);
builder.deserialize(responseDeserializer);
// Deserialize the rest of the members if any
var headers = message.getHeaders();
var codecDeserializer = codec.createDeserializer(message.getPayload());
var headers = message.getHeaders();
var deserializer = new EventStreamDeserializer(codecDeserializer, new HeadersDeserializer(headers));
builder.deserialize(deserializer);
return builder.build();
Expand All @@ -101,11 +122,19 @@ private Schema getEventStreamMember(Schema schema) {
}

private String getMessageType(Message message) {
return message.getHeaders().get(":message-type").getString();
return expectHeader(message, ":message-type").getString();
}

private String getEventType(Message message) {
return message.getHeaders().get(":event-type").getString();
return expectHeader(message, ":event-type").getString();
}

private HeaderValue expectHeader(Message message, String headerName) {
var header = message.getHeaders().get(headerName);
if (header == null) {
throw new EventStreamingProtocolException("expected headers to have '" + headerName + "' header");
}
return header;
}

static class InitialResponseDeserializer extends SpecificShapeDeserializer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,26 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import java.nio.charset.StandardCharsets;
import java.util.function.Supplier;
import org.junit.jupiter.api.Test;
import software.amazon.eventstream.Message;
import software.amazon.smithy.java.aws.events.model.BodyAndHeaderEvent;
import software.amazon.smithy.java.aws.events.model.EventStreamWithError;
import software.amazon.smithy.java.aws.events.model.HeadersOnlyEvent;
import software.amazon.smithy.java.aws.events.model.MyError;
import software.amazon.smithy.java.aws.events.model.StringEvent;
import software.amazon.smithy.java.aws.events.model.StructureEvent;
import software.amazon.smithy.java.aws.events.model.TestEventStream;
import software.amazon.smithy.java.aws.events.model.TestOperation;
import software.amazon.smithy.java.aws.events.model.TestOperationOutput;
import software.amazon.smithy.java.aws.events.model.TestOperationWithException;
import software.amazon.smithy.java.core.schema.ApiOperation;
import software.amazon.smithy.java.core.schema.SerializableStruct;
import software.amazon.smithy.java.core.serde.Codec;
import software.amazon.smithy.java.core.serde.event.EventStreamingException;
import software.amazon.smithy.java.json.JsonCodec;

class AwsEventShapeDecoderTest {
Expand All @@ -36,7 +43,7 @@ public void testDecodeInitialResponse() {
var frame = new AwsEventFrame(message);

// Act
var struct = createDecoder().decodeInitialEvent(frame, null);
var struct = createDecoder(TestOperation.instance()).decodeInitialEvent(frame, null);

// Assert
assertInstanceOf(TestOperationOutput.class, struct);
Expand All @@ -59,12 +66,12 @@ public void testDecodeHeadersOnlyMember() {
var frame = new AwsEventFrame(message);

// Act
var struct = createDecoder().decode(frame);
var struct = createDecoder(TestOperation.instance()).decode(frame);

// Assert
assertInstanceOf(TestEventStream.class, struct);
var actual = (TestEventStream) struct;
assertEquals(TestEventStream.Type.headersOnlyMember, actual.type());
assertInstanceOf(TestEventStream.HeadersOnlyMemberMember.class, actual);
var expected = TestEventStream.builder()
.headersOnlyMember(HeadersOnlyEvent.builder().sequenceNum(123).build())
.build();
Expand All @@ -82,12 +89,12 @@ public void testDecodeStructureMember() {
var frame = new AwsEventFrame(message);

// Act
var struct = createDecoder().decode(frame);
var struct = createDecoder(TestOperation.instance()).decode(frame);

// Assert
assertInstanceOf(TestEventStream.class, struct);
var actual = (TestEventStream) struct;
assertEquals(TestEventStream.Type.structureMember, actual.type());
assertInstanceOf(TestEventStream.StructureMemberMember.class, actual);
var expected = TestEventStream.builder()
.structureMember(StructureEvent.builder().foo("memberFooValue").build())
.build();
Expand All @@ -106,12 +113,12 @@ public void testDecodeBodyAndHeaderMember() {
var frame = new AwsEventFrame(message);

// Act
var struct = createDecoder().decode(frame);
var struct = createDecoder(TestOperation.instance()).decode(frame);

// Assert
assertInstanceOf(TestEventStream.class, struct);
var actual = (TestEventStream) struct;
assertEquals(TestEventStream.Type.bodyAndHeaderMember, actual.type());
assertInstanceOf(TestEventStream.BodyAndHeaderMemberMember.class, actual);
var expected = TestEventStream.builder()
.bodyAndHeaderMember(BodyAndHeaderEvent.builder()
.intMember(123)
Expand All @@ -127,29 +134,82 @@ public void testDecodeStringMember() {
var headers = new AwsEventShapeEncoderTest.HeadersBuilder()
.contentType("text/json")
.eventType("stringMember")
.build();;
.build();
var message = new Message(headers, "\"hello world!\"".getBytes(StandardCharsets.UTF_8));
var frame = new AwsEventFrame(message);

// Act
var struct = createDecoder().decode(frame);
var struct = createDecoder(TestOperation.instance()).decode(frame);

// Assert
assertInstanceOf(TestEventStream.class, struct);
var actual = (TestEventStream) struct;
assertEquals(TestEventStream.Type.stringMember, actual.type());
assertInstanceOf(TestEventStream.StringMemberMember.class, actual);
var expected = TestEventStream.builder()
.stringMember(StringEvent.builder().payload("hello world!").build())
.build();
assertEquals(expected, actual);
}

@Test
public void testDecodeExceptionMember() {
// Arrange
var headers = new AwsEventShapeEncoderTest.HeadersBuilder()
.contentType("text/json")
.messageType("exception")
.exceptionType("modeledErrorMember")
.build();
var message = new Message(headers, "{\"message\":\"Client exception\"}".getBytes(StandardCharsets.UTF_8));
var frame = new AwsEventFrame(message);

// Act
var struct = createDecoder(TestOperationWithException.instance()).decode(frame);

// Assert
assertInstanceOf(EventStreamWithError.class, struct);
var actual = (EventStreamWithError) struct;
assertInstanceOf(EventStreamWithError.ModeledErrorMemberMember.class, actual);
var expected = EventStreamWithError.builder()
.modeledErrorMember(MyError.builder().message("Client exception").build())
.build();
assertEquals(((EventStreamWithError.ModeledErrorMemberMember) expected).getValue().getMessage(),
((EventStreamWithError.ModeledErrorMemberMember) actual).getValue().getMessage());
}

@Test
public void testDecodeError() {
// Arrange
var headers = new AwsEventShapeEncoderTest.HeadersBuilder()
.messageType("error")
.put(":error-code", "InternalFailure")
.put(":error-message", "An internal server error occurred")
.build();
var message = new Message(headers, new byte[0]);
var frame = new AwsEventFrame(message);

// Act
Exception except = null;
try {
createDecoder(TestOperationWithException.instance()).decode(frame);
} catch (Exception e) {
except = e;
}

// Assert
assertNotNull(except);
assertInstanceOf(EventStreamingException.class, except);
var eventStreamingException = (EventStreamingException) except;
assertEquals(eventStreamingException.getErrorCode(), "InternalFailure");
assertEquals(eventStreamingException.getMessage(), "An internal server error occurred");
}

@SuppressWarnings("unchecked")
static AwsEventShapeDecoder<?, ?> createDecoder() {
static <I extends SerializableStruct,
O extends SerializableStruct> AwsEventShapeDecoder<?, ?> createDecoder(ApiOperation<I, O> operation) {
return new AwsEventShapeDecoder<>(InitialEventType.INITIAL_RESPONSE,
() -> TestOperation.instance().outputBuilder(), // output builder
(Supplier) TestOperation.instance().outputEventBuilderSupplier(),
TestOperation.instance().outputStreamMember(),
() -> operation.outputBuilder(), // output builder
(Supplier) operation.outputEventBuilderSupplier(),
operation.outputStreamMember(),
createJsonCodec());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
import software.amazon.eventstream.HeaderValue;
import software.amazon.smithy.java.aws.events.model.BodyAndHeaderEvent;
import software.amazon.smithy.java.aws.events.model.HeadersOnlyEvent;
import software.amazon.smithy.java.aws.events.model.MyError;
import software.amazon.smithy.java.aws.events.model.StringEvent;
import software.amazon.smithy.java.aws.events.model.StructureEvent;
import software.amazon.smithy.java.aws.events.model.TestEventStream;
import software.amazon.smithy.java.aws.events.model.TestOperation;
import software.amazon.smithy.java.aws.events.model.TestOperationInput;
import software.amazon.smithy.java.aws.events.model.TestOperationWithException;
import software.amazon.smithy.java.core.schema.ApiOperation;
import software.amazon.smithy.java.core.schema.SerializableStruct;
import software.amazon.smithy.java.core.serde.Codec;
import software.amazon.smithy.java.core.serde.event.EventStreamingException;
import software.amazon.smithy.java.json.JsonCodec;
Expand All @@ -27,7 +31,7 @@ class AwsEventShapeEncoderTest {
@Test
public void testEncodeInitialRequest() {
// Arrange
var encoder = createEncoder();
var encoder = createEncoder(TestOperation.instance());
var event = TestOperationInput.builder()
.headerString("headerValue")
.inputStringMember("inputStringValue")
Expand All @@ -48,7 +52,7 @@ public void testEncodeInitialRequest() {
@Test
public void testEncodeHeadersOnlyMember() {
// Arrange
var encoder = createEncoder();
var encoder = createEncoder(TestOperation.instance());
var event = TestEventStream.builder()
.headersOnlyMember(HeadersOnlyEvent.builder().sequenceNum(123).build())
.build();
Expand All @@ -69,7 +73,7 @@ public void testEncodeHeadersOnlyMember() {
@Test
public void testEncodeStructureMember() {
// Arrange
var encoder = createEncoder();
var encoder = createEncoder(TestOperation.instance());
var event = TestEventStream.builder()
.structureMember(StructureEvent.builder().foo("memberFooValue").build())
.build();
Expand All @@ -89,7 +93,7 @@ public void testEncodeStructureMember() {
@Test
public void testEncodeBodyAndHeaderMember() {
// Arrange
var encoder = createEncoder();
var encoder = createEncoder(TestOperation.instance());
var event = TestEventStream.builder()
.bodyAndHeaderMember(BodyAndHeaderEvent.builder()
.intMember(123)
Expand All @@ -113,7 +117,7 @@ public void testEncodeBodyAndHeaderMember() {
@Test
public void testEncodeStringMember() {
// Arrange
var encoder = createEncoder();
var encoder = createEncoder(TestOperation.instance());
var event = TestEventStream.builder()
.stringMember(StringEvent.builder().payload("hello world!").build())
.build();
Expand All @@ -130,9 +134,48 @@ public void testEncodeStringMember() {
assertEquals("\"hello world!\"", new String(result.unwrap().getPayload()));
}

static AwsEventShapeEncoder createEncoder() {
@Test
public void testEncodeException() {
// Arrange
var encoder = createEncoder(TestOperationWithException.instance());
var exception = MyError.builder().message("Event stream exception").build();

// Act
var result = encoder.encodeFailure(exception);

// Assert
var expectedHeaders = new HeadersBuilder()
.contentType("text/json")
.messageType("exception")
.exceptionType("modeledErrorMember")
.build();
assertEquals(expectedHeaders, result.unwrap().getHeaders());
assertEquals("{\"message\":\"Event stream exception\"}", new String(result.unwrap().getPayload()));
}

@Test
public void testEncodeError() {
// Arrange
var encoder = createEncoder(TestOperationWithException.instance());
var exception = new NullPointerException("something caused a null pointer exception");

// Act
var result = encoder.encodeFailure(exception);

// Assert
var expectedHeaders = new HeadersBuilder()
.messageType("error")
.put(":error-message", "Internal Server Error")
.put(":error-code", "InternalServerException")
.build();
assertEquals(expectedHeaders, result.unwrap().getHeaders());
assertEquals("", new String(result.unwrap().getPayload()));
}

static <I extends SerializableStruct,
O extends SerializableStruct> AwsEventShapeEncoder createEncoder(ApiOperation<I, O> operation) {
return new AwsEventShapeEncoder(InitialEventType.INITIAL_REQUEST,
TestOperation.instance().inputStreamMember(), // event schema
operation.outputStreamMember(), // event schema
createJsonCodec(), // codec
"text/json",
(e) -> new EventStreamingException("InternalServerException", "Internal Server Error"));
Expand Down Expand Up @@ -164,6 +207,11 @@ public HeadersBuilder eventType(String eventType) {
return this;
}

public HeadersBuilder exceptionType(String eventType) {
headers.put(":exception-type", HeaderValue.fromString(eventType));
return this;
}

public HeadersBuilder put(String name, String value) {
headers.put(name, HeaderValue.fromString(value));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc
private static final $InnerDeserializer INSTANCE = new $InnerDeserializer();

@Override
@SuppressWarnings("unchecked")
public void accept(Builder builder, Schema member, ShapeDeserializer de) {
switch (member.memberIndex()) {
case 0 -> builder.payload(de.readBlob(member));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ private static final class $InnerDeserializer implements ShapeDeserializer.Struc
private static final $InnerDeserializer INSTANCE = new $InnerDeserializer();

@Override
@SuppressWarnings("unchecked")
public void accept(Builder builder, Schema member, ShapeDeserializer de) {
switch (member.memberIndex()) {
case 0 -> builder.intMember(de.readInteger(member));
Expand Down
Loading