Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions java/versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ com.esotericsoftware:minlog:1.3.0 (2 constraints: 241b55b7)

com.fasterxml.jackson:jackson-bom:2.20.0 (3 constraints: e4396971)

com.fasterxml.jackson.core:jackson-annotations:2.20 (8 constraints: 3385e8c6)
com.fasterxml.jackson.core:jackson-annotations:2.20 (9 constraints: 4d956f83)

com.fasterxml.jackson.core:jackson-core:2.20.0 (12 constraints: d6b49f0f)

Expand Down Expand Up @@ -204,7 +204,7 @@ org.apache.avro:avro-mapred:1.12.1 (2 constraints: 361a5399)

org.apache.commons:commons-collections4:4.5.0 (2 constraints: de191f6b)

org.apache.commons:commons-compress:1.28.0 (4 constraints: 0b33e88a)
org.apache.commons:commons-compress:1.28.0 (6 constraints: 88536cb8)

org.apache.commons:commons-crypto:1.1.0 (3 constraints: fb2a0ea2)

Expand Down Expand Up @@ -346,7 +346,7 @@ org.json4s:json4s-jackson_2.13:4.0.7 (3 constraints: 0d28f849)

org.json4s:json4s-scalap_2.13:4.0.7 (2 constraints: d2172edd)

org.jspecify:jspecify:1.0.0 (1 constraints: 130ae0b4)
org.jspecify:jspecify:1.0.0 (2 constraints: ec1a087a)

org.locationtech.jts:jts-core:1.20.0 (3 constraints: 1e26b429)

Expand All @@ -372,7 +372,7 @@ org.slf4j:jcl-over-slf4j:2.0.17 (3 constraints: d72fb570)

org.slf4j:jul-to-slf4j:2.0.17 (3 constraints: d72fb570)

org.slf4j:slf4j-api:2.0.17 (32 constraints: c8b7441c)
org.slf4j:slf4j-api:2.0.17 (35 constraints: 57eda16b)

org.threeten:threeten-extra:1.8.0 (2 constraints: 0117967e)

Expand All @@ -390,6 +390,22 @@ ch.qos.logback:logback-classic:1.5.32 (1 constraints: 3d053f3b)

ch.qos.logback:logback-core:1.5.32 (1 constraints: 3c0d442a)

com.adobe.testing:s3mock-testcontainers:4.11.0 (1 constraints: 38053e3b)

com.github.docker-java:docker-java-api:3.4.2 (1 constraints: 3d0f6267)

com.github.docker-java:docker-java-transport:3.4.2 (1 constraints: c71515cb)

com.github.docker-java:docker-java-transport-zerodep:3.4.2 (1 constraints: 3d0f6267)

junit:junit:4.13.2 (1 constraints: 6e0fd276)

net.java.dev.jna:jna:5.13.0 (1 constraints: f71513e1)

org.hamcrest:hamcrest-core:1.3 (1 constraints: cc05fe3f)

org.jetbrains:annotations:17.0.0 (1 constraints: 590d4b31)

org.junit:junit-bom:6.0.3 (7 constraints: 76743380)

org.junit.jupiter:junit-jupiter:6.0.3 (2 constraints: 841045dd)
Expand All @@ -408,4 +424,10 @@ org.junit.platform:junit-platform-launcher:6.0.3 (1 constraints: 10097b95)

org.opentest4j:opentest4j:1.3.0 (2 constraints: cf209249)

org.rnorth.duct-tape:duct-tape:1.0.8 (1 constraints: 3d0f5267)

org.slf4j:slf4j-simple:2.0.17 (1 constraints: 3c05323b)

org.testcontainers:junit-jupiter:1.20.4 (1 constraints: 3905313b)

org.testcontainers:testcontainers:1.21.3 (2 constraints: de1fe2f9)
2 changes: 2 additions & 0 deletions java/versions.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ org.apache.arrow:* = 18.3.0

# Test dependencies
org.junit.jupiter:* = 6.0.3
com.adobe.testing:s3mock-testcontainers = 4.11.0
org.testcontainers:junit-jupiter = 1.20.4
25 changes: 16 additions & 9 deletions java/vortex-jni/src/main/java/dev/vortex/api/Files.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ private Files() {}
/**
* Opens a Vortex file from the specified path string.
*
* @see #open(String, Map)
*/
public static File open(String path) {
return open(path, Map.of());
}

/**
* Opens a Vortex file from the specified path string and format parameters.
*
* <p>This method provides a convenient way to open Vortex files using either
* absolute file system paths or URI strings. If the path starts with "/",
* it is treated as an absolute file system path and converted to a file URI.
Expand All @@ -42,16 +51,15 @@ private Files() {}
* @param path the path to the Vortex file, either as an absolute file system path
* (starting with "/") or as a URI string
* @return a {@link File} instance representing the opened Vortex file
* @throws RuntimeException if the file cannot be opened or the path is invalid
* @throws RuntimeException if the file cannot be opened or the path is invalid
* @throws NullPointerException if path is null
*
* @see #open(URI, Map)
*/
public static File open(String path) {
public static File open(String path, Map<String, String> properties) {
if (path.startsWith("/")) {
return open(Paths.get(path).toUri(), Map.of());
return open(Paths.get(path).toUri(), properties);
}
return open(URI.create(path), Map.of());
return open(URI.create(path), properties);
}

/**
Expand All @@ -66,14 +74,13 @@ public static File open(String path) {
* underlying file system or storage layer. The specific properties supported
* depend on the URI scheme and storage backend being used.
*
* @param uri the URI pointing to the Vortex file to open
* @param uri the URI pointing to the Vortex file to open
* @param properties a map of configuration properties for opening the file;
* may be empty but must not be null
* @return a {@link File} instance representing the opened Vortex file
* @throws RuntimeException if the file cannot be opened, the URI is invalid,
* or the returned native pointer is invalid
* @throws RuntimeException if the file cannot be opened, the URI is invalid,
* or the returned native pointer is invalid
* @throws NullPointerException if uri or properties is null
*
* @see #open(String)
*/
public static File open(URI uri, Map<String, String> properties) {
Expand Down
3 changes: 3 additions & 0 deletions java/vortex-spark/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ testing {
implementation("org.apache.spark:spark-core_2.13")
implementation("org.apache.spark:spark-sql_2.13")
runtimeOnly("org.slf4j:slf4j-simple:2.0.17")
// S3Mock Testcontainers for testing S3 integration (avoids classpath conflicts)
implementation("com.adobe.testing:s3mock-testcontainers")
implementation("org.testcontainers:junit-jupiter")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import dev.vortex.api.File;
import dev.vortex.api.Files;
import dev.vortex.jni.NativeFileMethods;
import dev.vortex.spark.config.HadoopUtils;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.CatalogV2Util;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import scala.Option;

/**
* Spark V2 data source for reading and writing Vortex files.
Expand All @@ -33,13 +37,17 @@ public final class VortexDataSourceV2 implements TableProvider, DataSourceRegist
private static final String PATH_KEY = "path";
private static final String PATHS_KEY = "paths";

private final Option<SparkSession> sparkSession;

/**
* Creates a new instance of the Vortex data source.
* <p>
* This no-argument constructor is required for Spark to instantiate the data source
* through reflection.
*/
public VortexDataSourceV2() {}
public VortexDataSourceV2() {
this.sparkSession = SparkSession.getActiveSession();
}

/**
* Infers the schema of the Vortex files specified in the options.
Expand All @@ -64,12 +72,13 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
return new StructType();
}

var formatOptions = buildDataSourceOptions(options.asCaseSensitiveMap());

var pathToInfer = Objects.requireNonNull(Iterables.getLast(paths));
// If the path is a directory, scan the directory for a file and use that file
if (!pathToInfer.endsWith(".vortex")) {
Optional<String> firstFile =
NativeFileMethods.listVortexFiles(pathToInfer, options.asCaseSensitiveMap()).stream()
.findFirst();
Optional<String> firstFile = NativeFileMethods.listVortexFiles(pathToInfer, formatOptions).stream()
.findFirst();

if (firstFile.isEmpty()) {
// Return empty struct if no files found
Expand All @@ -80,7 +89,7 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
}
}

try (File file = Files.open(pathToInfer)) {
try (File file = Files.open(pathToInfer, formatOptions)) {
var columns = SparkTypes.toColumns(file.getDType());
return CatalogV2Util.v2ColumnsToStructType(columns);
}
Expand All @@ -101,9 +110,8 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
@Override
public Table getTable(StructType schema, Transform[] _partitioning, Map<String, String> properties) {
var uncased = new CaseInsensitiveStringMap(properties);

ImmutableList<String> paths = getPaths(uncased);
return new VortexTable(paths, schema, properties);
return new VortexTable(paths, schema, buildDataSourceOptions(properties));
}

/**
Expand Down Expand Up @@ -132,6 +140,20 @@ public String shortName() {
return "vortex";
}

private Map<String, String> buildDataSourceOptions(Map<String, String> properties) {
var hadoopConf = sparkSession.get().sessionState().newHadoopConf();

var options = ImmutableMap.<String, String>builder();
options.putAll(properties);

// Forward any S3-relevant properties from hadoopConf to the reader config.
options.putAll(HadoopUtils.s3PropertiesFromHadoopConf(hadoopConf));
// Forward any Azure-relevant properties from hadoopConf to the reader config.
options.putAll(HadoopUtils.azurePropertiesFromHadoopConf(hadoopConf));

return options.build();
}

private static ImmutableList<String> getPaths(CaseInsensitiveStringMap uncased) {
if (uncased.containsKey(PATH_KEY)) {
return ImmutableList.of(uncased.get(PATH_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package dev.vortex.spark;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.util.Map;
import org.apache.spark.sql.connector.catalog.Column;
import org.apache.spark.sql.connector.read.InputPartition;

Expand All @@ -18,16 +20,18 @@
public final class VortexFilePartition implements InputPartition, Serializable {
private final String path;
private final ImmutableList<Column> columns;
private final ImmutableMap<String, String> formatOptions;

/**
* Creates a new Vortex file partition.
*
* @param path the file system path to the Vortex file
* @param columns the list of columns to read from the file
*/
public VortexFilePartition(String path, ImmutableList<Column> columns) {
public VortexFilePartition(String path, ImmutableList<Column> columns, ImmutableMap<String, String> formatOptions) {
this.path = path;
this.columns = columns;
this.formatOptions = formatOptions;
}

/**
Expand All @@ -47,4 +51,8 @@ public String getPath() {
public ImmutableList<Column> getColumns() {
return columns;
}

public Map<String, String> getFormatOptions() {
return formatOptions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

package dev.vortex.spark.config;

import java.util.Map;
import org.apache.hadoop.conf.Configuration;

public final class HadoopUtils {
private HadoopUtils() {}

static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
static final String FS_S3A_SESSION_TOKEN = "fs.s3a.session.token";
static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
static final String FS_S3A_ENDPOINT_REGION = "fs.s3a.endpoint.region";

public static Map<String, String> s3PropertiesFromHadoopConf(Configuration hadoopConf) {
VortexS3Properties properties = new VortexS3Properties();

for (Map.Entry<String, String> entry : hadoopConf) {
switch (entry.getKey()) {
case FS_S3A_ACCESS_KEY:
properties.setAccessKeyId(entry.getValue());
break;
case FS_S3A_SECRET_KEY:
properties.setSecretAccessKey(entry.getValue());
break;
case FS_S3A_SESSION_TOKEN:
properties.setSessionToken(entry.getValue());
break;
case FS_S3A_ENDPOINT:
String qualified = entry.getValue();
if (!qualified.startsWith("http")) {
qualified = "https://" + qualified;
}
properties.setEndpoint(qualified);
break;
case FS_S3A_ENDPOINT_REGION:
properties.setRegion(entry.getValue());
break;
default:
break;
}
}

return properties.asProperties();
}

static final String ACCESS_KEY_PREFIX = "fs.azure.account.key";
static final String FIXED_TOKEN_PREFIX = "fs.azure.sas.fixed.token.";

public static Map<String, String> azurePropertiesFromHadoopConf(Configuration hadoopConf) {
VortexAzureProperties properties = new VortexAzureProperties();

// TODO(aduffy): match on storage account name.
for (Map.Entry<String, String> entry : hadoopConf) {
String configKey = entry.getKey();
if (configKey.startsWith(ACCESS_KEY_PREFIX)) {
properties.setAccessKey(entry.getValue());
} else if (configKey.startsWith(FIXED_TOKEN_PREFIX)) {
properties.setSasKey(entry.getValue());
}
}

if (properties.accessKey().isEmpty()) {
properties.setSkipSignature(true);
}

return properties.asProperties();
}
}
Loading
Loading