From d47bf99f0c561638c25d39609cb7b33eb346abb4 Mon Sep 17 00:00:00 2001 From: samikshya-chand_data Date: Tue, 5 May 2026 15:37:35 +0530 Subject: [PATCH] Telemetry: stop double-retrying, identify traffic, tune breaker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After v1.11.0 enabled telemetry by default via the server feature flag, high-QPS workloads produced excessive 429s on /telemetry-ext. Three issues compounded: 1. Double-retry. The exporter ran its own retry loop on top of the retryablehttp-wrapped HTTP client (internal/client.RetryableClient), which already retries 429/5xx with Retry-After. Result: up to RetryMax * (MaxRetries+1) HTTP attempts per export, all collapsed into one circuit-breaker outcome — so the breaker barely opened. 2. Untraceable in access logs. Telemetry POSTs and feature-flag GETs sent no User-Agent, so 429s were tagged Go-http-client/1.1 and could not be attributed to godatabrickssqlconnector by version. 3. High request volume. FlushInterval=5s, BatchSize=100. Changes: - telemetry/exporter.go: drop the retry loop entirely. doExport now makes a single HTTP request; transient retries (429/5xx, Retry-After) are owned by the underlying retryablehttp client. Each export call → exactly one breaker outcome. - telemetry/exporter.go, telemetry/featureflag.go: set User-Agent header on telemetry POST and feature-flag GET. Built once at the connector site (buildUserAgent in connector.go), mirroring internal/client/client.go format (DriverName/DriverVersion + UserAgentEntry + agent product), and plumbed via TelemetryInitOptions.UserAgent. - telemetry/config.go: FlushInterval 5s → 30s, BatchSize 100 → 200. Remove MaxRetries/RetryDelay from telemetry.Config and TelemetryInitOptions; telemetry_retry_count/_delay DSN params still parse for backwards compat but are no-ops. - telemetry/circuitbreaker.go: lower minimumNumberOfCalls 20 → 10 (so low-traffic clients can still trip the breaker on a sustained outage now that each export is one signal), and raise waitDurationInOpenState 30s → 60s (respect typical Retry-After). - Tests: removed obsolete retry/backoff tests; added single-attempt assertion across 4xx/429/5xx; added User-Agent assertions on both endpoints. Co-authored-by: Isaac --- connector.go | 19 ++- telemetry/aggregator_test.go | 12 +- telemetry/benchmark_test.go | 17 ++- telemetry/circuitbreaker.go | 19 ++- telemetry/client.go | 4 +- telemetry/config.go | 52 ++----- telemetry/config_test.go | 127 ++++------------- telemetry/driver_integration.go | 34 ++--- telemetry/exporter.go | 87 ++++-------- telemetry/exporter_test.go | 235 +++++++------------------------- telemetry/featureflag.go | 9 +- telemetry/featureflag_test.go | 44 ++++-- telemetry/integration_test.go | 13 +- telemetry/manager.go | 4 +- telemetry/manager_test.go | 36 ++--- 15 files changed, 234 insertions(+), 478 deletions(-) diff --git a/connector.go b/connector.go index 4d019651..d86d5679 100644 --- a/connector.go +++ b/connector.go @@ -16,6 +16,7 @@ import ( "github.com/databricks/databricks-sql-go/auth/tokenprovider" "github.com/databricks/databricks-sql-go/driverctx" dbsqlerr "github.com/databricks/databricks-sql-go/errors" + "github.com/databricks/databricks-sql-go/internal/agent" "github.com/databricks/databricks-sql-go/internal/cli_service" "github.com/databricks/databricks-sql-go/internal/client" "github.com/databricks/databricks-sql-go/internal/config" @@ -95,12 +96,11 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) { conn.telemetry = telemetry.InitializeForConnection(ctx, telemetry.TelemetryInitOptions{ Host: c.cfg.Host, DriverVersion: c.cfg.DriverVersion, + UserAgent: buildUserAgent(c.cfg), HTTPClient: telemetryClient, EnableTelemetry: c.cfg.EnableTelemetry, BatchSize: c.cfg.TelemetryBatchSize, FlushInterval: c.cfg.TelemetryFlushInterval, - RetryCount: c.cfg.TelemetryRetryCount, - RetryDelay: c.cfg.TelemetryRetryDelay, }) if conn.telemetry != nil { log.Debug().Msg("telemetry initialized for connection") @@ -117,6 +117,21 @@ func (c *connector) Driver() driver.Driver { return &databricksDriver{} } +// buildUserAgent constructs the User-Agent header value used by the driver. +// Mirrors the format set on the Thrift HTTP client in +// internal/client/client.go so telemetry, feature-flag, and Thrift requests +// all carry the same identifier. +func buildUserAgent(cfg *config.Config) string { + userAgent := fmt.Sprintf("%s/%s", cfg.DriverName, cfg.DriverVersion) + if cfg.UserAgentEntry != "" { + userAgent = fmt.Sprintf("%s/%s (%s)", cfg.DriverName, cfg.DriverVersion, cfg.UserAgentEntry) + } + if agentProduct := agent.Detect(); agentProduct != "" { + userAgent = fmt.Sprintf("%s agent/%s", userAgent, agentProduct) + } + return userAgent +} + var _ driver.Connector = (*connector)(nil) type ConnOption func(*config.Config) diff --git a/telemetry/aggregator_test.go b/telemetry/aggregator_test.go index f98c44c5..0bc2a566 100644 --- a/telemetry/aggregator_test.go +++ b/telemetry/aggregator_test.go @@ -40,7 +40,7 @@ func TestAggregatorClose_WaitsForInFlightWorkerExports(t *testing.T) { cfg.BatchSize = 1 // one metric per batch → one worker export per metric httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) ctx := context.Background() @@ -98,7 +98,7 @@ func TestAggregatorClose_DrainsPendingQueueJobsBeforeCancel(t *testing.T) { cfg.BatchSize = 100 // large batch — won't auto-flush on size httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) // Use a single-worker aggregator with a tiny queue to make the "pending in queue" // scenario deterministic: we manually call flushUnlocked to enqueue a job. @@ -150,7 +150,7 @@ func TestAggregatorFlushUnlocked_InFlightAddBeforeSend(t *testing.T) { cfg.BatchSize = 1 httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) ctx := context.Background() @@ -183,7 +183,7 @@ func TestAggregatorClose_SafeToCallMultipleTimes(t *testing.T) { cfg := DefaultConfig() httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) ctx := context.Background() @@ -225,7 +225,7 @@ func TestAggregatorFlushUnlocked_DropWhenQueueFull(t *testing.T) { httpClient := &http.Client{Timeout: 1 * time.Second} // Use a no-op exporter — we never actually export in this test. - exporter := newTelemetryExporter("http://127.0.0.1:0", "test-version", httpClient, cfg) + exporter := newTelemetryExporter("http://127.0.0.1:0", "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) // Cancel the aggregator context immediately so workers stop consuming from the queue. @@ -303,7 +303,7 @@ func TestAggregatorClose_RespectsContextTimeout(t *testing.T) { cfg.BatchSize = 1 httpClient := &http.Client{Timeout: 10 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) // Record a metric that triggers an immediate flush (terminal op). diff --git a/telemetry/benchmark_test.go b/telemetry/benchmark_test.go index ea20e1fe..f42011f9 100644 --- a/telemetry/benchmark_test.go +++ b/telemetry/benchmark_test.go @@ -25,7 +25,7 @@ func BenchmarkInterceptor_Overhead_Enabled(b *testing.B) { cfg.BatchSize = 1000 cfg.FlushInterval = 10 * time.Minute // suppress periodic flush during bench - exporter := newTelemetryExporter("localhost", "test-version", &http.Client{}, cfg) + exporter := newTelemetryExporter("localhost", "test-version", "test-ua", &http.Client{}, cfg) agg := newMetricsAggregator(exporter, cfg) defer agg.close(context.Background()) //nolint:errcheck @@ -44,7 +44,7 @@ func BenchmarkInterceptor_Overhead_Enabled(b *testing.B) { // The delta between Enabled and Disabled is the pure telemetry cost. func BenchmarkInterceptor_Overhead_Disabled(b *testing.B) { cfg := DefaultConfig() - exporter := newTelemetryExporter("localhost", "test-version", &http.Client{}, cfg) + exporter := newTelemetryExporter("localhost", "test-version", "test-ua", &http.Client{}, cfg) agg := newMetricsAggregator(exporter, cfg) defer agg.close(context.Background()) //nolint:errcheck @@ -66,7 +66,7 @@ func BenchmarkAggregator_RecordMetric(b *testing.B) { cfg.BatchSize = 10000 cfg.FlushInterval = 10 * time.Minute - exporter := newTelemetryExporter("localhost", "test-version", &http.Client{}, cfg) + exporter := newTelemetryExporter("localhost", "test-version", "test-ua", &http.Client{}, cfg) agg := newMetricsAggregator(exporter, cfg) defer agg.close(context.Background()) //nolint:errcheck @@ -95,9 +95,8 @@ func BenchmarkExporter_Export(b *testing.B) { defer server.Close() cfg := DefaultConfig() - cfg.MaxRetries = 0 httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) metrics := make([]*telemetryMetric, 10) for i := range metrics { @@ -136,7 +135,7 @@ func BenchmarkConcurrentConnections_PerHostSharing(b *testing.B) { b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if client != nil { _ = manager.releaseClient(host) } @@ -185,7 +184,7 @@ func TestLoadTesting_ConcurrentConnections(t *testing.T) { go func() { defer wg.Done() - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if client == nil { atomic.AddInt64(&errors, 1) return @@ -237,7 +236,7 @@ func TestGracefulShutdown_ReferenceCountingCleanup(t *testing.T) { // Open 3 connections per host for _, host := range hosts { for i := 0; i < 3; i++ { - if client := manager.getOrCreateClient(host, "test-version", httpClient, cfg); client == nil { + if client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg); client == nil { t.Fatalf("expected client for host %s", host) } } @@ -288,7 +287,7 @@ func TestGracefulShutdown_FinalFlush(t *testing.T) { cfg.FlushInterval = 10 * time.Minute // prevent auto-flush httpClient := &http.Client{Timeout: 5 * time.Second} - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) agg := newMetricsAggregator(exporter, cfg) ctx := context.Background() diff --git a/telemetry/circuitbreaker.go b/telemetry/circuitbreaker.go index e399f03f..7088afa6 100644 --- a/telemetry/circuitbreaker.go +++ b/telemetry/circuitbreaker.go @@ -62,14 +62,21 @@ type circuitBreakerConfig struct { permittedCallsInHalfOpen int // Number of test calls in half-open state } -// defaultCircuitBreakerConfig returns default configuration matching JDBC. +// defaultCircuitBreakerConfig returns default configuration. +// +// Each export call is now a single logical request to /telemetry-ext (the +// retryablehttp layer handles transient retries internally), so each breaker +// call corresponds to one observed outcome. minimumNumberOfCalls is set low +// enough that low-traffic clients can still trip the breaker on a sustained +// outage; waitDurationInOpenState is long enough to respect typical +// Retry-After windows from the server. func defaultCircuitBreakerConfig() circuitBreakerConfig { return circuitBreakerConfig{ - failureRateThreshold: 50, // 50% failure rate - minimumNumberOfCalls: 20, // Minimum sample size - slidingWindowSize: 30, // Keep recent 30 calls - waitDurationInOpenState: 30 * time.Second, - permittedCallsInHalfOpen: 3, // Test with 3 calls + failureRateThreshold: 50, + minimumNumberOfCalls: 10, + slidingWindowSize: 30, + waitDurationInOpenState: 60 * time.Second, + permittedCallsInHalfOpen: 3, } } diff --git a/telemetry/client.go b/telemetry/client.go index 25c68e88..0432a086 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -31,9 +31,9 @@ type telemetryClient struct { } // newTelemetryClient creates a new telemetry client for the given host. -func newTelemetryClient(host string, driverVersion string, httpClient *http.Client, cfg *Config) *telemetryClient { +func newTelemetryClient(host string, driverVersion string, userAgent string, httpClient *http.Client, cfg *Config) *telemetryClient { // Create exporter - exporter := newTelemetryExporter(host, driverVersion, httpClient, cfg) + exporter := newTelemetryExporter(host, driverVersion, userAgent, httpClient, cfg) // Create aggregator with exporter aggregator := newMetricsAggregator(exporter, cfg) diff --git a/telemetry/config.go b/telemetry/config.go index 9054cb36..ceb0ac21 100644 --- a/telemetry/config.go +++ b/telemetry/config.go @@ -5,18 +5,6 @@ import ( "net/http" "strconv" "time" - - "github.com/databricks/databricks-sql-go/logger" -) - -const ( - // maxTelemetryRetryCount caps DSN-provided retry count to prevent - // excessive retries from misconfiguration. - maxTelemetryRetryCount = 10 - - // maxTelemetryRetryDelay caps DSN-provided retry delay to prevent - // excessively long backoff from misconfiguration. - maxTelemetryRetryDelay = 30 * time.Second ) // Config holds telemetry configuration. @@ -36,12 +24,6 @@ type Config struct { // FlushInterval is how often to flush metrics FlushInterval time.Duration - // MaxRetries is the maximum number of retry attempts - MaxRetries int - - // RetryDelay is the base delay between retries - RetryDelay time.Duration - // CircuitBreakerEnabled enables circuit breaker protection CircuitBreakerEnabled bool @@ -65,10 +47,8 @@ func DefaultConfig() *Config { return &Config{ Enabled: false, EnableTelemetry: nil, // unset — server feature flag decides - BatchSize: 100, - FlushInterval: 5 * time.Second, - MaxRetries: 3, - RetryDelay: 100 * time.Millisecond, + BatchSize: 200, + FlushInterval: 30 * time.Second, CircuitBreakerEnabled: true, CircuitBreakerThreshold: 5, CircuitBreakerTimeout: 1 * time.Minute, @@ -97,26 +77,10 @@ func ParseTelemetryConfig(params map[string]string) *Config { } } - if v, ok := params["telemetry_retry_count"]; ok { - if n, err := strconv.Atoi(v); err == nil && n >= 0 { - if n > maxTelemetryRetryCount { - logger.Debug().Msgf("telemetry: retry_count %d exceeds max %d, clamping", n, maxTelemetryRetryCount) - n = maxTelemetryRetryCount - } - cfg.MaxRetries = n - } - } - - if v, ok := params["telemetry_retry_delay"]; ok { - if d, err := time.ParseDuration(v); err == nil && d > 0 { - if d > maxTelemetryRetryDelay { - logger.Debug().Msgf("telemetry: retry_delay %v exceeds max %v, clamping", d, maxTelemetryRetryDelay) - d = maxTelemetryRetryDelay - } - cfg.RetryDelay = d - } - } - + // Note: telemetry_retry_count and telemetry_retry_delay DSN parameters + // are accepted for backwards compatibility but are no longer applied + // here. Retries are owned by the underlying retryablehttp-wrapped client + // (see internal/client.RetryableClient), which honors Retry-After. return cfg } @@ -126,12 +90,12 @@ func ParseTelemetryConfig(params map[string]string) *Config { // (databricks.partnerplatform.clientConfigsFeatureFlags.enableTelemetryForGoDriver). // // In all other cases — explicit opt-out or server flag absent/unreachable — returns false. -func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, driverVersion string, httpClient *http.Client) bool { +func isTelemetryEnabled(ctx context.Context, cfg *Config, host string, driverVersion string, userAgent string, httpClient *http.Client) bool { if cfg.EnableTelemetry != nil { return *cfg.EnableTelemetry } - serverEnabled, err := getFeatureFlagCache().isTelemetryEnabled(ctx, host, driverVersion, httpClient) + serverEnabled, err := getFeatureFlagCache().isTelemetryEnabled(ctx, host, driverVersion, userAgent, httpClient) if err != nil { return false } diff --git a/telemetry/config_test.go b/telemetry/config_test.go index 6806b184..099d90a6 100644 --- a/telemetry/config_test.go +++ b/telemetry/config_test.go @@ -17,17 +17,11 @@ func TestDefaultConfig(t *testing.T) { if cfg.EnableTelemetry != nil { t.Error("Expected EnableTelemetry to be nil (unset) by default") } - if cfg.BatchSize != 100 { - t.Errorf("Expected BatchSize 100, got %d", cfg.BatchSize) - } - if cfg.FlushInterval != 5*time.Second { - t.Errorf("Expected FlushInterval 5s, got %v", cfg.FlushInterval) - } - if cfg.MaxRetries != 3 { - t.Errorf("Expected MaxRetries 3, got %d", cfg.MaxRetries) + if cfg.BatchSize != 200 { + t.Errorf("Expected BatchSize 200, got %d", cfg.BatchSize) } - if cfg.RetryDelay != 100*time.Millisecond { - t.Errorf("Expected RetryDelay 100ms, got %v", cfg.RetryDelay) + if cfg.FlushInterval != 30*time.Second { + t.Errorf("Expected FlushInterval 30s, got %v", cfg.FlushInterval) } if !cfg.CircuitBreakerEnabled { t.Error("Expected CircuitBreakerEnabled true, got false") @@ -75,24 +69,24 @@ func TestParseTelemetryConfig_BatchSize(t *testing.T) { func TestParseTelemetryConfig_BatchSizeInvalid(t *testing.T) { cfg := ParseTelemetryConfig(map[string]string{"telemetry_batch_size": "invalid"}) - if cfg.BatchSize != 100 { - t.Errorf("Expected BatchSize to fallback to 100, got %d", cfg.BatchSize) + if cfg.BatchSize != 200 { + t.Errorf("Expected BatchSize to fallback to 200, got %d", cfg.BatchSize) } } func TestParseTelemetryConfig_BatchSizeZero(t *testing.T) { cfg := ParseTelemetryConfig(map[string]string{"telemetry_batch_size": "0"}) - if cfg.BatchSize != 100 { - t.Errorf("Expected BatchSize to fallback to 100 when zero, got %d", cfg.BatchSize) + if cfg.BatchSize != 200 { + t.Errorf("Expected BatchSize to fallback to 200 when zero, got %d", cfg.BatchSize) } } func TestParseTelemetryConfig_BatchSizeNegative(t *testing.T) { cfg := ParseTelemetryConfig(map[string]string{"telemetry_batch_size": "-10"}) - if cfg.BatchSize != 100 { - t.Errorf("Expected BatchSize to fallback to 100 when negative, got %d", cfg.BatchSize) + if cfg.BatchSize != 200 { + t.Errorf("Expected BatchSize to fallback to 200 when negative, got %d", cfg.BatchSize) } } @@ -107,77 +101,22 @@ func TestParseTelemetryConfig_FlushInterval(t *testing.T) { func TestParseTelemetryConfig_FlushIntervalInvalid(t *testing.T) { cfg := ParseTelemetryConfig(map[string]string{"telemetry_flush_interval": "invalid"}) - if cfg.FlushInterval != 5*time.Second { - t.Errorf("Expected FlushInterval to fallback to 5s, got %v", cfg.FlushInterval) - } -} - -func TestParseTelemetryConfig_RetryCount(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_count": "5"}) - - if cfg.MaxRetries != 5 { - t.Errorf("Expected MaxRetries 5, got %d", cfg.MaxRetries) - } -} - -func TestParseTelemetryConfig_RetryCountZero(t *testing.T) { - // Zero is valid — it disables retries entirely (unlike batch_size where zero is rejected) - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_count": "0"}) - - if cfg.MaxRetries != 0 { - t.Errorf("Expected MaxRetries 0 (disable retries), got %d", cfg.MaxRetries) - } -} - -func TestParseTelemetryConfig_RetryCountInvalid(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_count": "invalid"}) - - if cfg.MaxRetries != 3 { - t.Errorf("Expected MaxRetries to fallback to 3, got %d", cfg.MaxRetries) - } -} - -func TestParseTelemetryConfig_RetryDelay(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_delay": "500ms"}) - - if cfg.RetryDelay != 500*time.Millisecond { - t.Errorf("Expected RetryDelay 500ms, got %v", cfg.RetryDelay) - } -} - -func TestParseTelemetryConfig_RetryDelayInvalid(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_delay": "invalid"}) - - if cfg.RetryDelay != 100*time.Millisecond { - t.Errorf("Expected RetryDelay to fallback to 100ms, got %v", cfg.RetryDelay) - } -} - -func TestParseTelemetryConfig_RetryCountExceedsCap(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_count": "15"}) - if cfg.MaxRetries != maxTelemetryRetryCount { - t.Errorf("Expected MaxRetries clamped to %d, got %d", maxTelemetryRetryCount, cfg.MaxRetries) - } -} - -func TestParseTelemetryConfig_RetryCountAtCap(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_count": "10"}) - if cfg.MaxRetries != 10 { - t.Errorf("Expected MaxRetries 10, got %d", cfg.MaxRetries) - } -} - -func TestParseTelemetryConfig_RetryDelayExceedsCap(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_delay": "60s"}) - if cfg.RetryDelay != maxTelemetryRetryDelay { - t.Errorf("Expected RetryDelay clamped to %v, got %v", maxTelemetryRetryDelay, cfg.RetryDelay) + if cfg.FlushInterval != 30*time.Second { + t.Errorf("Expected FlushInterval to fallback to 30s, got %v", cfg.FlushInterval) } } -func TestParseTelemetryConfig_RetryDelayAtCap(t *testing.T) { - cfg := ParseTelemetryConfig(map[string]string{"telemetry_retry_delay": "30s"}) - if cfg.RetryDelay != 30*time.Second { - t.Errorf("Expected RetryDelay 30s, got %v", cfg.RetryDelay) +// telemetry_retry_count and telemetry_retry_delay DSN parameters are accepted +// for backwards compatibility but are no longer applied — retries are owned by +// the underlying retryablehttp-wrapped HTTP client. +func TestParseTelemetryConfig_RetryParamsAccepted(t *testing.T) { + cfg := ParseTelemetryConfig(map[string]string{ + "telemetry_retry_count": "5", + "telemetry_retry_delay": "500ms", + }) + // Should parse without error and produce a usable config. + if cfg == nil { + t.Fatal("expected non-nil config") } } @@ -186,8 +125,6 @@ func TestParseTelemetryConfig_AllParams(t *testing.T) { "enableTelemetry": "true", "telemetry_batch_size": "200", "telemetry_flush_interval": "30s", - "telemetry_retry_count": "5", - "telemetry_retry_delay": "250ms", }) if cfg.EnableTelemetry == nil || !*cfg.EnableTelemetry { @@ -199,12 +136,6 @@ func TestParseTelemetryConfig_AllParams(t *testing.T) { if cfg.FlushInterval != 30*time.Second { t.Errorf("Expected FlushInterval 30s, got %v", cfg.FlushInterval) } - if cfg.MaxRetries != 5 { - t.Errorf("Expected MaxRetries 5, got %d", cfg.MaxRetries) - } - if cfg.RetryDelay != 250*time.Millisecond { - t.Errorf("Expected RetryDelay 250ms, got %v", cfg.RetryDelay) - } } // TestIsTelemetryEnabled_ExplicitOptOut: client sets enableTelemetry=false → @@ -216,7 +147,7 @@ func TestIsTelemetryEnabled_ExplicitOptOut(t *testing.T) { })) defer server.Close() - result := isTelemetryEnabled(context.Background(), &Config{EnableTelemetry: boolPtr(false)}, server.URL, "test-version", &http.Client{Timeout: 5 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{EnableTelemetry: boolPtr(false)}, server.URL, "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}) if result { t.Error("Expected telemetry to be disabled when client sets enableTelemetry=false, got enabled") @@ -226,7 +157,7 @@ func TestIsTelemetryEnabled_ExplicitOptOut(t *testing.T) { // TestIsTelemetryEnabled_ExplicitOptIn: client sets enableTelemetry=true → // enabled without any server call (unreachable host proves no network call is made). func TestIsTelemetryEnabled_ExplicitOptIn(t *testing.T) { - result := isTelemetryEnabled(context.Background(), &Config{EnableTelemetry: boolPtr(true)}, "http://unreachable-host", "test-version", &http.Client{Timeout: 5 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{EnableTelemetry: boolPtr(true)}, "http://unreachable-host", "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}) if !result { t.Error("Expected telemetry to be enabled when client sets enableTelemetry=true, got disabled") @@ -245,7 +176,7 @@ func TestIsTelemetryEnabled_ServerEnabled(t *testing.T) { flagCache.getOrCreateContext(server.URL) defer flagCache.releaseContext(server.URL) - result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", &http.Client{Timeout: 5 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}) if !result { t.Error("Expected telemetry to be enabled when server flag is true and EnableTelemetry is nil, got disabled") @@ -264,7 +195,7 @@ func TestIsTelemetryEnabled_ServerDisabled(t *testing.T) { flagCache.getOrCreateContext(server.URL) defer flagCache.releaseContext(server.URL) - result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", &http.Client{Timeout: 5 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}) if result { t.Error("Expected telemetry to be disabled when server flag is false and EnableTelemetry is nil, got enabled") @@ -282,7 +213,7 @@ func TestIsTelemetryEnabled_ServerError(t *testing.T) { flagCache.getOrCreateContext(server.URL) defer flagCache.releaseContext(server.URL) - result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", &http.Client{Timeout: 5 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{}, server.URL, "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}) if result { t.Error("Expected telemetry to be disabled when server errors and EnableTelemetry is nil, got enabled") @@ -295,7 +226,7 @@ func TestIsTelemetryEnabled_ServerUnreachable(t *testing.T) { flagCache.getOrCreateContext("http://localhost:9999") defer flagCache.releaseContext("http://localhost:9999") - result := isTelemetryEnabled(context.Background(), &Config{}, "http://localhost:9999", "test-version", &http.Client{Timeout: 1 * time.Second}) + result := isTelemetryEnabled(context.Background(), &Config{}, "http://localhost:9999", "test-version", "test-ua", &http.Client{Timeout: 1 * time.Second}) if result { t.Error("Expected telemetry to be disabled when server is unreachable and EnableTelemetry is nil, got enabled") diff --git a/telemetry/driver_integration.go b/telemetry/driver_integration.go index f8b32ffc..e33c7537 100644 --- a/telemetry/driver_integration.go +++ b/telemetry/driver_integration.go @@ -16,6 +16,11 @@ type TelemetryInitOptions struct { // DriverVersion is the driver version string. DriverVersion string + // UserAgent is the User-Agent header sent on telemetry export and + // feature-flag requests. Should match the value the Thrift client uses so + // telemetry traffic is attributable to the driver in access logs. + UserAgent string + // HTTPClient is the HTTP client used for both feature-flag checks and // telemetry export. The /telemetry-ext endpoint requires authentication, // so this should be the authenticated driver client. @@ -27,20 +32,11 @@ type TelemetryInitOptions struct { // false — client explicitly opted out EnableTelemetry config.ConfigValue[bool] - // BatchSize is the number of metrics per batch (0 = use default 100). + // BatchSize is the number of metrics per batch (0 = use default). BatchSize int - // FlushInterval is the flush interval (0 = use default 5s). + // FlushInterval is the flush interval (0 = use default). FlushInterval time.Duration - - // RetryCount is max retry attempts (-1 = use default 3; 0 = disable retries). - // IMPORTANT: Go's zero-value for int is 0, which disables retries. Callers - // constructing TelemetryInitOptions must set RetryCount = -1 explicitly to - // get the default retry behavior. - RetryCount int - - // RetryDelay is the base delay between retries (0 = use default 100ms). - RetryDelay time.Duration } // InitializeForConnection initializes telemetry for a database connection. @@ -60,25 +56,13 @@ func InitializeForConnection(ctx context.Context, opts TelemetryInitOptions) *In if opts.FlushInterval > 0 { cfg.FlushInterval = opts.FlushInterval } - if opts.RetryCount >= 0 { - cfg.MaxRetries = opts.RetryCount - if cfg.MaxRetries > maxTelemetryRetryCount { - cfg.MaxRetries = maxTelemetryRetryCount - } - } - if opts.RetryDelay > 0 { - cfg.RetryDelay = opts.RetryDelay - if cfg.RetryDelay > maxTelemetryRetryDelay { - cfg.RetryDelay = maxTelemetryRetryDelay - } - } // Get feature flag cache context FIRST (for reference counting) flagCache := getFeatureFlagCache() flagCache.getOrCreateContext(opts.Host) // Check if telemetry should be enabled - enabled := isTelemetryEnabled(ctx, cfg, opts.Host, opts.DriverVersion, opts.HTTPClient) + enabled := isTelemetryEnabled(ctx, cfg, opts.Host, opts.DriverVersion, opts.UserAgent, opts.HTTPClient) if !enabled { flagCache.releaseContext(opts.Host) return nil @@ -86,7 +70,7 @@ func InitializeForConnection(ctx context.Context, opts TelemetryInitOptions) *In // Get or create telemetry client for this host clientMgr := getClientManager() - telemetryClient := clientMgr.getOrCreateClient(opts.Host, opts.DriverVersion, opts.HTTPClient, cfg) + telemetryClient := clientMgr.getOrCreateClient(opts.Host, opts.DriverVersion, opts.UserAgent, opts.HTTPClient, cfg) if telemetryClient == nil { // Client failed to start; release the flag cache ref we incremented above flagCache.releaseContext(opts.Host) diff --git a/telemetry/exporter.go b/telemetry/exporter.go index 3ecbf81f..c87933de 100644 --- a/telemetry/exporter.go +++ b/telemetry/exporter.go @@ -24,6 +24,7 @@ const ( type telemetryExporter struct { host string driverVersion string + userAgent string httpClient *http.Client circuitBreaker *circuitBreaker cfg *Config @@ -50,10 +51,11 @@ func ensureHTTPScheme(host string) string { } // newTelemetryExporter creates a new exporter. -func newTelemetryExporter(host string, driverVersion string, httpClient *http.Client, cfg *Config) *telemetryExporter { +func newTelemetryExporter(host string, driverVersion string, userAgent string, httpClient *http.Client, cfg *Config) *telemetryExporter { return &telemetryExporter{ host: host, driverVersion: driverVersion, + userAgent: userAgent, httpClient: httpClient, circuitBreaker: getCircuitBreakerManager().getCircuitBreaker(host), cfg: cfg, @@ -85,78 +87,43 @@ func (e *telemetryExporter) export(ctx context.Context, metrics []*telemetryMetr } } -// doExport performs the actual export with retries and exponential backoff. +// doExport sends one telemetry request. It does NOT retry — retries are +// handled by the underlying retryablehttp-wrapped HTTP client (see +// internal/client.RetryableClient), which already retries 429/5xx with the +// server-provided Retry-After header. Any non-2xx outcome here is therefore +// the *post-retry* result, and is returned to the caller so the circuit +// breaker counts it as one failure per export. func (e *telemetryExporter) doExport(ctx context.Context, metrics []*telemetryMetric) error { - // Create telemetry request with base64-encoded logs request, err := createTelemetryRequest(metrics, e.driverVersion) if err != nil { return fmt.Errorf("failed to create telemetry request: %w", err) } - // Serialize request data, err := json.Marshal(request) if err != nil { return fmt.Errorf("failed to marshal request: %w", err) } - // Determine endpoint - hostURL := ensureHTTPScheme(e.host) - endpoint := hostURL + telemetryEndpointPath - - // Retry logic with exponential backoff - maxRetries := e.cfg.MaxRetries - for attempt := 0; attempt <= maxRetries; attempt++ { - // Exponential backoff (except for first attempt) - if attempt > 0 { - backoff := time.Duration(1<= 200 && resp.StatusCode < 300 { - return nil // Success - } + endpoint := ensureHTTPScheme(e.host) + telemetryEndpointPath - // Check if retryable - if !isRetryableStatus(resp.StatusCode) { - return fmt.Errorf("non-retryable status: %d", resp.StatusCode) - } - - if attempt == maxRetries { - return fmt.Errorf("failed after %d retries: status %d", maxRetries, resp.StatusCode) - } + req, err := http.NewRequestWithContext(ctx, "POST", endpoint, bytes.NewReader(data)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + if e.userAgent != "" { + req.Header.Set("User-Agent", e.userAgent) } - return nil -} + resp, err := e.httpClient.Do(req) + if err != nil { + return fmt.Errorf("telemetry export failed: %w", err) + } + _, _ = io.ReadAll(resp.Body) + resp.Body.Close() //nolint:errcheck,gosec // G104: close after response is read -// isRetryableStatus returns true if HTTP status is retryable. -// Retryable statuses: 429 (Too Many Requests), 503 (Service Unavailable), 5xx (Server Errors) -func isRetryableStatus(status int) bool { - return status == 429 || status == 503 || status >= 500 + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + return fmt.Errorf("telemetry export failed: status %d", resp.StatusCode) } diff --git a/telemetry/exporter_test.go b/telemetry/exporter_test.go index 10156f82..008dd377 100644 --- a/telemetry/exporter_test.go +++ b/telemetry/exporter_test.go @@ -16,7 +16,7 @@ func TestNewTelemetryExporter(t *testing.T) { httpClient := &http.Client{Timeout: 5 * time.Second} host := "test-host" - exporter := newTelemetryExporter(host, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(host, "test-version", "test-ua", httpClient, cfg) if exporter.host != host { t.Errorf("Expected host %s, got %s", host, exporter.host) @@ -73,7 +73,7 @@ func TestExport_Success(t *testing.T) { httpClient := &http.Client{Timeout: 5 * time.Second} // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) metrics := []*telemetryMetric{ { @@ -93,109 +93,64 @@ func TestExport_Success(t *testing.T) { } } -func TestExport_RetryOn5xx(t *testing.T) { - attemptCount := int32(0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - count := atomic.AddInt32(&attemptCount, 1) - if count < 3 { - // Fail first 2 attempts - w.WriteHeader(http.StatusInternalServerError) - } else { - // Succeed on 3rd attempt - w.WriteHeader(http.StatusOK) - } - })) - defer server.Close() - - cfg := DefaultConfig() - cfg.MaxRetries = 3 - cfg.RetryDelay = 10 * time.Millisecond - httpClient := &http.Client{Timeout: 5 * time.Second} - - // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) - - metrics := []*telemetryMetric{ - { - metricType: "connection", - timestamp: time.Now(), - }, - } - - ctx := context.Background() - exporter.export(ctx, metrics) - - // Should have retried and succeeded - if atomic.LoadInt32(&attemptCount) != 3 { - t.Errorf("Expected 3 attempts, got %d", attemptCount) - } -} - -func TestExport_NonRetryable4xx(t *testing.T) { - attemptCount := int32(0) +// TestExport_SetsUserAgent verifies the configured User-Agent is sent on the +// telemetry POST so traffic is attributable in access logs. +func TestExport_SetsUserAgent(t *testing.T) { + const wantUA = "godatabrickssqlconnector/9.9.9" + gotUA := "" server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - atomic.AddInt32(&attemptCount, 1) - w.WriteHeader(http.StatusBadRequest) // 400 is not retryable + gotUA = r.Header.Get("User-Agent") + w.WriteHeader(http.StatusOK) })) defer server.Close() cfg := DefaultConfig() - cfg.MaxRetries = 3 - cfg.RetryDelay = 10 * time.Millisecond httpClient := &http.Client{Timeout: 5 * time.Second} + exporter := newTelemetryExporter(server.URL, "9.9.9", wantUA, httpClient, cfg) - // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) - - metrics := []*telemetryMetric{ - { - metricType: "connection", - timestamp: time.Now(), - }, - } - - ctx := context.Background() - exporter.export(ctx, metrics) + exporter.export(context.Background(), []*telemetryMetric{{ + metricType: "connection", timestamp: time.Now(), + }}) - // Should only try once (no retries for 4xx) - if atomic.LoadInt32(&attemptCount) != 1 { - t.Errorf("Expected 1 attempt, got %d", attemptCount) + if gotUA != wantUA { + t.Errorf("User-Agent: got %q, want %q", gotUA, wantUA) } } -func TestExport_Retry429(t *testing.T) { - attemptCount := int32(0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - count := atomic.AddInt32(&attemptCount, 1) - if count < 2 { - w.WriteHeader(http.StatusTooManyRequests) // 429 is retryable - } else { - w.WriteHeader(http.StatusOK) - } - })) - defer server.Close() - - cfg := DefaultConfig() - cfg.MaxRetries = 3 - cfg.RetryDelay = 10 * time.Millisecond - httpClient := &http.Client{Timeout: 5 * time.Second} - - // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) - - metrics := []*telemetryMetric{ - { - metricType: "connection", - timestamp: time.Now(), - }, +// TestExport_SingleAttemptPerExport asserts that doExport itself never +// retries — a single export is exactly one HTTP transaction. Retries are +// owned by the underlying retryablehttp-wrapped client (not exercised here +// because the test uses a plain *http.Client). Each export → one breaker +// outcome. +func TestExport_SingleAttemptPerExport(t *testing.T) { + cases := []struct { + name string + status int + }{ + {"400", http.StatusBadRequest}, + {"429", http.StatusTooManyRequests}, + {"500", http.StatusInternalServerError}, + {"503", http.StatusServiceUnavailable}, } - ctx := context.Background() - exporter.export(ctx, metrics) - - // Should have retried and succeeded - if atomic.LoadInt32(&attemptCount) != 2 { - t.Errorf("Expected 2 attempts, got %d", attemptCount) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + attemptCount := int32(0) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&attemptCount, 1) + w.WriteHeader(tc.status) + })) + defer server.Close() + + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", &http.Client{Timeout: 5 * time.Second}, DefaultConfig()) + exporter.export(context.Background(), []*telemetryMetric{{ + metricType: "connection", timestamp: time.Now(), + }}) + + if got := atomic.LoadInt32(&attemptCount); got != 1 { + t.Errorf("status %d: expected 1 attempt at exporter layer, got %d", tc.status, got) + } + }) } } @@ -211,7 +166,7 @@ func TestExport_CircuitBreakerOpen(t *testing.T) { httpClient := &http.Client{Timeout: 5 * time.Second} // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) // Open the circuit breaker by recording failures cb := exporter.circuitBreaker @@ -243,33 +198,6 @@ func TestExport_CircuitBreakerOpen(t *testing.T) { } } -func TestIsRetryableStatus(t *testing.T) { - tests := []struct { - status int - retryable bool - description string - }{ - {200, false, "200 OK is not retryable"}, - {201, false, "201 Created is not retryable"}, - {400, false, "400 Bad Request is not retryable"}, - {401, false, "401 Unauthorized is not retryable"}, - {403, false, "403 Forbidden is not retryable"}, - {404, false, "404 Not Found is not retryable"}, - {429, true, "429 Too Many Requests is retryable"}, - {500, true, "500 Internal Server Error is retryable"}, - {502, true, "502 Bad Gateway is retryable"}, - {503, true, "503 Service Unavailable is retryable"}, - {504, true, "504 Gateway Timeout is retryable"}, - } - - for _, tt := range tests { - result := isRetryableStatus(tt.status) - if result != tt.retryable { - t.Errorf("%s: expected %v, got %v", tt.description, tt.retryable, result) - } - } -} - func TestExport_ErrorSwallowing(t *testing.T) { // Server that always fails server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -278,12 +206,10 @@ func TestExport_ErrorSwallowing(t *testing.T) { defer server.Close() cfg := DefaultConfig() - cfg.MaxRetries = 1 - cfg.RetryDelay = 10 * time.Millisecond httpClient := &http.Client{Timeout: 5 * time.Second} // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) metrics := []*telemetryMetric{ { @@ -314,12 +240,10 @@ func TestExport_ContextCancellation(t *testing.T) { defer server.Close() cfg := DefaultConfig() - cfg.MaxRetries = 3 - cfg.RetryDelay = 50 * time.Millisecond httpClient := &http.Client{Timeout: 5 * time.Second} // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) metrics := []*telemetryMetric{ { @@ -336,62 +260,3 @@ func TestExport_ContextCancellation(t *testing.T) { exporter.export(ctx, metrics) // If we get here, context cancellation is handled properly } - -func TestExport_ExponentialBackoff(t *testing.T) { - attemptTimes := make([]time.Time, 0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - attemptTimes = append(attemptTimes, time.Now()) - // Always fail to test all retries - w.WriteHeader(http.StatusInternalServerError) - })) - defer server.Close() - - cfg := DefaultConfig() - cfg.MaxRetries = 3 - cfg.RetryDelay = 50 * time.Millisecond - httpClient := &http.Client{Timeout: 5 * time.Second} - - // Use full server URL for testing - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) - - metrics := []*telemetryMetric{ - { - metricType: "connection", - timestamp: time.Now(), - }, - } - - ctx := context.Background() - exporter.export(ctx, metrics) - - // Should have 4 attempts (1 initial + 3 retries) - if len(attemptTimes) != 4 { - t.Errorf("Expected 4 attempts, got %d", len(attemptTimes)) - return - } - - // Verify exponential backoff delays - // Attempt 0: immediate - // Attempt 1: +50ms (2^0 * 50ms) - // Attempt 2: +100ms (2^1 * 50ms) - // Attempt 3: +200ms (2^2 * 50ms) - - delay1 := attemptTimes[1].Sub(attemptTimes[0]) - delay2 := attemptTimes[2].Sub(attemptTimes[1]) - delay3 := attemptTimes[3].Sub(attemptTimes[2]) - - // Allow 30ms tolerance for timing variations - tolerance := 30 * time.Millisecond - - if delay1 < (50*time.Millisecond-tolerance) || delay1 > (50*time.Millisecond+tolerance) { - t.Errorf("Expected delay1 ~50ms, got %v", delay1) - } - - if delay2 < (100*time.Millisecond-tolerance) || delay2 > (100*time.Millisecond+tolerance) { - t.Errorf("Expected delay2 ~100ms, got %v", delay2) - } - - if delay3 < (200*time.Millisecond-tolerance) || delay3 > (200*time.Millisecond+tolerance) { - t.Errorf("Expected delay3 ~200ms, got %v", delay3) - } -} diff --git a/telemetry/featureflag.go b/telemetry/featureflag.go index 81696baa..d52e1e40 100644 --- a/telemetry/featureflag.go +++ b/telemetry/featureflag.go @@ -86,7 +86,7 @@ func (c *featureFlagCache) releaseContext(host string) { // isTelemetryEnabled checks if telemetry is enabled for the host. // Uses cached value if available and not expired. -func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, driverVersion string, httpClient *http.Client) (bool, error) { +func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, driverVersion string, userAgent string, httpClient *http.Client) (bool, error) { c.mu.RLock() flagCtx, exists := c.contexts[host] c.mu.RUnlock() @@ -135,7 +135,7 @@ func (c *featureFlagCache) isTelemetryEnabled(ctx context.Context, host string, flagCtx.mu.Unlock() // Fetch fresh value (outside lock so other readers are not blocked). - enabled, err := fetchFeatureFlag(ctx, host, driverVersion, httpClient) + enabled, err := fetchFeatureFlag(ctx, host, driverVersion, userAgent, httpClient) // Update cache. flagCtx.mu.Lock() @@ -166,7 +166,7 @@ func (c *featureFlagContext) isExpired() bool { } // fetchFeatureFlag fetches the feature flag value from Databricks. -func fetchFeatureFlag(ctx context.Context, host string, driverVersion string, httpClient *http.Client) (bool, error) { +func fetchFeatureFlag(ctx context.Context, host string, driverVersion string, userAgent string, httpClient *http.Client) (bool, error) { // Add timeout to context if it doesn't have a deadline if _, hasDeadline := ctx.Deadline(); !hasDeadline { var cancel context.CancelFunc @@ -182,6 +182,9 @@ func fetchFeatureFlag(ctx context.Context, host string, driverVersion string, ht if err != nil { return false, fmt.Errorf("failed to create feature flag request: %w", err) } + if userAgent != "" { + req.Header.Set("User-Agent", userAgent) + } resp, err := httpClient.Do(req) if err != nil { diff --git a/telemetry/featureflag_test.go b/telemetry/featureflag_test.go index 4ffbf07b..6c789410 100644 --- a/telemetry/featureflag_test.go +++ b/telemetry/featureflag_test.go @@ -99,7 +99,7 @@ func TestFeatureFlagCache_IsTelemetryEnabled_Cached(t *testing.T) { ctx.lastFetched = time.Now() // Should return cached value without HTTP call - result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", nil) + result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", "test-ua", nil) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -133,7 +133,7 @@ func TestFeatureFlagCache_IsTelemetryEnabled_Expired(t *testing.T) { // Should fetch fresh value httpClient := &http.Client{} - result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", httpClient) + result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", "test-ua", httpClient) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -158,7 +158,7 @@ func TestFeatureFlagCache_IsTelemetryEnabled_NoContext(t *testing.T) { host := "non-existent-host.databricks.com" // Should return false for non-existent context - result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", nil) + result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", "test-ua", nil) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -188,7 +188,7 @@ func TestFeatureFlagCache_IsTelemetryEnabled_ErrorFallback(t *testing.T) { // Should return cached value on error httpClient := &http.Client{} - result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", httpClient) + result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", "test-ua", httpClient) if err != nil { t.Errorf("Expected no error (fallback to cache), got %v", err) } @@ -213,7 +213,7 @@ func TestFeatureFlagCache_IsTelemetryEnabled_ErrorNoCache(t *testing.T) { // No cached value, should return error httpClient := &http.Client{} - result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", httpClient) + result, err := cache.isTelemetryEnabled(context.Background(), host, "test-version", "test-ua", httpClient) if err == nil { t.Error("Expected error when no cache available and fetch fails") } @@ -330,7 +330,7 @@ func TestFetchFeatureFlag_Success(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", httpClient) + enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", "test-ua", httpClient) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -339,6 +339,28 @@ func TestFetchFeatureFlag_Success(t *testing.T) { } } +// TestFetchFeatureFlag_SetsUserAgent verifies the configured User-Agent is +// sent on feature-flag GETs so traffic is attributable in access logs. +func TestFetchFeatureFlag_SetsUserAgent(t *testing.T) { + const wantUA = "godatabrickssqlconnector/9.9.9" + gotUA := "" + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + gotUA = r.Header.Get("User-Agent") + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"flags": [], "ttl_seconds": 300}`)) + })) + defer server.Close() + + _, err := fetchFeatureFlag(context.Background(), server.URL, "9.9.9", wantUA, &http.Client{}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if gotUA != wantUA { + t.Errorf("User-Agent: got %q, want %q", gotUA, wantUA) + } +} + func TestFetchFeatureFlag_Disabled(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -350,7 +372,7 @@ func TestFetchFeatureFlag_Disabled(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", httpClient) + enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", "test-ua", httpClient) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -370,7 +392,7 @@ func TestFetchFeatureFlag_FlagNotPresent(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", httpClient) + enabled, err := fetchFeatureFlag(context.Background(), host, "test-version", "test-ua", httpClient) if err != nil { t.Errorf("Expected no error, got %v", err) } @@ -388,7 +410,7 @@ func TestFetchFeatureFlag_HTTPError(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - _, err := fetchFeatureFlag(context.Background(), host, "test-version", httpClient) + _, err := fetchFeatureFlag(context.Background(), host, "test-version", "test-ua", httpClient) if err == nil { t.Error("Expected error for HTTP 500") } @@ -405,7 +427,7 @@ func TestFetchFeatureFlag_InvalidJSON(t *testing.T) { host := server.URL // Use full URL for testing httpClient := &http.Client{} - _, err := fetchFeatureFlag(context.Background(), host, "test-version", httpClient) + _, err := fetchFeatureFlag(context.Background(), host, "test-version", "test-ua", httpClient) if err == nil { t.Error("Expected error for invalid JSON") } @@ -424,7 +446,7 @@ func TestFetchFeatureFlag_ContextCancellation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() // Cancel immediately - _, err := fetchFeatureFlag(ctx, host, "test-version", httpClient) + _, err := fetchFeatureFlag(ctx, host, "test-version", "test-ua", httpClient) if err == nil { t.Error("Expected error for cancelled context") } diff --git a/telemetry/integration_test.go b/telemetry/integration_test.go index 20bd2fc0..b5ea8091 100644 --- a/telemetry/integration_test.go +++ b/telemetry/integration_test.go @@ -48,7 +48,7 @@ func TestIntegration_EndToEnd_WithCircuitBreaker(t *testing.T) { defer server.Close() // Create telemetry client - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) aggregator := newMetricsAggregator(exporter, cfg) defer aggregator.close(context.Background()) //nolint:errcheck @@ -84,7 +84,6 @@ func TestIntegration_CircuitBreakerOpening(t *testing.T) { cfg := DefaultConfig() cfg.FlushInterval = 50 * time.Millisecond - cfg.MaxRetries = 0 // No retries for faster test httpClient := &http.Client{Timeout: 5 * time.Second} requestCount := int32(0) @@ -95,7 +94,7 @@ func TestIntegration_CircuitBreakerOpening(t *testing.T) { })) defer server.Close() - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) aggregator := newMetricsAggregator(exporter, cfg) defer aggregator.close(context.Background()) //nolint:errcheck @@ -166,7 +165,7 @@ func TestIntegration_PrivacyCompliance_NoQueryText(t *testing.T) { })) defer server.Close() - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) aggregator := newMetricsAggregator(exporter, cfg) defer aggregator.close(context.Background()) //nolint:errcheck @@ -237,7 +236,7 @@ func TestIntegration_TelemetryEventCorrectnessAllFields(t *testing.T) { })) defer server.Close() - exporter := newTelemetryExporter(server.URL, testDriverVersion, httpClient, cfg) + exporter := newTelemetryExporter(server.URL, testDriverVersion, "test-ua", httpClient, cfg) metric := &telemetryMetric{ metricType: "operation", @@ -404,7 +403,7 @@ func TestIntegration_OperationLatencyMs_ZeroNotOmitted(t *testing.T) { })) defer server.Close() - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) // latencyMs=0 — simulates a CloseOperation that completed in <1ms. metric := &telemetryMetric{ @@ -476,7 +475,7 @@ func TestIntegration_ChunkTotalPresent_DerivedFromChunkCount(t *testing.T) { totalChunksIterated = 32 ) - exporter := newTelemetryExporter(server.URL, "test-version", httpClient, cfg) + exporter := newTelemetryExporter(server.URL, "test-version", "test-ua", httpClient, cfg) metric := &telemetryMetric{ metricType: "operation", timestamp: time.Now(), diff --git a/telemetry/manager.go b/telemetry/manager.go index 8977e924..375f3d60 100644 --- a/telemetry/manager.go +++ b/telemetry/manager.go @@ -45,13 +45,13 @@ func getClientManager() *clientManager { // getOrCreateClient gets or creates a telemetry client for the host. // Increments reference count. -func (m *clientManager) getOrCreateClient(host string, driverVersion string, httpClient *http.Client, cfg *Config) *telemetryClient { +func (m *clientManager) getOrCreateClient(host string, driverVersion string, userAgent string, httpClient *http.Client, cfg *Config) *telemetryClient { m.mu.Lock() defer m.mu.Unlock() holder, exists := m.clients[host] if !exists { - client := newTelemetryClient(host, driverVersion, httpClient, cfg) + client := newTelemetryClient(host, driverVersion, userAgent, httpClient, cfg) if err := client.start(); err != nil { // Failed to start client, don't add to map logger.Logger.Debug().Str("host", host).Err(err).Msg("failed to start telemetry client") diff --git a/telemetry/manager_test.go b/telemetry/manager_test.go index 51461452..3567d3b9 100644 --- a/telemetry/manager_test.go +++ b/telemetry/manager_test.go @@ -29,7 +29,7 @@ func TestClientManager_GetOrCreateClient(t *testing.T) { cfg := DefaultConfig() // First call should create client and increment refCount to 1 - client1 := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client1 := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if client1 == nil { t.Fatal("Expected client to be created") } @@ -46,7 +46,7 @@ func TestClientManager_GetOrCreateClient(t *testing.T) { } // Second call should reuse client and increment refCount to 2 - client2 := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client2 := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if client2 != client1 { t.Error("Expected to get the same client instance") } @@ -65,8 +65,8 @@ func TestClientManager_GetOrCreateClient_DifferentHosts(t *testing.T) { httpClient := &http.Client{} cfg := DefaultConfig() - client1 := manager.getOrCreateClient(host1, "test-version", httpClient, cfg) - client2 := manager.getOrCreateClient(host2, "test-version", httpClient, cfg) + client1 := manager.getOrCreateClient(host1, "test-version", "test-ua", httpClient, cfg) + client2 := manager.getOrCreateClient(host2, "test-version", "test-ua", httpClient, cfg) if client1 == client2 { t.Error("Expected different clients for different hosts") @@ -87,8 +87,8 @@ func TestClientManager_ReleaseClient(t *testing.T) { cfg := DefaultConfig() // Create client with refCount = 2 - manager.getOrCreateClient(host, "test-version", httpClient, cfg) - manager.getOrCreateClient(host, "test-version", httpClient, cfg) + manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) + manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) // First release should decrement to 1 err := manager.releaseClient(host) @@ -151,7 +151,7 @@ func TestClientManager_ConcurrentAccess(t *testing.T) { for i := 0; i < numGoroutines; i++ { go func() { defer wg.Done() - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if client == nil { t.Error("Expected client to be created") } @@ -207,7 +207,7 @@ func TestClientManager_ConcurrentAccessMultipleHosts(t *testing.T) { wg.Add(1) go func(h string) { defer wg.Done() - _ = manager.getOrCreateClient(h, "test-version", httpClient, cfg) + _ = manager.getOrCreateClient(h, "test-version", "test-ua", httpClient, cfg) }(host) } } @@ -241,7 +241,7 @@ func TestClientManager_ReleaseClientPartial(t *testing.T) { // Create 5 references for i := 0; i < 5; i++ { - manager.getOrCreateClient(host, "test-version", httpClient, cfg) + manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) } // Release 3 references @@ -271,7 +271,7 @@ func TestClientManager_ClientStartCalled(t *testing.T) { httpClient := &http.Client{} cfg := DefaultConfig() - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) if !client.started { t.Error("Expected start() to be called on new client") @@ -287,7 +287,7 @@ func TestClientManager_ClientCloseCalled(t *testing.T) { httpClient := &http.Client{} cfg := DefaultConfig() - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) _ = manager.releaseClient(host) if !client.closed { @@ -305,9 +305,9 @@ func TestClientManager_MultipleGetOrCreateSameClient(t *testing.T) { cfg := DefaultConfig() // Get same client multiple times - client1 := manager.getOrCreateClient(host, "test-version", httpClient, cfg) - client2 := manager.getOrCreateClient(host, "test-version", httpClient, cfg) - client3 := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client1 := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) + client2 := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) + client3 := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) // All should be same instance if client1 != client2 || client2 != client3 { @@ -337,7 +337,7 @@ func TestClientManager_Shutdown(t *testing.T) { // Create clients for multiple hosts clients := make([]*telemetryClient, 0, len(hosts)) for _, host := range hosts { - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) clients = append(clients, client) } @@ -375,9 +375,9 @@ func TestClientManager_ShutdownWithActiveRefs(t *testing.T) { cfg := DefaultConfig() // Create client with multiple references - client := manager.getOrCreateClient(host, "test-version", httpClient, cfg) - manager.getOrCreateClient(host, "test-version", httpClient, cfg) - manager.getOrCreateClient(host, "test-version", httpClient, cfg) + client := manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) + manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) + manager.getOrCreateClient(host, "test-version", "test-ua", httpClient, cfg) holder := manager.clients[host] if holder.refCount != 3 {