-
Notifications
You must be signed in to change notification settings - Fork 486
[lake/iceberg] Support MAP type in Iceberg tables #2367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[lake/iceberg] Support MAP type in Iceberg tables #2367
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request adds support for MAP data types in Iceberg tables for Fluss, addressing issue #2258. The implementation enables conversion of Fluss MAP types to Iceberg MapType and provides runtime support for reading and writing map data.
Changes:
- Implemented MAP type conversion from Fluss to Iceberg format in
FlussDataTypeToIcebergDataType - Added
FlussMapAsIcebergMapadapter class for converting Fluss InternalMap to Java Map for Iceberg - Enhanced
FlussArrayAsIcebergListandFlussRowAsIcebergRecordto support nested MAP types - Added FLOAT and DOUBLE support to
IcebergBinaryRowWriter.createFieldWriter - Added explicit error handling for MAP and ARRAY types in bucket key writer
- Comprehensive test coverage for MAP type conversions, including nested structures
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
FlussDataTypeToIcebergDataType.java |
Replaced UnsupportedOperationException with actual MAP type conversion logic, handling field ID allocation |
FlussMapAsIcebergMap.java |
New adapter class for converting Fluss InternalMap to Java Map, with support for all scalar types and nested collections |
FlussRowAsIcebergRecord.java |
Added MAP type handling in field converter creation |
FlussArrayAsIcebergList.java |
Added support for MAP elements within arrays |
IcebergBinaryRowWriter.java |
Added FLOAT/DOUBLE type support and explicit error messages for non-scalar bucket key types |
FlussDataTypeToIcebergDataTypeMapTest.java |
New comprehensive test suite covering MAP type conversions with various key-value type combinations |
FlussRowAsIcebergRecordTest.java |
Added integration tests for MAP data conversion including nested structures |
IcebergBinaryRowWriterTest.java |
Added tests to verify MAP and ARRAY types are properly rejected as bucket keys |
Comments suppressed due to low confidence (1)
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussArrayAsIcebergList.java:118
- The get method is missing support for RowType, which is a supported nested type in Arrays. When an Array contains Row values (e.g., Array<Row<id: Int, name: String>>), this method will throw an UnsupportedOperationException. Add a case to handle RowType by retrieving the InternalRow from the array and converting it appropriately, similar to how it's done in FlussRowAsIcebergRecord.createTypeConverter.
public Object get(int index) {
if (flussArray.isNullAt(index)) {
return null;
}
if (elementType instanceof BooleanType) {
return flussArray.getBoolean(index);
} else if (elementType instanceof TinyIntType) {
return (int) flussArray.getByte(index);
} else if (elementType instanceof SmallIntType) {
return (int) flussArray.getShort(index);
} else if (elementType instanceof IntType) {
return flussArray.getInt(index);
} else if (elementType instanceof BigIntType) {
return flussArray.getLong(index);
} else if (elementType instanceof FloatType) {
return flussArray.getFloat(index);
} else if (elementType instanceof DoubleType) {
return flussArray.getDouble(index);
} else if (elementType instanceof StringType) {
return flussArray.getString(index).toString();
} else if (elementType instanceof CharType) {
CharType charType = (CharType) elementType;
return flussArray.getChar(index, charType.getLength()).toString();
} else if (elementType instanceof BytesType || elementType instanceof BinaryType) {
return ByteBuffer.wrap(flussArray.getBytes(index));
} else if (elementType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) elementType;
return flussArray
.getDecimal(index, decimalType.getPrecision(), decimalType.getScale())
.toBigDecimal();
} else if (elementType instanceof LocalZonedTimestampType) {
LocalZonedTimestampType ltzType = (LocalZonedTimestampType) elementType;
return toIcebergTimestampLtz(
flussArray.getTimestampLtz(index, ltzType.getPrecision()).toInstant());
} else if (elementType instanceof TimestampType) {
TimestampType tsType = (TimestampType) elementType;
return flussArray.getTimestampNtz(index, tsType.getPrecision()).toLocalDateTime();
} else if (elementType instanceof DateType) {
return DateTimeUtils.toLocalDate(flussArray.getInt(index));
} else if (elementType instanceof TimeType) {
return DateTimeUtils.toLocalTime(flussArray.getInt(index));
} else if (elementType instanceof ArrayType) {
InternalArray innerArray = flussArray.getArray(index);
return innerArray == null
? null
: new FlussArrayAsIcebergList(
innerArray, ((ArrayType) elementType).getElementType());
} else if (elementType instanceof MapType) {
MapType mapType = (MapType) elementType;
InternalMap internalMap = flussArray.getMap(index);
return new FlussMapAsIcebergMap(
internalMap, mapType.getKeyType(), mapType.getValueType());
} else {
throw new UnsupportedOperationException(
"Unsupported array element type conversion for Fluss type: "
+ elementType.getClass().getSimpleName());
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...ss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/FlussMapAsIcebergMap.java
Show resolved
Hide resolved
...-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/FlussRowAsIcebergRecordTest.java
Show resolved
Hide resolved
...lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
Outdated
Show resolved
Hide resolved
7985d6f to
ea54108
Compare
ea54108 to
c5156b9
Compare
|
All issues raised by Copilot have been resolved. @luoyuxia |
Purpose
Linked issue: #2258
Brief change log
Tests
API and Format
Documentation