diff --git a/packages/kafka-client/src/events/index.ts b/packages/kafka-client/src/events/index.ts index a2b432daf..2ed089fd2 100644 --- a/packages/kafka-client/src/events/index.ts +++ b/packages/kafka-client/src/events/index.ts @@ -2,13 +2,15 @@ import { isNullish, isString } from 'remeda'; import { createLogger, Logger } from '@restorecommerce/logger'; import * as kafka from './provider/kafka/index.js'; import * as local from './provider/local/index.js'; -import { Topic } from './provider/kafka/index.js'; +import { EventProvider, Topic } from './interface.js'; + +type EventProvicerClass = new (config: any, logger: Logger, ...args: any[]) => EventProvider; /** * A key, value map containing event providers. * Event providers are registered with the register function. */ -const eventProviders: any = {}; +const eventProviders = {} as Record; /** * Register a event provider. @@ -16,7 +18,7 @@ const eventProviders: any = {}; * @param {string} name Event provider identifier * @param {constructor} provider Event provider constructor function */ -export const registerEventProvider = (name: string, provider: any): void => { +export const registerEventProvider = (name: string, provider: EventProvicerClass): void => { eventProviders[name] = provider; }; @@ -28,8 +30,8 @@ registerEventProvider(local.Name, local.Local); */ export class Events { config: any; - logger: Logger; - provider: any; + logger?: Logger; + provider: EventProvider; /** * @param [Object] config Event configuration. * @param [Logger] logger @@ -41,7 +43,7 @@ export class Events { } this.config = config; - const loggerCfg = this.config.logger; + const loggerCfg = this.config?.logger; if (loggerCfg) { loggerCfg.esTransformer = (msg: any) => { msg.fields = JSON.stringify(msg.fields); @@ -63,7 +65,7 @@ export class Events { // provider const providerName = this.config.provider; if (isNullish(providerName)) { - this.logger.error('config does not have event provider name', this.config); + this.logger?.error('config does not have event provider name', this.config); throw new Error('config does not have event provider name'); } const Provider = eventProviders[providerName]; @@ -87,10 +89,18 @@ export class Events { * No events will be received or can be send after this call. * Suspends the function until the provider is stopped. */ - async stop(): Promise { + async stop(): Promise { return await this.provider.stop(); } + async delete(topics: string[]): Promise { + return await this.provider.delete(topics); + } + + async deleteAll(): Promise { + return await this.provider.deleteAll(); + } + /** * Returns a topic from the provider. * @@ -102,10 +112,8 @@ export class Events { throw new Error('missing argument name'); } if (!isString(name)) { - throw new Error('argument name is not of type string'); + throw new Error('argument name is not type of string'); } - // topic() api called inside Local / Kafka class - which then - // invokes the actual topic constructor return this.provider.topic(name, this.config, manualOffsetCommit || false); } } diff --git a/packages/kafka-client/src/events/interface.ts b/packages/kafka-client/src/events/interface.ts new file mode 100644 index 000000000..082a397df --- /dev/null +++ b/packages/kafka-client/src/events/interface.ts @@ -0,0 +1,26 @@ + +export type Listener = (message?: any, context?: any, config?: any, eventName?: any) => void | Promise; + +export interface Topic { + commitCurrentOffsets(): Promise; + $wait(arg0: bigint): Promise; + $offset(arg0: bigint): Promise; + $reset(eventName: string, offset: bigint): Promise; + $resetConsumer(eventNames: string[], offset: bigint): Promise; + emit(eventName: string, msg: any): Promise; + on(eventName: string, listener: Listener, config?: any): Promise; + listenerCount(eventName: string): Promise; + removeListener(eventName: string, listener: Listener): Promise; + removeAllListeners(eventName: string): Promise; + stop(): Promise; + hasListeners(eventName?: string): Promise; + get subscribed(): string[]; +} + +export interface EventProvider { + topic(name: string, config: any, manualOffset?: boolean): Promise; + stop(): Promise; + start(): Promise; + delete(topics: string[]): Promise; + deleteAll(): Promise; +} \ No newline at end of file diff --git a/packages/kafka-client/src/events/provider/kafka/index.ts b/packages/kafka-client/src/events/provider/kafka/index.ts index abe1ed1fd..d25e11e5f 100644 --- a/packages/kafka-client/src/events/provider/kafka/index.ts +++ b/packages/kafka-client/src/events/provider/kafka/index.ts @@ -1,5 +1,5 @@ import * as retry from 'retry'; -import { isNullish, clone, isIncludedIn, filter, pick, keys } from 'remeda'; +import { isNullish, clone, isIncludedIn, filter, pick } from 'remeda'; import { EventEmitter } from 'events'; import * as async from 'async'; import { Logger } from '@restorecommerce/logger'; @@ -16,11 +16,13 @@ import { AdminOptions } from '@platformatic/kafka'; import { decodeMessage, encodeMessage } from '../../../protos.js'; +import { EventProvider, Listener, Topic } from '../../interface.js' +import { decomposeError } from '../../../utils.js'; /** * A Kafka topic. */ -export class Topic { +export class KafkaTopic implements Topic { name: string; emitter: EventEmitter; provider: Kafka; @@ -35,7 +37,7 @@ export class Topic { // default process one message at a time asyncLimit = 1; manualOffsetCommit: boolean; - subscribedToTopic: boolean; + private subscribedToTopic: boolean; /** * Kafka topic. @@ -60,30 +62,33 @@ export class Topic { } async createIfNotExists(): Promise { - return new Promise((resolve, reject) => { - this.provider.admin.listTopics().then(topics => { - if (topics.indexOf(this.name) < 0) { - const operation = retry.operation(); - operation.attempt(async () => { - this.provider.admin.createTopics({ + const topics = await this.provider.admin.listTopics(); + if (!topics.includes(this.name)) { + return await new Promise((resolve, reject) => { + const operation = retry.operation(); + operation.attempt(async (attemptNo) => { + try { + await this.provider.admin.createTopics({ topics: [this.name] - }).then(() => { - this.provider.logger.info(`Topic ${this.name} created successfully`); - resolve(); - }).catch((err: any) => { - const { code, message, details, stack } = err; - this.provider.logger.error(`Cannot create topic ${this.name}:`, { code, message, details, stack }); - operation.retry(err); - const attemptNo = (operation.attempts as () => number)(); - this.provider.logger.info(`Retry creating the Topic, attempt no: ${attemptNo}`); }); - }); - } else { - this.provider.logger.warn(`Topic ${this.name} already exists`); - resolve(); - } + this.provider.logger.info(`Topic ${this.name} created successfully, attempt no: ${attemptNo}`); + resolve(); + } + catch (err: any) { + this.provider.logger.error(`Cannot create topic ${this.name}:`, decomposeError(err)); + if (operation.retry(err)) { + this.provider.logger.info(`Retry creating Topic ${this.name}, attempt no: ${attemptNo}`); + } + else { + reject(err); + } + } + }); }); - }); + } + else { + this.provider.logger.debug(`Topic ${this.name} exists`); + } } /** @@ -92,7 +97,7 @@ export class Topic { * @param {string} eventName Name of the event * @return {number} Number of listeners */ - listenerCount(eventName: string): number { + async listenerCount(eventName: string): Promise { if (isNullish(eventName)) { throw new Error('missing argument eventName'); } @@ -106,7 +111,7 @@ export class Topic { * @param {string} eventName [description] * @return {Boolean} True when listeners exist, false if not. */ - hasListeners(eventName: string): boolean { + async hasListeners(eventName: string): Promise { if (isNullish(eventName)) { throw new Error('missing argument eventName'); } @@ -122,10 +127,11 @@ export class Topic { * @param {string} eventName Name of the event * @param {function|generator} listener Event listener */ - async removeListener(eventName: string, listener: any): Promise { + async removeListener(eventName: string, listener: Listener): Promise { this.provider.logger.verbose(`Removing listener from event ${eventName}`); this.emitter.removeListener(eventName, listener); - if (this.listenerCount(eventName) === 0) { + const count = await this.listenerCount(eventName); + if (count === 0) { await this.$unsubscribe(eventName); } } @@ -139,7 +145,8 @@ export class Topic { async removeAllListeners(eventName: string): Promise { this.provider.logger.verbose(`Removing all listeners from event ${eventName}`); this.emitter.removeAllListeners(eventName); - if (this.listenerCount(eventName) === 0) { + const count = await this.listenerCount(eventName); + if (count === 0) { await this.$unsubscribe(eventName); } } @@ -151,32 +158,19 @@ export class Topic { * @return {bigint} offset number */ async $offset(time: bigint = -1n): Promise { - await this.initConsumerIfNotExists(); - - return new Promise((resolve, reject) => { - if (time < 0n) { - this.consumer.listOffsets({ - topics: [this.name], - }).then((r) => { - resolve(r.get(this.name)[0]); - return r.get(this.name)[0]; - }).catch((err: any) => { - this.provider.logger.error('Error occurred retrieving topic offset:', { code: err.code, message: err.message, stack: err.stack }); - reject(err); - }); - } - - return this.consumer.listOffsets({ + try { + await this.initConsumerIfNotExists(); + return await this.consumer.listOffsets({ topics: [this.name], - timestamp: time, - }).then(r => { - resolve(r.get(this.name)[0]); + timestamp: time < 0n ? time : undefined, + }).then((r) => { return r.get(this.name)[0]; - }).catch((err: any) => { - this.provider.logger.error('Error occurred retrieving topic offset:', { code: err.code, message: err.message, stack: err.stack }); - reject(err); - }); - }); + }) + } + catch (err: any) { + this.provider.logger.error('Error occurred retrieving topic offset:', decomposeError(err)); + throw err; + } } /** @@ -230,7 +224,7 @@ export class Topic { * @param {bigint} offset The offset at which to restart from. */ async $reset(eventName: string, offset: bigint): Promise { - if (this.subscribed.indexOf(eventName) > -1) { + if (this.subscribed.includes(eventName)) { await this.$unsubscribe(eventName); } await this.$subscribe(eventName, offset); @@ -286,13 +280,16 @@ export class Topic { private async initConsumerIfNotExists(queue?: boolean): Promise { if (!this.consumer) { - this.consumer = await this.provider.newConsumer(this.provider.config.groupId + '_' + this.name); + this.consumer = await this.provider.newConsumer(`${this.provider.config.groupId}_${this.name}`); - await this.consumer.connectToBrokers().then(() => { + try { + await this.consumer.connectToBrokers(); this.provider.logger.info(`Consumer for topic '${this.name}' connected`); - }).catch((err: any) => { - this.provider.logger.error(`Consumer for topic '${this.name}' connection error`, { code: err.code, message: err.message, stack: err.stack }); - }); + } + catch (err: any) { + this.provider.logger.error(`Consumer for topic '${this.name}' connection error`, decomposeError(err)); + throw err; + } // On receiving the message on Kafka consumer put the message to async Queue. if (queue) { @@ -312,9 +309,8 @@ export class Topic { private async $subscribe(eventName: string, offsetValue: bigint, queue?: boolean): Promise { if (!this.subscribedToTopic) { this.subscribedToTopic = true; - await this.initConsumerIfNotExists(queue); - await this.consumer.consume({ + this.consumer.consume({ // await consume takes ~3 seconds! sessionTimeout: 10000, heartbeatInterval: 500, topics: [this.name], @@ -324,9 +320,9 @@ export class Topic { partition: 0, offset: BigInt(offsetValue) }], - }).then(stream => { + }).then( + stream => { this.provider.logger.info(`Consumer for topic '${this.name}' subscribed`); - stream.on('data', (message) => { if (queue) { this.onMessageForQueue(message); @@ -334,8 +330,10 @@ export class Topic { this.makeDataHandler(message); } }); - }).catch((err: any) => { - this.provider.logger.error(`Consumer for topic '${this.name}' failed to run`, { code: err.code, message: err.message, stack: err.stack }); + }, + (err: any) => { + this.subscribedToTopic = false; + this.provider.logger.error(`Consumer for topic '${this.name}' failed to run`, decomposeError(err)); throw err; }); } @@ -370,7 +368,6 @@ export class Topic { } }); this.filterWaitQueue(message.offset); - done(); }); } else { this.provider.logger.error('No draining event was provided; discarding messages'); @@ -387,39 +384,43 @@ export class Topic { this.provider.logger.info('Async queue draining started.'); } - private commit(): any { + private async commit(): Promise { // Check if manual offset commit is enabled if (!this.manualOffsetCommit) { - this.commitCurrentOffsets().then(() => { - this.provider.logger.verbose('Offsets committed successfully'); - }).catch(error => { - this.provider.logger.warn('Failed to commit offsets, resuming anyway after:', error); - // Fix for kafkaJS onCrash issue for KafkaJSNonRetriableError, to reset the consumers - this.provider.logger.warn('Commit error name', { name: error.name }); - this.provider.logger.warn('Commit error message', { message: error.message }); - if ((error.name === 'KafkaJSNonRetriableError' || error.name === 'KafkaJSError') && error.message === 'The coordinator is not aware of this member') { - this.provider.logger.info('Reset Consumer connection due to KafkaJSNonRetriableError'); - this.$resetConsumer(this.subscribed, this.currentOffset); - this.provider.logger.info('Consumer connection reset successfully'); + return await this.commitCurrentOffsets().then( + () => { + this.provider.logger.verbose('Offsets committed successfully'); + }, + async (error: any) => { + this.provider.logger.warn('Failed to commit offsets, resuming anyway after:', decomposeError(error)); + // Fix for kafkaJS onCrash issue for KafkaJSNonRetriableError, to reset the consumers + this.provider.logger.warn('Commit error name', { name: error.name }); + this.provider.logger.warn('Commit error message', { message: error.message }); + if ((error.name === 'KafkaJSNonRetriableError' || error.name === 'KafkaJSError') && error.message === 'The coordinator is not aware of this member') { + this.provider.logger.info('Reset Consumer connection due to KafkaJSNonRetriableError'); + await this.$resetConsumer(this.subscribed, this.currentOffset); + this.provider.logger.info('Consumer connection reset successfully'); + } } - }); + ); } } async commitCurrentOffsets(): Promise { - return new Promise((resolve, reject) => { - this.consumer.commit({ + try { + await this.consumer.commit({ offsets: [{ leaderEpoch: 0, // ? topic: this.name, offset: this.currentOffset, partition: 0 // ? }] - }).then(resolve).catch((err: any) => { - this.provider.logger.error('Error committing offset', { code: err.code, message: err.message, stack: err.stack }); - reject(err); }); - }); + } + catch (err: any) { + this.provider.logger.error('Error committing offset', decomposeError(err)); + throw err; + } } /** @@ -452,7 +453,7 @@ export class Topic { message ); if (decodedMsg) { - decodedMsg = pick(decodedMsg, keys(decodedMsg)); // hack around messy protobuf.js object + decodedMsg = pick(decodedMsg, Object.keys(decodedMsg)); // hack around messy protobuf.js object this.provider.logger.debug(`kafka received event with topic ${context.topic} and event name ${eventName} at offset ${context.offset.toString(10)}`, { decodedMsg }); this.emitter.emit( eventName, @@ -464,7 +465,7 @@ export class Topic { } } } - + /** * Listen to events. * When the topic is not subscribed to a Kafka topic, a connection to Kafka is @@ -477,7 +478,7 @@ export class Topic { * @param {function|generator} listener Listener * @param opts */ - async on(eventName: string, listener: any, opts: SubscriptionOptions = {}): Promise { + async on(eventName: string, listener: Listener, opts: SubscriptionOptions = {}): Promise { let { startingOffset } = opts; const { queue, forceOffset } = opts; if (!(this.subscribed.includes(eventName))) { @@ -500,6 +501,10 @@ export class Topic { async emit(eventName: string, message: object): Promise { await this.provider.$send(this.name, eventName, message); } + + stop(): Promise { + throw new Error('Method not implemented.'); + } } export interface KafkaProviderConfig { @@ -513,14 +518,12 @@ export interface KafkaProviderConfig { /** * Events provider. */ -export class Kafka { - - config: KafkaProviderConfig; - topics: { [key: string]: Topic }; - logger: Logger; - producer: Producer; +export class Kafka implements EventProvider { + readonly config: KafkaProviderConfig; + readonly topics: Record = {}; + private producer: Producer; + admin: Admin; - producerConnected: boolean; adminConnected: boolean; commonOptions: KafkaProviderConfig['kafka']; @@ -533,10 +536,11 @@ export class Kafka { * @param {object} config * @param {object} logger */ - constructor(config: any, logger: Logger) { + constructor( + config: any, + readonly logger: Logger + ) { this.config = clone(config); - this.topics = {}; - this.logger = logger; } /** @@ -575,7 +579,7 @@ export class Kafka { this.commonOptions['bootstrapBrokers'] = this.commonOptions['brokers'] as string[]; } - this.logger.info(`[kafka-client] Connecting - attempt No: ${operation.attempts()}`); + this.logger?.info(`[kafka-client] Connecting - attempt No: ${operation.attempts()}`); this.producer = new Producer(this.commonOptions); this.admin = new Admin(this.commonOptions); @@ -584,17 +588,17 @@ export class Kafka { await new Promise((resolveProducer, rejectProducer) => { this.producer.on('client:broker:connect', () => { this.producerConnected = true; - this.logger.info('The Producer is ready.'); + this.logger?.info('The Producer is ready.'); }); this.producer.on('client:broker:disconnect', (err: any) => { this.producerConnected = false; - this.logger.warn('The Producer has disconnected:', err); + this.logger?.warn('The Producer has disconnected:', err); rejectProducer(err); }); this.producer.on('client:broker:failed', (err: any) => { - this.logger.warn('The Producer connection failed:', err); + this.logger?.warn('The Producer connection failed:', err); rejectProducer(err); }); @@ -602,21 +606,21 @@ export class Kafka { resolveProducer(true); }).catch((err: any) => { rejectProducer(err); - this.logger.warn('Producer connection error:', err); + this.logger?.warn('Producer connection error:', err); }); }).then(() => { this.admin.on('client:broker:connect', () => { - this.logger.info('The Admin is ready.'); + this.logger?.info('The Admin is ready.'); this.adminConnected = true; }); this.admin.on('client:broker:disconnect', (err: any) => { - this.logger.warn('The Admin connection failed:', err); + this.logger?.warn('The Admin connection failed:', err); this.adminConnected = false }); this.admin.on('client:broker:failed', (err: any) => { - this.logger.warn('The Admin connection failed:', err); + this.logger?.warn('The Admin connection failed:', err); throw err; }); @@ -624,7 +628,7 @@ export class Kafka { resolveRetry(); }).catch( (err: any) => { - this.logger.warn('Admin connection error:', err); + this.logger?.warn('Admin connection error:', err); throw err; } ); @@ -634,7 +638,7 @@ export class Kafka { operation.retry(err); const attemptNo = operation.attempts(); this.producer?.close(); - this.logger.info(`Retry initialize the Producer, attempt No: ${attemptNo}`); + this.logger?.info(`Retry initialize the Producer, attempt No: ${attemptNo}`); } }); }); @@ -661,7 +665,7 @@ export class Kafka { try { return decodeMessage(msg, config[eventName].messageObject); } catch (error: any) { - this.logger.error( + this.logger?.error( `error on decoding message with event ${eventName}`, { error } ); @@ -695,26 +699,26 @@ export class Kafka { headers: {} } as Message; } - ) - for (const msg of messages) { + ); + for (const msg of messages ?? []) { if (config[eventName]?.omittedFields) { const keys = config[eventName].omittedFields; this.omitFields(keys, msg, config[eventName].enableLogging); } } - this.logger.debug(`Sending event ${eventName} to topic ${topicName}`, { messages }); + this.logger?.debug(`Sending event ${eventName} to topic ${topicName}`, { messages }); return await this.producer.send({ messages: values }).then((data) => { for (let i = 0; i < messages.length; i++) { const msg = messages[i]; - this.logger.debug(`Sent event ${eventName} to topic ${topicName} at offset ${data.offsets[i].offset.toString(10)}`, msg); + this.logger?.debug(`Sent event ${eventName} to topic ${topicName} at offset ${data.offsets[i].offset.toString(10)}`, msg); } return data; }); } catch (error: any) { const { message, details, stack } = error; - this.logger.error(`Error on sending event ${eventName} to topic ${topicName}`, { message, details, stack }); + this.logger?.error(`Error on sending event ${eventName} to topic ${topicName}`, { message, details, stack }); throw error; } } @@ -749,10 +753,7 @@ export class Kafka { * @return {Topic} Kafka topic */ async topic(topicName: string, config: any): Promise { - if (this.topics[topicName]) { - return this.topics[topicName]; - } - this.topics[topicName] = new Topic(topicName, this, config); + this.topics[topicName] ??= new KafkaTopic(topicName, this, config); await this.topics[topicName].createIfNotExists(); return this.topics[topicName]; } @@ -763,41 +764,61 @@ export class Kafka { * all consumers from topics are disconnected. */ async stop(): Promise { - this.logger.warn('Stopping Kafka. Ignore any following connection errors'); + this.logger?.warn('Stopping Kafka. Ignore any following connection errors'); const errors: any[] = []; if (this.producerConnected) { await this.producer.close().catch((err: any) => { - this.logger.warn('Error occurred stopping Kafka producer:', err); + this.logger?.warn('Error occurred stopping Kafka producer:', err); errors.push(err); }); } if (this.adminConnected) { await this.admin.close().catch((err: any) => { - this.logger.warn('Error occurred stopping Kafka admin:', err); + this.logger?.warn('Error occurred stopping Kafka admin:', err); errors.push(err); }); } - const topicNames = keys(this.topics); - for (let i = 0; i < topicNames.length; i += 1) { - const topic: Topic = this.topics[topicNames[i]]; - const eventNames = topic.subscribed; - for (let j = eventNames.length - 1; j >= 0; j -= 1) { - const eventName = eventNames[j]; - // This closes both producer and consumer objects - // via unsubscribe() + for (const topic of Object.values(this.topics)) { + for (const eventName of topic.subscribed?.reverse() ?? []) { await topic.removeAllListeners(eventName); } } if (errors.length > 0) { - this.logger.error('Errors when stopping Kafka client:', errors); + this.logger?.error('Errors when stopping Kafka client:', errors); throw errors; } } + async delete(topics: string[]) { + try { + return await this.admin.deleteTopics({ + topics, + }); + } + finally { + for (const topic of topics) { + delete this.topics[topic]; + } + } + } + + async deleteAll() { + try { + return await this.admin.deleteTopics({ + topics: Object.keys(this.topics), + }); + } + finally { + for (const topic of Object.keys(this.topics)) { + delete this.topics[topic]; + } + } + } + async newConsumer(groupId: string): Promise { const consumer = new Consumer({ groupId: groupId, @@ -805,51 +826,51 @@ export class Kafka { }); consumer.on('client:broker:connect', (msg: any) => { - this.logger.info('Consumer is ready.', msg); + this.logger?.info('Consumer is ready.', msg); }); consumer.on('client:broker:disconnect', (err: any) => { - this.logger.warn('Consumer connection failed:', err); + this.logger?.warn('Consumer connection failed:', err); }); consumer.on('client:broker:failed', (err: any) => { - this.logger.warn('Consumer connection failed:', err); + this.logger?.warn('Consumer connection failed:', err); }); consumer.on('client:broker:drain', (msg: any) => { - this.logger.info('Consumer broker ready for requests:', msg); + this.logger?.info('Consumer broker ready for requests:', msg); }); consumer.on('client:metadata', (msg: any) => { - this.logger.silly('Consumer broker metadata:', msg); + this.logger?.silly('Consumer broker metadata:', msg); }); consumer.on('client:close', (msg: any) => { - this.logger.warn('Consumer client closed:', msg); + this.logger?.warn('Consumer client closed:', msg); }); consumer.on('consumer:group:join', (msg: any) => { - this.logger.info('Consumer joining group:', msg); + this.logger?.info('Consumer joining group:', msg); }); consumer.on('consumer:group:leave', (msg: any) => { - this.logger.info('Consumer leaving group:', msg); + this.logger?.info('Consumer leaving group:', msg); }); consumer.on('consumer:group:rejoin', (msg: any) => { - this.logger.warn('Consumer re-joining group:', msg); + this.logger?.warn('Consumer re-joining group:', msg); }); consumer.on('consumer:group:rebalance', (msg: any) => { - this.logger.warn('Consumer group rebalancing:', msg); + this.logger?.warn('Consumer group rebalancing:', msg); }); consumer.on('consumer:heartbeat:cancel', (err: any) => { - this.logger.warn('Consumer heartbeat cancelled:', err); + this.logger?.warn('Consumer heartbeat cancelled:', err); }); consumer.on('consumer:heartbeat:error', (err: any) => { - this.logger.error('Consumer heartbeat error:', err); + this.logger?.error('Consumer heartbeat error:', err); }); return consumer; diff --git a/packages/kafka-client/src/events/provider/local/index.ts b/packages/kafka-client/src/events/provider/local/index.ts index 37535d4d9..5e0c5ff52 100644 --- a/packages/kafka-client/src/events/provider/local/index.ts +++ b/packages/kafka-client/src/events/provider/local/index.ts @@ -1,20 +1,21 @@ import { isNullish, isArray } from 'remeda'; import { Logger } from '@restorecommerce/logger'; +import { EventProvider, Topic } from '../../interface.js'; interface EventData { - listeners: ((bufferObj: any, context: any, config: any, eventName: string) => void)[]; + listeners: ((bufferObj: any, context: any, config: any, eventName: string) => Promise)[]; messages: any[]; } /** * Topic handles listening and sending events to a specific topic. */ -export class Topic { - +export class LocalTopic implements Topic { event: Record; name: string; logger: Logger; config: any; + subscribed: any; /** * @param {string} topicName @@ -26,6 +27,21 @@ export class Topic { this.logger = logger; this.config = config; } + $reset(eventName: string, offset: bigint): Promise { + throw new Error('Method not implemented.'); + } + $resetConsumer(eventNames: string[], offset: bigint): Promise { + throw new Error('Method not implemented.'); + } + commitCurrentOffsets(): Promise { + throw new Error('Method not implemented.'); + } + $wait(arg0: bigint): Promise { + throw new Error('Method not implemented.'); + } + $offset(arg0: bigint): Promise { + throw new Error('Method not implemented.'); + } /** * Listen to eventName events with listener. @@ -60,26 +76,19 @@ export class Topic { * @param {string} eventName Identification name of the event. * @param {object} message Event message which is send to all listeners. */ - async emit(eventName: string, message: any | any[]): Promise { - let e = this.event[eventName]; + async emit(eventName: string, message: any | any[]): Promise { + const e = this.event[eventName]; if (isNullish(e)) { - e = this.event[eventName] = { - listeners: [], - messages: [], - }; + return; } + e.messages ??= []; const currentOffset = e.messages.length; - let messages = message; - let bufferObj; - if (!isArray(message)) { - messages = [message]; - } - e.messages = [...e.messages, ...messages]; - const listeners = e.listeners; + const messages = isArray(message) ? message : [message]; + e.messages.push(...messages); + const listeners = e.listeners ?? []; const logger = this.logger; const messageObject = this.config[eventName].messageObject; - for (let i = 0; i < listeners.length; i += 1) { - const listener = listeners[i]; + for (const listener of listeners) { for (let j = 0; j < messages.length; j += 1) { const context = { offset: currentOffset + j, @@ -88,7 +97,7 @@ export class Topic { }; const msg = messages[j]; - bufferObj = await this.encodeObject(msg, messageObject); + const bufferObj = await this.encodeObject(msg, messageObject); await listener(bufferObj, context, this.config, eventName); } @@ -100,7 +109,7 @@ export class Topic { * @param {string} eventName Identification name of the event. * @return {number} Number of listeners. */ - listenerCount(eventName: string): number { + async listenerCount(eventName: string): Promise { const e = this.event[eventName]; if (isNullish(e)) { return 0; @@ -113,7 +122,7 @@ export class Topic { * @param {string} eventName Identification name of the event. * @return {boolean} True if any listener is listening, otherwise false. */ - hasListeners(eventName: string): boolean { + async hasListeners(eventName: string): Promise { const e = this.event[eventName]; if (isNullish(e)) { return false; @@ -141,14 +150,14 @@ export class Topic { * Remove all listener listening to eventName event. * @param {string} eventName Identification name of the event. */ - removeAllListeners(eventName: string) { + async removeAllListeners(eventName: string) { delete this.event[eventName]; } /** * Stop everything */ - stop() { + async stop() { this.event = {}; } } @@ -158,8 +167,7 @@ export class Topic { * It uses in-process communication * and does not support sending events to other processes or hosts. */ -export class Local { - +export class Local implements EventProvider { config: any; logger: Logger; topics: Record; @@ -179,7 +187,7 @@ export class Local { if (this.topics[topicName]) { return this.topics[topicName]; } - this.topics[topicName] = new Topic(topicName, this.logger, config); + this.topics[topicName] = new LocalTopic(topicName, this.logger, config); return this.topics[topicName]; } @@ -195,10 +203,22 @@ export class Local { /** * Stop the event provider and all event communication. */ - async stop(): Promise { - Object.values(this.topics).forEach((topic) => { - topic.stop(); - }) + async stop(): Promise { + await Promise.allSettled( + Object.values(this.topics).map((topic) => { + topic.stop(); + }) + ) + }; + + async delete(topics: string[]): Promise { + for (const topic of topics) { + delete this.topics[topic]; + } + } + + async deleteAll(): Promise { + this.topics = {}; } } diff --git a/packages/kafka-client/src/index.ts b/packages/kafka-client/src/index.ts index 669e3467f..937e755b3 100644 --- a/packages/kafka-client/src/index.ts +++ b/packages/kafka-client/src/index.ts @@ -1,5 +1,3 @@ -import { Events } from './events/index.js'; -export { Events }; -export * from './events/provider/kafka/index.js'; -export { Topic as localTopic, Local as local } from './events/provider/local/index.js'; +export * from './events/interface.js'; +export { Events, registerEventProvider } from './events/index.js'; export { registerProtoMeta, encodeMessage, decodeMessage } from './protos.js'; diff --git a/packages/kafka-client/src/utils.ts b/packages/kafka-client/src/utils.ts new file mode 100644 index 000000000..7440d79c7 --- /dev/null +++ b/packages/kafka-client/src/utils.ts @@ -0,0 +1,5 @@ + +export const decomposeError = (error: any) => { + const { code, message, details, stack } = error; + return { code, message, details, stack }; +} \ No newline at end of file diff --git a/packages/kafka-client/test/1-events_subscribe.spec.ts b/packages/kafka-client/test/1-event-provider.spec.ts similarity index 55% rename from packages/kafka-client/test/1-events_subscribe.spec.ts rename to packages/kafka-client/test/1-event-provider.spec.ts index 2370a0162..5bbe63b3a 100644 --- a/packages/kafka-client/test/1-events_subscribe.spec.ts +++ b/packages/kafka-client/test/1-event-provider.spec.ts @@ -6,7 +6,7 @@ import { protoMetadata } from '@restorecommerce/rc-grpc-clients/dist/generated/t registerProtoMeta(protoMetadata); -const kafkaConfig = { +const providerConfig = { events: { kafkaTest: { provider: 'kafka', @@ -44,24 +44,23 @@ const logger = createLogger(loggerConfig.logger); /* global describe it before after */ -describe('events', () => { - describe('without a provider', () => { +describe('EventProviders:', () => { + describe('without provider config:', () => { const topicName = 'test'; - describe('awaiting subscribe', () => { - it('should throw an error', async () => { - try { - const events: Events = new Events(); - await events.topic(topicName); - } catch (err) { - expect(err).not.toBe(undefined); - expect(err.message).to.equal('missing argument config'); - } - }); + it('should throw an error', async () => { + try { + const events: Events = new Events(); + await events.topic(topicName); + } catch (err) { + expect(err).not.toBe(undefined); + expect(err.message).to.equal('missing argument config'); + } }); }); - const providers = ['kafkaTest', 'localTest']; + + const providers = ['localTest', 'kafkaTest']; forEach(providers, (providerName: string) => { - describe(`testing config ${providerName}`, () => { + describe(`with provider config ${providerName}:`, () => { let events: Events; const topicName = 'com.example.test'; let topic: Topic; @@ -69,17 +68,15 @@ describe('events', () => { const testMessage = {value: 'testValue', count: 1}; beforeAll(async function () { - events = new Events(kafkaConfig.events[providerName], logger); + events = new Events(providerConfig.events[providerName], logger); await events.start(); - - await new Promise(resolve => setTimeout(resolve, 5000)); }, 10000); afterAll(async function () { await events.stop(); }, 10000); - describe('creating a topic', () => { + describe('creating topics', () => { it('should return a topic', async () => { topic = await (events.topic(topicName)); expect(topic).not.toBe(undefined); @@ -90,15 +87,22 @@ describe('events', () => { expect(topic.removeListener).not.toBe(undefined); expect(topic.removeAllListeners).not.toBe(undefined); }); + + const burst = 20; + it(`should manage a burst of ${burst} subscribed topics at a time`, async () => { + for (let i = 0; i < burst; ++i) { + await events.topic(topicName+i).then( + topic => topic.on(eventName, () => {}) + ); + } + }); }); describe('subscribing', function startKafka(): void { it('should allow listening to events', async () => { - topic = await (events.topic(topicName)); + topic = await events.topic(topicName); - const listener = () => { - // void listener - }; + const listener = async (...args: any[]) => {}; const count: number = await topic.listenerCount(eventName); expect(count).not.toBe(undefined); await topic.on(eventName, listener); @@ -106,23 +110,51 @@ describe('events', () => { const countAfter = await topic.listenerCount(eventName); expect(countAfter).to.equal(count + 1); }); + it('should allow emitting and receiving a message', async function () { topic = await (events.topic(topicName)); - let returnedMessage; - const listener = (message, context, config, eventName) => returnedMessage = message; + let returnedMessage: any; + const listener = async (message: any, ...args: any[]) => returnedMessage = message; await topic.on(eventName, listener); await topic.emit(eventName, testMessage); - while (returnedMessage == undefined) { + while (returnedMessage === undefined) { await new Promise((r) => setTimeout(r, 10)); } expect(returnedMessage.value).to.equal(testMessage.value); expect(returnedMessage.count).to.equal(testMessage.count); }, 20000); - }, 5000); + }, 20000); + + describe('removing listener', () => { + it('should allow removing all the subscribed listeners from topic', async function() { + await topic.removeAllListeners(eventName); + const count: number = await topic.listenerCount(eventName); + logger.info('Count of listeners after removing :', count); + expect(count).to.equal(0); + }, 20000); + + it( + 'should allow removing the subscribed listener from topic', + async () => { + const listener = async () => {}; + await topic.on(eventName, listener); + await topic.on(eventName, async () => {}); + await topic.removeListener(eventName, listener); + const count: number = await topic.listenerCount(eventName); + logger.info('Count of listeners after removing :', count); + expect(count).to.equal(1); + await topic.removeAllListeners(eventName); + } + ); + + it('should delete all topics', async function() { + await events.deleteAll(); + }); + }); }); }); }); diff --git a/packages/kafka-client/test/2-events_remove.spec.ts b/packages/kafka-client/test/2-events_remove.spec.ts deleted file mode 100644 index bf6e4049b..000000000 --- a/packages/kafka-client/test/2-events_remove.spec.ts +++ /dev/null @@ -1,82 +0,0 @@ -import { forEach } from 'remeda'; -import { Events, registerProtoMeta, Topic } from '../src'; -import { createLogger } from '@restorecommerce/logger'; -import { expect, it, describe, beforeAll, afterAll } from 'vitest'; -import { protoMetadata } from '@restorecommerce/rc-grpc-clients/dist/generated/test/test'; - -registerProtoMeta(protoMetadata); - -const kafkaConfig = { - events: { - kafkaTest: { - provider: 'kafka', - groupId: 'restore-chassis-test-server', - kafka: { - clientId: 'restore-chassis-test-server', - brokers: [ - 'localhost:29092' - ], - }, - testEvent: { - messageObject: 'test.TestEvent' - } - } - }, -}; - -const loggerConfig: any = { - logger: { - console: { - handleExceptions: false, - level: 'debug', - colorize: true, - prettyPrint: true - } - } -}; -const logger = createLogger(loggerConfig.logger); - -/* global describe it before after */ - -describe('events', () => { - const providers = ['kafkaTest']; - forEach(providers, (providerName: string) => { - describe(`testing config ${providerName}`, () => { - let events: Events; - const topicName = 'com.example.test'; - let topic: Topic; - const eventName = 'testEvent'; - - beforeAll(async function () { - const kafkaConfigEvents = kafkaConfig.events[providerName]; - events = new Events(kafkaConfig.events[providerName], logger); - await events.start(); - // Subscribe to topic - topic = await (events.topic(topicName)); - }, 10000); - afterAll(async function () { - await events.stop(); - }, 10000); - describe('removing listener', () => { - it('should allow removing the subscribed listener from topic', - async () => { - const listener = () => { - }; - await topic.on(eventName, listener); - await topic.on(eventName, () => { - }); - await topic.removeListener(eventName, listener); - const count: number = await topic.listenerCount(eventName); - logger.info('Count of listeners after removing :', count); - expect(count).to.equal(1); - }); - it('should allow removing all the subscribed listeners from topic', async function() { - await topic.removeAllListeners(eventName); - const count: number = await topic.listenerCount(eventName); - logger.info('Count of listeners after removing :', count); - expect(count).to.equal(0); - }, 20000); - }); - }); - }); -}); diff --git a/packages/kafka-client/test/0-kafka.spec.ts b/packages/kafka-client/test/2-kafka-offset.spec.ts similarity index 90% rename from packages/kafka-client/test/0-kafka.spec.ts rename to packages/kafka-client/test/2-kafka-offset.spec.ts index 49cd0bdeb..62d95e922 100644 --- a/packages/kafka-client/test/0-kafka.spec.ts +++ b/packages/kafka-client/test/2-kafka-offset.spec.ts @@ -43,16 +43,20 @@ describe('Kafka provider test', () => { const topicName = 'com.example.test'; const eventName = 'exampleEvent'; let initialOffset: bigint; + beforeAll(async () => { // start the client await client.start(); const topic = await client.topic(topicName); initialOffset = await topic.$offset(BigInt(-1)); }); + afterAll(async function() { // stop the client + await client.deleteAll(); await client.stop(); }, 10000); + describe('topic.wait', function testWait(): void { it('should wait until the event message is processed', async () => { @@ -63,12 +67,12 @@ describe('Kafka provider test', () => { await new Promise(async (resolve) => { // subscribe to topic for example-event with listener as call back. - await topic.on(eventName, (message, context) => { + await topic.on(eventName, async (message) => { expect(message).not.toBe(undefined); expect(testMessage.value).toBe(message.value); expect(testMessage.count).toBe(message.count); logger.info('Received message :', message); - topic.removeAllListeners(eventName).then(resolve); + await topic.removeAllListeners(eventName).then(resolve); }); // emit the message to Kafka (message is encoded and sent to Kafka) await topic.emit(eventName, testMessage) @@ -80,15 +84,19 @@ describe('Kafka provider test', () => { async () => { // create topic object const topic: Topic = await client.topic(topicName); - // order of count should be preserved - const expectedCountArr = [1, 2, 3, 4, 5]; const countArr = new Array; // subscribe to topic for example-event with listener as call back. - await topic.on(eventName, (message: any, context, config, eventName) => { - console.log('received message:', message) - expect(message).not.toBe(undefined); - countArr.push(message.count); - }, { queue: true }); + await topic.on( + eventName, + (message: any, context, config, eventName) => { + console.log('received message:', message) + expect(message).not.toBe(undefined); + countArr.push(message.count); + }, + { + queue: true + }, + ); // get the current offset const offset = await topic.$offset(BigInt(-1));