From 347b5500b0a820cc47b19c8c0a46535680906e4b Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Fri, 27 Mar 2026 15:26:09 +0530 Subject: [PATCH 1/3] [FLINK-39118] Add documentation for Native s3 FileSystem --- .../content/docs/deployment/filesystems/s3.md | 277 ++++++++++++++---- 1 file changed, 219 insertions(+), 58 deletions(-) diff --git a/docs/content/docs/deployment/filesystems/s3.md b/docs/content/docs/deployment/filesystems/s3.md index 965cc3c0130f9..a233affce15a5 100644 --- a/docs/content/docs/deployment/filesystems/s3.md +++ b/docs/content/docs/deployment/filesystems/s3.md @@ -64,94 +64,249 @@ env.configure(config); Note that these examples are *not* exhaustive and you can use S3 in other places as well, including your [high availability setup]({{< ref "docs/deployment/ha/overview" >}}) or the [EmbeddedRocksDBStateBackend]({{< ref "docs/ops/state/state_backends" >}}#the-rocksdbstatebackend); everywhere that Flink expects a FileSystem URI (unless otherwise stated). -For most use cases, you may use one of our `flink-s3-fs-hadoop` and `flink-s3-fs-presto` S3 filesystem plugins which are self-contained and easy to set up. -For some cases, however, e.g., for using S3 as YARN's resource storage dir, it may be necessary to set up a specific Hadoop S3 filesystem implementation. +## S3 FileSystem Implementations -### Hadoop/Presto S3 File Systems plugins +Flink provides three independent S3 filesystem implementations, each with different trade-offs: -{{< hint info >}} -You don't have to configure this manually if you are running [Flink on EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html). -{{< /hint >}} - -Flink provides two file systems to talk to Amazon S3, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. -Both implementations are self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use them. - - - `flink-s3-fs-presto`, registered under the scheme *s3://* and *s3p://*, is based on code from the [Presto project](https://prestodb.io/). - You can configure it using [the same configuration keys as the Presto file system](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration), by adding the configurations to your [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}). The Presto S3 implementation is the recommended file system for checkpointing to S3. - - - `flink-s3-fs-hadoop`, registered under *s3://* and *s3a://*, based on code from the [Hadoop Project](https://hadoop.apache.org/). - The file system can be [configured using Hadoop's s3a configuration keys](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A) by adding the configurations to your [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}). - - For example, Hadoop has a `fs.s3a.connection.maximum` configuration key. If you want to change it, you need to put `s3.connection.maximum: xyz` to the [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}). Flink will internally translate this back to `fs.s3a.connection.maximum`. There is no need to pass configuration parameters using Hadoop's XML configuration files. - - It is the only S3 file system with support for the [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}). - - -Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` register default FileSystem -wrappers for URIs with the *s3://* scheme, `flink-s3-fs-hadoop` also registers -for *s3a://* and `flink-s3-fs-presto` also registers for *s3p://*, so you can -use this to use both at the same time. -For example, the job uses the [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) which only supports Hadoop, but uses Presto for checkpointing. -In this case, you should explicitly use *s3a://* as a scheme for the sink (Hadoop) and *s3p://* for checkpointing (Presto). +- **Native S3 FileSystem** (`flink-s3-fs-native`): A drop-in replacement built on AWS SDK v2 with minimal dependencies. **Experimental** in Flink 2.3. +- **Presto S3 FileSystem** (`flink-s3-fs-presto`): Based on Presto project code, recommended for checkpointing. +- **Hadoop S3 FileSystem** (`flink-s3-fs-hadoop`): Based on Hadoop project code, has FileSystem sink support. -To use `flink-s3-fs-hadoop` or `flink-s3-fs-presto`, copy the respective JAR file from the `opt` directory to the `plugins` directory of your Flink distribution before starting Flink, e.g. +All three are self-contained with no dependency footprint, so there is no need to add Hadoop to the classpath to use them. -```bash -mkdir ./plugins/s3-fs-presto -cp ./opt/flink-s3-fs-presto-{{< version >}}.jar ./plugins/s3-fs-presto/ -``` +## Common Configuration -#### Configure Access Credentials +### Configure Access Credentials -After setting up the S3 FileSystem wrapper, you need to make sure that Flink is allowed to access your S3 buckets. +After setting up the S3 FileSystem implementation, you need to make sure that Flink is allowed to access your S3 buckets. -##### Identity and Access Management (IAM) (Recommended) +#### Identity and Access Management (IAM) (Recommended) The recommended way of setting up credentials on AWS is via [Identity and Access Management (IAM)](http://docs.aws.amazon.com/IAM/latest/UserGuide/introduction.html). You can use IAM features to securely give Flink instances the credentials that they need to access S3 buckets. Details about how to do this are beyond the scope of this documentation. Please refer to the AWS user guide. What you are looking for are [IAM Roles](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html). If you set this up correctly, you can manage access to S3 within AWS and don't need to distribute any access keys to Flink. -##### Access Keys (Discouraged) +#### Access Keys (Discouraged) Access to S3 can be granted via your **access and secret key pair**. Please note that this is discouraged since the [introduction of IAM roles](https://blogs.aws.amazon.com/security/post/Tx1XG3FX6VMU6O5/A-safer-way-to-distribute-AWS-credentials-to-EC2). -You need to configure both `s3.access-key` and `s3.secret-key` in Flink's [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): +You need to configure both `s3.access-key` and `s3.secret-key` in Flink's [configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml s3.access-key: your-access-key s3.secret-key: your-secret-key ``` -You can limit this configuration to JobManagers by using [Flink configuration file]({{< ref "docs/deployment/security/security-delegation-token" >}}). +You can limit this configuration to JobManagers by using [delegation tokens]({{< ref "docs/deployment/security/security-delegation-token" >}}): ```yaml -# flink-s3-fs-hadoop -fs.s3a.aws.credentials.provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider -# flink-s3-fs-presto +# For Native S3 or Hadoop implementations +fs.s3.aws.credentials.provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider +# For Presto implementation presto.s3.credentials-provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider ``` -## Configure Non-S3 Endpoint +### Configure Non-S3 Endpoint + +The S3 filesystems also support using S3 compliant object stores such as [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and [MinIO](https://min.io/). +To do so, configure your endpoint in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): + +```yaml +s3.endpoint: your-endpoint-hostname +``` + +### Configure Path Style Access + +Some S3 compliant object stores might not have virtual host style addressing enabled by default, for example when using Standalone MinIO for testing purpose. In such cases, you will have to provide the property to enable path style access in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): + +```yaml +s3.path-style-access: true +``` + +## S3 FileSystem Implementations + +### Native S3 FileSystem + +{{< hint warning >}} +**Experimental**: The Native S3 FileSystem implementation is experimental in Flink 2.3. While functionally complete, it should not yet be used in production environments. Please use Presto or Hadoop implementations for production deployments. +{{< /hint >}} + +The Native S3 FileSystem is a pure-Java implementation built on the AWS SDK v2. It requires no additional dependencies and provides a drop-in replacement for the Presto and Hadoop implementations. + +#### Setup + +To use the Native S3 FileSystem, copy the JAR file from the `opt` directory to the `plugins` directory: + +```bash +mkdir -p ./plugins/s3-fs-native +cp ./opt/flink-s3-fs-native-{{< version >}}.jar ./plugins/s3-fs-native/ +``` + +#### Features + +- **No external dependencies**: Built on AWS SDK v2 with minimal footprint +- **Drop-in replacement**: Compatible with the same S3 URI schemes (`s3://`) +- **Encryption support**: Server-side encryption (SSE) and KMS encryption +- **Assume role**: Cross-account access via IAM role assumption +- **Entropy injection**: Optimize S3 scalability through random key prefixes +- **Bulk copy**: Efficient multi-part copy operations via S3TransferManager + +#### Configuration + +The Native S3 FileSystem uses the following configuration options: + +```yaml +# AWS credentials (if using static credentials) +s3.access-key: your-access-key +s3.secret-key: your-secret-key + +# AWS region (optional; auto-detected if not specified) +s3.region: us-east-1 + +# Custom S3 endpoint for S3-compatible storage +s3.endpoint: your-endpoint-hostname + +# Path style access for S3-compatible storage +s3.path-style-access: true + +# Server-side encryption +s3.sse.type: sse-s3 # or sse-kms, aws:kms, AES256, none (default) +s3.sse.kms.key-id: arn:aws:kms:region:account:key/id # For SSE-KMS + +# IAM role assumption for cross-account access +s3.assume-role.arn: arn:aws:iam::account:role/RoleName +s3.assume-role.external-id: external-id-if-required +s3.assume-role.session-name: flink-s3-session +s3.assume-role.session-duration: 3600 + +# Performance tuning +s3.upload.min.part.size: 5242880 # 5MB default +s3.upload.max.concurrent.uploads: 4 # Based on CPU cores +s3.read.buffer.size: 262144 # 256KB default +s3.async.enabled: true # Enable async operations +s3.bulk-copy.enabled: true # Enable bulk copy +s3.bulk-copy.max-concurrent: 16 # Max concurrent copy ops + +# Entropy injection for scalability +s3.entropy.key: _entropy_ +s3.entropy.length: 4 + +# Retry configuration +s3.retry.max-num-retries: 3 + +# Credentials provider +fs.s3.aws.credentials.provider: software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider +``` + +See the [AWS SDK v2 documentation](https://docs.aws.amazon.com/sdk-for-java/) for additional configuration details. + +--- + +### Presto S3 FileSystem + +{{< hint info >}} +You don't have to configure this manually if you are running [Flink on EMR](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html). +{{< /hint >}} + +The Presto S3 FileSystem is based on code from the [Presto project](https://prestodb.io/). It is registered under the schemes *s3://* and *s3p://*. + +#### Features + +- **Recommended for checkpointing**: The Presto implementation is the recommended file system for checkpointing to S3 +- **Self-contained**: No Hadoop dependency required +- **Production-ready**: Stable and widely used + +#### Setup + +To use the Presto S3 FileSystem, copy the JAR file from the `opt` directory to the `plugins` directory: + +```bash +mkdir -p ./plugins/s3-fs-presto +cp ./opt/flink-s3-fs-presto-{{< version >}}.jar ./plugins/s3-fs-presto/ +``` + +#### Configuration -The S3 Filesystems also support using S3 compliant object stores such as [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and [MinIO](https://min.io/). -To do so, configure your endpoint in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}). +Configure it using [the same configuration keys as the Presto file system](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration), by adding the configurations to your [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml +# AWS credentials +s3.access-key: your-access-key +s3.secret-key: your-secret-key + +# Custom endpoint s3.endpoint: your-endpoint-hostname + +# Path style access +s3.path-style-access: true + +# Credentials provider +presto.s3.credentials-provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider +``` + +Refer to the [Presto documentation](https://prestodb.io/docs/0.272/connector/hive.html#amazon-s3-configuration) for all available configuration options. + +--- + +### Hadoop S3 FileSystem + +The Hadoop S3 FileSystem is based on code from the [Hadoop Project](https://hadoop.apache.org/). It is registered under the schemes *s3://* and *s3a://*. + +#### Features + +- **FileSystem sink support**: The only S3 implementation with support for the [FileSystem sink]({{< ref "docs/connectors/datastream/filesystem" >}}) +- **Self-contained**: No additional Hadoop installation required +- **Mature implementation**: Long-established code from the Hadoop ecosystem + +#### Setup + +To use the Hadoop S3 FileSystem, copy the JAR file from the `opt` directory to the `plugins` directory: + +```bash +mkdir -p ./plugins/s3-fs-hadoop +cp ./opt/flink-s3-fs-hadoop-{{< version >}}.jar ./plugins/s3-fs-hadoop/ ``` -## Configure Path Style Access +#### Configuration -Some S3 compliant object stores might not have virtual host style addressing enabled by default, for example when using Standalone MinIO for testing purpose. In such cases, you will have to provide the property to enable path style access in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}). +Configure it using [Hadoop's s3a configuration keys](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A) by adding the configurations to your [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml -s3.path.style.access: true +# AWS credentials +s3.access-key: your-access-key +s3.secret-key: your-secret-key + +# Custom endpoint +s3.endpoint: your-endpoint-hostname + +# Path style access +s3.path-style-access: true + +# Connection settings +s3.connection.maximum: 10 + +# Credentials provider +fs.s3a.aws.credentials.provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider ``` -## Entropy injection for S3 file systems +Hadoop configuration keys are automatically translated. For example, `fs.s3a.connection.maximum` becomes `s3.connection.maximum`. Refer to the [Hadoop S3A documentation](https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#S3A) for all available options. -The bundled S3 file systems (`flink-s3-fs-presto` and `flink-s3-fs-hadoop`) support entropy injection. Entropy injection is -a technique to improve the scalability of AWS S3 buckets through adding some random characters near the beginning of the key. +--- + +## Using Multiple S3 Implementations + +You can use multiple S3 implementations simultaneously by leveraging their different URI schemes. For example, if a job uses the [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) sink (Hadoop-only) but Presto for checkpointing: + +- Use *s3a://* scheme for the sink (Hadoop) +- Use *s3p://* scheme for checkpointing (Presto) + +--- + +## Advanced Features + +### Entropy Injection + +All S3 file systems support entropy injection, a technique to improve the scalability of AWS S3 buckets through adding random characters near the beginning of the key. If entropy injection is activated, a configured substring in the path is replaced with random characters. For example, path `s3://my-bucket/_entropy_/checkpoints/dashboard-job/` would be replaced by something like `s3://my-bucket/gf36ikvg/checkpoints/dashboard-job/`. @@ -175,7 +330,9 @@ The `s3.entropy.key` defines the string in paths that is replaced by the random If a file system operation does not pass the *"inject entropy"* write option, the entropy key substring is simply removed. The `s3.entropy.length` defines the number of random alphanumeric characters used for entropy. -## s5cmd +### s5cmd + +**Supported by**: Presto S3 FileSystem, Hadoop S3 FileSystem Both `flink-s3-fs-hadoop` and `flink-s3-fs-presto` can be configured to use the [s5cmd tool](https://github.com/peak/s5cmd) for faster file upload and download. [Benchmark results](https://cwiki.apache.org/confluence/display/FLINK/FLIP-444%3A+Native+file+copy+support) are showing that `s5cmd` can be over 2 times more CPU efficient. @@ -187,7 +344,10 @@ Secondly, the path to the `s5cmd` has to be configured via: s3.s5cmd.path: /path/to/the/s5cmd ``` +#### Configuration + The remaining configuration options (with their default value listed below) are: + ```yaml # Extra arguments that will be passed directly to the s5cmd call. Please refer to the s5cmd's official documentation. s3.s5cmd.args: -r 0 @@ -196,17 +356,18 @@ s3.s5cmd.batch.max-size: 1024mb # Maximum number of files that will be uploaded via a single s5cmd call. s3.s5cmd.batch.max-files: 100 ``` -Both `s3.s5cmd.batch.max-size` and `s3.s5cmd.batch.max-files` are used to control resource usage of the `s5cmd` binary, to prevent it from overloading the task manager. -It is recommended to first configure and making sure Flink works without using `s5cmd` and only then enabling this feature. +Both `s3.s5cmd.batch.max-size` and `s3.s5cmd.batch.max-files` control resource usage of the `s5cmd` binary to prevent it from overloading the task manager. + +It is recommended to first configure and verify that Flink works without using `s5cmd`, then enable this feature. -### Credentials +#### Credentials -If you are using [access keys](#access-keys-discouraged), they will be passed to the `s5cmd`. -Apart from that `s5cmd` has its own independent (but similar) of Flink way of [using credentials](https://github.com/peak/s5cmd?tab=readme-ov-file#specifying-credentials). +If you are using [access keys](#access-keys-discouraged), they will be passed to `s5cmd`. +Apart from that, `s5cmd` has its own independent way of [using credentials](https://github.com/peak/s5cmd?tab=readme-ov-file#specifying-credentials). -### Limitations +#### Limitations -Currently, Flink will use `s5cmd` only during recovery, when downloading state files from S3 and using RocksDB. +Currently, Flink uses `s5cmd` only during recovery, when downloading state files from S3 and using RocksDB. {{< top >}} From 72d0f31d277c2ec51bb4cf83db96d943504d2e34 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Tue, 31 Mar 2026 23:21:12 +0530 Subject: [PATCH 2/3] Address to review comments. Specify the scheme supported by native-s3-fs. --- .../content/docs/deployment/filesystems/s3.md | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/docs/content/docs/deployment/filesystems/s3.md b/docs/content/docs/deployment/filesystems/s3.md index a233affce15a5..da1745c05b3d0 100644 --- a/docs/content/docs/deployment/filesystems/s3.md +++ b/docs/content/docs/deployment/filesystems/s3.md @@ -68,7 +68,7 @@ Note that these examples are *not* exhaustive and you can use S3 in other places Flink provides three independent S3 filesystem implementations, each with different trade-offs: -- **Native S3 FileSystem** (`flink-s3-fs-native`): A drop-in replacement built on AWS SDK v2 with minimal dependencies. **Experimental** in Flink 2.3. +- **Native S3 FileSystem** (`flink-s3-fs-native`): Built directly on AWS SDK v2 with async I/O and parallel transfers, this implementation supports both checkpointing and the FileSystem sink. [Benchmarks](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620396) show ~2x higher checkpoint throughput (~200 MB/s vs ~90 MB/s) compared to the Presto implementation at state sizes up to 15 GB. **Experimental** in Flink 2.3; the API and behavior may change in future releases. - **Presto S3 FileSystem** (`flink-s3-fs-presto`): Based on Presto project code, recommended for checkpointing. - **Hadoop S3 FileSystem** (`flink-s3-fs-hadoop`): Based on Hadoop project code, has FileSystem sink support. @@ -108,7 +108,7 @@ presto.s3.credentials-provider: org.apache.flink.fs.s3.common.token.DynamicTempo ### Configure Non-S3 Endpoint -The S3 filesystems also support using S3 compliant object stores such as [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and [MinIO](https://min.io/). +The S3 filesystems also support using S3 compliant object stores such as [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and [Cloudflare R2](https://developers.cloudflare.com/r2/). To do so, configure your endpoint in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml @@ -117,7 +117,7 @@ s3.endpoint: your-endpoint-hostname ### Configure Path Style Access -Some S3 compliant object stores might not have virtual host style addressing enabled by default, for example when using Standalone MinIO for testing purpose. In such cases, you will have to provide the property to enable path style access in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): +Some S3 compliant object stores might not have virtual host style addressing enabled by default. In such cases, you will have to provide the property to enable path style access in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml s3.path-style-access: true @@ -131,7 +131,7 @@ s3.path-style-access: true **Experimental**: The Native S3 FileSystem implementation is experimental in Flink 2.3. While functionally complete, it should not yet be used in production environments. Please use Presto or Hadoop implementations for production deployments. {{< /hint >}} -The Native S3 FileSystem is a pure-Java implementation built on the AWS SDK v2. It requires no additional dependencies and provides a drop-in replacement for the Presto and Hadoop implementations. +The Native S3 FileSystem is a pure-Java implementation built on the AWS SDK v2. It is registered under the schemes *s3://* and *s3a://*. It requires no additional dependencies and provides a drop-in replacement for the Presto and Hadoop implementations. #### Setup @@ -295,11 +295,25 @@ Hadoop configuration keys are automatically translated. For example, `fs.s3a.con ## Using Multiple S3 Implementations +All three S3 implementations register as handlers for the *s3://* scheme. Additionally, each implementation supports alternative schemes: + +| Implementation | Schemes | +|---------------|---------| +| Native S3 | *s3://*, *s3a://* | +| Presto | *s3://*, *s3p://* | +| Hadoop | *s3://*, *s3a://* | + +Only one implementation can handle a given scheme at a time. The Native S3 implementation has the lowest priority, so when another implementation is present, it will take precedence for the *s3://* scheme. + You can use multiple S3 implementations simultaneously by leveraging their different URI schemes. For example, if a job uses the [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) sink (Hadoop-only) but Presto for checkpointing: - Use *s3a://* scheme for the sink (Hadoop) - Use *s3p://* scheme for checkpointing (Presto) +{{< hint info >}} +The Native S3 implementation does not introduce a new URI scheme. It reuses the existing *s3://* and *s3a://* schemes. To use it alongside the Hadoop implementation, ensure only the Native S3 plugin JAR is in the `plugins` directory (i.e., do not have both `flink-s3-fs-native` and `flink-s3-fs-hadoop` plugins loaded simultaneously for the same scheme). +{{< /hint >}} + --- ## Advanced Features From a0429edc1f22883c10a1c394ccbd2cd7b2cffa34 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Wed, 1 Apr 2026 10:33:20 +0530 Subject: [PATCH 3/3] Address to review comments --- .../content/docs/deployment/filesystems/s3.md | 41 ++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/docs/content/docs/deployment/filesystems/s3.md b/docs/content/docs/deployment/filesystems/s3.md index da1745c05b3d0..b5c6a23ee1804 100644 --- a/docs/content/docs/deployment/filesystems/s3.md +++ b/docs/content/docs/deployment/filesystems/s3.md @@ -68,7 +68,7 @@ Note that these examples are *not* exhaustive and you can use S3 in other places Flink provides three independent S3 filesystem implementations, each with different trade-offs: -- **Native S3 FileSystem** (`flink-s3-fs-native`): Built directly on AWS SDK v2 with async I/O and parallel transfers, this implementation supports both checkpointing and the FileSystem sink. [Benchmarks](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620396) show ~2x higher checkpoint throughput (~200 MB/s vs ~90 MB/s) compared to the Presto implementation at state sizes up to 15 GB. **Experimental** in Flink 2.3; the API and behavior may change in future releases. +- **Native S3 FileSystem** (`flink-s3-fs-native`): Built directly on AWS SDK v2 with async I/O and parallel transfers removing the dependency from hadoop entirely. This implementation supports both checkpointing and the FileSystem sink. The Native S3 FileSystem aims to provide integrated support for checkpointing as well as FileSystem sink, removing the need to use Presto S3 FileSystem for checkpointing and Hadoop S3 FileSystem for the FileSystem sink. [Benchmarks](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406620396) show ~2x higher checkpoint throughput (~200 MB/s vs ~90 MB/s) compared to the Presto implementation at state sizes up to 15 GB. **Experimental** in Flink 2.3. - **Presto S3 FileSystem** (`flink-s3-fs-presto`): Based on Presto project code, recommended for checkpointing. - **Hadoop S3 FileSystem** (`flink-s3-fs-hadoop`): Based on Hadoop project code, has FileSystem sink support. @@ -100,15 +100,17 @@ s3.secret-key: your-secret-key You can limit this configuration to JobManagers by using [delegation tokens]({{< ref "docs/deployment/security/security-delegation-token" >}}): ```yaml -# For Native S3 or Hadoop implementations -fs.s3.aws.credentials.provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider +# For Native S3 implementation +fs.s3.aws.credentials.provider: org.apache.flink.fs.s3native.token.DynamicTemporaryAWSCredentialsProvider +# For Hadoop implementation +fs.s3a.aws.credentials.provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider # For Presto implementation presto.s3.credentials-provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider ``` ### Configure Non-S3 Endpoint -The S3 filesystems also support using S3 compliant object stores such as [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and [Cloudflare R2](https://developers.cloudflare.com/r2/). +The S3 filesystems also support using S3 compliant object stores. To do so, configure your endpoint in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}): ```yaml @@ -123,15 +125,19 @@ Some S3 compliant object stores might not have virtual host style addressing ena s3.path-style-access: true ``` -## S3 FileSystem Implementations +{{< hint info >}} +The legacy configuration key `s3.path.style.access` is still supported as a fallback for backward compatibility. +{{< /hint >}} -### Native S3 FileSystem +## Implementation Details + +### Native S3 FileSystem (Experimental) {{< hint warning >}} -**Experimental**: The Native S3 FileSystem implementation is experimental in Flink 2.3. While functionally complete, it should not yet be used in production environments. Please use Presto or Hadoop implementations for production deployments. +**Experimental**: The Native S3 FileSystem is experimental in Flink 2.3. It is functionally complete and has demonstrated strong performance in benchmarks. {{< /hint >}} -The Native S3 FileSystem is a pure-Java implementation built on the AWS SDK v2. It is registered under the schemes *s3://* and *s3a://*. It requires no additional dependencies and provides a drop-in replacement for the Presto and Hadoop implementations. +The Native S3 FileSystem is a pure-Java implementation built on the AWS SDK v2 completely removing the dependencies from hadoop. It is registered under the schemes *s3://* and *s3a://*. It requires no additional dependencies and provides a drop-in replacement for the Presto and Hadoop implementations. #### Setup @@ -145,8 +151,9 @@ cp ./opt/flink-s3-fs-native-{{< version >}}.jar ./plugins/s3-fs-native/ #### Features - **No external dependencies**: Built on AWS SDK v2 with minimal footprint -- **Drop-in replacement**: Compatible with the same S3 URI schemes (`s3://`) -- **Encryption support**: Server-side encryption (SSE) and KMS encryption +- **Drop-in replacement**: Compatible with the same S3 URI schemes (`s3://`, `s3a://`) +- **FileSystem sink support**: Supports the [FileSystem sink]({{< ref "docs/connectors/datastream/filesystem" >}}) via `RecoverableWriter` +- **Encryption support**: Server-side encryption (SSE-S3, SSE-KMS) - **Assume role**: Cross-account access via IAM role assumption - **Entropy injection**: Optimize S3 scalability through random key prefixes - **Bulk copy**: Efficient multi-part copy operations via S3TransferManager @@ -194,10 +201,12 @@ s3.entropy.length: 4 # Retry configuration s3.retry.max-num-retries: 3 -# Credentials provider -fs.s3.aws.credentials.provider: software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider +# Credentials provider (optional; see note below) +# fs.s3.aws.credentials.provider: software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider ``` +When `fs.s3.aws.credentials.provider` is not set, the Native S3 FileSystem automatically builds a credentials chain in the following order: delegation tokens, static credentials (if `s3.access-key` and `s3.secret-key` are configured), and the AWS SDK v2 `DefaultCredentialsProvider` (environment variables, instance profiles, etc.). You only need to set this option if you require a custom provider chain. + See the [AWS SDK v2 documentation](https://docs.aws.amazon.com/sdk-for-java/) for additional configuration details. --- @@ -254,7 +263,7 @@ The Hadoop S3 FileSystem is based on code from the [Hadoop Project](https://hado #### Features -- **FileSystem sink support**: The only S3 implementation with support for the [FileSystem sink]({{< ref "docs/connectors/datastream/filesystem" >}}) +- **FileSystem sink support**: Supports the [FileSystem sink]({{< ref "docs/connectors/datastream/filesystem" >}}) - **Self-contained**: No additional Hadoop installation required - **Mature implementation**: Long-established code from the Hadoop ecosystem @@ -303,7 +312,7 @@ All three S3 implementations register as handlers for the *s3://* scheme. Additi | Presto | *s3://*, *s3p://* | | Hadoop | *s3://*, *s3a://* | -Only one implementation can handle a given scheme at a time. The Native S3 implementation has the lowest priority, so when another implementation is present, it will take precedence for the *s3://* scheme. +It is safe to load multiple S3 plugin JARs simultaneously — the priority mechanism ensures only one factory handles each scheme. The Native S3 implementation has the lowest priority (`-1` vs the default `0`), so when another implementation is present, it will take precedence for all overlapping schemes (e.g., *s3://* and *s3a://*). You can override factory priorities via the `fs..priority.` configuration option. You can use multiple S3 implementations simultaneously by leveraging their different URI schemes. For example, if a job uses the [FileSystem]({{< ref "docs/connectors/datastream/filesystem" >}}) sink (Hadoop-only) but Presto for checkpointing: @@ -311,7 +320,9 @@ You can use multiple S3 implementations simultaneously by leveraging their diffe - Use *s3p://* scheme for checkpointing (Presto) {{< hint info >}} -The Native S3 implementation does not introduce a new URI scheme. It reuses the existing *s3://* and *s3a://* schemes. To use it alongside the Hadoop implementation, ensure only the Native S3 plugin JAR is in the `plugins` directory (i.e., do not have both `flink-s3-fs-native` and `flink-s3-fs-hadoop` plugins loaded simultaneously for the same scheme). +The Native S3 implementation does not introduce a new URI scheme. It supports the existing *s3://* and *s3a://* schemes. Since both the Native S3 and Hadoop implementations register for the same schemes, Flink uses a priority-based mechanism to select which factory handles each scheme. By default, Native S3 has the lowest priority and will **not** be selected when another implementation is present for the same scheme. + +To use the Native S3 implementation, either place only the `flink-s3-fs-native` plugin JAR in the `plugins` directory, or use the `fs..priority.` configuration to raise its priority while keeping other implementations loaded. {{< /hint >}} ---