Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ CPX_DB_CONNECTION_STRING=postgres://${CPX_DB_USERNAME}:${CPX_DB_PASSWORD}@capxml
CPX_REDIS_HOST=cap-xml-redis
CPX_REDIS_PORT=6379
CPX_REDIS_TLS=false
CPX_METEOALARM_API_URL=http://mock-api:8080 # wiremock url
CPX_METEOALARM_API_USERNAME=username
CPX_METEOALARM_API_PASSWORD=password
CPX_METEOALARM_DISABLE=false
NODE_TLS_REJECT_UNAUTHORIZED=0
PGADMIN_DEFAULT_PASSWORD=pgadmin
POSTGRES_PASSWORD=postgres
LIQUIBASE_COMMAND_CHANGELOG_FILE=./changelog/db.changelog-master.xml
Expand Down
9 changes: 9 additions & 0 deletions docker/dev-tools.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ services:
networks:
ls:
command: /bin/sh -c "lpm add postgresql && liquibase update"
mock-api:
image: wiremock/wiremock:latest
ports:
- "8081:8080"
volumes:
- ./wiremock/mappings:/home/wiremock/mappings
command: ["--global-response-templating", "--verbose"]
networks:
ls:
volumes:
capxmlpgadmin:
external: true
Expand Down
7 changes: 6 additions & 1 deletion docker/scripts/register-lambda-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ cpx_agw_url=$(echo CPX_AGW_URL=$deployed_cpx_agw_url)
cpx_redis_host=$(echo CPX_REDIS_HOST=$CPX_REDIS_HOST)
cpx_redis_port=$(echo CPX_REDIS_PORT=$CPX_REDIS_PORT)
cpx_redis_tls=$(echo CPX_REDIS_TLS=$CPX_REDIS_TLS)
set -- $cpx_db_username $cpx_db_password $cpx_db_name $cpx_db_host $cpx_agw_url $cpx_redis_host $cpx_redis_port $cpx_redis_tls
cpx_meteoalarm_api_url=$(echo CPX_METEOALARM_API_URL=$CPX_METEOALARM_API_URL)
cpx_meteoalarm_api_username=$(echo CPX_METEOALARM_API_USERNAME=$CPX_METEOALARM_API_USERNAME)
cpx_meteoalarm_api_password=$(echo CPX_METEOALARM_API_PASSWORD=$CPX_METEOALARM_API_PASSWORD)
cpx_meteoalarm_disable=$(echo CPX_METEOALARM_DISABLE=$CPX_METEOALARM_DISABLE)
node_tls_reject_unauthorized=$(echo NODE_TLS_REJECT_UNAUTHORIZED=$NODE_TLS_REJECT_UNAUTHORIZED)
set -- $cpx_db_username $cpx_db_password $cpx_db_name $cpx_db_host $cpx_agw_url $cpx_redis_host $cpx_redis_port $cpx_redis_tls $cpx_meteoalarm_api_url $cpx_meteoalarm_api_username $cpx_meteoalarm_api_password $node_tls_reject_unauthorized $cpx_meteoalarm_disable
custom_environment_variables=$(printf '%s,' "$@" | sed 's/,*$//g')

# Iterate over each file in lambda_functions_dir
Expand Down
24 changes: 24 additions & 0 deletions docker/wiremock/mappings/third-party-api.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"mappings": [
{
"request": {
"method": "POST",
"urlPath": "/warnings"
},
"response": {
"status": 201,
"body": "{\"warning\": {\"uuid\": \"{{randomValue type='UUID'}}\" } }"
}
},
{
"request": {
"method": "POST",
"urlPath": "/tokens"
},
"response": {
"status": 200,
"body": "{\"tokenType\": \"Bearer\", \"token\": \"{{randomValue length=64 type='ALPHANUMERIC'}}\", \"expiresIn\": \"300\" }"
}
}
]
}
3 changes: 2 additions & 1 deletion lib/functions/archiveMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const service = require('../helpers/service')

module.exports.archiveMessages = async () => {
console.log('archiving messages')
console.log('[archiveMessages] Starting to archive messages')
await service.archiveMessages()
console.log('[archiveMessages] Finished archiving messages')
}
43 changes: 37 additions & 6 deletions lib/functions/processMessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,45 +11,58 @@ const path = require('node:path')
const xsdSchema = fs.readFileSync(path.join(__dirname, '..', 'schemas', 'CAP-v1.2.xsd'), 'utf8')
const additionalCapMessageSchema = require('../schemas/additionalCapMessageSchema')
const Message = require('../models/message')
const EA_WHO = '2.49.0.0.826.1'
const EA_WHO = '2.49.0.1.826.1'
const CODE = 'MCP:v2.0'
const severityV2Mapping = require('../models/v2MessageMapping')
const redis = require('../helpers/redis')
const meteoalarm = require('../helpers/meteoalarm')

module.exports.processMessage = async (event) => {
console.log('[processMessage] Event received:', event)
try {
// validate the event
await eventSchema.validateAsync(event)

// parse the xml
const message = new Message(event.bodyXml)
console.log(`Processing CAP message: ${message.identifier} for ${message.fwisCode}`)
console.log(`[processMessage] Processing CAP message: ${message.identifier} for ${message.fwisCode}`)

// get Last message
const dbResult = await service.getLastMessage(message.fwisCode)
const lastMessage = (!!dbResult && dbResult.rows.length > 0) ? dbResult.rows[0] : undefined

if (lastMessage) {
console.log(`[processMessage] Found last message for ${message.fwisCode}: identifier=${lastMessage.identifier}, expires=${lastMessage.expires}, status=${lastMessage.status}`)
} else {
console.log(`[processMessage] No previous message found for ${message.fwisCode}`)
}

// If not production set status to test
if (process.env.stage !== 'prd') {
message.status = 'Test'
}
console.log(`[processMessage] Running in stage: ${process.env.stage}, message status set to: ${message.status}`)

// Add in the references field and update msgtype to Update if references exist and is Alert (does this in message model)
const references = buildReference(lastMessage, message.sender, 'identifier', 'references')
if (references) {
message.references = references
console.log(`[processMessage] Built references: ${references}, msgType updated to: ${message.msgType}`)
}

// Generate message V2 for meteoalarm spec
const messageV2 = processMessageV2(message, lastMessage)

// do validation against OASIS CAP xml schema and extended JOI schema
console.log(`[processMessage] Starting validation for ${message.identifier}`)
const validationStart = Date.now()
const results = await Promise.allSettled([
validateAgainstXsdSchema(message),
validateAgainstJoiSchema(message),
validateAgainstXsdSchema(messageV2),
validateAgainstJoiSchema(messageV2)
])
console.log(`[processMessage] Validation completed in ${Date.now() - validationStart}ms for ${message.identifier}`)

// Check for validation failures and throw
const errors = results.filter(r => r.status === 'rejected').flatMap(r => r.reason)
Expand All @@ -58,9 +71,22 @@ module.exports.processMessage = async (event) => {
}

const { message: redisMessage, query: dbQuery } = message.putQuery(message, messageV2)
// store the message in database and redis/elasticache
await Promise.all([service.putMessage(dbQuery), redis.set(redisMessage.identifier, redisMessage)])
console.log(`Finished processing CAP message: ${message.identifier} for ${message.fwisCode}`)
// store the message in database, redis/elasticache, and post to Meteoalarm
const storageStart = Date.now()
const promises = [
service.putMessage(dbQuery),
redis.set(redisMessage.identifier, redisMessage)
]
if (process.env.CPX_METEOALARM_DISABLE === 'true') {
console.log('[processMessage] Meteoalarm integration is disabled')
} else {
promises.push(meteoalarm.postWarning(messageV2.toString(), message.identifier))
}
await Promise.all(promises)
console.log(`[processMessage] Storage operations completed in ${Date.now() - storageStart}ms`)
console.log(`[processMessage] DB write successful for ${message.identifier}`)
console.log(`[processMessage] Redis cache set for ${message.identifier}`)
console.log(`[processMessage] Finished processing CAP message: ${message.identifier} for ${message.fwisCode}`)

return {
statusCode: 200,
Expand All @@ -76,7 +102,9 @@ module.exports.processMessage = async (event) => {
} catch (err) {
// Actual error will be handled by lambda process
// So just log the message body to console for investigation
console.log(event.bodyXml)
console.error(`[processMessage] Error during processing: ${err.message}`)
console.error(`[processMessage] Error stack: ${err.stack}`)
console.log('[processMessage] Failed message body:', event.bodyXml)
return processFailedMessage(err, event.bodyXml)
}
}
Expand Down Expand Up @@ -167,6 +195,7 @@ const processMessageV2 = (message, lastMessage) => {
messageV2.references = referencesV2
}
messageV2.event = `${severityV2Mapping[message.severity]?.description}: ${messageV2.areaDesc}`
messageV2.responseType = 'Monitor'
messageV2.severity = severityV2Mapping[message.severity]?.severity || ''
messageV2.onset = message.sent
messageV2.headline = `${severityV2Mapping[message.severity]?.headline}: ${messageV2.areaDesc}`
Expand All @@ -188,5 +217,7 @@ const processMessageV2 = (message, lastMessage) => {
messageV2.addParameter('use_polygon_over_geocode', 'true')
messageV2.addParameter('uk_ea_ta_code', message.fwisCode)

messageV2.removeNode('geocode')

return messageV2
}
51 changes: 33 additions & 18 deletions lib/helpers/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,45 @@ const path = require('node:path')
const xsdSchema = fs.readFileSync(path.join(__dirname, '..', 'schemas', 'CAP-v1.2.xsd'), 'utf8')
const redis = require('../helpers/redis')

const fetchMessageBody = async (key, v2) => {
const fetchStart = Date.now()
const cachedMessage = await redis.get(key)
let body = {}

if (cachedMessage) {
console.log(`[getMessage] Cache HIT for ${key}`)
body = v2 ? cachedMessage.alert_v2 : cachedMessage.alert
console.log(`[getMessage] Message retrieved in ${Date.now() - fetchStart}ms for ${key}`)
return body
}

console.log(`[getMessage] Cache MISS for ${key}, fetching from database`)
const ret = await service.getMessage(key)
if (!ret?.rows || !Array.isArray(ret.rows) || ret.rows.length < 1 || !ret.rows[0].getmessage) {
console.log('[getMessage] No message found for ' + key)
throw new Error('No message found')
}
const message = ret.rows[0].getmessage
body = v2 ? message.alert_v2 : message.alert
// Cache the message in redis
await redis.set(key, message)
console.log(`[getMessage] Retrieved from database and cached: ${key}`)
console.log(`[getMessage] Message retrieved in ${Date.now() - fetchStart}ms for ${key}`)
return body
}

module.exports.getMessage = async (event, v2) => {
console.log('[getMessage] Event received:', event)
const { error } = eventSchema.validate(event)

if (error) {
throw error
}

// Fetch message from redis, else get from postgres
let body
const key = event.pathParameters.id
const cachedMessage = await redis.get(key)
console.log(`[getMessage] Fetching message with id: ${key}, version: ${v2 ? 'v2' : 'v1'}`)

if (cachedMessage) {
body = v2 ? cachedMessage.alert_v2 : cachedMessage.alert
} else {
const ret = await service.getMessage(key)
if (!ret?.rows || !Array.isArray(ret.rows) || ret.rows.length < 1 || !ret.rows[0].getmessage) {
console.log('No message found for ' + key)
throw new Error('No message found')
}
const message = ret.rows[0].getmessage
body = v2 ? message.alert_v2 : message.alert
// Cache the message in redis
await redis.set(key, message)
}
const body = await fetchMessageBody(key, v2)

const validationResult = await validateXML({
xml: [{
Expand All @@ -44,10 +58,11 @@ module.exports.getMessage = async (event, v2) => {

// NI-95 log validation errors and continue processing
if (validationResult.errors?.length > 0) {
console.log('CAP get message failed validation')
console.log(JSON.stringify(validationResult.errors))
console.log('[getMessage] CAP get message failed validation')
console.log('[getMessage] Validation errors:', JSON.stringify(validationResult.errors))
}

console.log(`[getMessage] Returning message ${key}, size: ${body.length} bytes`)
return {
statusCode: 200,
headers: {
Expand Down
13 changes: 11 additions & 2 deletions lib/helpers/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@ const path = require('node:path')
const xsdSchema = fs.readFileSync(path.join(__dirname, '..', 'schemas', 'atom.xsd'), 'utf8')

module.exports.messages = async (v2 = false) => {
console.log(`[getMessagesAtom] Generating atom feed, version: ${v2 ? 'v2' : 'v1'}`)
const feedStart = Date.now()
const { Feed } = await import('feed')
const dbStart = Date.now()
const ret = await service.getAllMessages()
console.log(`[getMessagesAtom] Database query completed in ${Date.now() - dbStart}ms`)
const messageCount = ret?.rows?.length || 0
console.log(`[getMessagesAtom] Feed contains ${messageCount} messages`)
const uriPrefix = v2 ? '/v2' : ''

const feed = new Feed({
Expand Down Expand Up @@ -41,7 +47,9 @@ module.exports.messages = async (v2 = false) => {
}
}

const feedGenStart = Date.now()
const xmlFeed = feed.atom1()
console.log(`[getMessagesAtom] Feed generated in ${Date.now() - feedGenStart}ms, size: ${xmlFeed.length} bytes`)

const validationResult = await validateXML({
xml: [{
Expand All @@ -52,10 +60,11 @@ module.exports.messages = async (v2 = false) => {
})
// NI-95 log validation errors and continue processing
if (validationResult.errors?.length > 0) {
console.log('ATOM feed failed validation')
console.log(JSON.stringify(validationResult.errors))
console.log('[getMessagesAtom] ATOM feed failed validation')
console.log('[getMessagesAtom] Validation errors:', JSON.stringify(validationResult.errors))
}

console.log(`[getMessagesAtom] Total feed generation time: ${Date.now() - feedStart}ms`)
return {
statusCode: 200,
headers: {
Expand Down
Loading