From 0fc567b63d3dafba67be5b40a61b931d507b878f Mon Sep 17 00:00:00 2001 From: Pat S Date: Tue, 17 Mar 2026 22:16:42 +0100 Subject: [PATCH] fix: correct SDP streaming SQL TVF syntax for Kafka, Kinesis, and Event Hub MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace invalid read_stream() with read_kafka() and read_kinesis() TVFs - Fix parameter names: bootstrapServers, initialPosition, streamName - Fix secret syntax: {{secrets/...}} → secret('scope', 'key') - Backtick-quote dot-notation Kafka options (kafka.security.protocol, etc.) - Add serviceCredential auth option for managed Kafka - Add Kinesis IAM role and env-var auth patterns - Replace read_stream(format=>'eventhubs') with read_kafka() + SASL/SSL JAAS config - Add pipelines.reset.allowed config key to 7-advanced-configuration.md Co-Authored-By: Claude Sonnet 4.6 --- .../1-ingestion-patterns.md | 113 +++++++++++++----- .../7-advanced-configuration.md | 1 + 2 files changed, 85 insertions(+), 29 deletions(-) diff --git a/databricks-skills/databricks-spark-declarative-pipelines/1-ingestion-patterns.md b/databricks-skills/databricks-spark-declarative-pipelines/1-ingestion-patterns.md index 2f60202f..f9cd73da 100644 --- a/databricks-skills/databricks-spark-declarative-pipelines/1-ingestion-patterns.md +++ b/databricks-skills/databricks-spark-declarative-pipelines/1-ingestion-patterns.md @@ -209,23 +209,40 @@ SELECT offset, timestamp AS kafka_timestamp, current_timestamp() AS _ingested_at -FROM read_stream( - format => 'kafka', - kafka.bootstrap.servers => '${kafka_brokers}', +FROM STREAM read_kafka( + bootstrapServers => '${kafka_brokers}', subscribe => 'events-topic', - startingOffsets => 'latest', -- or 'earliest' - kafka.security.protocol => 'SASL_SSL', - kafka.sasl.mechanism => 'PLAIN', - kafka.sasl.jaas.config => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="${kafka_username}" password="${kafka_password}";' + startingOffsets => 'latest' -- or 'earliest' ); ``` +**With a Databricks service credential** (recommended for managed auth): +```sql +FROM STREAM read_kafka( + bootstrapServers => '${kafka_brokers}', + subscribe => 'events-topic', + serviceCredential => 'my-kafka-credential' +) +``` + +**With explicit SASL/SSL** (external clusters): +```sql +FROM STREAM read_kafka( + bootstrapServers => '${kafka_brokers}', + subscribe => 'events-topic', + startingOffsets => 'latest', + `kafka.security.protocol` => 'SASL_SSL', + `kafka.sasl.mechanism` => 'PLAIN', + `kafka.sasl.username` => secret('kafka', 'username'), + `kafka.sasl.password` => secret('kafka', 'password') +) +``` + ### Kafka with Multiple Topics ```sql -FROM read_stream( - format => 'kafka', - kafka.bootstrap.servers => '${kafka_brokers}', +FROM STREAM read_kafka( + bootstrapServers => '${kafka_brokers}', subscribe => 'topic1,topic2,topic3', startingOffsets => 'latest' ) @@ -233,24 +250,33 @@ FROM read_stream( ### Azure Event Hub +There is no `read_eventhub` TVF. Use `read_kafka` with the Kafka-compatible endpoint. The SAS connection string becomes the JAAS config password — pass it as a pipeline variable or via `secret()`. + ```sql CREATE OR REPLACE STREAMING TABLE bronze_eventhub_events AS SELECT - CAST(body AS STRING) AS event_body, - enqueuedTime AS event_time, + CAST(value AS STRING) AS event_body, + timestamp AS event_time, + partition, offset, - sequenceNumber, - current_timestamp() AS _ingested_at -FROM read_stream( - format => 'eventhubs', - eventhubs.connectionString => '${eventhub_connection_string}', - eventhubs.consumerGroup => '${consumer_group}', - startingPosition => 'latest' + current_timestamp() AS _ingested_at +FROM STREAM read_kafka( + bootstrapServers => '${eventhub_namespace}.servicebus.windows.net:9093', + subscribe => '${eventhub_name}', + startingOffsets => 'latest', + `kafka.security.protocol` => 'SASL_SSL', + `kafka.sasl.mechanism` => 'PLAIN', + `kafka.sasl.jaas.config` => concat( + 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="', + '${eventhub_connection_string}', + '";' + ) ); ``` ### AWS Kinesis +**With explicit credentials** (using secrets): ```sql CREATE OR REPLACE STREAMING TABLE bronze_kinesis_events AS SELECT @@ -259,14 +285,24 @@ SELECT sequenceNumber, approximateArrivalTimestamp AS arrival_time, current_timestamp() AS _ingested_at -FROM read_stream( - format => 'kinesis', - kinesis.streamName => '${stream_name}', - kinesis.region => '${aws_region}', - kinesis.startingPosition => 'LATEST' +FROM STREAM read_kinesis( + streamName => '${stream_name}', + awsAccessKey => secret('kinesis', 'awsAccessKey'), + awsSecretKey => secret('kinesis', 'awsSecretKey'), + initialPosition => 'latest' ); ``` +**With IAM role**: +```sql +FROM STREAM read_kinesis( + streamName => '${stream_name}', + initialPosition => 'latest', + roleArn => 'arn:aws:iam::123456789012:role/MyRole', + roleSessionName => 'my-pipeline-session' +) +``` + ### Parse JSON from Streaming Sources ```sql @@ -300,21 +336,40 @@ FROM STREAM silver_kafka_parsed; ### Using Databricks Secrets -**Kafka**: +Use the `secret('scope', 'key')` SQL function to inject secrets into TVF parameters. The `{{secrets/scope/key}}` template syntax is **not valid** in SDP SQL. + +**Kafka — service credential** (recommended): ```sql -kafka.sasl.jaas.config => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{{secrets/kafka/username}}" password="{{secrets/kafka/password}}";' +serviceCredential => 'my-kafka-credential' ``` -**Event Hub**: +**Kafka — SASL/SSL** (external clusters): ```sql -eventhubs.connectionString => '{{secrets/eventhub/connection-string}}' +`kafka.security.protocol` => 'SASL_SSL', +`kafka.sasl.mechanism` => 'PLAIN', +`kafka.sasl.username` => secret('kafka', 'username'), +`kafka.sasl.password` => secret('kafka', 'password') ``` +**Kinesis — explicit credentials**: +```sql +awsAccessKey => secret('kinesis', 'awsAccessKey'), +awsSecretKey => secret('kinesis', 'awsSecretKey') +``` + +**Kinesis — IAM role**: +```sql +roleArn => 'arn:aws:iam::123456789012:role/MyRole', +roleSessionName => 'my-pipeline-session' +``` + +**Kinesis — environment variables**: no auth params needed if `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` are set on the cluster. + ### Using Pipeline Variables Reference variables in SQL: ```sql -kafka.bootstrap.servers => '${kafka_brokers}' +bootstrapServers => '${kafka_brokers}' ``` Define in pipeline configuration: diff --git a/databricks-skills/databricks-spark-declarative-pipelines/7-advanced-configuration.md b/databricks-skills/databricks-spark-declarative-pipelines/7-advanced-configuration.md index a6c8ecf3..10204a41 100644 --- a/databricks-skills/databricks-spark-declarative-pipelines/7-advanced-configuration.md +++ b/databricks-skills/databricks-spark-declarative-pipelines/7-advanced-configuration.md @@ -98,6 +98,7 @@ Common configuration keys (all values must be strings): | `spark.sql.shuffle.partitions` | Number of shuffle partitions (`"auto"` recommended) | | `pipelines.numRetries` | Number of retries on transient failures | | `pipelines.trigger.interval` | Trigger interval for continuous pipelines, e.g., `"1 hour"` | +| `pipelines.reset.allowed` | Set to `"false"` to preserve Delta table data on full refresh | | `spark.databricks.delta.preview.enabled` | Enable Delta preview features (`"true"`) | ### `run_as` Object - Pipeline Execution Identity