-
Notifications
You must be signed in to change notification settings - Fork 486
[lake/lance] Refactor LanceArrowWriter #2345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[lake/lance] Refactor LanceArrowWriter #2345
Conversation
8630ee7 to
a1ed91b
Compare
|
Just to double-check, doesn’t this still retain the The original goal of the issue was to reuse the |
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @XuQianJin-Stars , I left some comments.
| <groupId>org.apache.fluss</groupId> | ||
| <artifactId>fluss-common</artifactId> | ||
| <version>${project.version}</version> | ||
| <scope>provided</scope> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need to include fluss-common into shaded jar? If this is needed for testing, we can add a test scope.
@luoyuxia could you also help to check this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we don't need to include fluss-common
| private static List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf> | ||
| getFieldBuffers( | ||
| org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector vector) { | ||
| try { | ||
| Method method = vector.getClass().getMethod("getFieldBuffers"); | ||
| return (List<org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf>) | ||
| method.invoke(vector); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to get field buffers from shaded vector", e); | ||
| } | ||
| } | ||
|
|
||
| private static int getValueCount( | ||
| org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector vector) { | ||
| try { | ||
| Method method = vector.getClass().getMethod("getValueCount"); | ||
| return (int) method.invoke(vector); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException("Failed to get value count from shaded vector", e); | ||
| } | ||
| } | ||
|
|
||
| private static ByteBuffer getByteBuffer( | ||
| org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf buf) { | ||
| try { | ||
| Method method = buf.getClass().getMethod("nioBuffer", long.class, int.class); | ||
| return (ByteBuffer) method.invoke(buf, 0L, (int) buf.capacity()); | ||
| } catch (Exception e) { | ||
| try { | ||
| Field field = buf.getClass().getDeclaredField("memoryAddress"); | ||
| field.setAccessible(true); | ||
| long address = (long) field.get(buf); | ||
| return null; | ||
| } catch (Exception ex) { | ||
| throw new RuntimeException("Failed to get ByteBuffer from ArrowBuf", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why introduce these reflections? It seems they provide these methods and can be directly invoked.
| FieldVector fieldVector = shadedRoot.getVector(i); | ||
| fieldWriters[i] = ArrowUtils.createArrowFieldWriter(fieldVector, rowType.getTypeAt(i)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we direclty use ArrowWriter instead of ArrowFieldWriter? It seems here missed to call initFieldVector which has been done in ArrowWriter.
| shadedRoot.setRowCount(recordsCount); | ||
| } | ||
|
|
||
| public void reset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is never called, should we call it in LanceLakeWriter#complete?
| @Override | ||
| public void write(LogRecord record) throws IOException { | ||
| arrowWriter.write(record); | ||
| buffer.add(record.getRow()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why buffer it first instead of writing to arrow directly? This introduce doubled memory overhead.
| List<FragmentMetadata> fragments = | ||
| Fragment.create(datasetUri, nonShadedAllocator, nonShadedRoot, writeParams); | ||
|
|
||
| allFragments.addAll(fragments); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use a memory shared variable allFragments, this can be a local variable and as a return value of the flush() method? I can't find the reset of allFragments.
1baa6a1 to
458473b
Compare
Purpose
Linked issue: close #1569
Refactor fluss-lake-lance module to eliminate code duplication by reusing fluss-common's ArrowFieldWriter implementations.
Issue: The fluss-lake-lance module previously maintained a separate LanceArrowWriter with duplicate FieldWriter implementations because fluss-common uses shaded Arrow API while Lance library requires non-shaded Arrow API.
Solution: Implement ArrowDataConverter to bridge shaded and non-shaded Arrow via zero-copy off-heap memory sharing. This allows lance module to reuse fluss-common's ArrowFieldWriter implementations, eliminating the need for duplicate writer code.
Brief change log
1. Eliminated Code Duplication (~400 lines removed)
LanceArrowWriterclass and all its inner FieldWriter classes from fluss-lake-lanceArrowFieldWriter2. Created ArrowDataConverter (Zero-Copy Bridge)
3. Created ShadedArrowBatchWriter (Reuse Adapter)
ArrowUtils.createArrowFieldWriter()writeRow(),finish(),reset()4. Refactored LanceLakeWriter (Unified Writer Path)
InternalRow→ShadedArrowBatchWriter→ArrowDataConverter→Lance Fragment.create()5. Architecture Benefits
Tests
Unit Tests
LanceTieringTest.testTieringWriteTable(with/without partitions)LakeEnabledTableCreateITCase(table creation with various data types)Verification
API and Format
API Changes: None - This is an internal refactoring
LakeWriter,LanceLakeTieringFactory) remain unchangedInternal Changes:
Documentation
Documentation Updates: Not required
Code Quality Improvements: