From 37bd66a96e6e0ef890db69f304eaa2acd7c07bf5 Mon Sep 17 00:00:00 2001 From: Michael Dowling Date: Wed, 4 Mar 2026 13:49:16 -0600 Subject: [PATCH] Add support for streaming documents Removes (Input|Output)EventStreamingOperation and instead places its responsibility of returning event stream suppliers on the singular ApiOperation. This simplifies DynamicClient quite a bit, and is basically an even trade with code generated types/serialization. Now a document can carry a DataStream or EventStream, and properly read and write them when using compatible protocols. --- .../aws/events/AwsEventDecoderFactory.java | 13 +- .../aws/events/AwsEventEncoderFactory.java | 7 +- .../aws/events/AwsEventShapeDecoderTest.java | 4 +- .../java/aws/events/model/TestOperation.java | 11 +- .../restjson/RestJsonClientProtocol.java | 18 +- .../client/restxml/RestXmlClientProtocol.java | 14 +- .../binding/HttpBindingClientProtocol.java | 14 +- .../java/client/rpcv2/RpcV2CborProtocol.java | 23 +- .../java/dynamicclient/DynamicOperation.java | 51 +++-- .../dynamicclient/DynamicOperationTest.java | 199 ++++++++++++++++-- .../generators/OperationGenerator.java | 34 +-- .../smithy/java/core/schema/ApiOperation.java | 19 ++ .../InputEventStreamingApiOperation.java | 26 --- .../OutputEventStreamingApiOperation.java | 26 --- .../serde/document/DataStreamDocument.java | 42 ++++ .../java/core/serde/document/Document.java | 49 +++++ .../serde/document/EventStreamDocument.java | 43 ++++ .../core/serde/document/DocumentTest.java | 43 ++++ .../SchemaGuidedDocumentBuilder.java | 44 ++-- .../java/dynamicschemas/StructDocument.java | 32 ++- .../SchemaGuidedDocumentBuilderTest.java | 103 +++++++++ .../dynamicschemas/StructDocumentTest.java | 97 +++++++++ .../java/http/binding/RequestSerializer.java | 3 +- .../java/http/binding/ResponseSerializer.java | 3 +- 24 files changed, 714 insertions(+), 204 deletions(-) delete mode 100644 core/src/main/java/software/amazon/smithy/java/core/schema/InputEventStreamingApiOperation.java delete mode 100644 core/src/main/java/software/amazon/smithy/java/core/schema/OutputEventStreamingApiOperation.java create mode 100644 core/src/main/java/software/amazon/smithy/java/core/serde/document/DataStreamDocument.java create mode 100644 core/src/main/java/software/amazon/smithy/java/core/serde/document/EventStreamDocument.java diff --git a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventDecoderFactory.java b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventDecoderFactory.java index 1a491b9c0..df4726b32 100644 --- a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventDecoderFactory.java +++ b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventDecoderFactory.java @@ -7,8 +7,7 @@ import java.util.Objects; import java.util.function.Supplier; -import software.amazon.smithy.java.core.schema.InputEventStreamingApiOperation; -import software.amazon.smithy.java.core.schema.OutputEventStreamingApiOperation; +import software.amazon.smithy.java.core.schema.ApiOperation; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.schema.ShapeBuilder; @@ -60,8 +59,9 @@ private AwsEventDecoderFactory( * @param The output event type * @return A new event decoder factory */ + @SuppressWarnings("unchecked") public static AwsEventDecoderFactory forInputStream( - InputEventStreamingApiOperation operation, + ApiOperation operation, Codec codec, FrameTransformer transformer ) { @@ -70,7 +70,7 @@ private AwsEventDecoderFactory( operation::inputBuilder, operation.inputStreamMember(), codec, - operation.inputEventBuilderSupplier(), + (Supplier>) (Supplier) operation.inputEventBuilderSupplier(), transformer); } @@ -83,8 +83,9 @@ private AwsEventDecoderFactory( * @param The output event type * @return A new event decoder factory */ + @SuppressWarnings("unchecked") public static AwsEventDecoderFactory forOutputStream( - OutputEventStreamingApiOperation operation, + ApiOperation operation, Codec codec, FrameTransformer transformer ) { @@ -93,7 +94,7 @@ private AwsEventDecoderFactory( operation::outputBuilder, operation.outputStreamMember(), codec, - operation.outputEventBuilderSupplier(), + (Supplier>) (Supplier) operation.outputEventBuilderSupplier(), transformer); } diff --git a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventEncoderFactory.java b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventEncoderFactory.java index de9ee4393..72b8d456b 100644 --- a/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventEncoderFactory.java +++ b/aws/aws-event-streams/src/main/java/software/amazon/smithy/java/aws/events/AwsEventEncoderFactory.java @@ -7,8 +7,7 @@ import java.util.Objects; import java.util.function.Function; -import software.amazon.smithy.java.core.schema.InputEventStreamingApiOperation; -import software.amazon.smithy.java.core.schema.OutputEventStreamingApiOperation; +import software.amazon.smithy.java.core.schema.ApiOperation; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.serde.Codec; import software.amazon.smithy.java.core.serde.event.EventEncoder; @@ -54,7 +53,7 @@ private AwsEventEncoderFactory( * @return A new event encoder factory */ public static AwsEventEncoderFactory forInputStream( - InputEventStreamingApiOperation operation, + ApiOperation operation, Codec codec, String payloadMediaType, FrameTransformer transformer, @@ -78,7 +77,7 @@ public static AwsEventEncoderFactory forInputStream( * @return A new event encoder factory */ public static AwsEventEncoderFactory forOutputStream( - OutputEventStreamingApiOperation operation, + ApiOperation operation, Codec codec, String payloadMediaType, FrameTransformer transformer, diff --git a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoderTest.java b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoderTest.java index 6a47fff86..66ce7d80c 100644 --- a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoderTest.java +++ b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/AwsEventShapeDecoderTest.java @@ -9,6 +9,7 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import java.nio.charset.StandardCharsets; +import java.util.function.Supplier; import org.junit.jupiter.api.Test; import software.amazon.eventstream.Message; import software.amazon.smithy.java.aws.events.model.BodyAndHeaderEvent; @@ -143,10 +144,11 @@ public void testDecodeStringMember() { assertEquals(expected, actual); } + @SuppressWarnings("unchecked") static AwsEventShapeDecoder createDecoder() { return new AwsEventShapeDecoder<>(InitialEventType.INITIAL_RESPONSE, () -> TestOperation.instance().outputBuilder(), // output builder - TestOperation.instance().outputEventBuilderSupplier(), + (Supplier) TestOperation.instance().outputEventBuilderSupplier(), TestOperation.instance().outputStreamMember(), createJsonCodec()); } diff --git a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperation.java b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperation.java index c9c2a8fdd..e128f3791 100644 --- a/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperation.java +++ b/aws/aws-event-streams/src/test/java/software/amazon/smithy/java/aws/events/model/TestOperation.java @@ -7,10 +7,10 @@ import java.util.List; import java.util.function.Supplier; +import software.amazon.smithy.java.core.schema.ApiOperation; import software.amazon.smithy.java.core.schema.ApiService; -import software.amazon.smithy.java.core.schema.InputEventStreamingApiOperation; -import software.amazon.smithy.java.core.schema.OutputEventStreamingApiOperation; import software.amazon.smithy.java.core.schema.Schema; +import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.schema.ShapeBuilder; import software.amazon.smithy.java.core.serde.TypeRegistry; import software.amazon.smithy.model.shapes.ShapeId; @@ -18,8 +18,7 @@ @SmithyGenerated public final class TestOperation - implements InputEventStreamingApiOperation, - OutputEventStreamingApiOperation { + implements ApiOperation { private static final TestOperation $INSTANCE = new TestOperation(); @@ -51,7 +50,7 @@ public ShapeBuilder inputBuilder() { } @Override - public Supplier> inputEventBuilderSupplier() { + public Supplier> inputEventBuilderSupplier() { return () -> TestEventStream.builder(); } @@ -61,7 +60,7 @@ public ShapeBuilder outputBuilder() { } @Override - public Supplier> outputEventBuilderSupplier() { + public Supplier> outputEventBuilderSupplier() { return () -> TestEventStream.builder(); } diff --git a/aws/client/aws-client-restjson/src/main/java/software/amazon/smithy/java/aws/client/restjson/RestJsonClientProtocol.java b/aws/client/aws-client-restjson/src/main/java/software/amazon/smithy/java/aws/client/restjson/RestJsonClientProtocol.java index 18484625d..f5bce1285 100644 --- a/aws/client/aws-client-restjson/src/main/java/software/amazon/smithy/java/aws/client/restjson/RestJsonClientProtocol.java +++ b/aws/client/aws-client-restjson/src/main/java/software/amazon/smithy/java/aws/client/restjson/RestJsonClientProtocol.java @@ -19,8 +19,6 @@ import software.amazon.smithy.java.client.http.binding.HttpBindingErrorFactory; import software.amazon.smithy.java.context.Context; import software.amazon.smithy.java.core.schema.ApiOperation; -import software.amazon.smithy.java.core.schema.InputEventStreamingApiOperation; -import software.amazon.smithy.java.core.schema.OutputEventStreamingApiOperation; import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.schema.TraitKey; import software.amazon.smithy.java.core.serde.Codec; @@ -79,8 +77,8 @@ public HttpRequest .omitEmptyPayload(omitEmptyPayload()) .allowEmptyStructPayload(hasStructPayload(input)); - if (operation instanceof InputEventStreamingApiOperation i) { - serializer.eventEncoderFactory(getEventEncoderFactory(i)); + if (operation.inputEventBuilderSupplier() != null) { + serializer.eventEncoderFactory(getEventEncoderFactory(operation)); } return serializer.serializeRequest(); @@ -107,12 +105,10 @@ protected boolean omitEmptyPayload() { } @Override - protected EventEncoderFactory getEventEncoderFactory( - InputEventStreamingApiOperation inputOperation - ) { + protected EventEncoderFactory getEventEncoderFactory(ApiOperation operation) { // TODO: this is where you'd plumb through Sigv4 support, another frame transformer? return AwsEventEncoderFactory.forInputStream( - inputOperation, + operation, payloadCodec(), payloadMediaType(), FrameTransformer.identity(), @@ -120,10 +116,8 @@ protected EventEncoderFactory getEventEncoderFactory( } @Override - protected EventDecoderFactory getEventDecoderFactory( - OutputEventStreamingApiOperation outputOperation - ) { - return AwsEventDecoderFactory.forOutputStream(outputOperation, payloadCodec(), f -> f); + protected EventDecoderFactory getEventDecoderFactory(ApiOperation operation) { + return AwsEventDecoderFactory.forOutputStream(operation, payloadCodec(), f -> f); } private boolean hasStructPayload(I input) { diff --git a/aws/client/aws-client-restxml/src/main/java/software/amazon/smithy/java/aws/client/restxml/RestXmlClientProtocol.java b/aws/client/aws-client-restxml/src/main/java/software/amazon/smithy/java/aws/client/restxml/RestXmlClientProtocol.java index f38e4d38e..4d0b74b74 100644 --- a/aws/client/aws-client-restxml/src/main/java/software/amazon/smithy/java/aws/client/restxml/RestXmlClientProtocol.java +++ b/aws/client/aws-client-restxml/src/main/java/software/amazon/smithy/java/aws/client/restxml/RestXmlClientProtocol.java @@ -21,8 +21,6 @@ import software.amazon.smithy.java.core.error.CallException; import software.amazon.smithy.java.core.error.ModeledException; import software.amazon.smithy.java.core.schema.ApiOperation; -import software.amazon.smithy.java.core.schema.InputEventStreamingApiOperation; -import software.amazon.smithy.java.core.schema.OutputEventStreamingApiOperation; import software.amazon.smithy.java.core.serde.Codec; import software.amazon.smithy.java.core.serde.TypeRegistry; import software.amazon.smithy.java.core.serde.document.Document; @@ -81,12 +79,10 @@ protected HttpErrorDeserializer getErrorDeserializer(Context context) { } @Override - protected EventEncoderFactory getEventEncoderFactory( - InputEventStreamingApiOperation inputOperation - ) { + protected EventEncoderFactory getEventEncoderFactory(ApiOperation operation) { // TODO: this is where you'd plumb through Sigv4 support, another frame transformer? return AwsEventEncoderFactory.forInputStream( - inputOperation, + operation, payloadCodec(), payloadMediaType(), FrameTransformer.identity(), @@ -94,10 +90,8 @@ protected EventEncoderFactory getEventEncoderFactory( } @Override - protected EventDecoderFactory getEventDecoderFactory( - OutputEventStreamingApiOperation outputOperation - ) { - return AwsEventDecoderFactory.forOutputStream(outputOperation, payloadCodec(), f -> f); + protected EventDecoderFactory getEventDecoderFactory(ApiOperation operation) { + return AwsEventDecoderFactory.forOutputStream(operation, payloadCodec(), f -> f); } private static final HttpErrorDeserializer.ErrorPayloadParser XML_ERROR_PAYLOAD_PARSER = diff --git a/client/client-http-binding/src/main/java/software/amazon/smithy/java/client/http/binding/HttpBindingClientProtocol.java b/client/client-http-binding/src/main/java/software/amazon/smithy/java/client/http/binding/HttpBindingClientProtocol.java index b8bec0b69..03c4cb569 100644 --- a/client/client-http-binding/src/main/java/software/amazon/smithy/java/client/http/binding/HttpBindingClientProtocol.java +++ b/client/client-http-binding/src/main/java/software/amazon/smithy/java/client/http/binding/HttpBindingClientProtocol.java @@ -11,8 +11,6 @@ import software.amazon.smithy.java.context.Context; import software.amazon.smithy.java.core.error.CallException; import software.amazon.smithy.java.core.schema.ApiOperation; -import software.amazon.smithy.java.core.schema.InputEventStreamingApiOperation; -import software.amazon.smithy.java.core.schema.OutputEventStreamingApiOperation; import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.serde.Codec; import software.amazon.smithy.java.core.serde.TypeRegistry; @@ -55,11 +53,11 @@ protected final HttpBinding httpBinding() { return httpBinding; } - protected EventEncoderFactory getEventEncoderFactory(InputEventStreamingApiOperation inputOperation) { + protected EventEncoderFactory getEventEncoderFactory(ApiOperation operation) { throw new UnsupportedOperationException("This protocol does not support event streaming"); } - protected EventDecoderFactory getEventDecoderFactory(OutputEventStreamingApiOperation outputOperation) { + protected EventDecoderFactory getEventDecoderFactory(ApiOperation operation) { throw new UnsupportedOperationException("This protocol does not support event streaming"); } @@ -78,8 +76,8 @@ public HttpRequest .endpoint(endpoint) .omitEmptyPayload(omitEmptyPayload()); - if (operation instanceof InputEventStreamingApiOperation i) { - serializer.eventEncoderFactory(getEventEncoderFactory(i)); + if (operation.inputEventBuilderSupplier() != null) { + serializer.eventEncoderFactory(getEventEncoderFactory(operation)); } return serializer.serializeRequest(); @@ -106,8 +104,8 @@ public O deserializ .outputShapeBuilder(outputBuilder) .response(response); - if (operation instanceof OutputEventStreamingApiOperation o) { - deser.eventDecoderFactory(getEventDecoderFactory(o)); + if (operation.outputEventBuilderSupplier() != null) { + deser.eventDecoderFactory(getEventDecoderFactory(operation)); } deser.deserialize(); diff --git a/client/client-rpcv2-cbor/src/main/java/software/amazon/smithy/java/client/rpcv2/RpcV2CborProtocol.java b/client/client-rpcv2-cbor/src/main/java/software/amazon/smithy/java/client/rpcv2/RpcV2CborProtocol.java index 5d19bfe21..d6c355396 100644 --- a/client/client-rpcv2-cbor/src/main/java/software/amazon/smithy/java/client/rpcv2/RpcV2CborProtocol.java +++ b/client/client-rpcv2-cbor/src/main/java/software/amazon/smithy/java/client/rpcv2/RpcV2CborProtocol.java @@ -22,8 +22,6 @@ import software.amazon.smithy.java.client.http.HttpErrorDeserializer; import software.amazon.smithy.java.context.Context; import software.amazon.smithy.java.core.schema.ApiOperation; -import software.amazon.smithy.java.core.schema.InputEventStreamingApiOperation; -import software.amazon.smithy.java.core.schema.OutputEventStreamingApiOperation; import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.schema.Unit; import software.amazon.smithy.java.core.serde.Codec; @@ -82,9 +80,9 @@ public HttpRequest // Top-level Unit types do not get serialized builder.headers(HttpHeaders.of(headersForEmptyBody())) .body(DataStream.ofEmpty()); - } else if (operation instanceof InputEventStreamingApiOperation i) { + } else if (operation.inputEventBuilderSupplier() != null) { // Event streaming - var encoderFactory = getEventEncoderFactory(i); + var encoderFactory = getEventEncoderFactory(operation); var body = RpcEventStreamsUtil.bodyForEventStreaming(encoderFactory, input); builder.headers(HttpHeaders.of(headersForEventStreaming())) .body(body); @@ -108,8 +106,8 @@ public O deserializ throw errorDeserializer.createError(context, operation, typeRegistry, response); } - if (operation instanceof OutputEventStreamingApiOperation o) { - var eventDecoderFactory = getEventDecoderFactory(o); + if (operation.outputEventBuilderSupplier() != null) { + var eventDecoderFactory = getEventDecoderFactory(operation); return RpcEventStreamsUtil.deserializeResponse(eventDecoderFactory, bodyDataStream(response)); } @@ -154,22 +152,17 @@ private Map> headersForEventStreaming() { CONTENT_TYPE); } - private EventEncoderFactory getEventEncoderFactory( - InputEventStreamingApiOperation inputOperation - ) { - + private EventEncoderFactory getEventEncoderFactory(ApiOperation operation) { // TODO: this is where you'd plumb through Sigv4 support, another frame transformer? - return AwsEventEncoderFactory.forInputStream(inputOperation, + return AwsEventEncoderFactory.forInputStream(operation, payloadCodec(), PAYLOAD_MEDIA_TYPE, FrameTransformer.identity(), (e) -> new EventStreamingException("InternalServerException", "Internal Server Error")); } - private EventDecoderFactory getEventDecoderFactory( - OutputEventStreamingApiOperation outputOperation - ) { - return AwsEventDecoderFactory.forOutputStream(outputOperation, payloadCodec(), f -> f); + private EventDecoderFactory getEventDecoderFactory(ApiOperation operation) { + return AwsEventDecoderFactory.forOutputStream(operation, payloadCodec(), f -> f); } private static ShapeId extractErrorType(Document document, String namespace) { diff --git a/client/dynamic-client/src/main/java/software/amazon/smithy/java/dynamicclient/DynamicOperation.java b/client/dynamic-client/src/main/java/software/amazon/smithy/java/dynamicclient/DynamicOperation.java index 3bd57d068..ad5ed95ca 100644 --- a/client/dynamic-client/src/main/java/software/amazon/smithy/java/dynamicclient/DynamicOperation.java +++ b/client/dynamic-client/src/main/java/software/amazon/smithy/java/dynamicclient/DynamicOperation.java @@ -11,9 +11,11 @@ import java.util.List; import java.util.Set; import java.util.function.BiConsumer; +import java.util.function.Supplier; import software.amazon.smithy.java.core.schema.ApiOperation; import software.amazon.smithy.java.core.schema.ApiService; import software.amazon.smithy.java.core.schema.Schema; +import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.schema.ShapeBuilder; import software.amazon.smithy.java.core.schema.TraitKey; import software.amazon.smithy.java.core.serde.TypeRegistry; @@ -24,6 +26,7 @@ import software.amazon.smithy.model.shapes.OperationShape; import software.amazon.smithy.model.shapes.ServiceShape; import software.amazon.smithy.model.shapes.ShapeId; +import software.amazon.smithy.model.shapes.ShapeType; public final class DynamicOperation implements ApiOperation { @@ -34,6 +37,8 @@ public final class DynamicOperation implements ApiOperation errorSchemas; private final TypeRegistry typeRegistry; private final List effectiveAuthSchemes; + private final Supplier> inputEventBuilderSupplier; + private final Supplier> outputEventBuilderSupplier; DynamicOperation( ApiService service, @@ -42,7 +47,9 @@ public final class DynamicOperation implements ApiOperation errorSchemas, TypeRegistry typeRegistry, - List effectiveAuthSchemes + List effectiveAuthSchemes, + Supplier> inputEventBuilderSupplier, + Supplier> outputEventBuilderSupplier ) { this.service = service; this.effectiveAuthSchemes = effectiveAuthSchemes; @@ -51,16 +58,8 @@ public final class DynamicOperation implements ApiOperation errorSchemas() { return errorSchemas; } + @Override + public Supplier> inputEventBuilderSupplier() { + return inputEventBuilderSupplier; + } + + @Override + public Supplier> outputEventBuilderSupplier() { + return outputEventBuilderSupplier; + } + public static DynamicOperation create( OperationShape shape, SchemaConverter schemaConverter, @@ -155,6 +164,22 @@ public static DynamicOperation create( registry = TypeRegistry.compose(registryBuilder.build(), serviceErrorRegistry); } + // Detect event stream members for builder suppliers. + Supplier> inputEventSupplier = null; + Supplier> outputEventSupplier = null; + for (var member : inputSchema.members()) { + if (member.hasTrait(TraitKey.STREAMING_TRAIT) && member.type() == ShapeType.UNION) { + inputEventSupplier = () -> SchemaConverter.createDocumentBuilder(member, service.getId()); + break; + } + } + for (var member : outputSchema.members()) { + if (member.hasTrait(TraitKey.STREAMING_TRAIT) && member.type() == ShapeType.UNION) { + outputEventSupplier = () -> SchemaConverter.createDocumentBuilder(member, service.getId()); + break; + } + } + return new DynamicOperation( apiService, operationSchema, @@ -162,6 +187,8 @@ public static DynamicOperation create( outputSchema, Collections.unmodifiableSet(errorSchemas), registry, - authSchemes); + authSchemes, + inputEventSupplier, + outputEventSupplier); } } diff --git a/client/dynamic-client/src/test/java/software/amazon/smithy/java/dynamicclient/DynamicOperationTest.java b/client/dynamic-client/src/test/java/software/amazon/smithy/java/dynamicclient/DynamicOperationTest.java index 4e49e5dd4..7742795f0 100644 --- a/client/dynamic-client/src/test/java/software/amazon/smithy/java/dynamicclient/DynamicOperationTest.java +++ b/client/dynamic-client/src/test/java/software/amazon/smithy/java/dynamicclient/DynamicOperationTest.java @@ -6,14 +6,14 @@ package software.amazon.smithy.java.dynamicclient; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import java.util.List; import java.util.Set; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import software.amazon.smithy.java.core.schema.ApiService; import software.amazon.smithy.java.core.schema.Schema; @@ -22,17 +22,22 @@ import software.amazon.smithy.java.dynamicschemas.SchemaConverter; import software.amazon.smithy.model.Model; import software.amazon.smithy.model.shapes.ShapeId; +import software.amazon.smithy.model.shapes.ShapeType; import software.amazon.smithy.model.traits.DeprecatedTrait; public class DynamicOperationTest { @Test - public void doesNotCurrentlySupportStreaming() { + public void supportsDataStreamOperations() { Model model = Model.assembler() .addUnparsedModel("test.smithy", """ $version: "2" namespace smithy.example + service S { + operations: [PutFoo] + } + operation PutFoo { input := { @required @@ -47,26 +52,174 @@ public void doesNotCurrentlySupportStreaming() { .assemble() .unwrap(); var converter = new SchemaConverter(model); - var registry = TypeRegistry.empty(); - var operationSchema = converter.getSchema(model.expectShape(ShapeId.from("smithy.example#PutFoo"))); - var input = converter.getSchema(model.expectShape(ShapeId.from("smithy.example#PutFooInput"))); - var output = converter.getSchema(model.expectShape(ShapeId.from("smithy.example#PutFooOutput"))); + var service = model.expectShape(ShapeId.from("smithy.example#S")).asServiceShape().get(); + var operation = model.expectShape(ShapeId.from("smithy.example#PutFoo")).asOperationShape().get(); - var serviceSchema = Schema.createService(ShapeId.from("smithy.example#S")); - ApiService apiService = () -> serviceSchema; + var op = DynamicOperation.create( + operation, + converter, + model, + service, + TypeRegistry.empty(), + (id, b) -> {}); + + // Data streams don't need event builder suppliers + assertThat(op.inputEventBuilderSupplier(), is(nullValue())); + assertThat(op.outputEventBuilderSupplier(), is(nullValue())); + assertThat(op.inputStreamMember(), is(op.inputSchema().member("someStream"))); + } + + @Test + public void createsInputEventStreamingOperation() { + Model model = Model.assembler() + .addUnparsedModel("test.smithy", """ + $version: "2" + + namespace smithy.example + + service S { + operations: [PutFoo] + } + + operation PutFoo { + input := { + stream: Events + } + output := {} + } + + @streaming + union Events { + data: Data + } + + structure Data { + value: String + } + """) + .assemble() + .unwrap(); + var converter = new SchemaConverter(model); + var service = model.expectShape(ShapeId.from("smithy.example#S")).asServiceShape().get(); + var operation = model.expectShape(ShapeId.from("smithy.example#PutFoo")).asOperationShape().get(); + + var op = DynamicOperation.create( + operation, + converter, + model, + service, + TypeRegistry.empty(), + (id, b) -> {}); + + assertThat(op.inputEventBuilderSupplier(), is(notNullValue())); + assertThat(op.outputEventBuilderSupplier(), is(nullValue())); + assertThat(op.inputEventBuilderSupplier().get().schema().type(), + equalTo(ShapeType.UNION)); + } + + @Test + public void createsOutputEventStreamingOperation() { + Model model = Model.assembler() + .addUnparsedModel("test.smithy", """ + $version: "2" + + namespace smithy.example + + service S { + operations: [GetFoo] + } + + operation GetFoo { + input := {} + output := { + stream: Events + } + } + + @streaming + union Events { + data: Data + } + + structure Data { + value: String + } + """) + .assemble() + .unwrap(); + var converter = new SchemaConverter(model); + var service = model.expectShape(ShapeId.from("smithy.example#S")).asServiceShape().get(); + var operation = model.expectShape(ShapeId.from("smithy.example#GetFoo")).asOperationShape().get(); + + var op = DynamicOperation.create( + operation, + converter, + model, + service, + TypeRegistry.empty(), + (id, b) -> {}); + + assertThat(op.inputEventBuilderSupplier(), is(nullValue())); + assertThat(op.outputEventBuilderSupplier(), is(notNullValue())); + assertThat(op.outputEventBuilderSupplier().get().schema().type(), + equalTo(ShapeType.UNION)); + } + + @Test + public void createsBidirectionalEventStreamingOperation() { + Model model = Model.assembler() + .addUnparsedModel("test.smithy", """ + $version: "2" + + namespace smithy.example + + service S { + operations: [Chat] + } + + operation Chat { + input := { + inputStream: InputEvents + } + output := { + outputStream: OutputEvents + } + } + + @streaming + union InputEvents { + message: Message + } + + @streaming + union OutputEvents { + reply: Reply + } + + structure Message { + text: String + } + + structure Reply { + text: String + } + """) + .assemble() + .unwrap(); + var converter = new SchemaConverter(model); + var service = model.expectShape(ShapeId.from("smithy.example#S")).asServiceShape().get(); + var operation = model.expectShape(ShapeId.from("smithy.example#Chat")).asOperationShape().get(); + + var op = DynamicOperation.create( + operation, + converter, + model, + service, + TypeRegistry.empty(), + (id, b) -> {}); - var e = Assertions.assertThrows(UnsupportedOperationException.class, () -> { - DynamicOperation o = new DynamicOperation( - apiService, - operationSchema, - input, - output, - Set.of(), - registry, - List.of()); - }); - - assertThat(e.getMessage(), containsString("does not support streaming")); + assertThat(op.inputEventBuilderSupplier(), is(notNullValue())); + assertThat(op.outputEventBuilderSupplier(), is(notNullValue())); } @Test @@ -101,7 +254,9 @@ public void convertsSchemas() { output, Set.of(), registry, - List.of()); + List.of(), + null, + null); assertThat(o.schema().id(), equalTo(ShapeId.from("smithy.example#PutFoo"))); assertThat(o.schema().hasTrait(TraitKey.get(DeprecatedTrait.class)), is(true)); diff --git a/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/OperationGenerator.java b/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/OperationGenerator.java index 98221c73b..022a14e9e 100644 --- a/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/OperationGenerator.java +++ b/codegen/codegen-core/src/main/java/software/amazon/smithy/java/codegen/generators/OperationGenerator.java @@ -23,9 +23,8 @@ import software.amazon.smithy.java.core.schema.ApiOperation; import software.amazon.smithy.java.core.schema.ApiResource; import software.amazon.smithy.java.core.schema.ApiService; -import software.amazon.smithy.java.core.schema.InputEventStreamingApiOperation; -import software.amazon.smithy.java.core.schema.OutputEventStreamingApiOperation; import software.amazon.smithy.java.core.schema.Schema; +import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.schema.ShapeBuilder; import software.amazon.smithy.java.core.serde.TypeRegistry; import software.amazon.smithy.model.Model; @@ -92,7 +91,7 @@ public final class ${shape:T} implements ${operationType:C} { ${?hasInputEventStream} @Override - public ${supplier:T}<${sdkShapeBuilder:T}<${inputEventType:T}>> inputEventBuilderSupplier() { + public ${supplier:T}<${sdkShapeBuilder:T}> inputEventBuilderSupplier() { return () -> ${inputEventType:T}.builder(); } ${/hasInputEventStream} @@ -104,7 +103,7 @@ public final class ${shape:T} implements ${operationType:C} { ${?hasOutputEventStream} @Override - public ${supplier:T}<${sdkShapeBuilder:T}<${outputEventType:T}>> outputEventBuilderSupplier() { + public ${supplier:T}<${sdkShapeBuilder:T}> outputEventBuilderSupplier() { return () -> ${outputEventType:T}.builder(); } ${/hasOutputEventStream} @@ -210,6 +209,7 @@ public final class ${shape:T} implements ${operationType:C} { var inputShape = directive.model().expectShape(shape.getInputShape()); eventStreamIndex.getInputInfo(shape).ifPresentOrElse(info -> { writer.putContext("supplier", Supplier.class); + writer.putContext("serializableStruct", SerializableStruct.class); writer.putContext("hasInputEventStream", true); writer.putContext("inputStreamMember", info.getEventStreamMember().getMemberName()); writer.putContext( @@ -226,6 +226,7 @@ public final class ${shape:T} implements ${operationType:C} { eventStreamIndex.getOutputInfo(shape).ifPresentOrElse(info -> { writer.putContext("supplier", Supplier.class); + writer.putContext("serializableStruct", SerializableStruct.class); writer.putContext("hasOutputEventStream", true); writer.putContext("outputStreamMember", info.getEventStreamMember().getMemberName()); writer.putContext( @@ -295,30 +296,7 @@ public void run() { var outputShape = model.expectShape(shape.getOutputShape()); var output = symbolProvider.toSymbol(outputShape); - var inputEventStreamInfo = index.getInputInfo(shape); - var outputEventStreamInfo = index.getOutputInfo(shape); - inputEventStreamInfo.ifPresent( - info -> writer.writeInline( - "$1T<$2T, $3T, $4T>", - InputEventStreamingApiOperation.class, - input, - output, - symbolProvider.toSymbol(info.getEventStreamTarget()))); - outputEventStreamInfo.ifPresent(info -> { - if (inputEventStreamInfo.isPresent()) { - writer.writeInline(", "); - } - writer.writeInline( - "$1T<$2T, $3T, $4T>", - OutputEventStreamingApiOperation.class, - input, - output, - symbolProvider.toSymbol(info.getEventStreamTarget())); - }); - - if (inputEventStreamInfo.isEmpty() && outputEventStreamInfo.isEmpty()) { - writer.writeInline("$1T<$2T, $3T>", ApiOperation.class, input, output); - } + writer.writeInline("$1T<$2T, $3T>", ApiOperation.class, input, output); } } } diff --git a/core/src/main/java/software/amazon/smithy/java/core/schema/ApiOperation.java b/core/src/main/java/software/amazon/smithy/java/core/schema/ApiOperation.java index 983a58a69..b19335468 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/schema/ApiOperation.java +++ b/core/src/main/java/software/amazon/smithy/java/core/schema/ApiOperation.java @@ -6,6 +6,7 @@ package software.amazon.smithy.java.core.schema; import java.util.List; +import java.util.function.Supplier; import software.amazon.smithy.java.core.serde.TypeRegistry; import software.amazon.smithy.model.shapes.ShapeId; @@ -107,6 +108,24 @@ default Schema outputStreamMember() { return null; } + /** + * Get a supplier of builders for input event stream shapes. + * + * @return a supplier of input event shape builders, or null if the operation has no input event stream. + */ + default Supplier> inputEventBuilderSupplier() { + return null; + } + + /** + * Get a supplier of builders for output event stream shapes. + * + * @return a supplier of output event shape builders, or null if the operation has no output event stream. + */ + default Supplier> outputEventBuilderSupplier() { + return null; + } + /** * Get the schemas of all errors that can be thrown by the operation. * diff --git a/core/src/main/java/software/amazon/smithy/java/core/schema/InputEventStreamingApiOperation.java b/core/src/main/java/software/amazon/smithy/java/core/schema/InputEventStreamingApiOperation.java deleted file mode 100644 index e2409f9e8..000000000 --- a/core/src/main/java/software/amazon/smithy/java/core/schema/InputEventStreamingApiOperation.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package software.amazon.smithy.java.core.schema; - -import java.util.function.Supplier; - -/** - * Represents a modeled Smithy operation. - * - * @param Operation input shape type. - * @param Operation output shape type. - * @param Operation input event shape type. - */ -public interface InputEventStreamingApiOperation - extends ApiOperation { - /** - * Retrieves a supplier of builders for input events. - * - * @return Returns a supplier of input event shape builders. - */ - Supplier> inputEventBuilderSupplier(); -} diff --git a/core/src/main/java/software/amazon/smithy/java/core/schema/OutputEventStreamingApiOperation.java b/core/src/main/java/software/amazon/smithy/java/core/schema/OutputEventStreamingApiOperation.java deleted file mode 100644 index c63cc8931..000000000 --- a/core/src/main/java/software/amazon/smithy/java/core/schema/OutputEventStreamingApiOperation.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package software.amazon.smithy.java.core.schema; - -import java.util.function.Supplier; - -/** - * Represents a modeled Smithy operation. - * - * @param Operation input shape type. - * @param Operation output shape type. - * @param Operation output event shape type. - */ -public interface OutputEventStreamingApiOperation - extends ApiOperation { - /** - * Retrieves a supplier of builders for output events. - * - * @return Returns a supplier of output event shape builders. - */ - Supplier> outputEventBuilderSupplier(); -} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/document/DataStreamDocument.java b/core/src/main/java/software/amazon/smithy/java/core/serde/document/DataStreamDocument.java new file mode 100644 index 000000000..59c726677 --- /dev/null +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/document/DataStreamDocument.java @@ -0,0 +1,42 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.document; + +import software.amazon.smithy.java.core.schema.Schema; +import software.amazon.smithy.java.core.serde.ShapeSerializer; +import software.amazon.smithy.java.io.datastream.DataStream; +import software.amazon.smithy.model.shapes.ShapeType; + +/** + * A document wrapper around a {@link DataStream} for streaming blob members. + */ +record DataStreamDocument(Schema schema, DataStream dataStream) implements Document { + + @Override + public ShapeType type() { + return ShapeType.BLOB; + } + + @Override + public DataStream asDataStream() { + return dataStream; + } + + @Override + public Object asObject() { + return dataStream; + } + + @Override + public void serialize(ShapeSerializer serializer) { + serializer.writeDataStream(schema, dataStream); + } + + @Override + public void serializeContents(ShapeSerializer serializer) { + serializer.writeDataStream(schema, dataStream); + } +} diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/document/Document.java b/core/src/main/java/software/amazon/smithy/java/core/serde/document/Document.java index 69aaa7ad2..04b7eccd8 100644 --- a/core/src/main/java/software/amazon/smithy/java/core/serde/document/Document.java +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/document/Document.java @@ -18,10 +18,13 @@ import software.amazon.smithy.java.core.schema.PreludeSchemas; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableShape; +import software.amazon.smithy.java.core.schema.SerializableStruct; import software.amazon.smithy.java.core.schema.ShapeBuilder; import software.amazon.smithy.java.core.serde.SerializationException; import software.amazon.smithy.java.core.serde.ShapeDeserializer; import software.amazon.smithy.java.core.serde.ShapeSerializer; +import software.amazon.smithy.java.core.serde.event.EventStream; +import software.amazon.smithy.java.io.datastream.DataStream; import software.amazon.smithy.model.shapes.ShapeId; import software.amazon.smithy.model.shapes.ShapeType; @@ -323,6 +326,48 @@ default Instant asTimestamp() { throw new SerializationException("Expected a timestamp document, but found " + type()); } + /** + * Get the Document as a {@link DataStream} if it wraps a streaming blob. + * + * @return the data stream. + * @throws SerializationException if the Document does not wrap a data stream. + */ + default DataStream asDataStream() { + throw new SerializationException("Expected a data stream document, but found " + type()); + } + + /** + * Get the Document as an {@link EventStream} if it wraps a streaming union. + * + * @return the event stream. + * @throws SerializationException if the Document does not wrap an event stream. + */ + default EventStream asEventStream() { + throw new SerializationException("Expected an event stream document, but found " + type()); + } + + /** + * Create a Document wrapping a {@link DataStream} with a specific schema. + * + * @param schema Schema to associate with the stream. + * @param dataStream The data stream to wrap. + * @return the Document type. + */ + static Document of(Schema schema, DataStream dataStream) { + return new DataStreamDocument(schema, dataStream); + } + + /** + * Create a Document wrapping an {@link EventStream} with a specific schema. + * + * @param schema Schema to associate with the stream. + * @param eventStream The event stream to wrap. + * @return the Document type. + */ + static Document of(Schema schema, EventStream eventStream) { + return new EventStreamDocument(schema, eventStream); + } + /** * Get the number of elements in an array document, or the number of key value pairs in a map document. * @@ -626,6 +671,8 @@ static Document of(Map members) { *
    *
  • {@link Document}
  • *
  • {@link SerializableShape}
  • + *
  • {@link DataStream} to streaming blob
  • + *
  • {@link EventStream} to streaming union
  • *
  • {@link String}
  • *
  • {@code byte[]} to blob
  • *
  • {@link Instant} to timestamp
  • @@ -650,6 +697,8 @@ static Document ofObject(Object o) { return switch (o) { case Document d -> d; case SerializableShape s -> of(s); + case DataStream ds -> new DataStreamDocument(PreludeSchemas.BLOB, ds); + case EventStream es -> new EventStreamDocument(PreludeSchemas.DOCUMENT, es); case String s -> of(s); case Boolean b -> of(b); case Number n -> ofNumber(n); diff --git a/core/src/main/java/software/amazon/smithy/java/core/serde/document/EventStreamDocument.java b/core/src/main/java/software/amazon/smithy/java/core/serde/document/EventStreamDocument.java new file mode 100644 index 000000000..82fe0c0cb --- /dev/null +++ b/core/src/main/java/software/amazon/smithy/java/core/serde/document/EventStreamDocument.java @@ -0,0 +1,43 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.java.core.serde.document; + +import software.amazon.smithy.java.core.schema.Schema; +import software.amazon.smithy.java.core.schema.SerializableStruct; +import software.amazon.smithy.java.core.serde.ShapeSerializer; +import software.amazon.smithy.java.core.serde.event.EventStream; +import software.amazon.smithy.model.shapes.ShapeType; + +/** + * A document wrapper around an {@link EventStream} for streaming union members. + */ +record EventStreamDocument(Schema schema, EventStream eventStream) implements Document { + + @Override + public ShapeType type() { + return ShapeType.UNION; + } + + @Override + public EventStream asEventStream() { + return eventStream; + } + + @Override + public Object asObject() { + return eventStream; + } + + @Override + public void serialize(ShapeSerializer serializer) { + serializer.writeEventStream(schema, eventStream); + } + + @Override + public void serializeContents(ShapeSerializer serializer) { + serializer.writeEventStream(schema, eventStream); + } +} diff --git a/core/src/test/java/software/amazon/smithy/java/core/serde/document/DocumentTest.java b/core/src/test/java/software/amazon/smithy/java/core/serde/document/DocumentTest.java index 1753ef2db..3562cf065 100644 --- a/core/src/test/java/software/amazon/smithy/java/core/serde/document/DocumentTest.java +++ b/core/src/test/java/software/amazon/smithy/java/core/serde/document/DocumentTest.java @@ -35,8 +35,10 @@ import software.amazon.smithy.java.core.serde.ShapeDeserializer; import software.amazon.smithy.java.core.serde.ShapeSerializer; import software.amazon.smithy.java.core.serde.ToStringSerializer; +import software.amazon.smithy.java.core.serde.event.EventStream; import software.amazon.smithy.java.core.testmodels.Bird; import software.amazon.smithy.java.core.testmodels.Person; +import software.amazon.smithy.java.io.datastream.DataStream; import software.amazon.smithy.model.shapes.ShapeType; public class DocumentTest { @@ -543,4 +545,45 @@ public void documentsDefaultToSizeNegativeOne() { assertThat(document.size(), is(-1)); } + + @Test + public void ofObjectCreatesDataStreamDocument() { + var ds = DataStream.ofString("hello"); + var document = Document.ofObject(ds); + + assertThat(document.type(), is(ShapeType.BLOB)); + assertThat(document.asDataStream(), is(ds)); + assertThat(document.asObject(), is(ds)); + } + + @Test + public void ofObjectCreatesEventStreamDocument() { + var es = EventStream.newWriter(); + var document = Document.ofObject(es); + + assertThat(document.type(), is(ShapeType.UNION)); + assertThat(document.asEventStream(), is(es)); + assertThat(document.asObject(), is(es)); + } + + @Test + public void ofObjectHandlesDataStreamInMap() { + var ds = DataStream.ofString("hello"); + var document = Document.ofObject(Map.of("body", ds)); + + assertThat(document.type(), is(ShapeType.MAP)); + assertThat(document.getMember("body").asDataStream(), is(ds)); + } + + @Test + public void asDataStreamThrowsForNonStreamDocument() { + var document = Document.of("hello"); + Assertions.assertThrows(SerializationException.class, document::asDataStream); + } + + @Test + public void asEventStreamThrowsForNonStreamDocument() { + var document = Document.of("hello"); + Assertions.assertThrows(SerializationException.class, document::asEventStream); + } } diff --git a/dynamic-schemas/src/main/java/software/amazon/smithy/java/dynamicschemas/SchemaGuidedDocumentBuilder.java b/dynamic-schemas/src/main/java/software/amazon/smithy/java/dynamicschemas/SchemaGuidedDocumentBuilder.java index 48b580d76..ad6d90c4e 100644 --- a/dynamic-schemas/src/main/java/software/amazon/smithy/java/dynamicschemas/SchemaGuidedDocumentBuilder.java +++ b/dynamic-schemas/src/main/java/software/amazon/smithy/java/dynamicschemas/SchemaGuidedDocumentBuilder.java @@ -12,8 +12,11 @@ import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SchemaUtils; import software.amazon.smithy.java.core.schema.ShapeBuilder; +import software.amazon.smithy.java.core.schema.TraitKey; import software.amazon.smithy.java.core.serde.ShapeDeserializer; import software.amazon.smithy.java.core.serde.document.Document; +import software.amazon.smithy.java.core.serde.event.EventStream; +import software.amazon.smithy.java.io.datastream.DataStream; import software.amazon.smithy.model.shapes.ShapeId; import software.amazon.smithy.model.shapes.ShapeType; @@ -54,16 +57,15 @@ public StructDocument build() { @Override public void setMemberValue(Schema member, Object value) { SchemaUtils.validateMemberInSchema(target, member, value); - Document convertedValue; - if (value instanceof Document d) { + Document convertedValue = switch (value) { // Convert the given document so it matches the required schema. - convertedValue = StructDocument.convertDocument(member, d, service); - } else { + case Document d -> StructDocument.convertDocument(member, d, service); + case DataStream ds -> Document.of(member, ds); + case EventStream es -> Document.of(member, es); // Convert the object to a document and then wrap it with the correct schema. - convertedValue = Document.ofObject(value); - convertedValue = StructDocument.convertDocument(member, convertedValue, service); - } + case null, default -> StructDocument.convertDocument(member, Document.ofObject(value), service); + }; map.put(member.memberName(), convertedValue); } @@ -82,7 +84,12 @@ public ShapeBuilder deserializeMember(ShapeDeserializer decoder, private Document deserialize(ShapeDeserializer decoder, Schema schema) { return switch (schema.type()) { - case BLOB -> new ContentDocument(Document.of(decoder.readBlob(schema)), schema); + case BLOB -> { + if (schema.hasTrait(TraitKey.STREAMING_TRAIT)) { + yield Document.of(schema, decoder.readDataStream(schema)); + } + yield new ContentDocument(Document.of(decoder.readBlob(schema)), schema); + } case BOOLEAN -> new ContentDocument(Document.of(decoder.readBoolean(schema)), schema); case STRING, ENUM -> new ContentDocument(Document.of(decoder.readString(schema)), schema); case TIMESTAMP -> new ContentDocument(Document.of(decoder.readTimestamp(schema)), schema); @@ -109,18 +116,25 @@ private Document deserialize(ShapeDeserializer decoder, Schema schema) { }); yield new ContentDocument(Document.of(map), schema); } - case STRUCTURE, UNION -> { - var map = new HashMap(); - decoder.readStruct(schema, map, (state, memberSchema, memberDeserializer) -> { - state.put(memberSchema.memberName(), deserialize(memberDeserializer, memberSchema)); - }); - // Use "new" here since we know all the nested shapes have the correct schemas. - yield new StructDocument(schema, map, service); + case STRUCTURE -> createStructDocument(decoder, schema); + case UNION -> { + if (schema.hasTrait(TraitKey.STREAMING_TRAIT)) { + yield Document.of(schema, decoder.readEventStream(schema)); + } + yield createStructDocument(decoder, schema); } default -> throw new UnsupportedOperationException("Unsupported target type: " + schema.type()); }; } + private StructDocument createStructDocument(ShapeDeserializer decoder, Schema schema) { + var map = new LinkedHashMap(); + decoder.readStruct(schema, map, (state, memberSchema, memberDeserializer) -> { + state.put(memberSchema.memberName(), deserialize(memberDeserializer, memberSchema)); + }); + return new StructDocument(schema, map, service); + } + @Override public ShapeBuilder errorCorrection() { // TODO: fill in defaults. diff --git a/dynamic-schemas/src/main/java/software/amazon/smithy/java/dynamicschemas/StructDocument.java b/dynamic-schemas/src/main/java/software/amazon/smithy/java/dynamicschemas/StructDocument.java index 90be2fb7d..fb595ed40 100644 --- a/dynamic-schemas/src/main/java/software/amazon/smithy/java/dynamicschemas/StructDocument.java +++ b/dynamic-schemas/src/main/java/software/amazon/smithy/java/dynamicschemas/StructDocument.java @@ -12,6 +12,7 @@ import java.util.Set; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableStruct; +import software.amazon.smithy.java.core.schema.TraitKey; import software.amazon.smithy.java.core.serde.ShapeSerializer; import software.amazon.smithy.java.core.serde.document.Document; import software.amazon.smithy.java.core.serde.document.DocumentUtils; @@ -74,17 +75,25 @@ public static StructDocument of(Schema schema, Document delegate, ShapeId servic throw new IllegalArgumentException("Document must be a map, structure, or union, but got " + delegate.type()); } + private static Document convertStructureDocument(Schema schema, Document delegate, ShapeId service) { + Map result = new LinkedHashMap<>(); + for (var member : schema.members()) { + var value = delegate.getMember(member.memberName()); + if (value != null) { + result.put(member.memberName(), convertDocument(member, value, service)); + } + } + return new StructDocument(schema, result, service); + } + static Document convertDocument(Schema schema, Document delegate, ShapeId service) { return switch (schema.type()) { - case STRUCTURE, UNION -> { - Map result = new LinkedHashMap<>(); - for (var member : schema.members()) { - var value = delegate.getMember(member.memberName()); - if (value != null) { - result.put(member.memberName(), convertDocument(member, value, service)); - } + case STRUCTURE -> convertStructureDocument(schema, delegate, service); + case UNION -> { + if (schema.hasTrait(TraitKey.STREAMING_TRAIT)) { + yield Document.of(schema, delegate.asEventStream()); } - yield new StructDocument(schema, result, service); + yield convertStructureDocument(schema, delegate, service); } case MAP -> { Map result = new LinkedHashMap<>(); @@ -117,7 +126,12 @@ static Document convertDocument(Schema schema, Document delegate, ShapeId servic LONG, FLOAT, DOUBLE, BIG_INTEGER, BIG_DECIMAL -> new ContentDocument(Document.ofNumber(delegate.asNumber()), schema); case DOCUMENT -> new ContentDocument(delegate, schema); - case BLOB -> new ContentDocument(Document.of(delegate.asBlob()), schema); + case BLOB -> { + if (schema.hasTrait(TraitKey.STREAMING_TRAIT)) { + yield Document.of(schema, delegate.asDataStream()); + } + yield new ContentDocument(Document.of(delegate.asBlob()), schema); + } default -> throw new IllegalArgumentException("Unsupported schema type: " + schema); }; } diff --git a/dynamic-schemas/src/test/java/software/amazon/smithy/java/dynamicschemas/SchemaGuidedDocumentBuilderTest.java b/dynamic-schemas/src/test/java/software/amazon/smithy/java/dynamicschemas/SchemaGuidedDocumentBuilderTest.java index 2d1c07aa6..883f728f4 100644 --- a/dynamic-schemas/src/test/java/software/amazon/smithy/java/dynamicschemas/SchemaGuidedDocumentBuilderTest.java +++ b/dynamic-schemas/src/test/java/software/amazon/smithy/java/dynamicschemas/SchemaGuidedDocumentBuilderTest.java @@ -27,6 +27,8 @@ import software.amazon.smithy.java.core.serde.SpecificShapeSerializer; import software.amazon.smithy.java.core.serde.document.Document; import software.amazon.smithy.java.core.serde.document.DocumentDeserializer; +import software.amazon.smithy.java.core.serde.event.EventStream; +import software.amazon.smithy.java.io.datastream.DataStream; import software.amazon.smithy.java.json.JsonCodec; import software.amazon.smithy.model.Model; import software.amazon.smithy.model.shapes.ShapeId; @@ -429,4 +431,105 @@ public void usesStructDocumentForSetMember() { assertThat(StructDocumentTest.getDocumentSchema(result.getMember("foo")), equalTo(schema.member("foo"))); assertThat(StructDocumentTest.getDocumentSchema(result.getMember("baz")), equalTo(schema.member("baz"))); } + + @Test + public void setMemberValueHandlesDataStream() { + var testModel = Model.assembler() + .addUnparsedModel("test.smithy", """ + $version: "2" + namespace smithy.example + + structure TestShape { + @required + body: StreamBlob + } + + @streaming + blob StreamBlob + """) + .assemble() + .unwrap(); + var converter = new SchemaConverter(testModel); + var schema = converter.getSchema(testModel.expectShape(ShapeId.from("smithy.example#TestShape"))); + + var ds = DataStream.ofString("hello"); + var builder = SchemaConverter.createDocumentBuilder(schema); + builder.setMemberValue(schema.member("body"), ds); + + var result = builder.build(); + assertThat(result.getMember("body").asDataStream(), equalTo(ds)); + } + + @Test + public void setMemberValueHandlesEventStream() { + var testModel = Model.assembler() + .addUnparsedModel("test.smithy", """ + $version: "2" + namespace smithy.example + + structure TestShape { + events: Events + } + + @streaming + union Events { + data: Data + } + + structure Data { + value: String + } + """) + .assemble() + .unwrap(); + var converter = new SchemaConverter(testModel); + var schema = converter.getSchema(testModel.expectShape(ShapeId.from("smithy.example#TestShape"))); + + var es = EventStream.newWriter(); + var builder = SchemaConverter.createDocumentBuilder(schema); + builder.setMemberValue(schema.member("events"), es); + + var result = builder.build(); + assertThat(result.getMember("events").asEventStream(), equalTo(es)); + } + + @Test + public void deserializesDataStreamMember() { + var testModel = Model.assembler() + .addUnparsedModel("test.smithy", """ + $version: "2" + namespace smithy.example + + structure TestShape { + @required + body: StreamBlob + } + + @streaming + blob StreamBlob + """) + .assemble() + .unwrap(); + var converter = new SchemaConverter(testModel); + var schema = converter.getSchema(testModel.expectShape(ShapeId.from("smithy.example#TestShape"))); + + var ds = DataStream.ofString("hello"); + + // Simulate what the protocol layer does: provide a SpecificShapeDeserializer for the streaming member + var builder = SchemaConverter.createDocumentBuilder(schema); + builder.deserialize(new SpecificShapeDeserializer() { + @Override + public void readStruct(Schema s, T state, StructMemberConsumer consumer) { + consumer.accept(state, schema.member("body"), new SpecificShapeDeserializer() { + @Override + public DataStream readDataStream(Schema s2) { + return ds; + } + }); + } + }); + + var result = builder.build(); + assertThat(result.getMember("body").asDataStream(), equalTo(ds)); + } } diff --git a/dynamic-schemas/src/test/java/software/amazon/smithy/java/dynamicschemas/StructDocumentTest.java b/dynamic-schemas/src/test/java/software/amazon/smithy/java/dynamicschemas/StructDocumentTest.java index 98ad71a03..54dd0ca3d 100644 --- a/dynamic-schemas/src/test/java/software/amazon/smithy/java/dynamicschemas/StructDocumentTest.java +++ b/dynamic-schemas/src/test/java/software/amazon/smithy/java/dynamicschemas/StructDocumentTest.java @@ -33,6 +33,9 @@ import software.amazon.smithy.java.core.serde.SpecificShapeSerializer; import software.amazon.smithy.java.core.serde.document.DiscriminatorException; import software.amazon.smithy.java.core.serde.document.Document; +import software.amazon.smithy.java.core.serde.event.EventStream; +import software.amazon.smithy.java.io.datastream.DataStream; +import software.amazon.smithy.model.Model; import software.amazon.smithy.model.shapes.ShapeId; import software.amazon.smithy.model.shapes.ShapeType; @@ -369,4 +372,98 @@ public void writeString(Schema schema, String value) { assertThat(set[0], equalTo(schema)); assertThat(set[1], equalTo(schema.member("a"))); } + + @Test + public void convertDocumentPassesThroughDataStream() { + var model = Model.assembler() + .addUnparsedModel("test.smithy", """ + $version: "2" + namespace smithy.example + + structure Foo { + @required + body: StreamBlob + } + + @streaming + blob StreamBlob + """) + .assemble() + .unwrap(); + var converter = new SchemaConverter(model); + var schema = converter.getSchema(model.expectShape(ShapeId.from("smithy.example#Foo"))); + + var ds = DataStream.ofString("hello"); + var input = Document.ofObject(Map.of("body", ds)); + var struct = StructDocument.of(schema, input); + + assertThat(struct.getMember("body").asDataStream(), is(ds)); + } + + @Test + public void convertDocumentPassesThroughEventStream() { + var model = Model.assembler() + .addUnparsedModel("test.smithy", """ + $version: "2" + namespace smithy.example + + structure Foo { + events: Events + } + + @streaming + union Events { + data: Data + } + + structure Data { + value: String + } + """) + .assemble() + .unwrap(); + var converter = new SchemaConverter(model); + var schema = converter.getSchema(model.expectShape(ShapeId.from("smithy.example#Foo"))); + + var es = EventStream.newWriter(); + var input = Document.ofObject(Map.of("events", es)); + var struct = StructDocument.of(schema, input); + + assertThat(struct.getMember("events").asEventStream(), is(es)); + } + + @Test + public void dataStreamSerializesMemberCorrectly() { + var model = Model.assembler() + .addUnparsedModel("test.smithy", """ + $version: "2" + namespace smithy.example + + structure Foo { + @required + body: StreamBlob + } + + @streaming + blob StreamBlob + """) + .assemble() + .unwrap(); + var converter = new SchemaConverter(model); + var schema = converter.getSchema(model.expectShape(ShapeId.from("smithy.example#Foo"))); + + var ds = DataStream.ofString("hello"); + var input = Document.ofObject(Map.of("body", ds)); + var struct = StructDocument.of(schema, input); + + DataStream[] captured = new DataStream[1]; + struct.serializeMembers(new SpecificShapeSerializer() { + @Override + public void writeDataStream(Schema s, DataStream value) { + captured[0] = value; + } + }); + + assertThat(captured[0], is(ds)); + } } diff --git a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/RequestSerializer.java b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/RequestSerializer.java index 6a5dd8f26..0b9b3da98 100644 --- a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/RequestSerializer.java +++ b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/RequestSerializer.java @@ -9,7 +9,6 @@ import java.util.Objects; import java.util.concurrent.ConcurrentMap; import software.amazon.smithy.java.core.schema.ApiOperation; -import software.amazon.smithy.java.core.schema.InputEventStreamingApiOperation; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableShape; import software.amazon.smithy.java.core.schema.SerializableStruct; @@ -166,7 +165,7 @@ public HttpRequest serializeRequest() { .uri(targetEndpoint); var eventStream = serializer.getEventStream(); - if (eventStream != null && operation instanceof InputEventStreamingApiOperation) { + if (eventStream != null && operation.inputEventBuilderSupplier() != null) { ProtocolEventStreamWriter> writer = ProtocolEventStreamWriter.of(eventStream); writer.bootstrap((EventEncoderFactory) eventStreamEncodingFactory, null); diff --git a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseSerializer.java b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseSerializer.java index dda59ee1c..bd296b0f1 100644 --- a/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseSerializer.java +++ b/http/http-binding/src/main/java/software/amazon/smithy/java/http/binding/ResponseSerializer.java @@ -8,7 +8,6 @@ import java.util.Objects; import java.util.concurrent.ConcurrentMap; import software.amazon.smithy.java.core.schema.ApiOperation; -import software.amazon.smithy.java.core.schema.OutputEventStreamingApiOperation; import software.amazon.smithy.java.core.schema.Schema; import software.amazon.smithy.java.core.schema.SerializableShape; import software.amazon.smithy.java.core.schema.SerializableStruct; @@ -152,7 +151,7 @@ public HttpResponse serializeResponse() { .statusCode(serializer.getResponseStatus()); var eventStream = serializer.getEventStream(); - if (eventStream != null && operation instanceof OutputEventStreamingApiOperation) { + if (eventStream != null && operation.outputEventBuilderSupplier() != null) { ProtocolEventStreamWriter> writer = ProtocolEventStreamWriter.of(eventStream); writer.bootstrap(eventEncoderFactory, null);