diff --git a/lib/telemetry/TelemetryClient.ts b/lib/telemetry/TelemetryClient.ts new file mode 100644 index 00000000..82243d3a --- /dev/null +++ b/lib/telemetry/TelemetryClient.ts @@ -0,0 +1,76 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import IClientContext from '../contracts/IClientContext'; +import { LogLevel } from '../contracts/IDBSQLLogger'; + +/** + * Telemetry client for a specific host. + * Managed by TelemetryClientProvider with reference counting. + * One client instance is shared across all connections to the same host. + */ +class TelemetryClient { + private closed: boolean = false; + + constructor( + private context: IClientContext, + private host: string + ) { + const logger = context.getLogger(); + logger.log(LogLevel.debug, `Created TelemetryClient for host: ${host}`); + } + + /** + * Gets the host associated with this client. + */ + getHost(): string { + return this.host; + } + + /** + * Checks if the client has been closed. + */ + isClosed(): boolean { + return this.closed; + } + + /** + * Closes the telemetry client and releases resources. + * Should only be called by TelemetryClientProvider when reference count reaches zero. + */ + async close(): Promise { + if (this.closed) { + return; + } + + try { + const logger = this.context.getLogger(); + logger.log(LogLevel.debug, `Closing TelemetryClient for host: ${this.host}`); + this.closed = true; + } catch (error: any) { + // Swallow all exceptions per requirement + this.closed = true; + try { + const logger = this.context.getLogger(); + logger.log(LogLevel.debug, `Error closing TelemetryClient: ${error.message}`); + } catch (logError: any) { + // If even logging fails, silently swallow + } + } + } +} + +export default TelemetryClient; diff --git a/lib/telemetry/TelemetryClientProvider.ts b/lib/telemetry/TelemetryClientProvider.ts new file mode 100644 index 00000000..46a8b09e --- /dev/null +++ b/lib/telemetry/TelemetryClientProvider.ts @@ -0,0 +1,139 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import IClientContext from '../contracts/IClientContext'; +import { LogLevel } from '../contracts/IDBSQLLogger'; +import TelemetryClient from './TelemetryClient'; + +/** + * Holds a telemetry client and its reference count. + * The reference count tracks how many connections are using this client. + */ +interface TelemetryClientHolder { + client: TelemetryClient; + refCount: number; +} + +/** + * Manages one telemetry client per host. + * Prevents rate limiting by sharing clients across connections to the same host. + * Instance-based (not singleton), stored in DBSQLClient. + * + * Pattern from JDBC TelemetryClientFactory.java:27 with + * ConcurrentHashMap. + */ +class TelemetryClientProvider { + private clients: Map; + + constructor(private context: IClientContext) { + this.clients = new Map(); + const logger = context.getLogger(); + logger.log(LogLevel.debug, 'Created TelemetryClientProvider'); + } + + /** + * Gets or creates a telemetry client for the specified host. + * Increments the reference count for the client. + * + * @param host The host identifier (e.g., "workspace.cloud.databricks.com") + * @returns The telemetry client for the host + */ + getOrCreateClient(host: string): TelemetryClient { + const logger = this.context.getLogger(); + let holder = this.clients.get(host); + + if (!holder) { + // Create new client for this host + const client = new TelemetryClient(this.context, host); + holder = { + client, + refCount: 0, + }; + this.clients.set(host, holder); + logger.log(LogLevel.debug, `Created new TelemetryClient for host: ${host}`); + } + + // Increment reference count + holder.refCount += 1; + logger.log( + LogLevel.debug, + `TelemetryClient reference count for ${host}: ${holder.refCount}` + ); + + return holder.client; + } + + /** + * Releases a telemetry client for the specified host. + * Decrements the reference count and closes the client when it reaches zero. + * + * @param host The host identifier + */ + async releaseClient(host: string): Promise { + const logger = this.context.getLogger(); + const holder = this.clients.get(host); + + if (!holder) { + logger.log(LogLevel.debug, `No TelemetryClient found for host: ${host}`); + return; + } + + // Decrement reference count + holder.refCount -= 1; + logger.log( + LogLevel.debug, + `TelemetryClient reference count for ${host}: ${holder.refCount}` + ); + + // Close and remove client when reference count reaches zero + if (holder.refCount <= 0) { + try { + await holder.client.close(); + this.clients.delete(host); + logger.log(LogLevel.debug, `Closed and removed TelemetryClient for host: ${host}`); + } catch (error: any) { + // Swallow all exceptions per requirement + logger.log(LogLevel.debug, `Error releasing TelemetryClient: ${error.message}`); + } + } + } + + /** + * Gets the current reference count for a host's client. + * Useful for testing and diagnostics. + * + * @param host The host identifier + * @returns The reference count, or 0 if no client exists + */ + getRefCount(host: string): number { + const holder = this.clients.get(host); + return holder ? holder.refCount : 0; + } + + /** + * Gets all active clients. + * Useful for testing and diagnostics. + */ + getActiveClients(): Map { + const result = new Map(); + for (const [host, holder] of this.clients.entries()) { + result.set(host, holder.client); + } + return result; + } +} + +export default TelemetryClientProvider; diff --git a/tests/unit/telemetry/TelemetryClient.test.ts b/tests/unit/telemetry/TelemetryClient.test.ts new file mode 100644 index 00000000..21e917d8 --- /dev/null +++ b/tests/unit/telemetry/TelemetryClient.test.ts @@ -0,0 +1,163 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { expect } from 'chai'; +import sinon from 'sinon'; +import TelemetryClient from '../../../lib/telemetry/TelemetryClient'; +import ClientContextStub from '../.stubs/ClientContextStub'; +import { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; + +describe('TelemetryClient', () => { + const HOST = 'workspace.cloud.databricks.com'; + + describe('Constructor', () => { + it('should create client with host', () => { + const context = new ClientContextStub(); + const client = new TelemetryClient(context, HOST); + + expect(client.getHost()).to.equal(HOST); + expect(client.isClosed()).to.be.false; + }); + + it('should log creation at debug level', () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + + new TelemetryClient(context, HOST); + + expect(logSpy.calledWith(LogLevel.debug, `Created TelemetryClient for host: ${HOST}`)).to.be + .true; + }); + }); + + describe('getHost', () => { + it('should return the host identifier', () => { + const context = new ClientContextStub(); + const client = new TelemetryClient(context, HOST); + + expect(client.getHost()).to.equal(HOST); + }); + }); + + describe('isClosed', () => { + it('should return false initially', () => { + const context = new ClientContextStub(); + const client = new TelemetryClient(context, HOST); + + expect(client.isClosed()).to.be.false; + }); + + it('should return true after close', async () => { + const context = new ClientContextStub(); + const client = new TelemetryClient(context, HOST); + + await client.close(); + + expect(client.isClosed()).to.be.true; + }); + }); + + describe('close', () => { + it('should set closed flag', async () => { + const context = new ClientContextStub(); + const client = new TelemetryClient(context, HOST); + + await client.close(); + + expect(client.isClosed()).to.be.true; + }); + + it('should log closure at debug level', async () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const client = new TelemetryClient(context, HOST); + + await client.close(); + + expect(logSpy.calledWith(LogLevel.debug, `Closing TelemetryClient for host: ${HOST}`)).to.be + .true; + }); + + it('should be idempotent', async () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const client = new TelemetryClient(context, HOST); + + await client.close(); + const firstCallCount = logSpy.callCount; + + await client.close(); + + // Should not log again on second close + expect(logSpy.callCount).to.equal(firstCallCount); + expect(client.isClosed()).to.be.true; + }); + + it('should swallow all exceptions', async () => { + const context = new ClientContextStub(); + const client = new TelemetryClient(context, HOST); + + // Force an error by stubbing the logger + const error = new Error('Logger error'); + sinon.stub(context.logger, 'log').throws(error); + + // Should not throw + await client.close(); + // If we get here without throwing, the test passes + expect(true).to.be.true; + }); + + it('should log errors at debug level only', async () => { + const context = new ClientContextStub(); + const client = new TelemetryClient(context, HOST); + const error = new Error('Test error'); + + // Stub logger to throw on first call, succeed on second + const logStub = sinon.stub(context.logger, 'log'); + logStub.onFirstCall().throws(error); + logStub.onSecondCall().returns(); + + await client.close(); + + // Second call should log the error at debug level + expect(logStub.secondCall.args[0]).to.equal(LogLevel.debug); + expect(logStub.secondCall.args[1]).to.include('Error closing TelemetryClient'); + }); + }); + + describe('Context usage', () => { + it('should use logger from context', () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + + new TelemetryClient(context, HOST); + + expect(logSpy.called).to.be.true; + }); + + it('should log all messages at debug level only', async () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const client = new TelemetryClient(context, HOST); + + await client.close(); + + logSpy.getCalls().forEach((call) => { + expect(call.args[0]).to.equal(LogLevel.debug); + }); + }); + }); +}); diff --git a/tests/unit/telemetry/TelemetryClientProvider.test.ts b/tests/unit/telemetry/TelemetryClientProvider.test.ts new file mode 100644 index 00000000..c4063011 --- /dev/null +++ b/tests/unit/telemetry/TelemetryClientProvider.test.ts @@ -0,0 +1,400 @@ +/** + * Copyright (c) 2025 Databricks Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { expect } from 'chai'; +import sinon from 'sinon'; +import TelemetryClientProvider from '../../../lib/telemetry/TelemetryClientProvider'; +import TelemetryClient from '../../../lib/telemetry/TelemetryClient'; +import ClientContextStub from '../.stubs/ClientContextStub'; +import { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; + +describe('TelemetryClientProvider', () => { + const HOST1 = 'workspace1.cloud.databricks.com'; + const HOST2 = 'workspace2.cloud.databricks.com'; + + describe('Constructor', () => { + it('should create provider with empty client map', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + expect(provider.getActiveClients().size).to.equal(0); + }); + + it('should log creation at debug level', () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + + new TelemetryClientProvider(context); + + expect(logSpy.calledWith(LogLevel.debug, 'Created TelemetryClientProvider')).to.be.true; + }); + }); + + describe('getOrCreateClient', () => { + it('should create one client per host', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + const client1 = provider.getOrCreateClient(HOST1); + const client2 = provider.getOrCreateClient(HOST2); + + expect(client1).to.be.instanceOf(TelemetryClient); + expect(client2).to.be.instanceOf(TelemetryClient); + expect(client1).to.not.equal(client2); + expect(provider.getActiveClients().size).to.equal(2); + }); + + it('should share client across multiple connections to same host', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + const client1 = provider.getOrCreateClient(HOST1); + const client2 = provider.getOrCreateClient(HOST1); + const client3 = provider.getOrCreateClient(HOST1); + + expect(client1).to.equal(client2); + expect(client2).to.equal(client3); + expect(provider.getActiveClients().size).to.equal(1); + }); + + it('should increment reference count on each call', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + provider.getOrCreateClient(HOST1); + expect(provider.getRefCount(HOST1)).to.equal(1); + + provider.getOrCreateClient(HOST1); + expect(provider.getRefCount(HOST1)).to.equal(2); + + provider.getOrCreateClient(HOST1); + expect(provider.getRefCount(HOST1)).to.equal(3); + }); + + it('should log client creation at debug level', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + const logSpy = sinon.spy(context.logger, 'log'); + + provider.getOrCreateClient(HOST1); + + expect( + logSpy.calledWith(LogLevel.debug, `Created new TelemetryClient for host: ${HOST1}`) + ).to.be.true; + }); + + it('should log reference count at debug level', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + const logSpy = sinon.spy(context.logger, 'log'); + + provider.getOrCreateClient(HOST1); + + expect( + logSpy.calledWith(LogLevel.debug, `TelemetryClient reference count for ${HOST1}: 1`) + ).to.be.true; + }); + + it('should pass context to TelemetryClient', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + const client = provider.getOrCreateClient(HOST1); + + expect(client.getHost()).to.equal(HOST1); + }); + }); + + describe('releaseClient', () => { + it('should decrement reference count on release', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + provider.getOrCreateClient(HOST1); + provider.getOrCreateClient(HOST1); + provider.getOrCreateClient(HOST1); + expect(provider.getRefCount(HOST1)).to.equal(3); + + await provider.releaseClient(HOST1); + expect(provider.getRefCount(HOST1)).to.equal(2); + + await provider.releaseClient(HOST1); + expect(provider.getRefCount(HOST1)).to.equal(1); + }); + + it('should close client when reference count reaches zero', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + const client = provider.getOrCreateClient(HOST1); + const closeSpy = sinon.spy(client, 'close'); + + await provider.releaseClient(HOST1); + + expect(closeSpy.calledOnce).to.be.true; + expect(client.isClosed()).to.be.true; + }); + + it('should remove client from map when reference count reaches zero', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + provider.getOrCreateClient(HOST1); + expect(provider.getActiveClients().size).to.equal(1); + + await provider.releaseClient(HOST1); + + expect(provider.getActiveClients().size).to.equal(0); + expect(provider.getRefCount(HOST1)).to.equal(0); + }); + + it('should NOT close client while other connections exist', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + const client = provider.getOrCreateClient(HOST1); + provider.getOrCreateClient(HOST1); + provider.getOrCreateClient(HOST1); + const closeSpy = sinon.spy(client, 'close'); + + await provider.releaseClient(HOST1); + + expect(closeSpy.called).to.be.false; + expect(client.isClosed()).to.be.false; + expect(provider.getActiveClients().size).to.equal(1); + }); + + it('should handle releasing non-existent client gracefully', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + const logSpy = sinon.spy(context.logger, 'log'); + + await provider.releaseClient(HOST1); + + expect(logSpy.calledWith(LogLevel.debug, `No TelemetryClient found for host: ${HOST1}`)).to + .be.true; + }); + + it('should log reference count decrease at debug level', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + const logSpy = sinon.spy(context.logger, 'log'); + + provider.getOrCreateClient(HOST1); + provider.getOrCreateClient(HOST1); + + await provider.releaseClient(HOST1); + + expect( + logSpy.calledWith(LogLevel.debug, `TelemetryClient reference count for ${HOST1}: 1`) + ).to.be.true; + }); + + it('should log client closure at debug level', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + const logSpy = sinon.spy(context.logger, 'log'); + + provider.getOrCreateClient(HOST1); + await provider.releaseClient(HOST1); + + expect( + logSpy.calledWith(LogLevel.debug, `Closed and removed TelemetryClient for host: ${HOST1}`) + ).to.be.true; + }); + + it('should swallow errors during client closure', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + const client = provider.getOrCreateClient(HOST1); + const error = new Error('Close error'); + sinon.stub(client, 'close').rejects(error); + const logSpy = sinon.spy(context.logger, 'log'); + + await provider.releaseClient(HOST1); + + expect( + logSpy.calledWith(LogLevel.debug, `Error releasing TelemetryClient: ${error.message}`) + ).to.be.true; + }); + }); + + describe('Reference counting', () => { + it('should track reference counts independently per host', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + provider.getOrCreateClient(HOST1); + provider.getOrCreateClient(HOST1); + provider.getOrCreateClient(HOST2); + provider.getOrCreateClient(HOST2); + provider.getOrCreateClient(HOST2); + + expect(provider.getRefCount(HOST1)).to.equal(2); + expect(provider.getRefCount(HOST2)).to.equal(3); + + await provider.releaseClient(HOST1); + + expect(provider.getRefCount(HOST1)).to.equal(1); + expect(provider.getRefCount(HOST2)).to.equal(3); + }); + + it('should close only last connection for each host', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + const client1 = provider.getOrCreateClient(HOST1); + provider.getOrCreateClient(HOST1); + const client2 = provider.getOrCreateClient(HOST2); + + await provider.releaseClient(HOST1); + expect(client1.isClosed()).to.be.false; + expect(provider.getActiveClients().size).to.equal(2); + + await provider.releaseClient(HOST1); + expect(client1.isClosed()).to.be.true; + expect(provider.getActiveClients().size).to.equal(1); + + await provider.releaseClient(HOST2); + expect(client2.isClosed()).to.be.true; + expect(provider.getActiveClients().size).to.equal(0); + }); + }); + + describe('Per-host isolation', () => { + it('should isolate clients by host', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + const client1 = provider.getOrCreateClient(HOST1); + const client2 = provider.getOrCreateClient(HOST2); + + expect(client1.getHost()).to.equal(HOST1); + expect(client2.getHost()).to.equal(HOST2); + expect(client1).to.not.equal(client2); + }); + + it('should allow closing one host without affecting others', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + const client1 = provider.getOrCreateClient(HOST1); + const client2 = provider.getOrCreateClient(HOST2); + + await provider.releaseClient(HOST1); + + expect(client1.isClosed()).to.be.true; + expect(client2.isClosed()).to.be.false; + expect(provider.getActiveClients().size).to.equal(1); + }); + }); + + describe('getRefCount', () => { + it('should return 0 for non-existent host', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + expect(provider.getRefCount(HOST1)).to.equal(0); + }); + + it('should return current reference count for existing host', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + provider.getOrCreateClient(HOST1); + expect(provider.getRefCount(HOST1)).to.equal(1); + + provider.getOrCreateClient(HOST1); + expect(provider.getRefCount(HOST1)).to.equal(2); + }); + }); + + describe('getActiveClients', () => { + it('should return empty map initially', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + const clients = provider.getActiveClients(); + + expect(clients.size).to.equal(0); + }); + + it('should return all active clients', () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + const client1 = provider.getOrCreateClient(HOST1); + const client2 = provider.getOrCreateClient(HOST2); + + const clients = provider.getActiveClients(); + + expect(clients.size).to.equal(2); + expect(clients.get(HOST1)).to.equal(client1); + expect(clients.get(HOST2)).to.equal(client2); + }); + + it('should not include closed clients', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + + provider.getOrCreateClient(HOST1); + provider.getOrCreateClient(HOST2); + + await provider.releaseClient(HOST1); + + const clients = provider.getActiveClients(); + + expect(clients.size).to.equal(1); + expect(clients.has(HOST1)).to.be.false; + expect(clients.has(HOST2)).to.be.true; + }); + }); + + describe('Context usage', () => { + it('should use logger from context for all logging', () => { + const context = new ClientContextStub(); + const logSpy = sinon.spy(context.logger, 'log'); + const provider = new TelemetryClientProvider(context); + + provider.getOrCreateClient(HOST1); + + expect(logSpy.called).to.be.true; + logSpy.getCalls().forEach((call) => { + expect(call.args[0]).to.equal(LogLevel.debug); + }); + }); + + it('should log all errors at debug level only', async () => { + const context = new ClientContextStub(); + const provider = new TelemetryClientProvider(context); + const logSpy = sinon.spy(context.logger, 'log'); + + const client = provider.getOrCreateClient(HOST1); + sinon.stub(client, 'close').rejects(new Error('Test error')); + + await provider.releaseClient(HOST1); + + const errorLogs = logSpy + .getCalls() + .filter((call) => call.args[1].includes('Error releasing')); + expect(errorLogs.length).to.be.greaterThan(0); + errorLogs.forEach((call) => { + expect(call.args[0]).to.equal(LogLevel.debug); + }); + }); + }); +});