diff --git a/be/src/format/table/paimon_cpp_reader.cpp b/be/src/format/table/paimon_cpp_reader.cpp index 9bc84f3bbb32d3..e4b182c41edfc7 100644 --- a/be/src/format/table/paimon_cpp_reader.cpp +++ b/be/src/format/table/paimon_cpp_reader.cpp @@ -269,8 +269,12 @@ std::vector PaimonCppReader::_build_read_columns() const { std::map PaimonCppReader::_build_options() const { std::map options; - if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params && - _range.table_format_params.paimon_params.__isset.paimon_options) { + if (_range_params && _range_params->__isset.paimon_options && + !_range_params->paimon_options.empty()) { + options.insert(_range_params->paimon_options.begin(), _range_params->paimon_options.end()); + } else if (_range.__isset.table_format_params && + _range.table_format_params.__isset.paimon_params && + _range.table_format_params.paimon_params.__isset.paimon_options) { options.insert(_range.table_format_params.paimon_params.paimon_options.begin(), _range.table_format_params.paimon_params.paimon_options.end()); } @@ -310,7 +314,6 @@ std::map PaimonCppReader::_build_options() const { copy_if_missing("fs.s3a.region", "AWS_REGION"); copy_if_missing("fs.s3a.path.style.access", "use_path_style"); - // FE currently does not pass paimon_options in scan ranges. // Backfill file.format/manifest.format from split file_format to avoid // paimon-cpp falling back to default manifest.format=avro. if (_range.__isset.table_format_params && _range.table_format_params.__isset.paimon_params && diff --git a/be/src/format/table/paimon_jni_reader.cpp b/be/src/format/table/paimon_jni_reader.cpp index 4fc9c76e4c47f5..12e6171b3a5305 100644 --- a/be/src/format/table/paimon_jni_reader.cpp +++ b/be/src/format/table/paimon_jni_reader.cpp @@ -65,8 +65,15 @@ PaimonJniReader::PaimonJniReader(const std::vector& file_slot_d if (range_params->__isset.serialized_table) { params["serialized_table"] = range_params->serialized_table; } - for (const auto& kv : paimon_params.paimon_options) { - params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; + if (range_params->__isset.paimon_options && + !range_params->paimon_options.empty()) { + for (const auto& kv : range_params->paimon_options) { + params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; + } + } else if (paimon_params.__isset.paimon_options) { + for (const auto& kv : paimon_params.paimon_options) { + params[PAIMON_OPTION_PREFIX + kv.first] = kv.second; + } } if (range_params->__isset.properties && !range_params->properties.empty()) { for (const auto& kv : range_params->properties) { diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java index 1e6be07e097d0a..a90366d7a782ba 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/JniScannerClassLoader.java @@ -30,6 +30,15 @@ public JniScannerClassLoader(String scannerName, List urls, ClassLoader par this.scannerName = scannerName; } + public synchronized void addURLIfAbsent(URL url) { + for (URL existingUrl : getURLs()) { + if (existingUrl.equals(url)) { + return; + } + } + super.addURL(url); + } + @Override public String toString() { return "JniScannerClassLoader{" diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jdbc/JdbcDriverUtils.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jdbc/JdbcDriverUtils.java new file mode 100644 index 00000000000000..67430d361083ab --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jdbc/JdbcDriverUtils.java @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.jdbc; + +import org.apache.doris.common.classloader.JniScannerClassLoader; + +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLFeatureNotSupportedException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public final class JdbcDriverUtils { + private static final ConcurrentHashMap DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>(); + private static final Set REGISTERED_DRIVER_KEYS = ConcurrentHashMap.newKeySet(); + + private JdbcDriverUtils() { + } + + public static void registerDriver(String driverUrl, String driverClassName, ClassLoader classLoader) { + try { + URL url = new URL(driverUrl); + String driverKey = driverUrl + "#" + driverClassName; + if (!REGISTERED_DRIVER_KEYS.add(driverKey)) { + return; + } + try { + ClassLoader driverClassLoader = prepareDriverClassLoader(url, classLoader); + Class loadedDriverClass = Class.forName(driverClassName, true, driverClassLoader); + Driver driver = (Driver) loadedDriverClass.getDeclaredConstructor().newInstance(); + DriverManager.registerDriver(new DriverShim(driver)); + } catch (Exception e) { + REGISTERED_DRIVER_KEYS.remove(driverKey); + throw new RuntimeException("Failed to register JDBC driver: " + driverClassName, e); + } + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid JDBC driver URL: " + driverUrl, e); + } + } + + private static ClassLoader prepareDriverClassLoader(URL driverUrl, ClassLoader classLoader) { + if (classLoader instanceof JniScannerClassLoader) { + JniScannerClassLoader scannerClassLoader = (JniScannerClassLoader) classLoader; + scannerClassLoader.addURLIfAbsent(driverUrl); + return scannerClassLoader; + } + return DRIVER_CLASS_LOADER_CACHE.computeIfAbsent(driverUrl, + url -> URLClassLoader.newInstance(new URL[] {url}, classLoader)); + } + + private static final class DriverShim implements Driver { + private final Driver delegate; + + private DriverShim(Driver delegate) { + this.delegate = delegate; + } + + @Override + public Connection connect(String url, java.util.Properties info) throws java.sql.SQLException { + return delegate.connect(url, info); + } + + @Override + public boolean acceptsURL(String url) throws java.sql.SQLException { + return delegate.acceptsURL(url); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, java.util.Properties info) + throws java.sql.SQLException { + return delegate.getPropertyInfo(url, info); + } + + @Override + public int getMajorVersion() { + return delegate.getMajorVersion(); + } + + @Override + public int getMinorVersion() { + return delegate.getMinorVersion(); + } + + @Override + public boolean jdbcCompliant() { + return delegate.jdbcCompliant(); + } + + @Override + public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException { + return delegate.getParentLogger(); + } + } +} diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJdbcDriverUtils.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJdbcDriverUtils.java new file mode 100644 index 00000000000000..09d7818dc71587 --- /dev/null +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJdbcDriverUtils.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.paimon; + +import org.apache.doris.common.jdbc.JdbcDriverUtils; + +import java.util.Map; + +final class PaimonJdbcDriverUtils { + static final String PAIMON_JDBC_DRIVER_URL = "paimon.jdbc.driver_url"; + static final String PAIMON_JDBC_DRIVER_CLASS = "paimon.jdbc.driver_class"; + static final String JDBC_DRIVER_URL = "jdbc.driver_url"; + static final String JDBC_DRIVER_CLASS = "jdbc.driver_class"; + + private PaimonJdbcDriverUtils() { + } + + static void registerDriverIfNeeded(Map params, ClassLoader parentClassLoader) { + String driverUrl = firstNonBlank(params.get(PAIMON_JDBC_DRIVER_URL), params.get(JDBC_DRIVER_URL)); + if (driverUrl == null) { + return; + } + String driverClassName = firstNonBlank(params.get(PAIMON_JDBC_DRIVER_CLASS), params.get(JDBC_DRIVER_CLASS)); + if (driverClassName == null) { + throw new IllegalArgumentException("paimon.jdbc.driver_class or jdbc.driver_class is required when " + + "paimon.jdbc.driver_url or jdbc.driver_url is specified"); + } + registerDriver(driverUrl, driverClassName, parentClassLoader); + } + + static void registerDriver(String driverUrl, String driverClassName, ClassLoader parentClassLoader) { + JdbcDriverUtils.registerDriver(driverUrl, driverClassName, parentClassLoader); + } + + private static String firstNonBlank(String first, String second) { + if (first != null && !first.trim().isEmpty()) { + return first; + } + if (second != null && !second.trim().isEmpty()) { + return second; + } + return null; + } +} diff --git a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java index 5690b7f65054c3..8f64a51dc9b15b 100644 --- a/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java +++ b/fe/be-java-extensions/paimon-scanner/src/main/java/org/apache/doris/paimon/PaimonJniScanner.java @@ -92,6 +92,7 @@ public void open() throws IOException { // so we need to provide a classloader, otherwise it will cause NPE. Thread.currentThread().setContextClassLoader(classLoader); preExecutionAuthenticator.execute(() -> { + PaimonJdbcDriverUtils.registerDriverIfNeeded(params, classLoader); initTable(); initReader(); return null; @@ -227,4 +228,3 @@ private void initTable() { } } - diff --git a/fe/be-java-extensions/paimon-scanner/src/test/java/org/apache/doris/paimon/PaimonJdbcDriverUtilsTest.java b/fe/be-java-extensions/paimon-scanner/src/test/java/org/apache/doris/paimon/PaimonJdbcDriverUtilsTest.java new file mode 100644 index 00000000000000..1f0df2371dd71f --- /dev/null +++ b/fe/be-java-extensions/paimon-scanner/src/test/java/org/apache/doris/paimon/PaimonJdbcDriverUtilsTest.java @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.paimon; + +import org.apache.doris.common.classloader.JniScannerClassLoader; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLFeatureNotSupportedException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import java.util.logging.Logger; + +public class PaimonJdbcDriverUtilsTest { + private final List registeredDrivers = new ArrayList<>(); + private final List tempJars = new ArrayList<>(); + + @After + public void tearDown() throws Exception { + for (Driver driver : registeredDrivers) { + DriverManager.deregisterDriver(driver); + } + registeredDrivers.clear(); + for (Path tempJar : tempJars) { + Files.deleteIfExists(tempJar); + } + tempJars.clear(); + } + + @Test + public void testRegisterDriverIfNeeded() throws Exception { + Path driverJar = createDriverJar(); + Map params = new HashMap<>(); + params.put(PaimonJdbcDriverUtils.PAIMON_JDBC_DRIVER_URL, driverJar.toUri().toURL().toString()); + params.put(PaimonJdbcDriverUtils.PAIMON_JDBC_DRIVER_CLASS, DummyJdbcDriver.class.getName()); + + JniScannerClassLoader scannerClassLoader = + new JniScannerClassLoader("paimon-test", List.of(), ClassLoader.getPlatformClassLoader()); + PaimonJdbcDriverUtils.registerDriverIfNeeded(params, scannerClassLoader); + + Driver driver = DriverManager.getDriver("jdbc:dummy:test"); + registeredDrivers.add(driver); + Assert.assertTrue(driver.acceptsURL("jdbc:dummy:test")); + } + + @Test + public void testRegisterDriverIfNeededRequiresDriverClass() { + Map params = new HashMap<>(); + params.put(PaimonJdbcDriverUtils.PAIMON_JDBC_DRIVER_URL, "file:///tmp/postgresql-42.5.0.jar"); + + IllegalArgumentException exception = Assert.assertThrows(IllegalArgumentException.class, + () -> PaimonJdbcDriverUtils.registerDriverIfNeeded(params, getClass().getClassLoader())); + Assert.assertTrue(exception.getMessage().contains("driver_class")); + } + + private Path createDriverJar() throws IOException { + Path jarPath = Files.createTempFile("paimon-jdbc-driver", ".jar"); + tempJars.add(jarPath); + String resourceName = DummyJdbcDriver.class.getName().replace('.', '/') + ".class"; + try (JarOutputStream jarOutputStream = new JarOutputStream(Files.newOutputStream(jarPath)); + InputStream inputStream = DummyJdbcDriver.class.getClassLoader().getResourceAsStream(resourceName)) { + Assert.assertNotNull(inputStream); + jarOutputStream.putNextEntry(new JarEntry(resourceName)); + byte[] buffer = new byte[4096]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) >= 0) { + jarOutputStream.write(buffer, 0, bytesRead); + } + jarOutputStream.closeEntry(); + } + return jarPath; + } + + public static class DummyJdbcDriver implements Driver { + @Override + public java.sql.Connection connect(String url, Properties info) { + return null; + } + + @Override + public boolean acceptsURL(String url) { + return url != null && url.startsWith("jdbc:dummy:"); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) { + return new DriverPropertyInfo[0]; + } + + @Override + public int getMajorVersion() { + return 1; + } + + @Override + public int getMinorVersion() { + return 0; + } + + @Override + public boolean jdbcCompliant() { + return false; + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + throw new SQLFeatureNotSupportedException("not supported"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 9cf75eae4acdbb..a06a5fcd0e5bdf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -36,6 +36,7 @@ import org.apache.doris.datasource.paimon.PaimonUtils; import org.apache.doris.datasource.paimon.profile.PaimonMetricRegistry; import org.apache.doris.datasource.paimon.profile.PaimonScanMetricsReporter; +import org.apache.doris.datasource.property.metastore.PaimonJdbcMetaStoreProperties; import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanContext; @@ -144,6 +145,7 @@ public String toString() { // get them in doInitialize() to ensure internal consistency of ScanNode private Map storagePropertiesMap; private Map backendStorageProperties; + private Map backendPaimonOptions = Collections.emptyMap(); // The schema information involved in the current query process (including historical schema). protected ConcurrentHashMap currentQuerySchema = new ConcurrentHashMap<>(); @@ -170,6 +172,7 @@ protected void doInitialize() throws UserException { source.getPaimonTable() ); backendStorageProperties = CredentialUtils.getBackendPropertiesFromStorageMap(storagePropertiesMap); + backendPaimonOptions = getBackendPaimonOptions(); } @VisibleForTesting @@ -202,6 +205,13 @@ public void createScanRangeLocations() throws UserException { // Set paimon_predicate at ScanNode level to avoid redundant serialization in each split String serializedPredicate = PaimonUtil.encodeObjectToString(predicates); params.setPaimonPredicate(serializedPredicate); + setScanLevelPaimonOptions(); + } + + private void setScanLevelPaimonOptions() { + if (!backendPaimonOptions.isEmpty()) { + params.setPaimonOptions(backendPaimonOptions); + } } private void putHistorySchemaInfo(Long schemaId) { @@ -464,6 +474,23 @@ public List getSplits(int numBackends) throws UserException { return splits; } + @VisibleForTesting + Map getBackendPaimonOptions() { + if (source == null) { + return Collections.emptyMap(); + } + if (!(source.getCatalog() instanceof PaimonExternalCatalog)) { + return Collections.emptyMap(); + } + PaimonExternalCatalog catalog = (PaimonExternalCatalog) source.getCatalog(); + if (!(catalog.getCatalogProperty().getMetastoreProperties() instanceof PaimonJdbcMetaStoreProperties)) { + return Collections.emptyMap(); + } + PaimonJdbcMetaStoreProperties jdbcMetaStoreProperties = + (PaimonJdbcMetaStoreProperties) catalog.getCatalogProperty().getMetastoreProperties(); + return jdbcMetaStoreProperties.getBackendPaimonOptions(); + } + @VisibleForTesting boolean shouldForceJniForSystemTable() { if (source == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java index 92bcf023ff2704..7568d59c5fed33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStoreProperties.java @@ -37,6 +37,8 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -45,6 +47,8 @@ public class PaimonJdbcMetaStoreProperties extends AbstractPaimonProperties { private static final Logger LOG = LogManager.getLogger(PaimonJdbcMetaStoreProperties.class); private static final String JDBC_PREFIX = "jdbc."; + private static final String JDBC_DRIVER_URL = JDBC_PREFIX + JdbcResource.DRIVER_URL; + private static final String JDBC_DRIVER_CLASS = JDBC_PREFIX + JdbcResource.DRIVER_CLASS; private static final Map DRIVER_CLASS_LOADER_CACHE = new ConcurrentHashMap<>(); private static final Set REGISTERED_DRIVER_KEYS = ConcurrentHashMap.newKeySet(); @@ -157,6 +161,20 @@ private void appendRawJdbcCatalogOptions() { }); } + public Map getBackendPaimonOptions() { + if (StringUtils.isBlank(driverUrl)) { + return Collections.emptyMap(); + } + if (StringUtils.isBlank(driverClass)) { + throw new IllegalArgumentException("jdbc.driver_class or paimon.jdbc.driver_class is required when " + + "jdbc.driver_url or paimon.jdbc.driver_url is specified"); + } + Map backendPaimonOptions = new HashMap<>(); + backendPaimonOptions.put(JDBC_DRIVER_URL, JdbcResource.getFullDriverUrl(driverUrl)); + backendPaimonOptions.put(JDBC_DRIVER_CLASS, driverClass); + return backendPaimonOptions; + } + /** * Register JDBC driver with DriverManager. * This is necessary because DriverManager.getConnection() doesn't use Thread.contextClassLoader. diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java index 03828278c947b7..f0e8a91d360ed5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java @@ -21,13 +21,19 @@ import org.apache.doris.analysis.TupleId; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.FileSplitter; +import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog; import org.apache.doris.datasource.paimon.PaimonSysExternalTable; +import org.apache.doris.datasource.property.metastore.MetastoreProperties; +import org.apache.doris.datasource.property.metastore.PaimonJdbcMetaStoreProperties; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.ScanContext; import org.apache.doris.qe.SessionVariable; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; @@ -475,6 +481,64 @@ public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws Excep Assert.assertEquals(100L * 1024L * 1024L, target); } + @Test + public void testGetBackendPaimonOptionsForJdbcCatalog() throws Exception { + String driverUrl = "file:///tmp/postgresql-42.5.0.jar"; + Map props = new HashMap<>(); + props.put("type", "paimon"); + props.put("paimon.catalog.type", "jdbc"); + props.put("uri", "jdbc:postgresql://127.0.0.1:5442/postgres"); + props.put("warehouse", "s3://warehouse/path"); + props.put("paimon.jdbc.driver_url", driverUrl); + props.put("paimon.jdbc.driver_class", "org.postgresql.Driver"); + PaimonJdbcMetaStoreProperties jdbcMetaStoreProperties = + (PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props); + + CatalogProperty catalogProperty = Mockito.mock(CatalogProperty.class); + Mockito.when(catalogProperty.getMetastoreProperties()).thenReturn(jdbcMetaStoreProperties); + + PaimonExternalCatalog catalog = Mockito.mock(PaimonExternalCatalog.class); + Mockito.when(catalog.getCatalogProperty()).thenReturn(catalogProperty); + + PaimonSource source = Mockito.mock(PaimonSource.class); + Mockito.when(source.getCatalog()).thenReturn(catalog); + + PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), + false, sv, ScanContext.EMPTY); + node.setSource(source); + + Map backendOptions = node.getBackendPaimonOptions(); + Assert.assertEquals("org.postgresql.Driver", backendOptions.get("jdbc.driver_class")); + Assert.assertEquals(driverUrl, backendOptions.get("jdbc.driver_url")); + Assert.assertEquals(2, backendOptions.size()); + } + + @Test + public void testApplyBackendPaimonOptionsAtScanNodeLevel() throws Exception { + PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), + false, sv, ScanContext.EMPTY); + PaimonSource source = Mockito.mock(PaimonSource.class); + Mockito.when(source.getTableLocation()).thenReturn("file:///warehouse"); + node.setSource(source); + + Map backendOptions = new HashMap<>(); + backendOptions.put("jdbc.driver_url", "file:///tmp/postgresql-42.5.0.jar"); + backendOptions.put("jdbc.driver_class", "org.postgresql.Driver"); + setField(FileQueryScanNode.class, node, "params", new TFileScanRangeParams()); + setField(PaimonScanNode.class, node, "backendPaimonOptions", backendOptions); + setField(PaimonScanNode.class, node, "storagePropertiesMap", Collections.emptyMap()); + + invokePrivateMethod(node, "setScanLevelPaimonOptions"); + + Assert.assertEquals(backendOptions, node.getFileScanRangeParams().getPaimonOptions()); + + TFileRangeDesc rangeDesc = new TFileRangeDesc(); + invokePrivateMethod(node, "setPaimonParams", + new Class[] {TFileRangeDesc.class, PaimonSplit.class}, + rangeDesc, new PaimonSplit(createDataSplit("scan_level.parquet"))); + Assert.assertFalse(rangeDesc.getTableFormatParams().getPaimonParams().isSetPaimonOptions()); + } + private void mockJniReader(PaimonScanNode spyNode) { Mockito.doReturn(false).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class)); } @@ -482,4 +546,34 @@ private void mockJniReader(PaimonScanNode spyNode) { private void mockNativeReader(PaimonScanNode spyNode) { Mockito.doReturn(true).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class)); } + + private void setField(Class clazz, Object target, String fieldName, Object value) throws Exception { + java.lang.reflect.Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } + + private Object invokePrivateMethod(Object target, String methodName, Class[] parameterTypes, Object... args) + throws Exception { + Method method = target.getClass().getDeclaredMethod(methodName, parameterTypes); + method.setAccessible(true); + return method.invoke(target, args); + } + + private Object invokePrivateMethod(Object target, String methodName) throws Exception { + return invokePrivateMethod(target, methodName, new Class[0]); + } + + private DataSplit createDataSplit(String fileName) { + DataFileMeta dataFileMeta = DataFileMeta.forAppend(fileName, 64L * 1024 * 1024, 1L, SimpleStats.EMPTY_STATS, + 1L, 1L, 1L, Collections.emptyList(), null, FileSource.APPEND, + Collections.emptyList(), null, null, Collections.emptyList()); + return DataSplit.builder() + .rawConvertible(true) + .withPartition(BinaryRow.singleColumn(1)) + .withBucket(1) + .withBucketPath("file://b1") + .withDataFiles(Collections.singletonList(dataFileMeta)) + .build(); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java index 471d2f05a9502a..cd430d8a631f13 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonJdbcMetaStorePropertiesTest.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.property.metastore; +import org.apache.doris.catalog.JdbcResource; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.paimon.options.CatalogOptions; @@ -152,4 +153,40 @@ public void testRawDriverClassRequiredWhenDriverUrlIsSet() throws Exception { Assertions.assertThrows(IllegalArgumentException.class, () -> jdbcProps.initializeCatalog("paimon_catalog", Collections.emptyList())); } + + @Test + public void testGetBackendPaimonOptions() throws Exception { + String driverUrl = "file:///tmp/postgresql-42.5.0.jar"; + Map props = new HashMap<>(); + props.put("type", "paimon"); + props.put("paimon.catalog.type", "jdbc"); + props.put("uri", "jdbc:postgresql://127.0.0.1:5442/postgres"); + props.put("warehouse", "s3://warehouse/path"); + props.put("paimon.jdbc.driver_url", driverUrl); + props.put("paimon.jdbc.driver_class", "org.postgresql.Driver"); + + PaimonJdbcMetaStoreProperties jdbcProps = (PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props); + Map backendOptions = jdbcProps.getBackendPaimonOptions(); + + Assertions.assertEquals( + JdbcResource.getFullDriverUrl(driverUrl), + backendOptions.get("jdbc.driver_url")); + Assertions.assertEquals("org.postgresql.Driver", backendOptions.get("jdbc.driver_class")); + Assertions.assertEquals(2, backendOptions.size()); + } + + @Test + public void testGetBackendPaimonOptionsRequiresDriverClass() throws Exception { + Map props = new HashMap<>(); + props.put("type", "paimon"); + props.put("paimon.catalog.type", "jdbc"); + props.put("uri", "jdbc:postgresql://127.0.0.1:5442/postgres"); + props.put("warehouse", "s3://warehouse/path"); + props.put("paimon.jdbc.driver_url", "file:///tmp/postgresql-42.5.0.jar"); + + PaimonJdbcMetaStoreProperties jdbcProps = (PaimonJdbcMetaStoreProperties) MetastoreProperties.create(props); + IllegalArgumentException exception = Assertions.assertThrows(IllegalArgumentException.class, + jdbcProps::getBackendPaimonOptions); + Assertions.assertTrue(exception.getMessage().contains("driver_class")); + } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 64ad52267b3418..bf276edecde30b 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -491,6 +491,9 @@ struct TFileScanRangeParams { // enable mapping varbinary type for Doris external table and TVF 28: optional bool enable_mapping_varbinary = false; 29: optional bool enable_mapping_timestamp_tz = false; + // Paimon options from FE, used for jni/native scanner + // Set at ScanNode level to avoid redundant serialization in each split + 30: optional map paimon_options } struct TFileRangeDesc { diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy index 0f653063f149d6..82d8d5b0dfae10 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_jdbc_catalog.groovy @@ -124,6 +124,22 @@ suite("test_paimon_jdbc_catalog", "p0,external") { executeCommand(command, true) } + def assertSystemTableReadable = { String tableExpr, List expectedColumns = [], Integer minCount = null -> + def descRows = sql """DESC ${tableExpr}""" + assertTrue(descRows.size() > 0) + expectedColumns.each { col -> + assertTrue(descRows.toString().contains(col)) + } + + def countRows = sql """SELECT COUNT(*) FROM ${tableExpr}""" + assertEquals(1, countRows.size()) + int countValue = countRows[0][0].toString().toInteger() + if (minCount != null) { + assertTrue(countValue >= minCount) + } + return countValue + } + try { sql """switch internal""" sql """DROP CATALOG IF EXISTS ${catalogName}""" @@ -189,15 +205,55 @@ suite("test_paimon_jdbc_catalog", "p0,external") { assertEquals(1, rowCount.size()) assertEquals("2", rowCount[0][0].toString()) - def schemaDesc = sql """DESC paimon_jdbc_tbl\$schemas""" - assertTrue(schemaDesc.toString().contains("schema_id")) + assertSystemTableReadable("paimon_jdbc_tbl\$schemas", ["schema_id"], 1) + assertSystemTableReadable("paimon_jdbc_tbl\$snapshots", ["snapshot_id"], 1) + [ + "paimon_jdbc_tbl\$options", + "paimon_jdbc_tbl\$audit_log", + "paimon_jdbc_tbl\$files", + "paimon_jdbc_tbl\$tags", + "paimon_jdbc_tbl\$branches", + "paimon_jdbc_tbl\$consumers", + "paimon_jdbc_tbl\$ro", + "paimon_jdbc_tbl\$aggregation_fields", + "paimon_jdbc_tbl\$binlog", + "paimon_jdbc_tbl\$manifests", + "paimon_jdbc_tbl\$partitions", + "paimon_jdbc_tbl\$buckets", + "paimon_jdbc_tbl\$statistics", + "paimon_jdbc_tbl\$table_indexes" + ].each { tableExpr -> + assertSystemTableReadable(tableExpr) + } + + sql """DROP TABLE IF EXISTS paimon_jdbc_row_tracking_tbl""" + sql """ + CREATE TABLE ${dbName}.paimon_jdbc_row_tracking_tbl ( + id INT, + name STRING, + dt DATE + ) ENGINE=paimon + PROPERTIES ( + 'bucket' = '-1', + 'row-tracking.enabled' = 'true' + ) + """ + + sparkPaimonJdbc """ + INSERT INTO ${sparkSeedCatalogName}.${dbName}.paimon_jdbc_row_tracking_tbl VALUES + (3, 'carol', DATE '2025-01-03'), + (4, 'dave', DATE '2025-01-04') + """ - def schemaCount = sql """SELECT COUNT(*) FROM paimon_jdbc_tbl\$schemas""" - assertEquals(1, schemaCount.size()) - assertTrue(schemaCount[0][0].toString().toInteger() >= 1) + assertSystemTableReadable( + "paimon_jdbc_row_tracking_tbl\$row_tracking", + ["_row_id", "_sequence_number"], + 1 + ) } finally { try { sql """SWITCH ${catalogName}""" + sql """DROP TABLE IF EXISTS ${dbName}.paimon_jdbc_row_tracking_tbl""" sql """DROP TABLE IF EXISTS ${dbName}.paimon_jdbc_tbl""" sql """DROP DATABASE IF EXISTS ${dbName} FORCE""" } catch (Exception e) {