diff --git a/java/Beam/README.md b/java/Beam/README.md
new file mode 100644
index 00000000..962f00d3
--- /dev/null
+++ b/java/Beam/README.md
@@ -0,0 +1,40 @@
+## Getting Started Flink Java project - DataStream API
+
+Skeleton project for a basic Apache Beam application to run on Amazon Managed Service for Apache Flink.
+
+* Flink version: 1.19
+* Language: Java (11)
+* Apache Beam IO: KinesisIO
+
+The project can run both on Amazon Managed Service for Apache Flink, and locally for development.
+
+The application shows how to get runtime configuration.
+
+### Runtime configuration
+
+When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*.
+
+When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.
+
+Runtime parameters:
+
+| Group ID | Key | Description |
+|-----------------|---------------|---------------------------|
+| `InputStream0` | `stream.name` | Name of the input stream |
+| `InputStream0` | `aws.region` | (optional) Region of the input stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. |
+| `OutputStream0` | `stream.name` | Name of the output stream |
+| `OutputStream0` | `aws.region` | (optional) Region of the output stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. |
+
+All parameters are case-sensitive.
+
+### Running in IntelliJ
+
+You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.
+
+See [Running examples locally](../running-examples-locally.md) for details.
+
+### Generating data
+
+You can use [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator),
+also available in a [hosted version](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html),
+to generate random data to Kinesis Data Stream and test the application.
diff --git a/java/Beam/pom.xml b/java/Beam/pom.xml
new file mode 100644
index 00000000..f44ba8d7
--- /dev/null
+++ b/java/Beam/pom.xml
@@ -0,0 +1,159 @@
+
+
+ 4.0.0
+
+ com.amazonaws
+ basic-beam-app
+ 1.0
+ jar
+
+
+ UTF-8
+ ${project.basedir}/target
+ ${project.name}-${project.version}
+ 11
+ 1.19.0
+ 2.25.2
+ 2.69.0
+ 1.2.0
+ 1.12.794
+
+
+
+
+
+ com.amazonaws
+ aws-java-sdk-bom
+
+ ${aws.sdk.version}
+ pom
+ import
+
+
+
+ org.apache.beam
+ beam-sdks-java-bom
+ ${beam.version}
+ pom
+ import
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+ com.amazonaws
+ aws-kinesisanalytics-runtime
+ ${kda.runtime.version}
+ provided
+
+
+
+
+ org.apache.beam
+ beam-sdks-java-core
+
+
+ org.apache.beam
+ beam-runners-flink-1.19
+
+
+ org.apache.beam
+ beam-sdks-java-io-amazon-web-services2
+
+
+ org.apache.beam
+ beam-sdks-java-extensions-avro
+
+
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+ ${log4j.version}
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j.version}
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j.version}
+
+
+
+
+ ${buildDirectory}
+ ${jar.finalName}
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ ${target.java.version}
+ ${target.java.version}
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.1
+
+
+
+ package
+
+ shade
+
+
+
+
+ org.apache.flink:force-shading
+ com.google.code.findbugs:jsr305
+ org.slf4j:*
+ log4j:*
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+ com.amazonaws.services.msf.beam.BasicBeamStreamingJob
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/java/Beam/src/main/java/com/amazonaws/services/msf/beam/BasicBeamStreamingJob.java b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/BasicBeamStreamingJob.java
new file mode 100644
index 00000000..74e650d9
--- /dev/null
+++ b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/BasicBeamStreamingJob.java
@@ -0,0 +1,125 @@
+package com.amazonaws.services.msf.beam;
+
+import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.runners.flink.FlinkRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import software.amazon.kinesis.common.InitialPositionInStream;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Basic Apache Beam pipeline that runs on Managed Apache Flink
+ */
+public class BasicBeamStreamingJob {
+ private static final Logger LOGGER = LogManager
+ .getLogger(BasicBeamStreamingJob.class);
+
+ // Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime
+ private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE
+ = "flink-application-properties-dev.json";
+
+ // Property name for the Input Stream
+ private static final String INPUT_STREAM_PROPERTY_NAME =
+ "InputStream0";
+
+ // Property name for the Output Stream
+ private static final String OUTPUT_STREAM_PROPERTY_NAME =
+ "OutputStream0";
+
+ /**
+ * Load application properties from Amazon Managed Service for Apache
+ * Flink runtime or from a local resource, when the environment is local.
+ *
+ * @param env - context of the flink environment that is executed on
+ * @return properties that were read from either from local file or service
+ * @throws IOException - thrown if the local properties file doesn't exist
+ */
+ private static Map loadApplicationProperties(
+ StreamExecutionEnvironment env) throws IOException {
+ if (env instanceof LocalStreamEnvironment) {
+ LOGGER.info(
+ "Loading application properties from '{}'",
+ LOCAL_APPLICATION_PROPERTIES_RESOURCE);
+ return KinesisAnalyticsRuntime.getApplicationProperties(
+ BasicBeamStreamingJob.class.getClassLoader()
+ .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)
+ .getPath());
+ } else {
+ LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
+ return KinesisAnalyticsRuntime.getApplicationProperties();
+ }
+ }
+
+ /***
+ * Constructs an Apache Beam pipeline that can be executed by Apache Flink
+ *
+ * @param inputStream - input stream name that pipeline will source from
+ * @param outputStream - output stream name that pipeline will sink to
+ * @return constructed Apache Beam pipeline
+ */
+ public static Pipeline createFlinkPipeline(String inputStream,
+ String outputStream) {
+
+ // Apache Beam requires the runner to be set as Flink Runner explicitly.
+ FlinkPipelineOptions options =
+ PipelineOptionsFactory
+ .create()
+ .as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+
+ Pipeline p = Pipeline.create(options);
+ PCollection record = p.apply(
+ KinesisIO
+ .read()
+ .withStreamName(inputStream)
+ .withInitialPositionInStream(
+ InitialPositionInStream.LATEST)
+ ).apply(
+ MapElements
+ .into(TypeDescriptor.of(Document.class))
+ .via((KinesisRecord r)-> new Document(
+ new String(r.getDataAsBytes())))
+ ).apply(
+ ParDo.of(new PingPongFn())
+ );
+
+ record.apply(
+ KinesisIO.write()
+ .withStreamName(outputStream)
+ .withSerializer(new ByteSerializer())
+ .withPartitioner(new HashKeyPartitioner()));
+
+ return p;
+ }
+
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .getExecutionEnvironment();
+
+ final Map props = loadApplicationProperties(env);
+ final String inputStream = props
+ .get(INPUT_STREAM_PROPERTY_NAME)
+ .getProperty("stream.name");
+ final String outputStream = props
+ .get(OUTPUT_STREAM_PROPERTY_NAME)
+ .getProperty("stream.name");
+
+ Pipeline p = createFlinkPipeline(inputStream, outputStream);
+
+ p.run().waitUntilFinish();
+ }
+}
diff --git a/java/Beam/src/main/java/com/amazonaws/services/msf/beam/ByteSerializer.java b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/ByteSerializer.java
new file mode 100644
index 00000000..d9c224c4
--- /dev/null
+++ b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/ByteSerializer.java
@@ -0,0 +1,19 @@
+package com.amazonaws.services.msf.beam;
+
+import org.apache.beam.sdk.transforms.SimpleFunction;
+
+import java.nio.charset.StandardCharsets;
+
+public class ByteSerializer extends SimpleFunction {
+ /**
+ * Takes Document object and serialize the text
+ *
+ * @param doc - Document object
+ * @return serialized text that can be sinked to AWS Kinesis
+ */
+ @Override
+ public byte[] apply(Document doc) {
+
+ return doc.getText().getBytes(StandardCharsets.UTF_8);
+ }
+}
diff --git a/java/Beam/src/main/java/com/amazonaws/services/msf/beam/Document.java b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/Document.java
new file mode 100644
index 00000000..1a7280de
--- /dev/null
+++ b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/Document.java
@@ -0,0 +1,22 @@
+package com.amazonaws.services.msf.beam;
+
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
+
+@DefaultCoder(AvroCoder.class)
+public class Document {
+ private final String text;
+
+ // Default constructor
+ public Document() {
+ this.text = "";
+ }
+
+ public Document(String text) {
+ this.text = text;
+ }
+
+ public String getText() {
+ return text;
+ }
+}
diff --git a/java/Beam/src/main/java/com/amazonaws/services/msf/beam/HashKeyPartitioner.java b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/HashKeyPartitioner.java
new file mode 100644
index 00000000..8ef8b882
--- /dev/null
+++ b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/HashKeyPartitioner.java
@@ -0,0 +1,32 @@
+package com.amazonaws.services.msf.beam;
+
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisPartitioner;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Arrays;
+
+public class HashKeyPartitioner implements KinesisPartitioner {
+ private final Logger LOGGER = LogManager.getLogger(HashKeyPartitioner.class);
+
+ /**
+ * Creates a partition key based on text of Document object
+ *
+ * @param doc - a Document object
+ * @return partition key as hash code
+ */
+ @Override
+ public @NotNull @UnknownKeyFor @NonNull @Initialized
+ String getPartitionKey(Document doc) {
+ String partitionKey = String.valueOf(
+ Arrays.hashCode(doc.getText().getBytes())
+ );
+
+ LOGGER.info("Partition key is {}", partitionKey);
+ return partitionKey;
+ }
+}
diff --git a/java/Beam/src/main/java/com/amazonaws/services/msf/beam/PingPongFn.java b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/PingPongFn.java
new file mode 100644
index 00000000..9e32b93b
--- /dev/null
+++ b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/PingPongFn.java
@@ -0,0 +1,29 @@
+package com.amazonaws.services.msf.beam;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class PingPongFn extends DoFn {
+ private final Logger LOGGER = LogManager.getLogger(PingPongFn.class);
+
+ /**
+ * Example of DoFn. Replaces term ping to pong.
+ *
+ * @param c
+ */
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ // PCollection is immutable
+ Document document = c.element();
+
+ if (document.getText().trim().equalsIgnoreCase("ping")) {
+ LOGGER.info("Ponged!");
+ c.output(new Document("pong"));
+ } else {
+ LOGGER.info("No action for: {}", document.getText());
+ c.output(document);
+ }
+ }
+}
diff --git a/java/Beam/src/main/resources/flink-application-properties-dev.json b/java/Beam/src/main/resources/flink-application-properties-dev.json
new file mode 100644
index 00000000..40f8cdce
--- /dev/null
+++ b/java/Beam/src/main/resources/flink-application-properties-dev.json
@@ -0,0 +1,16 @@
+[
+ {
+ "PropertyGroupId": "InputStream0",
+ "PropertyMap": {
+ "aws.region": "us-east-1",
+ "stream.name": "ExampleInputStream"
+ }
+ },
+ {
+ "PropertyGroupId": "OutputStream0",
+ "PropertyMap": {
+ "aws.region": "us-east-1",
+ "stream.name": "ExampleOutputStream"
+ }
+ }
+]
diff --git a/java/Beam/src/main/resources/log4j2.properties b/java/Beam/src/main/resources/log4j2.properties
new file mode 100644
index 00000000..35466433
--- /dev/null
+++ b/java/Beam/src/main/resources/log4j2.properties
@@ -0,0 +1,7 @@
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/java/pom.xml b/java/pom.xml
index 89e16e04..02942d69 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -1,51 +1,46 @@
-
-
- 4.0.0
-
- com.amazonaws
- amazon-msf-examples
- 1.0
- pom
-
- Amazon Managed Service for Apache Flink - Java examples
-
-
-
- AsyncIO
- AvroGlueSchemaRegistryKafka
- AvroGlueSchemaRegistryKinesis
- CustomMetrics
- GettingStarted
- GettingStartedTable
- Iceberg/IcebergDataStreamSink
- Iceberg/IcebergDataStreamSource
- Iceberg/S3TableSink
- Iceberg/IcebergSQLSink
- KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders
- KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders
- KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders
- KafkaConnectors
- KinesisConnectors
- KinesisSourceDeaggregation
- DynamoDBStreamSource
- KinesisFirehoseSink
- S3ParquetSink
- S3ParquetSource
- S3Sink
- Windowing
- Serialization/CustomTypeInfo
- SideOutputs
- PrometheusSink
- SQSSink
- S3AvroSink
- S3AvroSource
- FlinkCDC/FlinkCDCSQLServerSource
- FlinkDataGenerator
- JdbcSink
- FetchSecrets
-
-
\ No newline at end of file
+
+ AsyncIO
+ AvroGlueSchemaRegistryKafka
+ AvroGlueSchemaRegistryKinesis
+ CustomMetrics
+ GettingStarted
+ GettingStartedTable
+ Iceberg/IcebergDataStreamSink
+ Iceberg/IcebergDataStreamSource
+ Iceberg/S3TableSink
+ Iceberg/IcebergSQLSink
+ KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders
+ KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders
+ KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders
+ KafkaConnectors
+ KinesisConnectors
+ KinesisSourceDeaggregation
+ DynamoDBStreamSource
+ KinesisFirehoseSink
+ S3ParquetSink
+ S3ParquetSource
+ S3Sink
+ Windowing
+ Serialization/CustomTypeInfo
+ SideOutputs
+ PrometheusSink
+ SQSSink
+ S3AvroSink
+ S3AvroSource
+ FlinkCDC/FlinkCDCSQLServerSource
+ FlinkDataGenerator
+ JdbcSink
+ FetchSecrets
+ Beam
+
+