diff --git a/java/versions.lock b/java/versions.lock index 6db967d84c0..1697a5c3e39 100644 --- a/java/versions.lock +++ b/java/versions.lock @@ -8,7 +8,7 @@ com.esotericsoftware:minlog:1.3.0 (2 constraints: 241b55b7) com.fasterxml.jackson:jackson-bom:2.20.0 (3 constraints: e4396971) -com.fasterxml.jackson.core:jackson-annotations:2.20 (8 constraints: 3385e8c6) +com.fasterxml.jackson.core:jackson-annotations:2.20 (9 constraints: 4d956f83) com.fasterxml.jackson.core:jackson-core:2.20.0 (12 constraints: d6b49f0f) @@ -204,7 +204,7 @@ org.apache.avro:avro-mapred:1.12.1 (2 constraints: 361a5399) org.apache.commons:commons-collections4:4.5.0 (2 constraints: de191f6b) -org.apache.commons:commons-compress:1.28.0 (4 constraints: 0b33e88a) +org.apache.commons:commons-compress:1.28.0 (6 constraints: 88536cb8) org.apache.commons:commons-crypto:1.1.0 (3 constraints: fb2a0ea2) @@ -346,7 +346,7 @@ org.json4s:json4s-jackson_2.13:4.0.7 (3 constraints: 0d28f849) org.json4s:json4s-scalap_2.13:4.0.7 (2 constraints: d2172edd) -org.jspecify:jspecify:1.0.0 (1 constraints: 130ae0b4) +org.jspecify:jspecify:1.0.0 (2 constraints: ec1a087a) org.locationtech.jts:jts-core:1.20.0 (3 constraints: 1e26b429) @@ -372,7 +372,7 @@ org.slf4j:jcl-over-slf4j:2.0.17 (3 constraints: d72fb570) org.slf4j:jul-to-slf4j:2.0.17 (3 constraints: d72fb570) -org.slf4j:slf4j-api:2.0.17 (32 constraints: c8b7441c) +org.slf4j:slf4j-api:2.0.17 (35 constraints: 57eda16b) org.threeten:threeten-extra:1.8.0 (2 constraints: 0117967e) @@ -390,6 +390,22 @@ ch.qos.logback:logback-classic:1.5.32 (1 constraints: 3d053f3b) ch.qos.logback:logback-core:1.5.32 (1 constraints: 3c0d442a) +com.adobe.testing:s3mock-testcontainers:4.11.0 (1 constraints: 38053e3b) + +com.github.docker-java:docker-java-api:3.4.2 (1 constraints: 3d0f6267) + +com.github.docker-java:docker-java-transport:3.4.2 (1 constraints: c71515cb) + +com.github.docker-java:docker-java-transport-zerodep:3.4.2 (1 constraints: 3d0f6267) + +junit:junit:4.13.2 (1 constraints: 6e0fd276) + +net.java.dev.jna:jna:5.13.0 (1 constraints: f71513e1) + +org.hamcrest:hamcrest-core:1.3 (1 constraints: cc05fe3f) + +org.jetbrains:annotations:17.0.0 (1 constraints: 590d4b31) + org.junit:junit-bom:6.0.3 (7 constraints: 76743380) org.junit.jupiter:junit-jupiter:6.0.3 (2 constraints: 841045dd) @@ -408,4 +424,10 @@ org.junit.platform:junit-platform-launcher:6.0.3 (1 constraints: 10097b95) org.opentest4j:opentest4j:1.3.0 (2 constraints: cf209249) +org.rnorth.duct-tape:duct-tape:1.0.8 (1 constraints: 3d0f5267) + org.slf4j:slf4j-simple:2.0.17 (1 constraints: 3c05323b) + +org.testcontainers:junit-jupiter:1.20.4 (1 constraints: 3905313b) + +org.testcontainers:testcontainers:1.21.3 (2 constraints: de1fe2f9) diff --git a/java/versions.props b/java/versions.props index ab9bae09684..18e9f02ca84 100644 --- a/java/versions.props +++ b/java/versions.props @@ -13,3 +13,5 @@ org.apache.arrow:* = 18.3.0 # Test dependencies org.junit.jupiter:* = 6.0.3 +com.adobe.testing:s3mock-testcontainers = 4.11.0 +org.testcontainers:junit-jupiter = 1.20.4 \ No newline at end of file diff --git a/java/vortex-jni/src/main/java/dev/vortex/api/Files.java b/java/vortex-jni/src/main/java/dev/vortex/api/Files.java index d4752e27401..8483c26be56 100644 --- a/java/vortex-jni/src/main/java/dev/vortex/api/Files.java +++ b/java/vortex-jni/src/main/java/dev/vortex/api/Files.java @@ -31,6 +31,15 @@ private Files() {} /** * Opens a Vortex file from the specified path string. * + * @see #open(String, Map) + */ + public static File open(String path) { + return open(path, Map.of()); + } + + /** + * Opens a Vortex file from the specified path string and format parameters. + * *

This method provides a convenient way to open Vortex files using either * absolute file system paths or URI strings. If the path starts with "/", * it is treated as an absolute file system path and converted to a file URI. @@ -42,16 +51,15 @@ private Files() {} * @param path the path to the Vortex file, either as an absolute file system path * (starting with "/") or as a URI string * @return a {@link File} instance representing the opened Vortex file - * @throws RuntimeException if the file cannot be opened or the path is invalid + * @throws RuntimeException if the file cannot be opened or the path is invalid * @throws NullPointerException if path is null - * * @see #open(URI, Map) */ - public static File open(String path) { + public static File open(String path, Map properties) { if (path.startsWith("/")) { - return open(Paths.get(path).toUri(), Map.of()); + return open(Paths.get(path).toUri(), properties); } - return open(URI.create(path), Map.of()); + return open(URI.create(path), properties); } /** @@ -66,14 +74,13 @@ public static File open(String path) { * underlying file system or storage layer. The specific properties supported * depend on the URI scheme and storage backend being used. * - * @param uri the URI pointing to the Vortex file to open + * @param uri the URI pointing to the Vortex file to open * @param properties a map of configuration properties for opening the file; * may be empty but must not be null * @return a {@link File} instance representing the opened Vortex file - * @throws RuntimeException if the file cannot be opened, the URI is invalid, - * or the returned native pointer is invalid + * @throws RuntimeException if the file cannot be opened, the URI is invalid, + * or the returned native pointer is invalid * @throws NullPointerException if uri or properties is null - * * @see #open(String) */ public static File open(URI uri, Map properties) { diff --git a/java/vortex-spark/build.gradle.kts b/java/vortex-spark/build.gradle.kts index b6ce5259eef..4ba686a620d 100644 --- a/java/vortex-spark/build.gradle.kts +++ b/java/vortex-spark/build.gradle.kts @@ -33,6 +33,9 @@ testing { implementation("org.apache.spark:spark-core_2.13") implementation("org.apache.spark:spark-sql_2.13") runtimeOnly("org.slf4j:slf4j-simple:2.0.17") + // S3Mock Testcontainers for testing S3 integration (avoids classpath conflicts) + implementation("com.adobe.testing:s3mock-testcontainers") + implementation("org.testcontainers:junit-jupiter") } } } diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/VortexDataSourceV2.java b/java/vortex-spark/src/main/java/dev/vortex/spark/VortexDataSourceV2.java index 4ea28289e43..4acb98831e9 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/VortexDataSourceV2.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/VortexDataSourceV2.java @@ -5,13 +5,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import dev.vortex.api.File; import dev.vortex.api.Files; import dev.vortex.jni.NativeFileMethods; +import dev.vortex.spark.config.HadoopUtils; import java.util.Map; import java.util.Objects; import java.util.Optional; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.catalog.CatalogV2Util; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableProvider; @@ -19,6 +22,7 @@ import org.apache.spark.sql.sources.DataSourceRegister; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import scala.Option; /** * Spark V2 data source for reading and writing Vortex files. @@ -33,13 +37,17 @@ public final class VortexDataSourceV2 implements TableProvider, DataSourceRegist private static final String PATH_KEY = "path"; private static final String PATHS_KEY = "paths"; + private final Option sparkSession; + /** * Creates a new instance of the Vortex data source. *

* This no-argument constructor is required for Spark to instantiate the data source * through reflection. */ - public VortexDataSourceV2() {} + public VortexDataSourceV2() { + this.sparkSession = SparkSession.getActiveSession(); + } /** * Infers the schema of the Vortex files specified in the options. @@ -64,12 +72,13 @@ public StructType inferSchema(CaseInsensitiveStringMap options) { return new StructType(); } + var formatOptions = buildDataSourceOptions(options.asCaseSensitiveMap()); + var pathToInfer = Objects.requireNonNull(Iterables.getLast(paths)); // If the path is a directory, scan the directory for a file and use that file if (!pathToInfer.endsWith(".vortex")) { - Optional firstFile = - NativeFileMethods.listVortexFiles(pathToInfer, options.asCaseSensitiveMap()).stream() - .findFirst(); + Optional firstFile = NativeFileMethods.listVortexFiles(pathToInfer, formatOptions).stream() + .findFirst(); if (firstFile.isEmpty()) { // Return empty struct if no files found @@ -80,7 +89,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options) { } } - try (File file = Files.open(pathToInfer)) { + try (File file = Files.open(pathToInfer, formatOptions)) { var columns = SparkTypes.toColumns(file.getDType()); return CatalogV2Util.v2ColumnsToStructType(columns); } @@ -101,9 +110,8 @@ public StructType inferSchema(CaseInsensitiveStringMap options) { @Override public Table getTable(StructType schema, Transform[] _partitioning, Map properties) { var uncased = new CaseInsensitiveStringMap(properties); - ImmutableList paths = getPaths(uncased); - return new VortexTable(paths, schema, properties); + return new VortexTable(paths, schema, buildDataSourceOptions(properties)); } /** @@ -132,6 +140,20 @@ public String shortName() { return "vortex"; } + private Map buildDataSourceOptions(Map properties) { + var hadoopConf = sparkSession.get().sessionState().newHadoopConf(); + + var options = ImmutableMap.builder(); + options.putAll(properties); + + // Forward any S3-relevant properties from hadoopConf to the reader config. + options.putAll(HadoopUtils.s3PropertiesFromHadoopConf(hadoopConf)); + // Forward any Azure-relevant properties from hadoopConf to the reader config. + options.putAll(HadoopUtils.azurePropertiesFromHadoopConf(hadoopConf)); + + return options.build(); + } + private static ImmutableList getPaths(CaseInsensitiveStringMap uncased) { if (uncased.containsKey(PATH_KEY)) { return ImmutableList.of(uncased.get(PATH_KEY)); diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/VortexFilePartition.java b/java/vortex-spark/src/main/java/dev/vortex/spark/VortexFilePartition.java index eb3712c57b9..5cb64327331 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/VortexFilePartition.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/VortexFilePartition.java @@ -4,7 +4,9 @@ package dev.vortex.spark; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.io.Serializable; +import java.util.Map; import org.apache.spark.sql.connector.catalog.Column; import org.apache.spark.sql.connector.read.InputPartition; @@ -18,6 +20,7 @@ public final class VortexFilePartition implements InputPartition, Serializable { private final String path; private final ImmutableList columns; + private final ImmutableMap formatOptions; /** * Creates a new Vortex file partition. @@ -25,9 +28,10 @@ public final class VortexFilePartition implements InputPartition, Serializable { * @param path the file system path to the Vortex file * @param columns the list of columns to read from the file */ - public VortexFilePartition(String path, ImmutableList columns) { + public VortexFilePartition(String path, ImmutableList columns, ImmutableMap formatOptions) { this.path = path; this.columns = columns; + this.formatOptions = formatOptions; } /** @@ -47,4 +51,8 @@ public String getPath() { public ImmutableList getColumns() { return columns; } + + public Map getFormatOptions() { + return formatOptions; + } } diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/config/HadoopUtils.java b/java/vortex-spark/src/main/java/dev/vortex/spark/config/HadoopUtils.java new file mode 100644 index 00000000000..d091269df0a --- /dev/null +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/config/HadoopUtils.java @@ -0,0 +1,72 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.spark.config; + +import java.util.Map; +import org.apache.hadoop.conf.Configuration; + +public final class HadoopUtils { + private HadoopUtils() {} + + static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key"; + static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key"; + static final String FS_S3A_SESSION_TOKEN = "fs.s3a.session.token"; + static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint"; + static final String FS_S3A_ENDPOINT_REGION = "fs.s3a.endpoint.region"; + + public static Map s3PropertiesFromHadoopConf(Configuration hadoopConf) { + VortexS3Properties properties = new VortexS3Properties(); + + for (Map.Entry entry : hadoopConf) { + switch (entry.getKey()) { + case FS_S3A_ACCESS_KEY: + properties.setAccessKeyId(entry.getValue()); + break; + case FS_S3A_SECRET_KEY: + properties.setSecretAccessKey(entry.getValue()); + break; + case FS_S3A_SESSION_TOKEN: + properties.setSessionToken(entry.getValue()); + break; + case FS_S3A_ENDPOINT: + String qualified = entry.getValue(); + if (!qualified.startsWith("http")) { + qualified = "https://" + qualified; + } + properties.setEndpoint(qualified); + break; + case FS_S3A_ENDPOINT_REGION: + properties.setRegion(entry.getValue()); + break; + default: + break; + } + } + + return properties.asProperties(); + } + + static final String ACCESS_KEY_PREFIX = "fs.azure.account.key"; + static final String FIXED_TOKEN_PREFIX = "fs.azure.sas.fixed.token."; + + public static Map azurePropertiesFromHadoopConf(Configuration hadoopConf) { + VortexAzureProperties properties = new VortexAzureProperties(); + + // TODO(aduffy): match on storage account name. + for (Map.Entry entry : hadoopConf) { + String configKey = entry.getKey(); + if (configKey.startsWith(ACCESS_KEY_PREFIX)) { + properties.setAccessKey(entry.getValue()); + } else if (configKey.startsWith(FIXED_TOKEN_PREFIX)) { + properties.setSasKey(entry.getValue()); + } + } + + if (properties.accessKey().isEmpty()) { + properties.setSkipSignature(true); + } + + return properties.asProperties(); + } +} diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/config/VortexAzureProperties.java b/java/vortex-spark/src/main/java/dev/vortex/spark/config/VortexAzureProperties.java new file mode 100644 index 00000000000..b064575c311 --- /dev/null +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/config/VortexAzureProperties.java @@ -0,0 +1,48 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.spark.config; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.util.Map; +import java.util.Optional; + +public final class VortexAzureProperties { + private static final String ACCOUNT_KEY = "azure_storage_account_key"; + private static final String SAS_KEY = "azure_storage_sas_key"; + private static final String SKIP_SIGNATURE = "azure_skip_signature"; + + private final Map properties = Maps.newHashMap(); + + public Optional accessKey() { + return Optional.ofNullable(properties.get(ACCOUNT_KEY)); + } + + public Optional sasKey() { + return Optional.ofNullable(properties.get(SAS_KEY)); + } + + public boolean skipSignature() { + return Boolean.parseBoolean(properties.getOrDefault(SKIP_SIGNATURE, "false")); + } + + public VortexAzureProperties setAccessKey(String accountKey) { + properties.put(ACCOUNT_KEY, accountKey); + return this; + } + + public VortexAzureProperties setSasKey(String sasKey) { + properties.put(SAS_KEY, sasKey); + return this; + } + + public VortexAzureProperties setSkipSignature(boolean skipSignature) { + properties.put(SKIP_SIGNATURE, String.valueOf(skipSignature)); + return this; + } + + public Map asProperties() { + return ImmutableMap.copyOf(properties); + } +} diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/config/VortexS3Properties.java b/java/vortex-spark/src/main/java/dev/vortex/spark/config/VortexS3Properties.java new file mode 100644 index 00000000000..f8298891f18 --- /dev/null +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/config/VortexS3Properties.java @@ -0,0 +1,72 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.spark.config; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.util.Map; +import java.util.Optional; + +public final class VortexS3Properties { + private static final String ACCESS_KEY = "aws_access_key_id"; + private static final String SECRET_KEY = "aws_secret_access_key"; + private static final String SESSION_TOKEN = "aws_session_token"; + private static final String REGION = "aws_region"; + private static final String ENDPOINT = "aws_endpoint"; + private static final String SKIP_SIGNATURE = "aws_skip_signature"; + + private final Map properties = Maps.newHashMap(); + + public Optional accessKeyId() { + return Optional.ofNullable(properties.get(ACCESS_KEY)); + } + + public Optional secretAccessKey() { + return Optional.ofNullable(properties.get(SECRET_KEY)); + } + + public Optional sessionToken() { + return Optional.ofNullable(properties.get(SESSION_TOKEN)); + } + + public Optional region() { + return Optional.ofNullable(properties.get(REGION)); + } + + public Optional endpoint() { + return Optional.ofNullable(properties.get(ENDPOINT)); + } + + public boolean skipSignature() { + return Boolean.parseBoolean(properties.getOrDefault(SKIP_SIGNATURE, "false")); + } + + public void setAccessKeyId(String accessKeyId) { + properties.put(ACCESS_KEY, accessKeyId); + } + + public void setSecretAccessKey(String secretAccessKey) { + properties.put(SECRET_KEY, secretAccessKey); + } + + public void setSessionToken(String sessionToken) { + properties.put(SESSION_TOKEN, sessionToken); + } + + public void setRegion(String region) { + properties.put(REGION, region); + } + + public void setEndpoint(String endpoint) { + properties.put(ENDPOINT, endpoint); + } + + public void setSkipSignature(boolean skipSignature) { + properties.put(SKIP_SIGNATURE, Boolean.toString(skipSignature)); + } + + public Map asProperties() { + return ImmutableMap.copyOf(properties); + } +} diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexBatchExec.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexBatchExec.java index 7c9ccd5ea5e..00e4fe49f60 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexBatchExec.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexBatchExec.java @@ -4,9 +4,9 @@ package dev.vortex.spark.read; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import dev.vortex.jni.NativeFileMethods; import dev.vortex.spark.VortexFilePartition; -import java.util.Map; import java.util.stream.Stream; import org.apache.spark.sql.connector.catalog.Column; import org.apache.spark.sql.connector.read.Batch; @@ -19,7 +19,7 @@ public final class VortexBatchExec implements Batch { private final ImmutableList paths; private final ImmutableList columns; - private final Map formatOptions; + private final ImmutableMap formatOptions; /** * Creates a new VortexBatchExec for scanning the specified Vortex files. @@ -28,7 +28,7 @@ public final class VortexBatchExec implements Batch { * @param columns the list of columns to read from the files */ public VortexBatchExec( - ImmutableList paths, ImmutableList columns, Map formatOptions) { + ImmutableList paths, ImmutableList columns, ImmutableMap formatOptions) { this.paths = paths; this.columns = columns; this.formatOptions = formatOptions; @@ -54,7 +54,7 @@ public InputPartition[] planInputPartitions() { return NativeFileMethods.listVortexFiles(path, formatOptions).stream(); } }) - .map(path -> new VortexFilePartition(path, columns)) + .map(path -> new VortexFilePartition(path, columns, formatOptions)) .toArray(InputPartition[]::new); } diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java index a1680e0801e..59c58c3236d 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexPartitionReader.java @@ -47,7 +47,7 @@ public ColumnarBatch get() { * Initialize the Vortex File and ArrayStream resources. */ void initNativeResources() { - file = Files.open(partition.getPath()); + file = Files.open(partition.getPath(), partition.getFormatOptions()); List pushdownColumns = partition.getColumns().stream().map(Column::name).collect(Collectors.toList()); batches = new VortexColumnarBatchIterator( diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java index d7542962c8d..1ec1a0aeb7d 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java @@ -4,7 +4,7 @@ package dev.vortex.spark.read; import com.google.common.collect.ImmutableList; -import java.util.Map; +import com.google.common.collect.ImmutableMap; import org.apache.spark.sql.connector.catalog.CatalogV2Util; import org.apache.spark.sql.connector.catalog.Column; import org.apache.spark.sql.connector.read.Batch; @@ -18,7 +18,7 @@ public final class VortexScan implements Scan { private final ImmutableList paths; private final ImmutableList readColumns; - private final Map formatOptions; + private final ImmutableMap formatOptions; /** * Creates a new VortexScan for the specified file paths and columns. @@ -27,7 +27,9 @@ public final class VortexScan implements Scan { * @param readColumns the list of columns to read from the files */ public VortexScan( - ImmutableList paths, ImmutableList readColumns, Map formatOptions) { + ImmutableList paths, + ImmutableList readColumns, + ImmutableMap formatOptions) { this.paths = paths; this.readColumns = readColumns; this.formatOptions = formatOptions; diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java index e4e17dc2500..0ea690b6406 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScanBuilder.java @@ -6,6 +6,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -89,6 +90,7 @@ public VortexScanBuilder addAllColumns(Iterable columns) { public Scan build() { var paths = this.paths.build(); var columns = ImmutableList.copyOf(this.columns); + var formatOptions = ImmutableMap.copyOf(this.formatOptions); checkState(!paths.isEmpty(), "paths cannot be empty"); // Allow empty columns for operations like count() that don't need actual column data diff --git a/java/vortex-spark/src/main/java/dev/vortex/spark/write/VortexDataWriter.java b/java/vortex-spark/src/main/java/dev/vortex/spark/write/VortexDataWriter.java index 08ff564f8fb..fcf0ff209bc 100644 --- a/java/vortex-spark/src/main/java/dev/vortex/spark/write/VortexDataWriter.java +++ b/java/vortex-spark/src/main/java/dev/vortex/spark/write/VortexDataWriter.java @@ -17,9 +17,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; import org.apache.spark.sql.catalyst.util.ArrayData; @@ -96,11 +94,8 @@ public VortexDataWriter(String filePath, StructType schema, CaseInsensitiveStrin var writeSchema = SparkTypes.toDType(schema); var arrowSchema = SparkToArrowSchema.convert(schema); - // Convert the writer to a new schema type instead. - // Create Vortex writer - Map writerOptions = new HashMap<>(); - this.vortexWriter = VortexWriter.create(filePath, writeSchema, writerOptions); + this.vortexWriter = VortexWriter.create(filePath, writeSchema, options.asCaseSensitiveMap()); // Create VectorSchemaRoot for batching rows this.vectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, allocator); diff --git a/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceS3MockTest.java b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceS3MockTest.java new file mode 100644 index 00000000000..04c94546013 --- /dev/null +++ b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceS3MockTest.java @@ -0,0 +1,182 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +package dev.vortex.spark; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.adobe.testing.s3mock.testcontainers.S3MockContainer; +import java.util.List; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.*; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * Integration test for Vortex DataSource with mocked S3 using Adobe S3Mock. + * + *

This test verifies that Vortex can correctly read and write files from S3-compatible + * storage by using S3Mock running as a Testcontainer. + */ +@Testcontainers +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public final class VortexDataSourceS3MockTest { + + private static final String TEST_BUCKET = "vortex-test-bucket"; + + @Container + private static final S3MockContainer S3_MOCK = new S3MockContainer("4.11.0").withInitialBuckets(TEST_BUCKET); + + private SparkSession spark; + + @BeforeAll + public void setUp() { + // Get the S3Mock endpoint + String s3Endpoint = S3_MOCK.getHttpEndpoint(); + + // Create a local Spark session configured to use S3Mock + spark = SparkSession.builder() + .appName("VortexS3MockTest") + .master("local[2]") + .config("spark.sql.shuffle.partitions", "2") + .config("spark.sql.adaptive.enabled", "false") + .config("spark.ui.enabled", "false") + // S3A configuration for S3Mock. + // This should be propagated into our reader + .config("spark.hadoop.fs.s3a.endpoint", s3Endpoint) + .config("spark.hadoop.fs.s3a.access.key", "foo") + .config("spark.hadoop.fs.s3a.secret.key", "bar") + .config("spark.hadoop.fs.s3a.path.style.access", "true") + // Disable features that S3Mock may not support + .config("spark.hadoop.fs.s3a.change.detection.version.required", "false") + .config("spark.hadoop.fs.s3a.change.detection.mode", "none") + .config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") + .getOrCreate(); + } + + @AfterAll + public void tearDown() { + if (spark != null) { + spark.stop(); + } + } + + @Test + @DisplayName("Write and read Vortex files from mocked S3") + public void testWriteAndReadVortexFilesFromS3Mock() { + // Given: Create a DataFrame with test data + int numRows = 100; + Dataset originalDf = createTestDataFrame(numRows); + + // When: Write to S3Mock + String s3Path = "s3a://" + TEST_BUCKET + "/vortex-test"; + originalDf + .repartition(2) + .write() + .format("vortex") + .option("path", s3Path) + .mode(SaveMode.Overwrite) + .save(); + + // Then: Read back from S3Mock + Dataset readDf = spark.read().format("vortex").load(s3Path); + + // Verify schema is preserved + assertSchemaEquals(originalDf.schema(), readDf.schema()); + + // Verify row count + assertEquals(numRows, readDf.count(), "Read DataFrame should have same number of rows as original"); + + // Verify data content + verifyDataContent(originalDf, readDf); + } + + @Test + @DisplayName("Write and read Vortex files with format options from S3Mock") + public void testWriteAndReadWithFormatOptionsFromS3Mock() { + // Given: Create a DataFrame with test data + int numRows = 50; + Dataset originalDf = createTestDataFrame(numRows); + + // When: Write to S3Mock with format options + String s3Path = "s3a://" + TEST_BUCKET + "/vortex-options-test"; + originalDf + .write() + .format("vortex") + .option("path", s3Path) + .mode(SaveMode.Overwrite) + .save(); + + // Then: Read back from S3Mock with format options + Dataset readDf = + spark.read().format("vortex").option("path", s3Path).load(); + + // Verify row count + assertEquals(numRows, readDf.count(), "Read DataFrame should have same number of rows as original"); + + // Verify data content + verifyDataContent(originalDf, readDf); + } + + /** + * Creates a test DataFrame with monotonically increasing integers + * and their string representations. + */ + private Dataset createTestDataFrame(int numRows) { + return spark.range(0, numRows) + .selectExpr( + "cast(id as int) as id", + "concat('value_', cast(id as string)) as value", + "array('Alpha', 'Bravo', 'Charlie') AS elements"); + } + + /** + * Verifies that two schemas are equal. + */ + private void assertSchemaEquals(StructType expected, StructType actual) { + assertEquals(expected.fields().length, actual.fields().length, "Schemas should have same number of fields"); + + for (int i = 0; i < expected.fields().length; i++) { + StructField expectedField = expected.fields()[i]; + StructField actualField = actual.fields()[i]; + + assertEquals(expectedField.name(), actualField.name(), "Field names should match at position " + i); + assertEquals( + expectedField.dataType(), + actualField.dataType(), + "Field types should match for field: " + expectedField.name()); + assertEquals( + expectedField.nullable(), + actualField.nullable(), + "Field nullability should match for field: " + expectedField.name()); + } + } + + /** + * Verifies that the data content of two DataFrames is identical. + */ + private void verifyDataContent(Dataset expected, Dataset actual) { + // Sort both DataFrames by id to ensure consistent ordering + Dataset expectedSorted = expected.orderBy("id"); + Dataset actualSorted = actual.orderBy("id"); + + // Collect and compare + List expectedRows = expectedSorted.collectAsList(); + List actualRows = actualSorted.collectAsList(); + + assertEquals(expectedRows.size(), actualRows.size(), "Should have same number of rows"); + + for (int i = 0; i < expectedRows.size(); i++) { + Row expectedRow = expectedRows.get(i); + Row actualRow = actualRows.get(i); + + assertEquals(expectedRow.getInt(0), actualRow.getInt(0), "ID should match at row " + i); + assertEquals(expectedRow.getString(1), actualRow.getString(1), "Value should match at row " + i); + } + } +} diff --git a/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceWriteTest.java b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceWriteTest.java index df22407162b..46538c59287 100644 --- a/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceWriteTest.java +++ b/java/vortex-spark/src/test/java/dev/vortex/spark/VortexDataSourceWriteTest.java @@ -177,56 +177,6 @@ public void testOverwriteMode() throws IOException { assertEquals(75, readDf.count(), "Should have data from second write after overwrite"); } - @Test - @DisplayName("Write and read Vortex files from S3") - public void testWriteAndReadFromS3() throws IOException { - // Skip test if AWS credentials or S3 base URI are not available - String awsAccessKey = System.getenv("AWS_ACCESS_KEY_ID"); - String awsSecretKey = System.getenv("AWS_SECRET_ACCESS_KEY"); - String s3BaseUri = System.getenv("VORTEX_TEST_S3_BASE_URI"); - - Assumptions.assumeTrue( - awsAccessKey != null && awsSecretKey != null, "Skipping S3 test - AWS credentials not configured"); - - Assumptions.assumeTrue( - s3BaseUri != null, - "Skipping S3 test - VORTEX_TEST_S3_BASE_URI not configured (e.g., s3://bucket/path)"); - - // Given: Create a test DataFrame - int numRows = 100; - Dataset originalDf = createTestDataFrame(numRows); - - // When: Write to S3 (relying on environment credentials) - String s3Path = s3BaseUri + "/spark-test-" + System.currentTimeMillis(); - originalDf - .repartition(2) // Force 2 partitions - .write() - .format("vortex") - .option("path", s3Path) - .mode(SaveMode.Overwrite) - .save(); - - // Then: Read back from S3 - Dataset readDf = - spark.read().format("vortex").option("path", s3Path).load(); - - // Verify schema is preserved - assertSchemaEquals(originalDf.schema(), readDf.schema()); - - // Verify row count - assertEquals(numRows, readDf.count(), "Read DataFrame should have same number of rows as original"); - - // Verify data content - verifyDataContent(originalDf, readDf); - - // Log the S3 path for debugging - System.out.println("Successfully wrote and read Vortex files from: " + s3Path); - - // Cleanup: Delete the test files from S3 - // Note: In production, you might want to use AWS SDK for cleanup - // For now, we'll rely on a periodic cleanup job for the test folder - } - @Test @DisplayName("Handle special characters and nulls") public void testSpecialCharactersAndNulls() throws IOException { diff --git a/vortex-jni/src/object_store.rs b/vortex-jni/src/object_store.rs index 53da2322df0..141e00c211f 100644 --- a/vortex-jni/src/object_store.rs +++ b/vortex-jni/src/object_store.rs @@ -36,7 +36,7 @@ pub(crate) fn make_object_store( let (scheme, _) = ObjectStoreScheme::parse(url) .map_err(|error| VortexError::from(object_store::Error::from(error)))?; - let cache_key = url_cache_key(url); + let cache_key = url_cache_key(url, properties); { if let Some(cached) = OBJECT_STORES.lock().get(&cache_key) { @@ -57,7 +57,10 @@ pub(crate) fn make_object_store( .with_url(url.to_string()) // Use generic S3 endpoint to avoid DNS resolution issues with region-specific endpoints .with_endpoint("https://s3.amazonaws.com") - .with_virtual_hosted_style_request(false); // Use path-style URLs + // Use path-style URLs + .with_virtual_hosted_style_request(false) + // Allow user to override endpoint to HTTP endpoints, e.g. LocalStack, Minio + .with_allow_http(true); // Try to load credentials from environment if not provided in properties if !properties.contains_key("access_key_id") @@ -136,10 +139,19 @@ pub(crate) fn make_object_store( Ok((store, scheme)) } -fn url_cache_key(url: &Url) -> String { +fn url_cache_key(url: &Url, properties: &HashMap) -> String { + let mut sorted_props: Vec<_> = properties.iter().collect(); + sorted_props.sort_by_key(|(k, _)| *k); + + let props_str: String = sorted_props + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(","); format!( - "{}://{}", + "{}://{};{}", url.scheme(), &url[url::Position::BeforeHost..url::Position::AfterPort], + props_str, ) }