Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions packages/kafka-client/src/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@ import { isNullish, isString } from 'remeda';
import { createLogger, Logger } from '@restorecommerce/logger';
import * as kafka from './provider/kafka/index.js';
import * as local from './provider/local/index.js';
import { Topic } from './provider/kafka/index.js';
import { EventProvider, Topic } from './interface.js';

type EventProvicerClass = new (config: any, logger: Logger, ...args: any[]) => EventProvider;

/**
* A key, value map containing event providers.
* Event providers are registered with the register function.
*/
const eventProviders: any = {};
const eventProviders = {} as Record<string, EventProvicerClass>;

/**
* Register a event provider.
*
* @param {string} name Event provider identifier
* @param {constructor} provider Event provider constructor function
*/
export const registerEventProvider = (name: string, provider: any): void => {
export const registerEventProvider = (name: string, provider: EventProvicerClass): void => {
eventProviders[name] = provider;
};

Expand All @@ -28,8 +30,8 @@ registerEventProvider(local.Name, local.Local);
*/
export class Events {
config: any;
logger: Logger;
provider: any;
logger?: Logger;
provider: EventProvider;
/**
* @param [Object] config Event configuration.
* @param [Logger] logger
Expand All @@ -41,7 +43,7 @@ export class Events {
}
this.config = config;

const loggerCfg = this.config.logger;
const loggerCfg = this.config?.logger;
if (loggerCfg) {
loggerCfg.esTransformer = (msg: any) => {
msg.fields = JSON.stringify(msg.fields);
Expand All @@ -63,7 +65,7 @@ export class Events {
// provider
const providerName = this.config.provider;
if (isNullish(providerName)) {
this.logger.error('config does not have event provider name', this.config);
this.logger?.error('config does not have event provider name', this.config);
throw new Error('config does not have event provider name');
}
const Provider = eventProviders[providerName];
Expand All @@ -87,10 +89,18 @@ export class Events {
* No events will be received or can be send after this call.
* Suspends the function until the provider is stopped.
*/
async stop(): Promise<any> {
async stop(): Promise<void> {
return await this.provider.stop();
}

async delete(topics: string[]): Promise<void> {
return await this.provider.delete(topics);
}

async deleteAll(): Promise<void> {
return await this.provider.deleteAll();
}

/**
* Returns a topic from the provider.
*
Expand All @@ -102,10 +112,8 @@ export class Events {
throw new Error('missing argument name');
}
if (!isString(name)) {
throw new Error('argument name is not of type string');
throw new Error('argument name is not type of string');
}
// topic() api called inside Local / Kafka class - which then
// invokes the actual topic constructor
return this.provider.topic(name, this.config, manualOffsetCommit || false);
}
}
26 changes: 26 additions & 0 deletions packages/kafka-client/src/events/interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

export type Listener = (message?: any, context?: any, config?: any, eventName?: any) => void | Promise<void>;

export interface Topic {
commitCurrentOffsets(): Promise<void>;
$wait(arg0: bigint): Promise<void>;
$offset(arg0: bigint): Promise<bigint>;
$reset(eventName: string, offset: bigint): Promise<void>;
$resetConsumer(eventNames: string[], offset: bigint): Promise<void>;
emit(eventName: string, msg: any): Promise<void>;
on(eventName: string, listener: Listener, config?: any): Promise<void>;
listenerCount(eventName: string): Promise<number>;
removeListener(eventName: string, listener: Listener): Promise<void>;
removeAllListeners(eventName: string): Promise<void>;
stop(): Promise<void>;
hasListeners(eventName?: string): Promise<boolean>;
get subscribed(): string[];
}

export interface EventProvider {
topic(name: string, config: any, manualOffset?: boolean): Promise<Topic>;
stop(): Promise<void>;
start(): Promise<void>;
delete(topics: string[]): Promise<void>;
deleteAll(): Promise<void>;
}
Loading
Loading