Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,17 @@ for buf in tbl.scan().to_arrow_batch_reader():
print(f"Buffer contains {len(buf)} rows")
```

### Streaming writes from a `RecordBatchReader`

`tbl.append()` and `tbl.overwrite()` also accept a `pyarrow.RecordBatchReader` directly, which lets you write datasets that don't fit in memory without materialising them as a `pa.Table` first. PyIceberg consumes the reader once and microbatches it into Parquet files of approximately `write.target-file-size-bytes` (default 512 MiB), keeping memory usage bounded by the target size. All files are committed in a single snapshot.

```python
reader = pa.RecordBatchReader.from_batches(schema, batch_iter)
tbl.append(reader)
```

Streaming writes are currently only supported on **unpartitioned** tables. For a partitioned table, materialise the reader as a `pa.Table` first, or follow [#2152](https://github.com/apache/iceberg-python/issues/2152) for the partitioned support tracked as a follow-up.

To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:

```python
Expand Down
79 changes: 76 additions & 3 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2666,6 +2666,18 @@ def write_parquet(task: WriteTask) -> DataFile:


def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
"""Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size``.

Note:
``target_file_size`` is measured in **uncompressed in-memory** Arrow bytes
(``Table.nbytes`` / ``RecordBatch.nbytes``), not compressed on-disk Parquet
bytes. The resulting Parquet file after compression (zstd by default,
plus dictionary/RLE encoding) is typically 3-10× smaller than
``target_file_size``. This is a coarse proxy for the spec-defined
``write.target-file-size-bytes`` and will be tightened to true on-disk
bytes once the writer is switched to a rolling-``ParquetWriter`` with
``OutputStream.tell()`` (#2998).
"""
from pyiceberg.utils.bin_packing import PackingIterator

avg_row_size_bytes = tbl.nbytes / tbl.num_rows
Expand All @@ -2681,6 +2693,41 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[
return bin_packed_record_batches


def bin_pack_record_batches(batches: Iterable[pa.RecordBatch], target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
"""Microbatch a single-pass stream of RecordBatches into target-sized groups.

Unlike :func:`bin_pack_arrow_table`, this consumes ``batches`` lazily and
holds at most one in-flight buffer in memory, bounded by ``target_file_size``.
Suitable for streaming inputs (``pa.RecordBatchReader``,
``Iterator[pa.RecordBatch]``) where the total size is unknown up front and
the caller cannot afford to materialise the full dataset.

Each yielded list of batches is intended to be written as a single Parquet
data file. Because this is single-pass FIFO accumulation (no lookback), the
last bin may be smaller than ``target_file_size``.

Note:
``target_file_size`` is measured in **uncompressed in-memory** Arrow
bytes (``RecordBatch.nbytes``), not compressed on-disk Parquet bytes.
The resulting Parquet file after compression is typically 3-10×
smaller than ``target_file_size``. Matches the existing
:func:`bin_pack_arrow_table` semantics; both will be tightened to true
on-disk bytes once the writer is switched to a rolling-
``ParquetWriter`` with ``OutputStream.tell()`` (#2998).
"""
buffer: list[pa.RecordBatch] = []
buffer_bytes = 0
for batch in batches:
buffer.append(batch)
buffer_bytes += batch.nbytes
if buffer_bytes >= target_file_size:
yield buffer
buffer = []
buffer_bytes = 0
if buffer:
yield buffer


def _check_pyarrow_schema_compatible(
requested_schema: Schema,
provided_schema: pa.Schema,
Expand Down Expand Up @@ -2800,15 +2847,24 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]:

def _dataframe_to_data_files(
table_metadata: TableMetadata,
df: pa.Table,
df: pa.Table | pa.RecordBatchReader,
io: FileIO,
write_uuid: uuid.UUID | None = None,
counter: itertools.count[int] | None = None,
) -> Iterable[DataFile]:
"""Convert a PyArrow table into a DataFile.
"""Convert a PyArrow Table or RecordBatchReader into DataFiles.

For a ``pa.Table`` the data is materialised in memory and bin-packed into
target-sized files (with partition splitting if the table is partitioned).

For a ``pa.RecordBatchReader`` batches are streamed and microbatched into
target-sized files using bounded memory (see :func:`bin_pack_record_batches`).
Streaming writes are currently only supported on unpartitioned tables;
partitioned support is tracked in
https://github.com/apache/iceberg-python/issues/2152.

Returns:
An iterable that supplies datafiles that represent the table.
An iterable that supplies datafiles that represent the input data.
"""
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties, WriteTask

Expand All @@ -2828,6 +2884,23 @@ def _dataframe_to_data_files(
format_version=table_metadata.format_version,
)

if isinstance(df, pa.RecordBatchReader):
if not table_metadata.spec().is_unpartitioned():
raise NotImplementedError(
"Writing a pa.RecordBatchReader to a partitioned table is not yet supported. "
"Materialise the reader as a pa.Table first, or follow "
"https://github.com/apache/iceberg-python/issues/2152 for partitioned streaming support."
)
yield from write_file(
io=io,
table_metadata=table_metadata,
tasks=(
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema)
for batches in bin_pack_record_batches(df, target_file_size)
),
)
return

if table_metadata.spec().is_unpartitioned():
yield from write_file(
io=io,
Expand Down
140 changes: 116 additions & 24 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,53 @@ def update_statistics(self) -> UpdateStatistics:
"""
return UpdateStatistics(transaction=self)

def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None:
def append(
self,
df: pa.Table | pa.RecordBatchReader,
snapshot_properties: dict[str, str] = EMPTY_DICT,
branch: str | None = MAIN_BRANCH,
) -> None:
"""
Shorthand API for appending a PyArrow table to a table transaction.
Shorthand API for appending PyArrow data to a table transaction.

Accepts either a fully materialised ``pa.Table`` or a streaming
``pa.RecordBatchReader``. Streaming is microbatched by
``write.target-file-size-bytes`` so memory stays bounded; the reader is
consumed once and cannot be reused.

Streaming writes are currently only supported on unpartitioned tables;
passing a ``pa.RecordBatchReader`` for a partitioned table raises
``NotImplementedError``. See
https://github.com/apache/iceberg-python/issues/2152.

Note:
When ``df`` is a ``pa.RecordBatchReader`` the reader is consumed
once and cannot be replayed. If the catalog commit fails (e.g.
``CommitFailedException`` from a concurrent writer) the reader is
already drained and a naive retry will append zero rows. Callers
that need at-least-once semantics should either:

- reconstruct the reader on each attempt via a factory callable,
or
- use a two-stage pattern — write Parquet files explicitly and
then call :meth:`add_files` (whose input is a replayable list of
paths) within a retry loop.

Failures during the write stage (mid-stream reader exception, S3
errors) do not commit a snapshot, but may leave orphan data files
in storage that are not referenced by any snapshot. Clean these
up with expire/orphan-file maintenance jobs.

``write.target-file-size-bytes`` is currently interpreted as
uncompressed in-memory Arrow bytes (the bin-packing weight) rather
than compressed on-disk Parquet bytes. The resulting files are
typically 3-10× smaller than the property suggests after
compression. This matches the existing ``pa.Table`` write path and
will be tightened once the writer is switched to a
rolling-``ParquetWriter`` with ``OutputStream.tell()`` (#2998).

Args:
df: The Arrow dataframe that will be appended to overwrite the table
df: An Arrow Table or a RecordBatchReader of records to append.
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch Reference to run the append operation
"""
Expand All @@ -466,8 +507,8 @@ def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT,

from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

if not isinstance(df, pa.Table):
raise ValueError(f"Expected PyArrow table, got: {df}")
if not isinstance(df, (pa.Table, pa.RecordBatchReader)):
raise ValueError(f"Expected pa.Table or pa.RecordBatchReader, got: {df}")

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
Expand All @@ -478,12 +519,14 @@ def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT,
)

with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
data_files = list(
_dataframe_to_data_files(
table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
)
# For pa.Table we can short-circuit empty inputs cheaply. For a
# RecordBatchReader the stream is consumed lazily by
# _dataframe_to_data_files and an empty reader simply yields zero
# data files (the snapshot is still committed for symmetry with the
# pa.Table case where empty inputs also produce a snapshot).
if isinstance(df, pa.RecordBatchReader) or df.shape[0] > 0:
data_files = _dataframe_to_data_files(
table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
)
for data_file in data_files:
append_files.append_data_file(data_file)
Expand Down Expand Up @@ -555,14 +598,50 @@ def dynamic_partition_overwrite(

def overwrite(
self,
df: pa.Table,
df: pa.Table | pa.RecordBatchReader,
overwrite_filter: BooleanExpression | str = ALWAYS_TRUE,
snapshot_properties: dict[str, str] = EMPTY_DICT,
case_sensitive: bool = True,
branch: str | None = MAIN_BRANCH,
) -> None:
"""
Shorthand for adding a table overwrite with a PyArrow table to the transaction.
Shorthand for adding a table overwrite with a PyArrow table or RecordBatchReader to the transaction.

Accepts either a fully materialised ``pa.Table`` or a streaming
``pa.RecordBatchReader``. Streaming is microbatched by
``write.target-file-size-bytes`` so memory stays bounded; the reader is
consumed once and cannot be reused.

Streaming writes are currently only supported on unpartitioned tables;
passing a ``pa.RecordBatchReader`` for a partitioned table raises
``NotImplementedError``. See
https://github.com/apache/iceberg-python/issues/2152.

Note:
When ``df`` is a ``pa.RecordBatchReader`` the reader is consumed
once and cannot be replayed. If the catalog commit fails (e.g.
``CommitFailedException`` from a concurrent writer) the reader is
already drained and a naive retry will write zero rows. Callers
that need at-least-once semantics should either:

- reconstruct the reader on each attempt via a factory callable,
or
- use a two-stage pattern — write Parquet files explicitly and
then call :meth:`add_files` (whose input is a replayable list
of paths) within a retry loop.

Failures during the write stage (mid-stream reader exception, S3
errors) do not commit a snapshot, but may leave orphan data files
in storage that are not referenced by any snapshot. Clean these
up with expire/orphan-file maintenance jobs.

``write.target-file-size-bytes`` is currently interpreted as
uncompressed in-memory Arrow bytes (the bin-packing weight) rather
than compressed on-disk Parquet bytes. The resulting files are
typically 3-10× smaller than the property suggests after
compression. This matches the existing ``pa.Table`` write path and
will be tightened once the writer is switched to a
rolling-``ParquetWriter`` with ``OutputStream.tell()`` (#2998).

An overwrite may produce zero or more snapshots based on the operation:

Expand All @@ -571,7 +650,7 @@ def overwrite(
- APPEND: In case new data is being inserted into the table.

Args:
df: The Arrow dataframe that will be used to overwrite the table
df: An Arrow Table or a RecordBatchReader of records to write.
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
snapshot_properties: Custom properties to be added to the snapshot summary
Expand All @@ -585,8 +664,8 @@ def overwrite(

from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

if not isinstance(df, pa.Table):
raise ValueError(f"Expected PyArrow table, got: {df}")
if not isinstance(df, (pa.Table, pa.RecordBatchReader)):
raise ValueError(f"Expected pa.Table or pa.RecordBatchReader, got: {df}")

downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
_check_pyarrow_schema_compatible(
Expand All @@ -606,8 +685,8 @@ def overwrite(
)

with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
# skip writing data files if the dataframe is empty
if df.shape[0] > 0:
# See append() for the empty-input handling rationale.
if isinstance(df, pa.RecordBatchReader) or df.shape[0] > 0:
data_files = _dataframe_to_data_files(
table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
)
Expand Down Expand Up @@ -1373,12 +1452,21 @@ def upsert(
snapshot_properties=snapshot_properties,
)

def append(self, df: pa.Table, snapshot_properties: dict[str, str] = EMPTY_DICT, branch: str | None = MAIN_BRANCH) -> None:
def append(
self,
df: pa.Table | pa.RecordBatchReader,
snapshot_properties: dict[str, str] = EMPTY_DICT,
branch: str | None = MAIN_BRANCH,
) -> None:
"""
Shorthand API for appending a PyArrow table to the table.
Shorthand API for appending PyArrow data to the table.

Accepts either a ``pa.Table`` or a streaming ``pa.RecordBatchReader``.
See :meth:`Transaction.append` for streaming semantics and partition
limitations.

Args:
df: The Arrow dataframe that will be appended to overwrite the table
df: An Arrow Table or a RecordBatchReader of records to append.
snapshot_properties: Custom properties to be added to the snapshot summary
branch: Branch Reference to run the append operation
"""
Expand All @@ -1401,14 +1489,18 @@ def dynamic_partition_overwrite(

def overwrite(
self,
df: pa.Table,
df: pa.Table | pa.RecordBatchReader,
overwrite_filter: BooleanExpression | str = ALWAYS_TRUE,
snapshot_properties: dict[str, str] = EMPTY_DICT,
case_sensitive: bool = True,
branch: str | None = MAIN_BRANCH,
) -> None:
"""
Shorthand for overwriting the table with a PyArrow table.
Shorthand for overwriting the table with a PyArrow Table or RecordBatchReader.

Accepts either a ``pa.Table`` or a streaming ``pa.RecordBatchReader``.
See :meth:`Transaction.overwrite` for streaming semantics and partition
limitations.

An overwrite may produce zero or more snapshots based on the operation:

Expand All @@ -1417,7 +1509,7 @@ def overwrite(
- APPEND: In case new data is being inserted into the table.

Args:
df: The Arrow dataframe that will be used to overwrite the table
df: An Arrow Table or a RecordBatchReader of records to write.
overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
or a boolean expression in case of a partial overwrite
snapshot_properties: Custom properties to be added to the snapshot summary
Expand Down
Loading
Loading