Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/databricks/databricks-sql-go/auth/tokenprovider"
"github.com/databricks/databricks-sql-go/driverctx"
dbsqlerr "github.com/databricks/databricks-sql-go/errors"
"github.com/databricks/databricks-sql-go/internal/agent"
"github.com/databricks/databricks-sql-go/internal/cli_service"
"github.com/databricks/databricks-sql-go/internal/client"
"github.com/databricks/databricks-sql-go/internal/config"
Expand Down Expand Up @@ -95,12 +96,11 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
conn.telemetry = telemetry.InitializeForConnection(ctx, telemetry.TelemetryInitOptions{
Host: c.cfg.Host,
DriverVersion: c.cfg.DriverVersion,
UserAgent: buildUserAgent(c.cfg),
HTTPClient: telemetryClient,
EnableTelemetry: c.cfg.EnableTelemetry,
BatchSize: c.cfg.TelemetryBatchSize,
FlushInterval: c.cfg.TelemetryFlushInterval,
RetryCount: c.cfg.TelemetryRetryCount,
RetryDelay: c.cfg.TelemetryRetryDelay,
})
if conn.telemetry != nil {
log.Debug().Msg("telemetry initialized for connection")
Expand All @@ -117,6 +117,21 @@ func (c *connector) Driver() driver.Driver {
return &databricksDriver{}
}

// buildUserAgent constructs the User-Agent header value used by the driver.
// Mirrors the format set on the Thrift HTTP client in
// internal/client/client.go so telemetry, feature-flag, and Thrift requests
// all carry the same identifier.
func buildUserAgent(cfg *config.Config) string {
userAgent := fmt.Sprintf("%s/%s", cfg.DriverName, cfg.DriverVersion)
if cfg.UserAgentEntry != "" {
userAgent = fmt.Sprintf("%s/%s (%s)", cfg.DriverName, cfg.DriverVersion, cfg.UserAgentEntry)
}
if agentProduct := agent.Detect(); agentProduct != "" {
userAgent = fmt.Sprintf("%s agent/%s", userAgent, agentProduct)
}
return userAgent
}

var _ driver.Connector = (*connector)(nil)

type ConnOption func(*config.Config)
Expand Down
12 changes: 6 additions & 6 deletions telemetry/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestAggregatorClose_WaitsForInFlightWorkerExports(t *testing.T) {
cfg.BatchSize = 1 // one metric per batch → one worker export per metric
httpClient := &http.Client{Timeout: 5 * time.Second}

exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg)
agg := newMetricsAggregator(exporter, cfg)

ctx := context.Background()
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestAggregatorClose_DrainsPendingQueueJobsBeforeCancel(t *testing.T) {
cfg.BatchSize = 100 // large batch — won't auto-flush on size
httpClient := &http.Client{Timeout: 5 * time.Second}

exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg)

// Use a single-worker aggregator with a tiny queue to make the "pending in queue"
// scenario deterministic: we manually call flushUnlocked to enqueue a job.
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestAggregatorFlushUnlocked_InFlightAddBeforeSend(t *testing.T) {
cfg.BatchSize = 1
httpClient := &http.Client{Timeout: 5 * time.Second}

exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg)
agg := newMetricsAggregator(exporter, cfg)
ctx := context.Background()

Expand Down Expand Up @@ -183,7 +183,7 @@ func TestAggregatorClose_SafeToCallMultipleTimes(t *testing.T) {

cfg := DefaultConfig()
httpClient := &http.Client{Timeout: 5 * time.Second}
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg)
agg := newMetricsAggregator(exporter, cfg)
ctx := context.Background()

Expand Down Expand Up @@ -225,7 +225,7 @@ func TestAggregatorFlushUnlocked_DropWhenQueueFull(t *testing.T) {
httpClient := &http.Client{Timeout: 1 * time.Second}

// Use a no-op exporter — we never actually export in this test.
exporter := newTelemetryExporter("http://127.0.0.1:0", "test-version", httpClient, cfg)
exporter := newTelemetryExporter("http://127.0.0.1:0", "test-version", "test-ua", httpClient, cfg)
agg := newMetricsAggregator(exporter, cfg)

// Cancel the aggregator context immediately so workers stop consuming from the queue.
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestAggregatorClose_RespectsContextTimeout(t *testing.T) {
cfg.BatchSize = 1
httpClient := &http.Client{Timeout: 10 * time.Second}

exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg)
agg := newMetricsAggregator(exporter, cfg)

// Record a metric that triggers an immediate flush (terminal op).
Expand Down
17 changes: 8 additions & 9 deletions telemetry/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func BenchmarkInterceptor_Overhead_Enabled(b *testing.B) {
cfg.BatchSize = 1000
cfg.FlushInterval = 10 * time.Minute // suppress periodic flush during bench

exporter := newTelemetryExporter("localhost", "test-version", &http.Client{}, cfg)
exporter := newTelemetryExporter("localhost", "test-version", "test-ua", &http.Client{}, cfg)
agg := newMetricsAggregator(exporter, cfg)
defer agg.close(context.Background()) //nolint:errcheck

Expand All @@ -44,7 +44,7 @@ func BenchmarkInterceptor_Overhead_Enabled(b *testing.B) {
// The delta between Enabled and Disabled is the pure telemetry cost.
func BenchmarkInterceptor_Overhead_Disabled(b *testing.B) {
cfg := DefaultConfig()
exporter := newTelemetryExporter("localhost", "test-version", &http.Client{}, cfg)
exporter := newTelemetryExporter("localhost", "test-version", "test-ua", &http.Client{}, cfg)
agg := newMetricsAggregator(exporter, cfg)
defer agg.close(context.Background()) //nolint:errcheck

Expand All @@ -66,7 +66,7 @@ func BenchmarkAggregator_RecordMetric(b *testing.B) {
cfg.BatchSize = 10000
cfg.FlushInterval = 10 * time.Minute

exporter := newTelemetryExporter("localhost", "test-version", &http.Client{}, cfg)
exporter := newTelemetryExporter("localhost", "test-version", "test-ua", &http.Client{}, cfg)
agg := newMetricsAggregator(exporter, cfg)
defer agg.close(context.Background()) //nolint:errcheck

Expand Down Expand Up @@ -95,9 +95,8 @@ func BenchmarkExporter_Export(b *testing.B) {
defer server.Close()

cfg := DefaultConfig()
cfg.MaxRetries = 0
httpClient := &http.Client{Timeout: 5 * time.Second}
exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg)

metrics := make([]*telemetryMetric, 10)
for i := range metrics {
Expand Down Expand Up @@ -136,7 +135,7 @@ func BenchmarkConcurrentConnections_PerHostSharing(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
client := manager.getOrCreateClient(host, "test-version", httpClient, cfg)
client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg)
if client != nil {
_ = manager.releaseClient(host)
}
Expand Down Expand Up @@ -185,7 +184,7 @@ func TestLoadTesting_ConcurrentConnections(t *testing.T) {
go func() {
defer wg.Done()

client := manager.getOrCreateClient(host, "test-version", httpClient, cfg)
client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg)
if client == nil {
atomic.AddInt64(&errors, 1)
return
Expand Down Expand Up @@ -237,7 +236,7 @@ func TestGracefulShutdown_ReferenceCountingCleanup(t *testing.T) {
// Open 3 connections per host
for _, host := range hosts {
for i := 0; i < 3; i++ {
if client := manager.getOrCreateClient(host, "test-version", httpClient, cfg); client == nil {
if client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg); client == nil {
t.Fatalf("expected client for host %s", host)
}
}
Expand Down Expand Up @@ -288,7 +287,7 @@ func TestGracefulShutdown_FinalFlush(t *testing.T) {
cfg.FlushInterval = 10 * time.Minute // prevent auto-flush
httpClient := &http.Client{Timeout: 5 * time.Second}

exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg)
exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg)
agg := newMetricsAggregator(exporter, cfg)

ctx := context.Background()
Expand Down
19 changes: 13 additions & 6 deletions telemetry/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,21 @@ type circuitBreakerConfig struct {
permittedCallsInHalfOpen int // Number of test calls in half-open state
}

// defaultCircuitBreakerConfig returns default configuration matching JDBC.
// defaultCircuitBreakerConfig returns default configuration.
//
// Each export call is now a single logical request to /telemetry-ext (the
// retryablehttp layer handles transient retries internally), so each breaker
// call corresponds to one observed outcome. minimumNumberOfCalls is set low
// enough that low-traffic clients can still trip the breaker on a sustained
// outage; waitDurationInOpenState is long enough to respect typical
// Retry-After windows from the server.
func defaultCircuitBreakerConfig() circuitBreakerConfig {
return circuitBreakerConfig{
failureRateThreshold: 50, // 50% failure rate
minimumNumberOfCalls: 20, // Minimum sample size
slidingWindowSize: 30, // Keep recent 30 calls
waitDurationInOpenState: 30 * time.Second,
permittedCallsInHalfOpen: 3, // Test with 3 calls
failureRateThreshold: 50,
minimumNumberOfCalls: 10,
slidingWindowSize: 30,
waitDurationInOpenState: 60 * time.Second,
permittedCallsInHalfOpen: 3,
}
}

Expand Down
4 changes: 2 additions & 2 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type telemetryClient struct {
}

// newTelemetryClient creates a new telemetry client for the given host.
func newTelemetryClient(host string, driverVersion string, httpClient *http.Client, cfg *Config) *telemetryClient {
func newTelemetryClient(host string, driverVersion string, userAgent string, httpClient *http.Client, cfg *Config) *telemetryClient {
// Create exporter
exporter := newTelemetryExporter(host, driverVersion, httpClient, cfg)
exporter := newTelemetryExporter(host, driverVersion, userAgent, httpClient, cfg)

// Create aggregator with exporter
aggregator := newMetricsAggregator(exporter, cfg)
Expand Down
52 changes: 8 additions & 44 deletions telemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,6 @@ import (
"net/http"
"strconv"
"time"

"github.com/databricks/databricks-sql-go/logger"
)

const (
// maxTelemetryRetryCount caps DSN-provided retry count to prevent
// excessive retries from misconfiguration.
maxTelemetryRetryCount = 10

// maxTelemetryRetryDelay caps DSN-provided retry delay to prevent
// excessively long backoff from misconfiguration.
maxTelemetryRetryDelay = 30 * time.Second
)

// Config holds telemetry configuration.
Expand All @@ -36,12 +24,6 @@ type Config struct {
// FlushInterval is how often to flush metrics
FlushInterval time.Duration

// MaxRetries is the maximum number of retry attempts
MaxRetries int

// RetryDelay is the base delay between retries
RetryDelay time.Duration

// CircuitBreakerEnabled enables circuit breaker protection
CircuitBreakerEnabled bool

Expand All @@ -65,10 +47,8 @@ func DefaultConfig() *Config {
return &Config{
Enabled: false,
EnableTelemetry: nil, // unset — server feature flag decides
BatchSize: 100,
FlushInterval: 5 * time.Second,
MaxRetries: 3,
RetryDelay: 100 * time.Millisecond,
BatchSize: 200,
FlushInterval: 30 * time.Second,
CircuitBreakerEnabled: true,
CircuitBreakerThreshold: 5,
CircuitBreakerTimeout: 1 * time.Minute,
Expand Down Expand Up @@ -97,26 +77,10 @@ func ParseTelemetryConfig(params map[string]string) *Config {
}
}

if v, ok := params["telemetry_retry_count"]; ok {
if n, err := strconv.Atoi(v); err == nil && n >= 0 {
if n > maxTelemetryRetryCount {
logger.Debug().Msgf("telemetry: retry_count %d exceeds max %d, clamping", n, maxTelemetryRetryCount)
n = maxTelemetryRetryCount
}
cfg.MaxRetries = n
}
}

if v, ok := params["telemetry_retry_delay"]; ok {
if d, err := time.ParseDuration(v); err == nil && d > 0 {
if d > maxTelemetryRetryDelay {
logger.Debug().Msgf("telemetry: retry_delay %v exceeds max %v, clamping", d, maxTelemetryRetryDelay)
d = maxTelemetryRetryDelay
}
cfg.RetryDelay = d
}
}

// Note: telemetry_retry_count and telemetry_retry_delay DSN parameters
// are accepted for backwards compatibility but are no longer applied
// here. Retries are owned by the underlying retryablehttp-wrapped client
// (see internal/client.RetryableClient), which honors Retry-After.
return cfg
}

Expand All @@ -126,12 +90,12 @@ func ParseTelemetryConfig(params map[string]string) *Config {
// (databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver).
//
// In all other cases — explicit opt-out or server flag absent/unreachable — returns false.
func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, driverVersion string, httpClient *http.Client) bool {
func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, driverVersion string, userAgent string, httpClient *http.Client) bool {
if cfg.EnableTelemetry != nil {
return *cfg.EnableTelemetry
}

serverEnabled, err := getFeatureFlagCache().isTelemetryEnabled(ctx, host, driverVersion, httpClient)
serverEnabled, err := getFeatureFlagCache().isTelemetryEnabled(ctx, host, driverVersion, userAgent, httpClient)
if err != nil {
return false
}
Expand Down
Loading
Loading