diff --git a/backend/compact-connect/lambdas/nodejs/ingest-event-reporter/lambda.ts b/backend/compact-connect/lambdas/nodejs/ingest-event-reporter/lambda.ts index 29270d146..d73300a5d 100644 --- a/backend/compact-connect/lambdas/nodejs/ingest-event-reporter/lambda.ts +++ b/backend/compact-connect/lambdas/nodejs/ingest-event-reporter/lambda.ts @@ -11,6 +11,7 @@ import { JurisdictionClient } from '../lib/jurisdiction-client'; import { IEventBridgeEvent } from '../lib/models/event-bridge-event-detail'; import { IngestEventEmailService } from '../lib/email'; import { EventClient } from '../lib/event-client'; +import { Compact, IJurisdiction } from 'lib/models'; const environmentVariables = new EnvironmentVariablesService(); const logger = new Logger({ logLevel: environmentVariables.getLogLevel() }); @@ -59,8 +60,6 @@ export class Lambda implements LambdaInterface { logger.info('Processing event', { event: event }); logger.debug('Context wait for event loop', { wait_for_empty_event_loop: context.callbackWaitsForEmptyEventLoop }); - const [ startTimeStamp, endTimeStamp ] = this.eventClient.getYesterdayTimestamps(); - // Loop over each compact the system knows about for (const compact of environmentVariables.getCompacts()) { let compactConfig; @@ -78,90 +77,107 @@ export class Lambda implements LambdaInterface { // Loop over each jurisdiction that we have contacts configured for for (const jurisdictionConfig of jurisdictionConfigs) { - const ingestEvents = await this.eventClient.getEvents( - compact, jurisdictionConfig.postalAbbreviation, startTimeStamp, endTimeStamp - ); - - // If there were any issues, send a report email summarizing them - if (ingestEvents.ingestFailures.length || ingestEvents.validationErrors.length) { - const messageId = await this.emailService.sendReportEmail( - ingestEvents, - compactConfig.compactName, - jurisdictionConfig.jurisdictionName, - jurisdictionConfig.jurisdictionOperationsTeamEmails - ); - - logger.info( - 'Sent event summary email', - { - compact: compact, - jurisdiction: jurisdictionConfig.postalAbbreviation, - message_id: messageId - } - ); - } else { - logger.info( - 'No events in 24 hours', - { - compact: compact, - jurisdiction: jurisdictionConfig.postalAbbreviation - } - ); - const eventType = event.eventType; - - // If this is a weekly run and there have been no issues all week, we send an "All's Well" report - if (eventType === 'weekly') { - const [ weekStartStamp, weekEndStamp ] = this.eventClient.getLastWeekTimestamps(); - const weeklyIngestEvents = await this.eventClient.getEvents( - compact, - jurisdictionConfig.postalAbbreviation, - weekStartStamp, - weekEndStamp - ); - - // verify that the jurisdiction uploaded licenses within the last week without any errors - if (!weeklyIngestEvents.ingestFailures.length - && !weeklyIngestEvents.validationErrors.length - && weeklyIngestEvents.ingestSuccesses.length - ) { - const messageId = await this.emailService.sendAllsWellEmail( - compactConfig.compactName, - jurisdictionConfig.jurisdictionName, - jurisdictionConfig.jurisdictionOperationsTeamEmails - ); - - logger.info( - 'Sent alls well email', - { - compact: compactConfig.compactName, - jurisdiction: jurisdictionConfig.postalAbbreviation, - message_id: messageId - } - ); - } - else if(!weeklyIngestEvents.ingestSuccesses.length) { - const messageId = await this.emailService.sendNoLicenseUpdatesEmail( - compactConfig.compactName, - jurisdictionConfig.jurisdictionName, - [ - ...jurisdictionConfig.jurisdictionOperationsTeamEmails, - ...compactConfig.compactOperationsTeamEmails - ] - ); - - logger.warn( - 'No licenses uploaded withinin the last week', - { - compact: compactConfig.compactName, - jurisdiction: jurisdictionConfig.postalAbbreviation, - message_id: messageId - } - ); - } - } - } + switch (event.eventType) { + case 'weekly': + await this.runWeeklyReports(compactConfig, jurisdictionConfig); + break; + default: + // frequent case (every 15 minutes) + await this.runFrequentReports(compactConfig, jurisdictionConfig); + break; + }; + } } logger.info('Completing handler'); } + + public async runFrequentReports(compactConfig: Compact, jurisdictionConfig: IJurisdiction) { + const [ startTimeStamp, endTimeStamp ] = this.eventClient.getLast15MinuteTimestamps(); + + const ingestEvents = await this.eventClient.getEvents( + compactConfig.compactAbbr, jurisdictionConfig.postalAbbreviation, startTimeStamp, endTimeStamp + ); + + // If there were any issues, send a report email summarizing them + if (ingestEvents.ingestFailures.length || ingestEvents.validationErrors.length) { + const messageId = await this.emailService.sendReportEmail( + ingestEvents, + compactConfig.compactName, + jurisdictionConfig.jurisdictionName, + jurisdictionConfig.jurisdictionOperationsTeamEmails + ); + + logger.info( + 'Sent event summary email', + { + compact: compactConfig.compactAbbr, + jurisdiction: jurisdictionConfig.postalAbbreviation, + startTimeStamp, + endTimeStamp, + message_id: messageId + } + ); + } else { + logger.info( + 'No events in window', + { + compact: compactConfig.compactAbbr, + jurisdiction: jurisdictionConfig.postalAbbreviation, + startTimeStamp, + endTimeStamp + } + ); + } + } + + public async runWeeklyReports(compactConfig: Compact, jurisdictionConfig: IJurisdiction) { + const [ weekStartStamp, weekEndStamp ] = this.eventClient.getLastWeekTimestamps(); + const weeklyIngestEvents = await this.eventClient.getEvents( + compactConfig.compactAbbr, + jurisdictionConfig.postalAbbreviation, + weekStartStamp, + weekEndStamp + ); + + // verify that the jurisdiction uploaded licenses within the last week without any errors + if (!weeklyIngestEvents.ingestFailures.length + && !weeklyIngestEvents.validationErrors.length + && weeklyIngestEvents.ingestSuccesses.length + ) { + const messageId = await this.emailService.sendAllsWellEmail( + compactConfig.compactName, + jurisdictionConfig.jurisdictionName, + jurisdictionConfig.jurisdictionOperationsTeamEmails + ); + + logger.info( + 'Sent alls well email', + { + compact: compactConfig.compactName, + jurisdiction: jurisdictionConfig.postalAbbreviation, + message_id: messageId + } + ); + } + else if(!weeklyIngestEvents.ingestSuccesses.length) { + const messageId = await this.emailService.sendNoLicenseUpdatesEmail( + compactConfig.compactName, + jurisdictionConfig.jurisdictionName, + [ + ...jurisdictionConfig.jurisdictionOperationsTeamEmails, + ...compactConfig.compactOperationsTeamEmails + ] + ); + + logger.warn( + 'No licenses uploaded within the last week', + { + compact: compactConfig.compactName, + jurisdiction: jurisdictionConfig.postalAbbreviation, + message_id: messageId + } + ); + } + } } diff --git a/backend/compact-connect/lambdas/nodejs/lib/event-client.ts b/backend/compact-connect/lambdas/nodejs/lib/event-client.ts index e898e7af1..c27299211 100644 --- a/backend/compact-connect/lambdas/nodejs/lib/event-client.ts +++ b/backend/compact-connect/lambdas/nodejs/lib/event-client.ts @@ -26,6 +26,33 @@ export class EventClient { this.dynamoDBClient = props.dynamoDBClient; } + /* + * Returns timestamps for the last complete 15-minute block + * i.e. if now is 13:05, returns 12:45-13:00 + * if now is 13:15, returns 13:00-13:15 + */ + public getLast15MinuteTimestamps(): [number, number] { + const now: Date = new Date(); + const last15MinuteBlockStart: Date = new Date(); + const last15MinuteBlockEnd: Date = new Date(); + + // Calculate the start of the current 15-minute block + const currentBlockStartMinutes = now.getUTCMinutes() - (now.getUTCMinutes() % 15); + + last15MinuteBlockStart.setUTCMinutes(currentBlockStartMinutes, 0, 0); + + // The end of the previous complete block is the start of the current block + last15MinuteBlockEnd.setTime(last15MinuteBlockStart.getTime()); + + // The start of the previous complete block is 15 minutes before the end + last15MinuteBlockStart.setUTCMinutes(currentBlockStartMinutes - 15, 0, 0); + + return [ + Math.floor((last15MinuteBlockStart.valueOf()/1000)), + Math.floor((last15MinuteBlockEnd.valueOf()/1000)), + ]; + } + /* * Returns timestamps for the beginning and end of the previous UTC day */ @@ -40,12 +67,8 @@ export class EventClient { // Uncomment to manually force today's events into the time window (for development/testing) // today.setUTCDate(today.getUTCDate() + 1); return [ - Number.parseInt( - (yesterday.valueOf()/1000).toString() - ), - Number.parseInt( - (today.valueOf()/1000).toString() - ) + Math.floor((yesterday.valueOf()/1000)), + Math.floor((today.valueOf()/1000)), ]; } @@ -63,12 +86,8 @@ export class EventClient { // Uncomment to manually force today's events into the time window (for development/testing) // today.setUTCDate(today.getUTCDate() + 1); return [ - Number.parseInt( - (lastWeek.valueOf()/1000).toString() - ), - Number.parseInt( - (today.valueOf()/1000).toString() - ) + Math.floor((lastWeek.valueOf()/1000)), + Math.floor((today.valueOf()/1000)), ]; } diff --git a/backend/compact-connect/lambdas/nodejs/lib/models/index.ts b/backend/compact-connect/lambdas/nodejs/lib/models/index.ts index 5ed805706..061854fae 100644 --- a/backend/compact-connect/lambdas/nodejs/lib/models/index.ts +++ b/backend/compact-connect/lambdas/nodejs/lib/models/index.ts @@ -1,2 +1,3 @@ export * from './event-records'; export * from './jurisdiction'; +export * from './compact'; diff --git a/backend/compact-connect/lambdas/nodejs/tests/ingest-event-reporter.test.ts b/backend/compact-connect/lambdas/nodejs/tests/ingest-event-reporter.test.ts index 10358b69f..45b1c33f9 100644 --- a/backend/compact-connect/lambdas/nodejs/tests/ingest-event-reporter.test.ts +++ b/backend/compact-connect/lambdas/nodejs/tests/ingest-event-reporter.test.ts @@ -73,7 +73,7 @@ const mockSendNoLicenseUpdatesEmail = jest.fn().mockImplementation( (recipients: string[]) => Promise.resolve('message-id-no-license-updates') ); -describe('Nightly runs', () => { +describe('Frequent runs', () => { let mockSESClient: ReturnType; let mockS3Client: ReturnType; let lambda: Lambda; @@ -459,7 +459,7 @@ describe('Weekly runs', () => { ); }); - it('should send a report email and not an alls well, when there were errors', async () => { + it('should send nothing, when there were errors', async () => { const mockDynamoDBClient = mockClient(DynamoDBClient); const mockS3Client = mockClient(S3Client); @@ -523,8 +523,9 @@ describe('Weekly runs', () => { } ); - // Verify an event report was sent - expect(mockSendReportEmail).toHaveBeenCalled(); + // Verify an event report was not sent + expect(mockSendReportEmail).not.toHaveBeenCalled(); expect(mockSendAllsWellEmail).not.toHaveBeenCalled(); + expect(mockSendNoLicenseUpdatesEmail).not.toHaveBeenCalled(); }); }); diff --git a/backend/compact-connect/lambdas/nodejs/tests/lib/event-client.test.ts b/backend/compact-connect/lambdas/nodejs/tests/lib/event-client.test.ts index 08078cf60..19c924b8e 100644 --- a/backend/compact-connect/lambdas/nodejs/tests/lib/event-client.test.ts +++ b/backend/compact-connect/lambdas/nodejs/tests/lib/event-client.test.ts @@ -77,6 +77,61 @@ describe('EventClient', () => { jest.clearAllMocks(); }); + it('should produce 15-minute timestamps 900 seconds (15 minutes) apart', async () => { + const eventClient = new EventClient({ + logger: new Logger(), + dynamoDBClient: asDynamoDBClient(mockDynamoDBClient) + }); + + const [ startStamp, endStamp ] = eventClient.getLast15MinuteTimestamps(); + + expect(endStamp - startStamp).toEqual(900); + }); + + it('should produce 15-minute blocks', async () => { + const eventClient = new EventClient({ + logger: new Logger(), + dynamoDBClient: asDynamoDBClient(mockDynamoDBClient) + }); + + // Test case 1: if 'now' is at 11:01, it should return timestamps at 10:45-11:00 + jest.useFakeTimers(); + jest.setSystemTime(new Date('2025-01-01T11:01:00.000Z')); + + const [ startStamp1, endStamp1 ] = eventClient.getLast15MinuteTimestamps(); + const expectedStart1 = Math.floor(new Date('2025-01-01T10:45:00.000Z').getTime() / 1000); + const expectedEnd1 = Math.floor(new Date('2025-01-01T11:00:00.000Z').getTime() / 1000); + + expect(startStamp1).toEqual(expectedStart1); + expect(endStamp1).toEqual(expectedEnd1); + expect(endStamp1 - startStamp1).toEqual(900); // 15 minutes (10:45 to 11:00) + + // Test case 2: if 'now' is at 2025-01-01T00:00:00.001Z, it should return timestamps for 2024-12-31T23:45:00.000Z-2025-01-01T00:00:00.000Z + jest.setSystemTime(new Date('2025-01-01T00:00:00.001Z')); + + const [ startStamp2, endStamp2 ] = eventClient.getLast15MinuteTimestamps(); + const expectedStart2 = Math.floor(new Date('2024-12-31T23:45:00.000Z').getTime() / 1000); + const expectedEnd2 = Math.floor(new Date('2025-01-01T00:00:00.000Z').getTime() / 1000); + + expect(startStamp2).toEqual(expectedStart2); + expect(endStamp2).toEqual(expectedEnd2); + expect(endStamp2 - startStamp2).toEqual(900); // 15 minutes (23:45 to 00:00) + + // Test case 3: if 'now' is at 12:35, it should return timestamps at 12:15-12:30 + jest.setSystemTime(new Date('2025-01-01T12:35:00.000Z')); + + const [ startStamp3, endStamp3 ] = eventClient.getLast15MinuteTimestamps(); + const expectedStart3 = Math.floor(new Date('2025-01-01T12:15:00.000Z').getTime() / 1000); + const expectedEnd3 = Math.floor(new Date('2025-01-01T12:30:00.000Z').getTime() / 1000); + + expect(startStamp3).toEqual(expectedStart3); + expect(endStamp3).toEqual(expectedEnd3); + expect(endStamp3 - startStamp3).toEqual(900); // 15 minutes (12:15 to 12:30) + + // Restore real timers + jest.useRealTimers(); + }); + it('should produce nightly timestamps 86400 seconds (24 hours) apart', async () => { const eventClient = new EventClient({ logger: new Logger(), diff --git a/backend/compact-connect/lambdas/python/common/cc_common/data_model/schema/license/api.py b/backend/compact-connect/lambdas/python/common/cc_common/data_model/schema/license/api.py index fbcf1c0d2..52c7764ff 100644 --- a/backend/compact-connect/lambdas/python/common/cc_common/data_model/schema/license/api.py +++ b/backend/compact-connect/lambdas/python/common/cc_common/data_model/schema/license/api.py @@ -80,6 +80,35 @@ def validate_compact_eligibility(self, data, **_kwargs): ) +class LicenseReportResponseSchema(ForgivingSchema): + """ + License object fields, as included in ingest error reports to state operational staff. + + Serialization direction: + Python -> load() -> API + """ + + providerId = Raw(required=True, allow_none=False) + type = String(required=True, allow_none=False) + compact = Compact(required=True, allow_none=False) + jurisdiction = Jurisdiction(required=True, allow_none=False) + licenseType = String(required=True, allow_none=False) + licenseStatusName = String(required=False, allow_none=False, validate=Length(1, 100)) + licenseStatus = ActiveInactive(required=True, allow_none=False) + jurisdictionUploadedLicenseStatus = ActiveInactive(required=True, allow_none=False) + compactEligibility = CompactEligibility(required=True, allow_none=False) + jurisdictionUploadedCompactEligibility = CompactEligibility(required=True, allow_none=False) + npi = NationalProviderIdentifier(required=False, allow_none=False) + licenseNumber = String(required=False, allow_none=False, validate=Length(1, 100)) + givenName = String(required=True, allow_none=False, validate=Length(1, 100)) + middleName = String(required=False, allow_none=False, validate=Length(1, 100)) + familyName = String(required=True, allow_none=False, validate=Length(1, 100)) + suffix = String(required=False, allow_none=False, validate=Length(1, 100)) + dateOfIssuance = Raw(required=True, allow_none=False) + dateOfRenewal = Raw(required=False, allow_none=False) + dateOfExpiration = Raw(required=True, allow_none=False) + + class LicenseGeneralResponseSchema(ForgivingSchema): """ License object fields, as seen by staff users with only the 'readGeneral' permission. diff --git a/backend/compact-connect/lambdas/python/provider-data-v1/handlers/bulk_upload.py b/backend/compact-connect/lambdas/python/provider-data-v1/handlers/bulk_upload.py index 01c2a97c8..1b9118d25 100644 --- a/backend/compact-connect/lambdas/python/provider-data-v1/handlers/bulk_upload.py +++ b/backend/compact-connect/lambdas/python/provider-data-v1/handlers/bulk_upload.py @@ -7,7 +7,10 @@ from botocore.exceptions import ClientError from botocore.response import StreamingBody from cc_common.config import config, logger -from cc_common.data_model.schema.license.api import LicenseGeneralResponseSchema, LicensePostRequestSchema +from cc_common.data_model.schema.license.api import ( + LicensePostRequestSchema, + LicenseReportResponseSchema, +) from cc_common.event_batch_writer import EventBatchWriter from cc_common.exceptions import CCInternalException from cc_common.utils import ( @@ -124,7 +127,7 @@ def process_bulk_upload_file( Stream each line of the new CSV file, validating it then publishing an ingest event for each line. Process licenses in batches to avoid loading the entire file into memory. """ - general_schema = LicenseGeneralResponseSchema() + report_schema = LicenseReportResponseSchema() schema = LicensePostRequestSchema() reader = LicenseCSVReader() @@ -163,14 +166,14 @@ def process_bulk_upload_file( # and publish it as a failure event. Because this data may eventually be sent back over # an email, we will only include the generally available values that we can still validate. try: - general_license_data = general_schema.load(raw_license) + report_license_data = report_schema.load(raw_license) except ValidationError as exc_second_try: - general_license_data = exc_second_try.valid_data + report_license_data = exc_second_try.valid_data logger.info( 'Invalid license in line %s uploaded: %s', i + 1, str(e), - valid_data=general_license_data, + valid_data=report_license_data, exc_info=e, ) event_writer.put_event( @@ -183,7 +186,7 @@ def process_bulk_upload_file( 'compact': compact, 'jurisdiction': jurisdiction, 'recordNumber': i + 1, - 'validData': general_license_data, + 'validData': report_license_data, 'errors': e.messages, }, cls=ResponseEncoder, diff --git a/backend/compact-connect/lambdas/python/provider-data-v1/tests/function/test_handlers/test_bulk_upload.py b/backend/compact-connect/lambdas/python/provider-data-v1/tests/function/test_handlers/test_bulk_upload.py index 2074c11c1..d5a90aa09 100644 --- a/backend/compact-connect/lambdas/python/provider-data-v1/tests/function/test_handlers/test_bulk_upload.py +++ b/backend/compact-connect/lambdas/python/provider-data-v1/tests/function/test_handlers/test_bulk_upload.py @@ -258,13 +258,6 @@ def test_bulk_upload_prevents_compact_jurisdiction_overwrites(self): 'dateOfIssuance': '2020-01-01', 'dateOfRenewal': '2021-01-01', 'dateOfExpiration': '2023-01-01', - 'homeAddressStreet1': '123 Main St', - 'homeAddressStreet2': 'Apt 1', - 'homeAddressCity': 'Columbus', - 'homeAddressState': 'OH', - 'homeAddressPostalCode': '43215', - 'emailAddress': 'test@example.com', - 'phoneNumber': '+15551234567', }, 'errors': ['License contains unsupported fields'], } diff --git a/backend/compact-connect/stacks/reporting_stack.py b/backend/compact-connect/stacks/reporting_stack.py index 3eb42228e..843cf78e9 100644 --- a/backend/compact-connect/stacks/reporting_stack.py +++ b/backend/compact-connect/stacks/reporting_stack.py @@ -86,15 +86,17 @@ def _add_ingest_event_reporting_chain(self, persistent_stack: ps.PersistentStack treat_missing_data=TreatMissingData.NOT_BREACHING, ).add_alarm_action(SnsAction(persistent_stack.alarm_topic)) + # This will report any ingest errors to the configured operational contact, every 15 minutes Rule( self, - 'NightlyRule', - schedule=Schedule.cron(week_day='1-6', hour='1', minute='0', month='*', year='*'), + 'FrequentlyRule', + schedule=Schedule.cron(week_day='*', hour='*', minute='*/15', month='*', year='*'), targets=[ - LambdaFunction(handler=event_collector, event=RuleTargetInput.from_object({'eventType': 'nightly'})) + LambdaFunction(handler=event_collector, event=RuleTargetInput.from_object({'eventType': 'frequent'})) ], ) + # This will send an "alls well" , a "you haven't uploaded anything" email or nothing Rule( self, 'WeeklyRule',