From f46916fc4b0ebd34effbe38d26857da93f341f0e Mon Sep 17 00:00:00 2001 From: ywatanabe Date: Sun, 14 Dec 2025 16:08:09 +0900 Subject: [PATCH] feat: Apache Beam example for Managed Apache Flink service --- java/Beam/README.md | 40 +++++ java/Beam/pom.xml | 159 ++++++++++++++++++ .../msf/beam/BasicBeamStreamingJob.java | 125 ++++++++++++++ .../services/msf/beam/ByteSerializer.java | 19 +++ .../amazonaws/services/msf/beam/Document.java | 22 +++ .../services/msf/beam/HashKeyPartitioner.java | 32 ++++ .../services/msf/beam/PingPongFn.java | 29 ++++ .../flink-application-properties-dev.json | 16 ++ .../Beam/src/main/resources/log4j2.properties | 7 + java/pom.xml | 93 +++++----- 10 files changed, 493 insertions(+), 49 deletions(-) create mode 100644 java/Beam/README.md create mode 100644 java/Beam/pom.xml create mode 100644 java/Beam/src/main/java/com/amazonaws/services/msf/beam/BasicBeamStreamingJob.java create mode 100644 java/Beam/src/main/java/com/amazonaws/services/msf/beam/ByteSerializer.java create mode 100644 java/Beam/src/main/java/com/amazonaws/services/msf/beam/Document.java create mode 100644 java/Beam/src/main/java/com/amazonaws/services/msf/beam/HashKeyPartitioner.java create mode 100644 java/Beam/src/main/java/com/amazonaws/services/msf/beam/PingPongFn.java create mode 100644 java/Beam/src/main/resources/flink-application-properties-dev.json create mode 100644 java/Beam/src/main/resources/log4j2.properties diff --git a/java/Beam/README.md b/java/Beam/README.md new file mode 100644 index 00000000..962f00d3 --- /dev/null +++ b/java/Beam/README.md @@ -0,0 +1,40 @@ +## Getting Started Flink Java project - DataStream API + +Skeleton project for a basic Apache Beam application to run on Amazon Managed Service for Apache Flink. + +* Flink version: 1.19 +* Language: Java (11) +* Apache Beam IO: KinesisIO + +The project can run both on Amazon Managed Service for Apache Flink, and locally for development. + +The application shows how to get runtime configuration. + +### Runtime configuration + +When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*. + +When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder. + +Runtime parameters: + +| Group ID | Key | Description | +|-----------------|---------------|---------------------------| +| `InputStream0` | `stream.name` | Name of the input stream | +| `InputStream0` | `aws.region` | (optional) Region of the input stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. | +| `OutputStream0` | `stream.name` | Name of the output stream | +| `OutputStream0` | `aws.region` | (optional) Region of the output stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. | + +All parameters are case-sensitive. + +### Running in IntelliJ + +You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation. + +See [Running examples locally](../running-examples-locally.md) for details. + +### Generating data + +You can use [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator), +also available in a [hosted version](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html), +to generate random data to Kinesis Data Stream and test the application. diff --git a/java/Beam/pom.xml b/java/Beam/pom.xml new file mode 100644 index 00000000..f44ba8d7 --- /dev/null +++ b/java/Beam/pom.xml @@ -0,0 +1,159 @@ + + + 4.0.0 + + com.amazonaws + basic-beam-app + 1.0 + jar + + + UTF-8 + ${project.basedir}/target + ${project.name}-${project.version} + 11 + 1.19.0 + 2.25.2 + 2.69.0 + 1.2.0 + 1.12.794 + + + + + + com.amazonaws + aws-java-sdk-bom + + ${aws.sdk.version} + pom + import + + + + org.apache.beam + beam-sdks-java-bom + ${beam.version} + pom + import + + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + com.amazonaws + aws-kinesisanalytics-runtime + ${kda.runtime.version} + provided + + + + + org.apache.beam + beam-sdks-java-core + + + org.apache.beam + beam-runners-flink-1.19 + + + org.apache.beam + beam-sdks-java-io-amazon-web-services2 + + + org.apache.beam + beam-sdks-java-extensions-avro + + + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + + + + + ${buildDirectory} + ${jar.finalName} + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${target.java.version} + ${target.java.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + package + + shade + + + + + org.apache.flink:force-shading + com.google.code.findbugs:jsr305 + org.slf4j:* + log4j:* + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + com.amazonaws.services.msf.beam.BasicBeamStreamingJob + + + + + + + + + \ No newline at end of file diff --git a/java/Beam/src/main/java/com/amazonaws/services/msf/beam/BasicBeamStreamingJob.java b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/BasicBeamStreamingJob.java new file mode 100644 index 00000000..74e650d9 --- /dev/null +++ b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/BasicBeamStreamingJob.java @@ -0,0 +1,125 @@ +package com.amazonaws.services.msf.beam; + +import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.FlinkRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; +import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import software.amazon.kinesis.common.InitialPositionInStream; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +/** + * Basic Apache Beam pipeline that runs on Managed Apache Flink + */ +public class BasicBeamStreamingJob { + private static final Logger LOGGER = LogManager + .getLogger(BasicBeamStreamingJob.class); + + // Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime + private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE + = "flink-application-properties-dev.json"; + + // Property name for the Input Stream + private static final String INPUT_STREAM_PROPERTY_NAME = + "InputStream0"; + + // Property name for the Output Stream + private static final String OUTPUT_STREAM_PROPERTY_NAME = + "OutputStream0"; + + /** + * Load application properties from Amazon Managed Service for Apache + * Flink runtime or from a local resource, when the environment is local. + * + * @param env - context of the flink environment that is executed on + * @return properties that were read from either from local file or service + * @throws IOException - thrown if the local properties file doesn't exist + */ + private static Map loadApplicationProperties( + StreamExecutionEnvironment env) throws IOException { + if (env instanceof LocalStreamEnvironment) { + LOGGER.info( + "Loading application properties from '{}'", + LOCAL_APPLICATION_PROPERTIES_RESOURCE); + return KinesisAnalyticsRuntime.getApplicationProperties( + BasicBeamStreamingJob.class.getClassLoader() + .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE) + .getPath()); + } else { + LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); + return KinesisAnalyticsRuntime.getApplicationProperties(); + } + } + + /*** + * Constructs an Apache Beam pipeline that can be executed by Apache Flink + * + * @param inputStream - input stream name that pipeline will source from + * @param outputStream - output stream name that pipeline will sink to + * @return constructed Apache Beam pipeline + */ + public static Pipeline createFlinkPipeline(String inputStream, + String outputStream) { + + // Apache Beam requires the runner to be set as Flink Runner explicitly. + FlinkPipelineOptions options = + PipelineOptionsFactory + .create() + .as(FlinkPipelineOptions.class); + options.setRunner(FlinkRunner.class); + + Pipeline p = Pipeline.create(options); + PCollection record = p.apply( + KinesisIO + .read() + .withStreamName(inputStream) + .withInitialPositionInStream( + InitialPositionInStream.LATEST) + ).apply( + MapElements + .into(TypeDescriptor.of(Document.class)) + .via((KinesisRecord r)-> new Document( + new String(r.getDataAsBytes()))) + ).apply( + ParDo.of(new PingPongFn()) + ); + + record.apply( + KinesisIO.write() + .withStreamName(outputStream) + .withSerializer(new ByteSerializer()) + .withPartitioner(new HashKeyPartitioner())); + + return p; + } + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment + .getExecutionEnvironment(); + + final Map props = loadApplicationProperties(env); + final String inputStream = props + .get(INPUT_STREAM_PROPERTY_NAME) + .getProperty("stream.name"); + final String outputStream = props + .get(OUTPUT_STREAM_PROPERTY_NAME) + .getProperty("stream.name"); + + Pipeline p = createFlinkPipeline(inputStream, outputStream); + + p.run().waitUntilFinish(); + } +} diff --git a/java/Beam/src/main/java/com/amazonaws/services/msf/beam/ByteSerializer.java b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/ByteSerializer.java new file mode 100644 index 00000000..d9c224c4 --- /dev/null +++ b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/ByteSerializer.java @@ -0,0 +1,19 @@ +package com.amazonaws.services.msf.beam; + +import org.apache.beam.sdk.transforms.SimpleFunction; + +import java.nio.charset.StandardCharsets; + +public class ByteSerializer extends SimpleFunction { + /** + * Takes Document object and serialize the text + * + * @param doc - Document object + * @return serialized text that can be sinked to AWS Kinesis + */ + @Override + public byte[] apply(Document doc) { + + return doc.getText().getBytes(StandardCharsets.UTF_8); + } +} diff --git a/java/Beam/src/main/java/com/amazonaws/services/msf/beam/Document.java b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/Document.java new file mode 100644 index 00000000..1a7280de --- /dev/null +++ b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/Document.java @@ -0,0 +1,22 @@ +package com.amazonaws.services.msf.beam; + +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; + +@DefaultCoder(AvroCoder.class) +public class Document { + private final String text; + + // Default constructor + public Document() { + this.text = ""; + } + + public Document(String text) { + this.text = text; + } + + public String getText() { + return text; + } +} diff --git a/java/Beam/src/main/java/com/amazonaws/services/msf/beam/HashKeyPartitioner.java b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/HashKeyPartitioner.java new file mode 100644 index 00000000..8ef8b882 --- /dev/null +++ b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/HashKeyPartitioner.java @@ -0,0 +1,32 @@ +package com.amazonaws.services.msf.beam; + +import org.apache.beam.sdk.io.aws2.kinesis.KinesisPartitioner; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; +import org.jetbrains.annotations.NotNull; + +import java.util.Arrays; + +public class HashKeyPartitioner implements KinesisPartitioner { + private final Logger LOGGER = LogManager.getLogger(HashKeyPartitioner.class); + + /** + * Creates a partition key based on text of Document object + * + * @param doc - a Document object + * @return partition key as hash code + */ + @Override + public @NotNull @UnknownKeyFor @NonNull @Initialized + String getPartitionKey(Document doc) { + String partitionKey = String.valueOf( + Arrays.hashCode(doc.getText().getBytes()) + ); + + LOGGER.info("Partition key is {}", partitionKey); + return partitionKey; + } +} diff --git a/java/Beam/src/main/java/com/amazonaws/services/msf/beam/PingPongFn.java b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/PingPongFn.java new file mode 100644 index 00000000..9e32b93b --- /dev/null +++ b/java/Beam/src/main/java/com/amazonaws/services/msf/beam/PingPongFn.java @@ -0,0 +1,29 @@ +package com.amazonaws.services.msf.beam; + +import org.apache.beam.sdk.transforms.DoFn; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class PingPongFn extends DoFn { + private final Logger LOGGER = LogManager.getLogger(PingPongFn.class); + + /** + * Example of DoFn. Replaces term ping to pong. + * + * @param c + */ + @ProcessElement + public void processElement(ProcessContext c) { + // PCollection is immutable + Document document = c.element(); + + if (document.getText().trim().equalsIgnoreCase("ping")) { + LOGGER.info("Ponged!"); + c.output(new Document("pong")); + } else { + LOGGER.info("No action for: {}", document.getText()); + c.output(document); + } + } +} diff --git a/java/Beam/src/main/resources/flink-application-properties-dev.json b/java/Beam/src/main/resources/flink-application-properties-dev.json new file mode 100644 index 00000000..40f8cdce --- /dev/null +++ b/java/Beam/src/main/resources/flink-application-properties-dev.json @@ -0,0 +1,16 @@ +[ + { + "PropertyGroupId": "InputStream0", + "PropertyMap": { + "aws.region": "us-east-1", + "stream.name": "ExampleInputStream" + } + }, + { + "PropertyGroupId": "OutputStream0", + "PropertyMap": { + "aws.region": "us-east-1", + "stream.name": "ExampleOutputStream" + } + } +] diff --git a/java/Beam/src/main/resources/log4j2.properties b/java/Beam/src/main/resources/log4j2.properties new file mode 100644 index 00000000..35466433 --- /dev/null +++ b/java/Beam/src/main/resources/log4j2.properties @@ -0,0 +1,7 @@ +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/java/pom.xml b/java/pom.xml index 89e16e04..02942d69 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -1,51 +1,46 @@ - - - 4.0.0 - - com.amazonaws - amazon-msf-examples - 1.0 - pom - - Amazon Managed Service for Apache Flink - Java examples - - - - AsyncIO - AvroGlueSchemaRegistryKafka - AvroGlueSchemaRegistryKinesis - CustomMetrics - GettingStarted - GettingStartedTable - Iceberg/IcebergDataStreamSink - Iceberg/IcebergDataStreamSource - Iceberg/S3TableSink - Iceberg/IcebergSQLSink - KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders - KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders - KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders - KafkaConnectors - KinesisConnectors - KinesisSourceDeaggregation - DynamoDBStreamSource - KinesisFirehoseSink - S3ParquetSink - S3ParquetSource - S3Sink - Windowing - Serialization/CustomTypeInfo - SideOutputs - PrometheusSink - SQSSink - S3AvroSink - S3AvroSource - FlinkCDC/FlinkCDCSQLServerSource - FlinkDataGenerator - JdbcSink - FetchSecrets - - \ No newline at end of file + + AsyncIO + AvroGlueSchemaRegistryKafka + AvroGlueSchemaRegistryKinesis + CustomMetrics + GettingStarted + GettingStartedTable + Iceberg/IcebergDataStreamSink + Iceberg/IcebergDataStreamSource + Iceberg/S3TableSink + Iceberg/IcebergSQLSink + KafkaConfigProviders/Kafka-mTLS-Keystore-ConfigProviders + KafkaConfigProviders/Kafka-SASL_SSL-ConfigProviders + KafkaConfigProviders/Kafka-mTLS-Keystore-Sql-ConfigProviders + KafkaConnectors + KinesisConnectors + KinesisSourceDeaggregation + DynamoDBStreamSource + KinesisFirehoseSink + S3ParquetSink + S3ParquetSource + S3Sink + Windowing + Serialization/CustomTypeInfo + SideOutputs + PrometheusSink + SQSSink + S3AvroSink + S3AvroSource + FlinkCDC/FlinkCDCSQLServerSource + FlinkDataGenerator + JdbcSink + FetchSecrets + Beam + +