Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ export const IntegratedServices = Object.freeze({
QCG: 'qcg',
QC: 'qc',
CCDB: 'ccdb',
KAFKA: 'kafka',
});
21 changes: 11 additions & 10 deletions QualityControl/lib/QCModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ export const setupQcModel = async (eventEmitter) => {
logger.warnMessage('No database configuration found, skipping database initialization');
}

const layoutRepository = new LayoutRepository(jsonFileService);
const userRepository = new UserRepository(jsonFileService);
const chartRepository = new ChartRepository(jsonFileService);

const userController = new UserController(userRepository);
const layoutController = new LayoutController(layoutRepository);

const statusService = new StatusService({ version: packageJSON?.version ?? '-' }, { qc: config.qc ?? {} });
const statusController = new StatusController(statusService);

if (config?.kafka?.enabled) {
try {
const validConfig = await KafkaConfigDto.validateAsync(config.kafka);
Expand All @@ -89,22 +99,13 @@ export const setupQcModel = async (eventEmitter) => {
logLevel: logLevel.NOTHING,
});
const aliEcsSynchronizer = new AliEcsSynchronizer(kafkaClient, consumerGroups, eventEmitter);
statusService.aliEcsSynchronizer = aliEcsSynchronizer;
aliEcsSynchronizer.start();
} catch (error) {
logger.errorMessage(`Kafka initialization/connection failed: ${error.message}`);
}
}

const layoutRepository = new LayoutRepository(jsonFileService);
const userRepository = new UserRepository(jsonFileService);
const chartRepository = new ChartRepository(jsonFileService);

const userController = new UserController(userRepository);
const layoutController = new LayoutController(layoutRepository);

const statusService = new StatusService({ version: packageJSON?.version ?? '-' }, { qc: config.qc ?? {} });
const statusController = new StatusController(statusService);

const qcdbDownloadService = new QcdbDownloadService(config.ccdb);

const ccdbService = CcdbService.setup(config.ccdb);
Expand Down
35 changes: 35 additions & 0 deletions QualityControl/lib/services/Status.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { exec } from 'node:child_process';

import { LogManager } from '@aliceo2/web-ui';
import { IntegratedServices } from './../../common/library/enums/Status/integratedServices.enum.js';
import { ServiceStatus } from '../../common/library/enums/Status/serviceStatus.enum.js';

const QC_VERSION_EXEC_COMMAND = 'yum info o2-QualityControl | awk \'/Version/ {print $3}\'';
const execPromise = promisify(exec);
Expand All @@ -43,6 +44,11 @@ export class StatusService {
*/
this._ws = undefined;

/**
* @type {?AliEcsSynchronizer}
*/
this._aliEcsSynchronizer = undefined;

this._packageInfo = packageInfo;
this._config = config;
}
Expand All @@ -64,6 +70,9 @@ export class StatusService {
case IntegratedServices.CCDB:
result = await this.retrieveDataServiceStatus();
break;
case IntegratedServices.KAFKA:
result = this.retrieveKafkaServiceStatus();
break;
}
return result;
}
Expand Down Expand Up @@ -120,6 +129,23 @@ export class StatusService {
return { name: 'CCDB', status, version, extras: {} };
}

/**
* Retrieve the kafka service status response
* @returns {object} - status of the kafka service
*/
retrieveKafkaServiceStatus() {
const status = this._aliEcsSynchronizer?.status;
return {
name: IntegratedServices.KAFKA,
status: {
ok: status === ServiceStatus.SUCCESS,
},
extras: {
state: status ?? 'NOT_CONFIGURED',
},
};
}

/*
* Getters & Setters
*/
Expand All @@ -141,4 +167,13 @@ export class StatusService {
set ws(ws) {
this._ws = ws;
}

/**
* Set instance of `AliEcsSynchronizer`
* @param {AliEcsSynchronizer} aliEcsSynchronizer - instance of the `AliEcsSynchronizer`
* @returns {void}
*/
set aliEcsSynchronizer(aliEcsSynchronizer) {
this._aliEcsSynchronizer = aliEcsSynchronizer;
}
}
27 changes: 21 additions & 6 deletions QualityControl/lib/services/external/AliEcsSynchronizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import { AliEcsEventMessagesConsumer, LogManager } from '@aliceo2/web-ui';
import { EmitterKeys } from './../../../common/library/enums/emitterKeys.enum.js';
import { ServiceStatus } from '../../../common/library/enums/Status/serviceStatus.enum.js';

const LOG_FACILITY = `${process.env.npm_config_log_label ?? 'qcg'}/ecs-synchronizer`;
const RUN_TOPICS = ['aliecs.run'];
Expand All @@ -26,7 +27,7 @@
* @param {import('kafkajs').Kafka} kafkaClient - configured kafka client
* @param {KafkaConfiguration.consumerGroups} consumerGroups - consumer groups to be used for various topics
* @param {EventEmitter} eventEmitter - event emitter to be used to emit events when new data is available
* @param {class} ConsumerClass - class to be used for creating the consumer, defaults to AliEcsEventMessagesConsumer

Check warning on line 30 in QualityControl/lib/services/external/AliEcsSynchronizer.js

View workflow job for this annotation

GitHub Actions / Check eslint rules on ubuntu-latest

Syntax error in type: class
*/
constructor(kafkaClient, consumerGroups, eventEmitter, ConsumerClass = AliEcsEventMessagesConsumer) {
this._logger = LogManager.getLogger(LOG_FACILITY);
Expand All @@ -38,18 +39,24 @@
RUN_TOPICS,
);
this._ecsRunConsumer.onMessageReceived(this._onRunMessage.bind(this));

this._status = ServiceStatus.NOT_ASKED;
}

/**
* Start the synchronization process and listen to events from various topics via their consumers
* @returns {void}
* @returns {Promise<void>}
*/
start() {
async start() {
this._logger.infoMessage('Starting to consume AliECS messages for topics:');
this._ecsRunConsumer
.start()
.catch((error) =>
this._logger.errorMessage(`Error when starting ECS run consumer: ${error.message}\n${error.stack}`));
this._status = ServiceStatus.LOADING;
try {
await this._ecsRunConsumer.start();
this._status = ServiceStatus.SUCCESS;
} catch (error) {
this._logger.errorMessage(`Error when starting ECS run consumer: ${error.message}\n${error.stack}`);
this._status = ServiceStatus.ERROR;
}
}

/**
Expand All @@ -75,4 +82,12 @@
});
}
}

/**
* Returns the current kafka service status
* @returns {ServiceStatus} - The kafka service status
*/
get status() {
return this._status;
}
}
7 changes: 7 additions & 0 deletions QualityControl/public/Model.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import LayoutListModel from './pages/layoutListView/model/LayoutListModel.js';
import { RequestFields } from './common/RequestFields.enum.js';
import FilterModel from './common/filters/model/FilterModel.js';
import { IntegratedServices } from '../library/enums/Status/integratedServices.enum.js';

/**
* Represents the application's state and actions as a class
Expand Down Expand Up @@ -115,6 +116,12 @@
height: 10,
};

// For active run monitoring, the kafka service must be available.
// If we do not yet know the kafka service status, we should request it from the backend
if (!this.aboutViewModel.findService(IntegratedServices.KAFKA)) {
this.aboutViewModel.retrieveIndividualServiceStatus(IntegratedServices.KAFKA);
}

/*
* Init first page
*/
Expand Down Expand Up @@ -276,7 +283,7 @@

/**
* Clear URL parameters and redirect to a certain page
* @param {*} pageName - name of the page to be redirected to

Check warning on line 286 in QualityControl/public/Model.js

View workflow job for this annotation

GitHub Actions / Check eslint rules on ubuntu-latest

Prefer a more specific type to `*`
* @returns {undefined}
*/
clearURL(pageName) {
Expand Down
43 changes: 37 additions & 6 deletions QualityControl/public/common/filters/filterViews.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
import { filterInput, dynamicSelector, ongoingRunsSelector } from './filter.js';
import { FilterType } from './filterTypes.js';
import { filtersConfig, runModeFilterConfig } from './filtersConfig.js';
import { runModeCheckbox } from './runMode/runModeCheckbox.js';
import { lastUpdatePanel, runStatusPanel } from './runMode/runStatusPanel.js';
import { h, iconChevronBottom, iconChevronTop } from '/js/src/index.js';
import { h, iconChevronBottom, iconChevronTop, iconWarning } from '/js/src/index.js';
import { IntegratedServices } from '../../../library/enums/Status/integratedServices.enum.js';
import { spinner } from '../spinner.js';
import { ServiceStatus } from '../../../library/enums/Status/serviceStatus.enum.js';
import { runModeCheckbox } from './runMode/runModeCheckbox.js';

/**
* Creates an input element for a specific metadata field;
Expand Down Expand Up @@ -76,15 +79,16 @@ export function filtersPanel(filterModel, viewModel) {
lastRefresh,
ONGOING_RUN_INTERVAL_MS: refreshRate,
} = filterModel;
if (!isVisible) {
return null;
}
const { fetchOngoingRuns } = filterService;
const onInputCallback = setFilterValue.bind(filterModel);
const onChangeCallback = setFilterValue.bind(filterModel);
const onFocusCallback = fetchOngoingRuns.bind(filterService);
const onEnterCallback = () => filterModel.triggerFilter(viewModel);
const clearFilterCallback = clearFiltersAndTrigger.bind(filterModel, viewModel);
if (!isVisible) {
return null;
}
const kafkaService = filterModel.model.aboutViewModel.findService(IntegratedServices.KAFKA);
const filtersList = isRunModeActivated
? runModeFilterConfig(filterService)
: filtersConfig(filterService);
Expand All @@ -93,7 +97,34 @@ export function filtersPanel(filterModel, viewModel) {
'.w-100.flex-column.p2.g2.justify-center#filterElement',
[
h('.flex-row.g2.justify-center.items-center', [
runModeCheckbox(filterModel, viewModel),
kafkaService?.match({
Loading: () => spinner(2, 'Checking if RunMode is configured'),
Failure: (payload) => h('.error-box.danger.flex-column.justify-center.f6.text-center', {
id: 'run-mode-failure',
}, [
h('span.error-icon', { title: 'RunMode is unavailable. Please contact administrator.' }, iconWarning()),
h('span', payload.status.message),
]),
Success: (payload) => {
switch (payload.extras.state) {
case ServiceStatus.SUCCESS:
return runModeCheckbox(filterModel, viewModel);
case 'NOT_CONFIGURED':
return null;
default:
return h('.error-box.danger.flex-column.justify-center.f6.text-center', {
id: 'run-mode-failure',
}, [
h('span.error-icon', {
title: 'RunMode is unavailable. Please contact administrator.',
}, iconWarning()),
h('span', 'Contact an administrator and include this information:'),
h('span', `Kafka service returned status code '${payload.extras.state ?? '?'}'`),
]);
}
},
Other: () => {},
}),
!isRunModeActivated &&
[triggerFiltersButton(onEnterCallback, filterModel), clearFiltersButton(clearFilterCallback)],
...filtersList.map((filter) =>
Expand Down
16 changes: 16 additions & 0 deletions QualityControl/public/pages/aboutView/AboutViewModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,20 @@ export default class AboutViewModel extends BaseViewModel {
this.model.notification.show(`Error fetching data for ${service}: ${error.message}`, 'danger', 2000);
}
}

/**
* Iterates through all known {@link ServiceStatus} values and returns the
* first matching service found. This assumes that a given service can exist
* in at most one {@link ServiceStatus} at a time.
* @param {string} service - The service identifier to look up
* @returns {RemoteData|undefined} - The service instance under any `ServiceStatus`, or `undefined` if not found.
*/
findService(service) {
for (const status of Object.values(ServiceStatus)) {
if (this.services[status][service]) {
return this.services[status][service];
}
}
return undefined;
}
}
65 changes: 65 additions & 0 deletions QualityControl/test/lib/services/StatusService.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { deepStrictEqual } from 'node:assert';
import { suite, test, before } from 'node:test';

import { StatusService } from './../../../lib/services/Status.service.js';
import { ServiceStatus } from '../../../common/library/enums/Status/serviceStatus.enum.js';

export const statusServiceTestSuite = async () => {
suite('`retrieveDataServiceStatus()` tests', () => {
Expand Down Expand Up @@ -47,17 +48,20 @@ export const statusServiceTestSuite = async () => {
test('should successfully build an object with framework information from all used sources', async () => {
const statusService = new StatusService();
statusService.dataService = { getVersion: stub().resolves({ version: '0.0.1-beta' }) };
statusService.aliEcsSynchronizer = { status: ServiceStatus.SUCCESS };

const statusInfo = await Promise.all([
statusService.retrieveServiceStatus('qcg'),
statusService.retrieveServiceStatus('qc'),
statusService.retrieveServiceStatus('ccdb'),
statusService.retrieveServiceStatus('kafka'),
]);

const expectedResults = [
{ name: 'QCG', version: '', status: { ok: true }, extras: { clients: -1 } },
{ name: 'QC', status: { ok: true }, version: 'Not part of an FLP deployment', extras: {} },
{ name: 'CCDB', status: { ok: true }, version: '0.0.1-beta', extras: {} },
{ name: 'kafka', status: { ok: true }, extras: { state: ServiceStatus.SUCCESS } },
];

deepStrictEqual(statusInfo, expectedResults);
Expand Down Expand Up @@ -100,4 +104,65 @@ export const statusServiceTestSuite = async () => {
});
});
});

suite('`retrieveKafkaServiceStatus()` tests', () => {
test('marks Kafka service as healthy when synchronizer reports SUCCESS', async () => {
const statusService = new StatusService();
statusService.aliEcsSynchronizer = { status: ServiceStatus.SUCCESS };
const result = statusService.retrieveKafkaServiceStatus();

deepStrictEqual(result, {
name: 'kafka',
status: { ok: true },
extras: { state: ServiceStatus.SUCCESS },
});
});

test('marks Kafka service as idle when synchronizer has not been queried', async () => {
const statusService = new StatusService();
statusService.aliEcsSynchronizer = { status: ServiceStatus.NOT_ASKED };
const result = statusService.retrieveKafkaServiceStatus();

deepStrictEqual(result, {
name: 'kafka',
status: { ok: false },
extras: { state: ServiceStatus.NOT_ASKED },
});
});

test('marks Kafka service as unhealthy when synchronizer reports an error', async () => {
const statusService = new StatusService();
statusService.aliEcsSynchronizer = { status: ServiceStatus.ERROR };
const result = statusService.retrieveKafkaServiceStatus();

deepStrictEqual(result, {
name: 'kafka',
status: { ok: false },
extras: { state: ServiceStatus.ERROR },
});
});

test('marks Kafka service as initializing while synchronizer is loading', async () => {
const statusService = new StatusService();
statusService.aliEcsSynchronizer = { status: ServiceStatus.LOADING };
const result = statusService.retrieveKafkaServiceStatus();

deepStrictEqual(result, {
name: 'kafka',
status: { ok: false },
extras: { state: ServiceStatus.LOADING },
});
});

test('marks Kafka service as not configured when no synchronizer is present', async () => {
const statusService = new StatusService();
const result = statusService.retrieveKafkaServiceStatus();

deepStrictEqual(result, {
name: 'kafka',
status: { ok: false },
extras: { state: 'NOT_CONFIGURED' },
});
});
});
};
Loading
Loading