Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,17 @@ for buf in tbl.scan().to_arrow_batch_reader():
print(f"Buffer contains {len(buf)} rows")
```

### Streaming writes from a `RecordBatchReader`

`tbl.append()` and `tbl.overwrite()` also accept a `pyarrow.RecordBatchReader` directly, which lets you write datasets that don't fit in memory without materialising them as a `pa.Table` first. PyIceberg consumes the reader once, writing batches through a rolling Parquet writer that rolls a new file each time the on-disk file size hits `write.target-file-size-bytes` (default 512 MiB). Each input `RecordBatch` becomes a Parquet row group, capped at `write.parquet.row-group-limit` rows (default 1M) — caller batch size sets the lower bound on row group size, the property enforces the upper bound. All files are committed in a single snapshot.

```python
reader = pa.RecordBatchReader.from_batches(schema, batch_iter)
tbl.append(reader)
```

Streaming writes are currently only supported on **unpartitioned** tables. For a partitioned table, materialise the reader as a `pa.Table` first, or follow [#2152](https://github.com/apache/iceberg-python/issues/2152) for the partitioned support tracked as a follow-up.

To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:

```python
Expand Down
181 changes: 178 additions & 3 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2665,7 +2665,159 @@ def write_parquet(task: WriteTask) -> DataFile:
return iter(data_files)


def _record_batches_to_data_files(
table_metadata: TableMetadata,
reader: pa.RecordBatchReader,
io: FileIO,
write_uuid: uuid.UUID | None = None,
counter: itertools.count[int] | None = None,
) -> Iterator[DataFile]:
"""Stream a ``pa.RecordBatchReader`` into Parquet data files via a rolling ``pq.ParquetWriter``.

Each input ``RecordBatch`` is written directly via
``writer.write_batch``. File rollover is driven by ``OutputStream.tell()``
(#2998): after each batch, if ``tell() >= write.target-file-size-bytes``
the current writer is closed (footer written) and a new file is opened.
The threshold is measured in compressed on-disk bytes — matching the
spec interpretation of ``write.target-file-size-bytes``.

Row groups are capped at ``write.parquet.row-group-limit`` rows (default
1M) via the ``row_group_size`` argument to ``write_batch``: a batch
larger than the cap is split into multiple row groups, each ≤ the cap;
a batch smaller than the cap becomes a single row group of its own
size. Callers control the lower bound of row group size by their
choice of input batch size; pyiceberg enforces the upper bound. This
matches the materialised ``pa.Table`` write path's treatment of the
same property.

Memory per writer is bounded by one input ``RecordBatch`` plus the
``ParquetWriter``'s internal page buffer (~1 MiB by default). The
materialised ``pa.Table`` write path (``write_file``) keeps its
existing ``executor.map``-based file-level parallelism; streaming
writes are sequential — one rolling file at a time, with concurrency
provided by the underlying multipart upload pool.

Streaming writes to partitioned tables are not yet supported — see
https://github.com/apache/iceberg-python/issues/2152.
"""
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties

if not table_metadata.spec().is_unpartitioned():
raise NotImplementedError(
"Writing a pa.RecordBatchReader to a partitioned table is not yet supported. "
"Materialise the reader as a pa.Table first, or follow "
"https://github.com/apache/iceberg-python/issues/2152 for partitioned streaming support."
)

counter = counter or itertools.count(0)
write_uuid = write_uuid or uuid.uuid4()
target_file_size: int = property_as_int( # type: ignore # The property is set with non-None value.
properties=table_metadata.properties,
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)
name_mapping = table_metadata.schema().name_mapping
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
task_schema = pyarrow_to_schema(
reader.schema,
name_mapping=name_mapping,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
format_version=table_metadata.format_version,
)
table_schema = table_metadata.schema()
if (sanitized_schema := sanitize_column_names(table_schema)) != table_schema:
file_schema = sanitized_schema
else:
file_schema = table_schema

parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
row_group_size = property_as_int(
properties=table_metadata.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
)
location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties)
stats_columns = compute_statistics_plan(file_schema, table_metadata.properties)
column_mapping = parquet_path_to_id_mapping(file_schema)

def _transform(batch: pa.RecordBatch) -> pa.RecordBatch:
return _to_requested_schema(
requested_schema=file_schema,
file_schema=task_schema,
batch=batch,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us,
include_field_ids=True,
)

def _new_data_file_path() -> str:
# Mirrors WriteTask.generate_data_file_filename to keep file names compatible
# with the materialised write path.
filename = f"00000-{next(counter)}-{write_uuid}.parquet"
return location_provider.new_data_location(data_file_name=filename)

def _build_data_file(file_path: str, output_file: OutputFile, parquet_metadata: Any) -> DataFile:
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=parquet_metadata,
stats_columns=stats_columns,
parquet_column_mapping=column_mapping,
)
return DataFile.from_args(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=Record(),
file_size_in_bytes=len(output_file),
sort_order_id=None,
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict(),
)

batches = iter(reader)
while True:
# Pull the next batch up front. If the reader is exhausted (either at the
# very start or between rolled files), we're done — yield nothing further.
try:
first_batch = next(batches)
except StopIteration:
return

transformed_first = _transform(first_batch)
file_path = _new_data_file_path()
output_file = io.new_output(file_path)
with output_file.create(overwrite=True) as fos:
with pq.ParquetWriter(
fos,
schema=transformed_first.schema,
store_decimal_as_integer=True,
**parquet_writer_kwargs,
) as writer:
writer.write_batch(transformed_first, row_group_size=row_group_size)
# Keep writing into this file until the on-disk byte threshold
# is crossed. ``tell()`` advances as ``write_batch`` flushes
# encoded pages to the stream — files end up close to but
# slightly above ``target_file_size`` (lag bounded by one
# Parquet data page, ~1 MiB by default).
while fos.tell() < target_file_size:
try:
batch = next(batches)
except StopIteration:
break
writer.write_batch(_transform(batch), row_group_size=row_group_size)
# writer is closed (footer written) and the OutputStream is flushed.
# writer.writer.metadata is still readable post-close — same access
# pattern used by write_file().
yield _build_data_file(file_path, output_file, writer.writer.metadata)


def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[list[pa.RecordBatch]]:
"""Bin-pack ``tbl`` into groups of RecordBatches, each ~``target_file_size`` uncompressed Arrow bytes.

Used by the materialised ``pa.Table`` write path (``_dataframe_to_data_files``
+ ``write_file``) to split a fully in-memory table into multiple Parquet
files written in parallel.
"""
from pyiceberg.utils.bin_packing import PackingIterator

avg_row_size_bytes = tbl.nbytes / tbl.num_rows
Expand Down Expand Up @@ -2800,15 +2952,24 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]:

def _dataframe_to_data_files(
table_metadata: TableMetadata,
df: pa.Table,
df: pa.Table | pa.RecordBatchReader,
io: FileIO,
write_uuid: uuid.UUID | None = None,
counter: itertools.count[int] | None = None,
) -> Iterable[DataFile]:
"""Convert a PyArrow table into a DataFile.
"""Convert a PyArrow Table or RecordBatchReader into DataFiles.

For a ``pa.Table`` the data is materialised in memory and bin-packed into
target-sized files (with partition splitting if the table is partitioned).

For a ``pa.RecordBatchReader`` batches are streamed and microbatched into
target-sized files using bounded memory (see :func:`bin_pack_record_batches`).
Streaming writes are currently only supported on unpartitioned tables;
partitioned support is tracked in
https://github.com/apache/iceberg-python/issues/2152.

Returns:
An iterable that supplies datafiles that represent the table.
An iterable that supplies datafiles that represent the input data.
"""
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties, WriteTask

Expand All @@ -2828,6 +2989,20 @@ def _dataframe_to_data_files(
format_version=table_metadata.format_version,
)

if isinstance(df, pa.RecordBatchReader):
# Streaming path: rolling ParquetWriter driven by OutputStream.tell()
# for constant-memory writes and on-disk-accurate file sizes.
# Partitioned-table support is the responsibility of
# _record_batches_to_data_files; the NotImplementedError is raised there.
yield from _record_batches_to_data_files(
table_metadata=table_metadata,
reader=df,
io=io,
write_uuid=write_uuid,
counter=counter,
)
return

if table_metadata.spec().is_unpartitioned():
yield from write_file(
io=io,
Expand Down
Loading
Loading