Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
323 changes: 297 additions & 26 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ futures = "0.3.32"
futures-core = "0.3.31"
futures-task = "0.3.31"
futures-util = "0.3.31"
gcp_auth = "0.12.6"
glob = "0.3.3"
globset = "0.4.18"
governor = "0.10.1"
Expand All @@ -366,8 +367,9 @@ hyper-0-14 = { package = "hyper", version = "0.14", features = ["client", "tcp"]
hyper-openssl = "0.10.2"
hyper-util = "0.1.20"
tower-service = "0.3.3"
iceberg = "0.7.0"
iceberg-catalog-rest = "0.7.0"
iceberg = "0.9.0"
iceberg-catalog-rest = "0.9.0"
iceberg-storage-opendal = { version = "0.9.0", default-features = false, features = ["opendal-s3", "opendal-gcs"] }
imbl = { version = "7.0.0", features = ["serde"] }
include_dir = "0.7.4"
indexmap = { version = "2.10.0", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -621,9 +623,10 @@ tiberius = { git = "https://github.com/MaterializeInc/tiberius", rev="64ca594cc2
async-compression = { git = "https://github.com/MaterializeInc/async-compression.git", rev = "fe7411eb6104a02a89e2c3a76ab326dd6594214d" }

# Custom iceberg features for mz
# All changes should go to the `mz_changes` branch.
iceberg = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "c31a98afe789" }
iceberg-catalog-rest = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "c31a98afe789" }
# All changes should go to the `kynan-mz_changes_v0.9.0` branch.
iceberg = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92" }
iceberg-catalog-rest = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92" }
iceberg-storage-opendal = { git = "https://github.com/MaterializeInc/iceberg-rust.git", rev = "cd7349fb1fa9d640eaeb9f39d85d7cebc089fa92" }

# Custom duckdb crate to support mz needs
# All changes should go to the `mz_changes` branch.
Expand Down
20 changes: 19 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,21 @@ skip = [
{ name = "fallible-iterator", version = "0.3.0" },
# arrow
{ name = "hashbrown", version = "0.16.1" },
# Used by dynfmt; iceberg/typetag pulls in v0.4.
{ name = "erased-serde", version = "0.3.26" },
# gcp_auth → hyper-rustls → rustls-native-certs pulls newer versions
# while native-tls still pulls older versions.
{ name = "core-foundation", version = "0.10.1" },
{ name = "security-framework", version = "3.7.0" },
{ name = "openssl-probe", version = "0.2.1" },
# reqsign (via iceberg-storage-opendal / opendal) pins older deps
# than the workspace.
{ name = "jsonwebtoken", version = "9.3.1" },
{ name = "quick-xml", version = "0.37.5" },
# aws-lc-rs (via jsonwebtoken 10) and ring pull different `untrusted`.
{ name = "untrusted", version = "0.7.1" },
# Held back by lazy_static 1.4.0 (used by num-bigint-dig).
{ name = "spin", version = "0.5.2" },
]

[[bans.deny]]
Expand Down Expand Up @@ -204,9 +219,11 @@ wrappers = [
]

# We prefer the system's native TLS or OpenSSL to Rustls, since they are more
# mature and more widely used.
# mature and more widely used. `gcp_auth` only ships with rustls-based TLS,
# so allow it through.
[[bans.deny]]
name = "rustls"
wrappers = ["hyper-rustls", "tokio-rustls"]

# once_cell is going to be added to std, and doesn't use macros
# Unfortunately, its heavily used, so we have lots of exceptions.
Expand All @@ -217,6 +234,7 @@ wrappers = [
"findshlibs",
"launchdarkly-server-sdk",
"launchdarkly-server-sdk-evaluation",
"num-bigint-dig",
"prometheus",
"rayon-core",
"sharded-slab",
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,7 @@ impl CatalogState {
updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
}
ConnectionDetails::Csr(_)
| ConnectionDetails::Gcp(_)
| ConnectionDetails::Postgres(_)
| ConnectionDetails::MySql(_)
| ConnectionDetails::SqlServer(_)
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,7 @@ impl ConnectionResolver for CatalogState {
Ssh(conn) => Ssh(conn),
Aws(conn) => Aws(conn),
AwsPrivatelink(conn) => AwsPrivatelink(conn),
Gcp(conn) => Gcp(conn),
MySql(conn) => MySql(conn.into_inline_connection(self)),
SqlServer(conn) => SqlServer(conn.into_inline_connection(self)),
IcebergCatalog(conn) => IcebergCatalog(conn.into_inline_connection(self)),
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/coord/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,7 @@ impl Coordinator {
ConnectionDetails::Csr(_)
| ConnectionDetails::Ssh { .. }
| ConnectionDetails::Aws(_)
| ConnectionDetails::Gcp(_)
| ConnectionDetails::IcebergCatalog(_) => {}
},
CatalogItem::Table(_) => {
Expand Down Expand Up @@ -1311,6 +1312,7 @@ impl Coordinator {
ConnectionDetails::Csr(_)
| ConnectionDetails::Ssh { .. }
| ConnectionDetails::Aws(_)
| ConnectionDetails::Gcp(_)
| ConnectionDetails::IcebergCatalog(_) => {}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

Abort
Access
Account
Action
Add
Added
Expand Down Expand Up @@ -200,6 +201,7 @@ Full
Fullname
Function
Fusion
Gcp
Generator
Grant
Greatest
Expand Down
28 changes: 22 additions & 6 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ pub enum ConnectionOptionName {
Credential,
Database,
Endpoint,
GcpConnection,
Host,
Password,
Port,
Expand All @@ -791,6 +792,7 @@ pub enum ConnectionOptionName {
Scope,
SecretAccessKey,
SecurityProtocol,
ServiceAccountKey,
ServiceName,
SshTunnel,
SslCertificate,
Expand All @@ -816,6 +818,7 @@ impl AstDisplay for ConnectionOptionName {
ConnectionOptionName::Credential => "CREDENTIAL",
ConnectionOptionName::Database => "DATABASE",
ConnectionOptionName::Endpoint => "ENDPOINT",
ConnectionOptionName::GcpConnection => "GCP CONNECTION",
ConnectionOptionName::Host => "HOST",
ConnectionOptionName::Password => "PASSWORD",
ConnectionOptionName::Port => "PORT",
Expand All @@ -834,6 +837,7 @@ impl AstDisplay for ConnectionOptionName {
ConnectionOptionName::Scope => "SCOPE",
ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
ConnectionOptionName::ServiceAccountKey => "SERVICE ACCOUNT KEY",
ConnectionOptionName::ServiceName => "SERVICE NAME",
ConnectionOptionName::SshTunnel => "SSH TUNNEL",
ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
Expand Down Expand Up @@ -867,6 +871,7 @@ impl WithOptionName for ConnectionOptionName {
| ConnectionOptionName::Credential
| ConnectionOptionName::Database
| ConnectionOptionName::Endpoint
| ConnectionOptionName::GcpConnection
| ConnectionOptionName::Host
| ConnectionOptionName::Password
| ConnectionOptionName::Port
Expand All @@ -883,6 +888,7 @@ impl WithOptionName for ConnectionOptionName {
| ConnectionOptionName::Scope
| ConnectionOptionName::SecurityProtocol
| ConnectionOptionName::SecretAccessKey
| ConnectionOptionName::ServiceAccountKey
| ConnectionOptionName::ServiceName
| ConnectionOptionName::SshTunnel
| ConnectionOptionName::SslCertificate
Expand Down Expand Up @@ -911,6 +917,7 @@ impl_display_t!(ConnectionOption);
pub enum CreateConnectionType {
Aws,
AwsPrivatelink,
Gcp,
Kafka,
Csr,
Postgres,
Expand All @@ -928,6 +935,7 @@ impl CreateConnectionType {
Self::Postgres => "postgres",
Self::Aws => "aws",
Self::AwsPrivatelink => "aws-privatelink",
Self::Gcp => "gcp",
Self::Ssh => "ssh-tunnel",
Self::MySql => "mysql",
Self::SqlServer => "sql-server",
Expand All @@ -954,6 +962,9 @@ impl AstDisplay for CreateConnectionType {
Self::AwsPrivatelink => {
f.write_str("AWS PRIVATELINK");
}
Self::Gcp => {
f.write_str("GCP");
}
Self::Ssh => {
f.write_str("SSH TUNNEL");
}
Expand Down Expand Up @@ -1542,8 +1553,11 @@ pub enum CreateSinkConnection<T: AstInfo> {
headers: Option<Ident>,
},
Iceberg {
connection: T::ItemName,
aws_connection: T::ItemName,
catalog_connection: T::ItemName,

/// AWS creds for the storage layer.
aws_connection: Option<T::ItemName>,

key: Option<SinkKey>,
options: Vec<IcebergSinkConfigOption<T>>,
},
Expand Down Expand Up @@ -1575,20 +1589,22 @@ impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
}
}
CreateSinkConnection::Iceberg {
connection,
catalog_connection,
aws_connection,
key,
options,
} => {
f.write_str("ICEBERG CATALOG CONNECTION ");
f.write_node(connection);
f.write_node(catalog_connection);
if !options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
f.write_str(" USING AWS CONNECTION ");
f.write_node(aws_connection);
if let Some(aws_connection) = aws_connection {
f.write_str(" USING AWS CONNECTION ");
f.write_node(aws_connection);
}
if let Some(key) = key.as_ref() {
f.write_str(" ");
f.write_node(key);
Expand Down
36 changes: 25 additions & 11 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2456,16 +2456,17 @@ impl<'a> Parser<'a> {
TO => true,
_ => unreachable!(),
};
let connection_type = match self
.expect_one_of_keywords(&[AWS, KAFKA, CONFLUENT, POSTGRES, SSH, SQL, MYSQL, ICEBERG])?
{
let connection_type = match self.expect_one_of_keywords(&[
AWS, GCP, KAFKA, CONFLUENT, POSTGRES, SSH, SQL, MYSQL, ICEBERG,
])? {
AWS => {
if self.parse_keyword(PRIVATELINK) {
CreateConnectionType::AwsPrivatelink
} else {
CreateConnectionType::Aws
}
}
GCP => CreateConnectionType::Gcp,
KAFKA => CreateConnectionType::Kafka,
CONFLUENT => {
self.expect_keywords(&[SCHEMA, REGISTRY])?;
Expand Down Expand Up @@ -2786,6 +2787,7 @@ impl<'a> Parser<'a> {
CREDENTIAL,
DATABASE,
ENDPOINT,
GCP,
HOST,
PASSWORD,
PORT,
Expand Down Expand Up @@ -2839,6 +2841,10 @@ impl<'a> Parser<'a> {
CREDENTIAL => ConnectionOptionName::Credential,
DATABASE => ConnectionOptionName::Database,
ENDPOINT => ConnectionOptionName::Endpoint,
GCP => {
self.expect_keyword(CONNECTION)?;
ConnectionOptionName::GcpConnection
}
HOST => ConnectionOptionName::Host,
PASSWORD => ConnectionOptionName::Password,
PORT => ConnectionOptionName::Port,
Expand Down Expand Up @@ -2873,10 +2879,14 @@ impl<'a> Parser<'a> {
self.expect_keywords(&[ACCESS, KEY])?;
ConnectionOptionName::SecretAccessKey
}
SERVICE => {
self.expect_keyword(NAME)?;
ConnectionOptionName::ServiceName
}
SERVICE => match self.expect_one_of_keywords(&[ACCOUNT, NAME])? {
ACCOUNT => {
self.expect_keyword(KEY)?;
ConnectionOptionName::ServiceAccountKey
}
NAME => ConnectionOptionName::ServiceName,
_ => unreachable!(),
},
SESSION => {
self.expect_keyword(TOKEN)?;
ConnectionOptionName::SessionToken
Expand Down Expand Up @@ -2915,6 +2925,7 @@ impl<'a> Parser<'a> {
ConnectionOptionName::Brokers => Some(WithOptionValue::Sequence(
self.parse_list_value(Parser::parse_kafka_broker_or_matching_rule)?,
)),
ConnectionOptionName::GcpConnection => Some(self.parse_object_option_value()?),
ConnectionOptionName::SshTunnel => Some(self.parse_object_option_value()?),
_ => self.parse_optional_option_value()?,
};
Expand Down Expand Up @@ -3856,7 +3867,7 @@ impl<'a> Parser<'a> {
&mut self,
) -> Result<CreateSinkConnection<Raw>, ParserError> {
self.expect_keyword(CONNECTION)?;
let connection = self.parse_raw_name()?;
let catalog_connection = self.parse_raw_name()?;

let options = if self.consume_token(&Token::LParen) {
let options = self.parse_comma_separated(Parser::parse_iceberg_sink_config_option)?;
Expand All @@ -3866,8 +3877,11 @@ impl<'a> Parser<'a> {
vec![]
};

self.expect_keywords(&[USING, AWS, CONNECTION])?;
let aws_connection = self.parse_raw_name()?;
let aws_connection = if self.parse_keywords(&[USING, AWS, CONNECTION]) {
Some(self.parse_raw_name()?)
} else {
None
};

let key = if self.parse_keyword(KEY) {
let key_columns = self.parse_parenthesized_column_list(Mandatory)?;
Expand All @@ -3882,7 +3896,7 @@ impl<'a> Parser<'a> {
};

Ok(CreateSinkConnection::Iceberg {
connection,
catalog_connection,
aws_connection,
key,
options,
Expand Down
Loading
Loading