Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions java/Beam/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
## Getting Started Flink Java project - DataStream API

Skeleton project for a basic Apache Beam application to run on Amazon Managed Service for Apache Flink.

* Flink version: 1.19
* Language: Java (11)
* Apache Beam IO: KinesisIO

The project can run both on Amazon Managed Service for Apache Flink, and locally for development.

The application shows how to get runtime configuration.

### Runtime configuration

When running on Amazon Managed Service for Apache Flink the runtime configuration is read from *Runtime Properties*.

When running locally, the configuration is read from the [`resources/flink-application-properties-dev.json`](resources/flink-application-properties-dev.json) file located in the resources folder.

Runtime parameters:

| Group ID | Key | Description |
|-----------------|---------------|---------------------------|
| `InputStream0` | `stream.name` | Name of the input stream |
| `InputStream0` | `aws.region` | (optional) Region of the input stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. |
| `OutputStream0` | `stream.name` | Name of the output stream |
| `OutputStream0` | `aws.region` | (optional) Region of the output stream. If not specified, it will use the application region or the default region of the AWS profile, when running locally. |

All parameters are case-sensitive.

### Running in IntelliJ

You can run this example directly in IntelliJ, without any local Flink cluster or local Flink installation.

See [Running examples locally](../running-examples-locally.md) for details.

### Generating data

You can use [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator),
also available in a [hosted version](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html),
to generate random data to Kinesis Data Stream and test the application.
159 changes: 159 additions & 0 deletions java/Beam/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.amazonaws</groupId>
<artifactId>basic-beam-app</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<buildDirectory>${project.basedir}/target</buildDirectory>
<jar.finalName>${project.name}-${project.version}</jar.finalName>
<target.java.version>11</target.java.version>
<flink.version>1.19.0</flink.version>
<log4j.version>2.25.2</log4j.version>
<beam.version>2.69.0</beam.version>
<kda.runtime.version>1.2.0</kda.runtime.version>
<aws.sdk.version>1.12.794</aws.sdk.version>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<!-- Get the latest SDK version from https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bom -->
<version>${aws.sdk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<!-- BOM provided by Apache Beam community https://beam.apache.org/documentation/sdks/java-dependencies/#manage-dependencies -->
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-bom</artifactId>
<version>${beam.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- Dependencies to retrieve runtime application properties in Managed Service for Apache Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesisanalytics-runtime</artifactId>
<version>${kda.runtime.version}</version>
<scope>provided</scope>
</dependency>

<!-- Apache Beam dependencies -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-1.19</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-amazon-web-services2</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-avro</artifactId>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>

<build>
<directory>${buildDirectory}</directory>
<finalName>${jar.finalName}</finalName>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.amazonaws.services.msf.beam.BasicBeamStreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.amazonaws.services.msf.beam;

import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.kinesis.common.InitialPositionInStream;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

/**
* Basic Apache Beam pipeline that runs on Managed Apache Flink
*/
public class BasicBeamStreamingJob {
private static final Logger LOGGER = LogManager
.getLogger(BasicBeamStreamingJob.class);

// Name of the local JSON resource with the application properties in the same format as they are received from the Amazon Managed Service for Apache Flink runtime
private static final String LOCAL_APPLICATION_PROPERTIES_RESOURCE
= "flink-application-properties-dev.json";

// Property name for the Input Stream
private static final String INPUT_STREAM_PROPERTY_NAME =
"InputStream0";

// Property name for the Output Stream
private static final String OUTPUT_STREAM_PROPERTY_NAME =
"OutputStream0";

/**
* Load application properties from Amazon Managed Service for Apache
* Flink runtime or from a local resource, when the environment is local.
*
* @param env - context of the flink environment that is executed on
* @return properties that were read from either from local file or service
* @throws IOException - thrown if the local properties file doesn't exist
*/
private static Map<String, Properties> loadApplicationProperties(
StreamExecutionEnvironment env) throws IOException {
if (env instanceof LocalStreamEnvironment) {
LOGGER.info(
"Loading application properties from '{}'",
LOCAL_APPLICATION_PROPERTIES_RESOURCE);
return KinesisAnalyticsRuntime.getApplicationProperties(
BasicBeamStreamingJob.class.getClassLoader()
.getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE)
.getPath());
} else {
LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
return KinesisAnalyticsRuntime.getApplicationProperties();
}
}

/***
* Constructs an Apache Beam pipeline that can be executed by Apache Flink
*
* @param inputStream - input stream name that pipeline will source from
* @param outputStream - output stream name that pipeline will sink to
* @return constructed Apache Beam pipeline
*/
public static Pipeline createFlinkPipeline(String inputStream,
String outputStream) {

// Apache Beam requires the runner to be set as Flink Runner explicitly.
FlinkPipelineOptions options =
PipelineOptionsFactory
.create()
.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);

Pipeline p = Pipeline.create(options);
PCollection<Document> record = p.apply(
KinesisIO
.read()
.withStreamName(inputStream)
.withInitialPositionInStream(
InitialPositionInStream.LATEST)
).apply(
MapElements
.into(TypeDescriptor.of(Document.class))
.via((KinesisRecord r)-> new Document(
new String(r.getDataAsBytes())))
).apply(
ParDo.of(new PingPongFn())
);

record.apply(
KinesisIO.<Document>write()
.withStreamName(outputStream)
.withSerializer(new ByteSerializer())
.withPartitioner(new HashKeyPartitioner()));

return p;
}

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();

final Map<String, Properties> props = loadApplicationProperties(env);
final String inputStream = props
.get(INPUT_STREAM_PROPERTY_NAME)
.getProperty("stream.name");
final String outputStream = props
.get(OUTPUT_STREAM_PROPERTY_NAME)
.getProperty("stream.name");

Pipeline p = createFlinkPipeline(inputStream, outputStream);

p.run().waitUntilFinish();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.amazonaws.services.msf.beam;

import org.apache.beam.sdk.transforms.SimpleFunction;

import java.nio.charset.StandardCharsets;

public class ByteSerializer extends SimpleFunction<Document, byte[]> {
/**
* Takes Document object and serialize the text
*
* @param doc - Document object
* @return serialized text that can be sinked to AWS Kinesis
*/
@Override
public byte[] apply(Document doc) {

return doc.getText().getBytes(StandardCharsets.UTF_8);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.amazonaws.services.msf.beam;

import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

@DefaultCoder(AvroCoder.class)
public class Document {
private final String text;

// Default constructor
public Document() {
this.text = "";
}

public Document(String text) {
this.text = text;
}

public String getText() {
return text;
}
}
Loading