Skip to content

Commit a0ed403

Browse files
committed
[fix](fe) Address Paimon JDBC review feedback
### What problem does this PR solve? Issue Number: None Related PR: #61513 Problem Summary: Address review feedback for Paimon JDBC JNI scans by moving backend Paimon options to scan-level params with BE fallback, aligning driver_class validation exceptions, cleaning up JDBC driver test state, and removing the unrelated build change from the PR branch history. ### Release note None ### Check List (For Author) - Test: Unit Test / Static check - git diff --check on the staged files - ./run-fe-ut.sh --run org.apache.doris.datasource.property.metastore.PaimonJdbcMetaStorePropertiesTest,org.apache.doris.datasource.paimon.source.PaimonScanNodeTest,org.apache.doris.paimon.PaimonJdbcDriverUtilsTest (still running in the PR worktree at commit time) - Behavior changed: Yes (Paimon backend options now propagate at scan level with BE fallback; validation exception type is aligned) - Does this need documentation: No
1 parent ae13439 commit a0ed403

8 files changed

Lines changed: 118 additions & 10 deletions

File tree

be/src/format/table/paimon_cpp_reader.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,10 @@ std::vector<std::string> PaimonCppReader::_build_read_columns() const {
269269

270270
std::map<std::string, std::string> PaimonCppReader::_build_options() const {
271271
std::map<std::string, std::string> options;
272-
if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params &&
273-
_range.table_format_params.paimon_params.__isset.paimon_options) {
272+
if (_range_params && _range_params->__isset.paimon_options && !_range_params->paimon_options.empty()) {
273+
options.insert(_range_params->paimon_options.begin(), _range_params->paimon_options.end());
274+
} else if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params &&
275+
_range.table_format_params.paimon_params.__isset.paimon_options) {
274276
options.insert(_range.table_format_params.paimon_params.paimon_options.begin(),
275277
_range.table_format_params.paimon_params.paimon_options.end());
276278
}
@@ -310,7 +312,6 @@ std::map<std::string, std::string> PaimonCppReader::_build_options() const {
310312
copy_if_missing("fs.s3a.region", "AWS_REGION");
311313
copy_if_missing("fs.s3a.path.style.access", "use_path_style");
312314

313-
// FE currently does not pass paimon_options in scan ranges.
314315
// Backfill file.format/manifest.format from split file_format to avoid
315316
// paimon-cpp falling back to default manifest.format=avro.
316317
if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params &&

be/src/format/table/paimon_jni_reader.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,15 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d
6565
if (range_params->__isset.serialized_table) {
6666
params["serialized_table"] = range_params->serialized_table;
6767
}
68-
for (const auto& kv : paimon_params.paimon_options) {
69-
params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
68+
if (range_params->__isset.paimon_options &&
69+
!range_params->paimon_options.empty()) {
70+
for (const auto& kv : range_params->paimon_options) {
71+
params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
72+
}
73+
} else if (paimon_params.__isset.paimon_options) {
74+
for (const auto& kv : paimon_params.paimon_options) {
75+
params[PAIMON_OPTION_PREFIX + kv.first] = kv.second;
76+
}
7077
}
7178
if (range_params->__isset.properties && !range_params->properties.empty()) {
7279
for (const auto& kv : range_params->properties) {

fe/be-java-extensions/paimon-scanner/src/test/java/org/apache/doris/paimon/PaimonJdbcDriverUtilsTest.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.common.classloader.JniScannerClassLoader;
2121

22+
import org.junit.After;
2223
import org.junit.Assert;
2324
import org.junit.Test;
2425

@@ -30,6 +31,7 @@
3031
import java.sql.DriverManager;
3132
import java.sql.DriverPropertyInfo;
3233
import java.sql.SQLFeatureNotSupportedException;
34+
import java.util.ArrayList;
3335
import java.util.HashMap;
3436
import java.util.List;
3537
import java.util.Map;
@@ -39,17 +41,34 @@
3941
import java.util.logging.Logger;
4042

4143
public class PaimonJdbcDriverUtilsTest {
44+
private final List<Driver> registeredDrivers = new ArrayList<>();
45+
private final List<Path> tempJars = new ArrayList<>();
46+
47+
@After
48+
public void tearDown() throws Exception {
49+
for (Driver driver : registeredDrivers) {
50+
DriverManager.deregisterDriver(driver);
51+
}
52+
registeredDrivers.clear();
53+
for (Path tempJar : tempJars) {
54+
Files.deleteIfExists(tempJar);
55+
}
56+
tempJars.clear();
57+
}
58+
4259
@Test
4360
public void testRegisterDriverIfNeeded() throws Exception {
4461
Path driverJar = createDriverJar();
4562
Map<String, String> params = new HashMap<>();
4663
params.put(PaimonJdbcDriverUtils.PAIMON_JDBC_DRIVER_URL, driverJar.toUri().toURL().toString());
4764
params.put(PaimonJdbcDriverUtils.PAIMON_JDBC_DRIVER_CLASS, DummyJdbcDriver.class.getName());
4865

49-
JniScannerClassLoader scannerClassLoader = new JniScannerClassLoader("paimon-test", List.of(), null);
66+
JniScannerClassLoader scannerClassLoader =
67+
new JniScannerClassLoader("paimon-test", List.of(), ClassLoader.getPlatformClassLoader());
5068
PaimonJdbcDriverUtils.registerDriverIfNeeded(params, scannerClassLoader);
5169

5270
Driver driver = DriverManager.getDriver("jdbc:dummy:test");
71+
registeredDrivers.add(driver);
5372
Assert.assertTrue(driver.acceptsURL("jdbc:dummy:test"));
5473
}
5574

@@ -65,6 +84,7 @@ public void testRegisterDriverIfNeededRequiresDriverClass() {
6584

6685
private Path createDriverJar() throws IOException {
6786
Path jarPath = Files.createTempFile("paimon-jdbc-driver", ".jar");
87+
tempJars.add(jarPath);
6888
String resourceName = DummyJdbcDriver.class.getName().replace('.', '/') + ".class";
6989
try (JarOutputStream jarOutputStream = new JarOutputStream(Files.newOutputStream(jarPath));
7090
InputStream inputStream = DummyJdbcDriver.class.getClassLoader().getResourceAsStream(resourceName)) {

fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,13 @@ public void createScanRangeLocations() throws UserException {
205205
// Set paimon_predicate at ScanNode level to avoid redundant serialization in each split
206206
String serializedPredicate = PaimonUtil.encodeObjectToString(predicates);
207207
params.setPaimonPredicate(serializedPredicate);
208+
setScanLevelPaimonOptions();
209+
}
210+
211+
private void setScanLevelPaimonOptions() {
212+
if (!backendPaimonOptions.isEmpty()) {
213+
params.setPaimonOptions(backendPaimonOptions);
214+
}
208215
}
209216

210217
private void putHistorySchemaInfo(Long schemaId) {
@@ -261,9 +268,6 @@ private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit)
261268
fileDesc.setSchemaId(paimonSplit.getSchemaId());
262269
}
263270
fileDesc.setFileFormat(fileFormat);
264-
if (!backendPaimonOptions.isEmpty()) {
265-
fileDesc.setPaimonOptions(backendPaimonOptions);
266-
}
267271
// Hadoop conf is set at ScanNode level via params.properties in createScanRangeLocations(),
268272
// no need to set it for each split to avoid redundant configuration
269273
Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public Map<String, String> getBackendPaimonOptions() {
166166
return Collections.emptyMap();
167167
}
168168
if (StringUtils.isBlank(driverClass)) {
169-
throw new IllegalStateException("jdbc.driver_class or paimon.jdbc.driver_class is required when "
169+
throw new IllegalArgumentException("jdbc.driver_class or paimon.jdbc.driver_class is required when "
170170
+ "jdbc.driver_url or paimon.jdbc.driver_url is specified");
171171
}
172172
Map<String, String> backendPaimonOptions = new HashMap<>();

fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.doris.planner.PlanNodeId;
3333
import org.apache.doris.planner.ScanContext;
3434
import org.apache.doris.qe.SessionVariable;
35+
import org.apache.doris.thrift.TFileRangeDesc;
36+
import org.apache.doris.thrift.TFileScanRangeParams;
3537

3638
import org.apache.paimon.data.BinaryRow;
3739
import org.apache.paimon.io.DataFileMeta;
@@ -511,11 +513,67 @@ public void testGetBackendPaimonOptionsForJdbcCatalog() throws Exception {
511513
Assert.assertEquals(2, backendOptions.size());
512514
}
513515

516+
@Test
517+
public void testApplyBackendPaimonOptionsAtScanNodeLevel() throws Exception {
518+
PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)),
519+
false, sv, ScanContext.EMPTY);
520+
PaimonSource source = Mockito.mock(PaimonSource.class);
521+
Mockito.when(source.getTableLocation()).thenReturn("file:///warehouse");
522+
node.setSource(source);
523+
524+
Map<String, String> backendOptions = new HashMap<>();
525+
backendOptions.put("jdbc.driver_url", "file:///tmp/postgresql-42.5.0.jar");
526+
backendOptions.put("jdbc.driver_class", "org.postgresql.Driver");
527+
setField(FileQueryScanNode.class, node, "params", new TFileScanRangeParams());
528+
setField(PaimonScanNode.class, node, "backendPaimonOptions", backendOptions);
529+
setField(PaimonScanNode.class, node, "storagePropertiesMap", Collections.emptyMap());
530+
531+
invokePrivateMethod(node, "setScanLevelPaimonOptions");
532+
533+
Assert.assertEquals(backendOptions, node.getFileScanRangeParams().getPaimonOptions());
534+
535+
TFileRangeDesc rangeDesc = new TFileRangeDesc();
536+
invokePrivateMethod(node, "setPaimonParams",
537+
new Class<?>[] {TFileRangeDesc.class, PaimonSplit.class},
538+
rangeDesc, new PaimonSplit(createDataSplit("scan_level.parquet")));
539+
Assert.assertFalse(rangeDesc.getTableFormatParams().getPaimonParams().isSetPaimonOptions());
540+
}
541+
514542
private void mockJniReader(PaimonScanNode spyNode) {
515543
Mockito.doReturn(false).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
516544
}
517545

518546
private void mockNativeReader(PaimonScanNode spyNode) {
519547
Mockito.doReturn(true).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class));
520548
}
549+
550+
private void setField(Class<?> clazz, Object target, String fieldName, Object value) throws Exception {
551+
java.lang.reflect.Field field = clazz.getDeclaredField(fieldName);
552+
field.setAccessible(true);
553+
field.set(target, value);
554+
}
555+
556+
private Object invokePrivateMethod(Object target, String methodName, Class<?>[] parameterTypes, Object... args)
557+
throws Exception {
558+
Method method = target.getClass().getDeclaredMethod(methodName, parameterTypes);
559+
method.setAccessible(true);
560+
return method.invoke(target, args);
561+
}
562+
563+
private Object invokePrivateMethod(Object target, String methodName) throws Exception {
564+
return invokePrivateMethod(target, methodName, new Class<?>[0]);
565+
}
566+
567+
private DataSplit createDataSplit(String fileName) {
568+
DataFileMeta dataFileMeta = DataFileMeta.forAppend(fileName, 64L * 1024 * 1024, 1L, SimpleStats.EMPTY_STATS,
569+
1L, 1L, 1L, Collections.<String>emptyList(), null, FileSource.APPEND,
570+
Collections.<String>emptyList(), null, null, Collections.<String>emptyList());
571+
return DataSplit.builder()
572+
.rawConvertible(true)
573+
.withPartition(BinaryRow.singleColumn(1))
574+
.withBucket(1)
575+
.withBucketPath("file://b1")
576+
.withDataFiles(Collections.singletonList(dataFileMeta))
577+
.build();
578+
}
521579
}

fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,4 +174,19 @@ public void testGetBackendPaimonOptions() throws Exception {
174174
Assertions.assertEquals("org.postgresql.Driver", backendOptions.get("jdbc.driver_class"));
175175
Assertions.assertEquals(2, backendOptions.size());
176176
}
177+
178+
@Test
179+
public void testGetBackendPaimonOptionsRequiresDriverClass() throws Exception {
180+
Map<String, String> props = new HashMap<>();
181+
props.put("type", "paimon");
182+
props.put("paimon.catalog.type", "jdbc");
183+
props.put("uri", "jdbc:postgresql://127.0.0.1:5442/postgres");
184+
props.put("warehouse", "s3://warehouse/path");
185+
props.put("paimon.jdbc.driver_url", "file:///tmp/postgresql-42.5.0.jar");
186+
187+
PaimonJdbcMetaStoreProperties jdbcProps = (PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props);
188+
IllegalArgumentException exception = Assertions.assertThrows(IllegalArgumentException.class,
189+
jdbcProps::getBackendPaimonOptions);
190+
Assertions.assertTrue(exception.getMessage().contains("driver_class"));
191+
}
177192
}

gensrc/thrift/PlanNodes.thrift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,9 @@ struct TFileScanRangeParams {
491491
// enable mapping varbinary type for Doris external table and TVF
492492
28: optional bool enable_mapping_varbinary = false;
493493
29: optional bool enable_mapping_timestamp_tz = false;
494+
// Paimon options from FE, used for jni/native scanner
495+
// Set at ScanNode level to avoid redundant serialization in each split
496+
30: optional map<string, string> paimon_options
494497
}
495498

496499
struct TFileRangeDesc {

0 commit comments

Comments
 (0)