Skip to content

pw.io.s3 needs a write counterpart #216

@zxqfd555

Description

@zxqfd555

Is your feature request related to a problem? Please describe.

pw.io.s3 and pw.io.minio currently expose only a read method. There is no native way to write Pathway pipeline output back to S3 or MinIO, forcing users to implement custom Python output connectors which bypass engine-level batching and reliability guarantees.

Unlike a local filesystem, S3 and MinIO do not support appending to existing objects — every write must be a complete new object. This rules out the row-by-row write strategy used by file system connectors and requires a dedicated batching approach.

Describe the solution you'd like

Add pw.io.s3.write and pw.io.minio.write, implemented in Rust, following the same connector patterns as the existing output connectors.

API:

pw.io.s3.write(
    table,
    path,                  # S3 prefix / directory to write into
    format,                # "csv" | "json" | ...
    *,
    write_interval_ms,        # how often to flush a new object, in milliseconds
    name=None,
)

pw.io.minio.write(
    table,
    path,
    format,
    *,
    write_interval_ms,
    name=None,
)

Implementation:

The connector is split into a formatter and a writer, consistent with the existing connector architecture:

  • The formatter serializes incoming rows into the chosen format (CSV, JSON, etc.), producing a byte string per row.
  • The writer appends each formatted row to an in-memory buffer. The buffer is never written to S3/MinIO directly during normal row processing.

Flushing works as follows: when the engine calls flush on the write connector, the writer checks whether now - last_write_timestamp >= write_interval. If yes, it uploads the buffer as a new S3/MinIO object (with a timestamped or sequenced key under the configured prefix), and on success updates last_write_timestamp and clears the buffer. If no, the flush is a no-op — the buffer is retained as-is and last_write_timestamp is not updated, so the data will be included in the next successful flush. This ensures no data is lost between flush cycles and that each uploaded object is a complete, self-contained chunk.

Object naming within the prefix should be deterministic and monotonically increasing (e.g. using the flush timestamp or a sequence number) so that pw.io.s3.read / pw.io.minio.read can consume the output directory correctly.

Describe alternatives you've considered

Uploading one object per row would generate excessive S3 API call costs and small-object overhead. Streaming via multipart upload could allow appending within a single object, but adds significant complexity and still requires a final CompleteMultipartUpload call, making it unsuitable for a continuously running pipeline where the "end" of a file is never reached.

Additional context

Testing: integration tests should follow the existing pattern for S3/MinIO connectors and use a MinIO container (which is S3-compatible) in the Docker Compose test suite. Coverage should include:

  • Basic write and read-back: write rows via pw.io.s3.write, read back via pw.io.s3.read, assert equality.
  • Flush timing: verify that no object is written before write_interval elapses and that one is written shortly after.
  • Buffer retention on early flush: verify that rows are not lost when a flush occurs before write_interval and are included in the next successful flush.
  • Multiple flush cycles: verify that successive objects under the prefix are all readable by pw.io.s3.read and that the full dataset is correct.
  • Format coverage: at minimum CSV and JSON.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions