diff --git a/backend/src/mapHandler.js b/backend/src/mapHandler.js index 64740c8..ddd625b 100644 --- a/backend/src/mapHandler.js +++ b/backend/src/mapHandler.js @@ -681,6 +681,7 @@ const handleSourceFieldsToDestArray = (sourceFieldArray, source, itemsType) => { } }); + // print Array String as output logger.debug({ finalArray }) return finalArray diff --git a/backend/src/utils/decoders/Datapoint.js b/backend/src/utils/decoders/Datapoint.js new file mode 100644 index 0000000..919d6c3 --- /dev/null +++ b/backend/src/utils/decoders/Datapoint.js @@ -0,0 +1,14 @@ +const mongoose = require('mongoose'); + +const datapointSchema = new mongoose.Schema({ + source: String, + survey: String, + surveyName: String, + region: String, + fromUrl: String, + timestamp: String, + dimensions: Object, + value: Number +}, {strict: false}); + +module.exports = mongoose.model('Datapoint', datapointSchema); \ No newline at end of file diff --git a/backend/src/utils/decoders/json-stat.js b/backend/src/utils/decoders/json-stat.js index 03a0b2d..5b65469 100644 --- a/backend/src/utils/decoders/json-stat.js +++ b/backend/src/utils/decoders/json-stat.js @@ -74,120 +74,91 @@ module.exports = async function decode(source) { const indices = new Array(ids.length).fill(0); const timestamp = js.updated; - if (config.debug?.recursiveJsonStat) { //TODO once tested remove this option - function walk(dimIndex) { - if (dimIndex === ids.length) { - let flat = 0; - let regionLevel = "unknown"; - const humanDims = {}; - - for (let i = 0; i < ids.length; i++) { - flat += indices[i] * strides[i]; - let dim = ids[i]; - const code = indexToCode[dim][indices[i]]; - let label = indexToLabel[dim][indices[i]]; - console.log(89) - if (dim == "time") - dim = "year" - humanDims[dim] = label - - if (dim === geoDimName) { - const isRegional = !NON_REGIONAL.has(code); - - if (nutsMap[code]?.level != null) { - regionLevel = "NUTS" + nutsMap[code].level; - } else if (isRegional) { - if (code.length === 3) regionLevel = "NUTS1"; - else if (code.length === 4) regionLevel = "NUTS2"; - else if (code.length === 5) regionLevel = "NUTS3"; - else regionLevel = "NON_NUTS"; - } else { - regionLevel = "NON_NUTS"; - } - - regionName = label; - } - } - - const val = values[flat]; - if (val == null) return; - - output.push({ - source: source.extension.agencyId || source.extension.datastructure.agencyId, - survey: source.extension.id || source.extension.datastructure.id, - region: regionLevel, - dimensions: humanDims, - value: val, - timestamp - }); - return; - } + if (sizes.length !== ids.length) { + throw new Error("sizes e ids non allineati"); + } - for (let i = 0; i < sizes[dimIndex]; i++) { - indices[dimIndex] = i; - walk(dimIndex + 1); - } + sizes.forEach((s, i) => { + if (!Number.isInteger(s) || s <= 0) { + throw new Error(`Size non valida alla dimensione ${ids[i]}: ${s}`); } - walk(0); - } + }); - else - while (true) { - let flat = 0; - let regionLevel = "unknown"; - const humanDims = {}; - - for (let i = 0; i < ids.length; i++) { - flat += indices[i] * strides[i]; - - let dim = ids[i]; - const code = indexToCode[dim][indices[i]]; - const label = indexToLabel[dim][indices[i]]; - if(dim == "time") - dim = "year" - humanDims[dim] = label; - - if (dim === geoDimName) { - const isRegional = !NON_REGIONAL.has(code); - - if (nutsMap[code]?.level != null) { - regionLevel = "NUTS" + nutsMap[code].level; - } else if (isRegional) { - if (code.length === 3) regionLevel = "NUTS1"; - else if (code.length === 4) regionLevel = "NUTS2"; - else if (code.length === 5) regionLevel = "NUTS3"; - else regionLevel = "NON_NUTS"; - } else { - regionLevel = "NON_NUTS"; - } - } - } + const fs = require("fs"); - const val = values[flat]; - if (val != null) { - output.push({ - source: source.extension.agencyId || source.extension.datastructure.agencyId, - survey: source.extension.id || source.extension.datastructure.id, - region: regionLevel, - dimensions: humanDims, - value: val, - timestamp - }); - } + let nameStream = ".out" + Date.now() + ".json"; + + const stream = fs.createWriteStream(nameStream, { + highWaterMark: 1024 * 1024 // 1MB buffer, opzionale + }); + stream.write("[\n"); + + let firstRecord = true; + + while (true) { + let flat = 0; + let regionLevel = "unknown"; + const humanDims = {}; + + for (let i = 0; i < ids.length; i++) { + flat += indices[i] * strides[i]; + + let dim = ids[i]; + const code = indexToCode[dim][indices[i]]; + const label = indexToLabel[dim][indices[i]]; + + if (dim === "time") dim = "year"; + humanDims[dim] = label; - let carry = true; - for (let i = indices.length - 1; i >= 0 && carry; i--) { - indices[i]++; - if (indices[i] < sizes[i]) { - carry = false; + if (dim === geoDimName) { + const isRegional = !NON_REGIONAL.has(code); + + if (nutsMap[code]?.level != null) { + regionLevel = "NUTS" + nutsMap[code].level; + } else if (isRegional) { + if (code.length === 3) regionLevel = "NUTS1"; + else if (code.length === 4) regionLevel = "NUTS2"; + else if (code.length === 5) regionLevel = "NUTS3"; + else regionLevel = "NON_NUTS"; } else { - indices[i] = 0; + regionLevel = "NON_NUTS"; } } + } + + const val = values[flat]; + if (val != null) { + const record = JSON.stringify({ + source: source.extension.agencyId || source.extension.datastructure.agencyId, + survey: source.extension.id || source.extension.datastructure.id, + region: regionLevel, + dimensions: humanDims, + value: val, + timestamp + }); + + if (!firstRecord) stream.write(",\n"); + else firstRecord = false; + + // Scrive su SSD direttamente, senza accumulare in RAM + stream.write(record); + } - if (carry) break; + // incrementa gli indici + let carry = true; + for (let i = indices.length - 1; i >= 0 && carry; i--) { + indices[i]++; + if (indices[i] < sizes[i]) carry = false; + else indices[i] = 0; } + if (carry) break; // terminazione del ciclo + } + + stream.write("\n]"); + stream.end(); + + if (config.debug?.jsonStat) { logger.debug("Salvataggio file di output..."); @@ -195,5 +166,52 @@ module.exports = async function decode(source) { logger.debug("File salvato: out_human_nuts.json"); } - return output; + const Datapoints = require('./Datapoint'); + const stream2 = fs.createReadStream(nameStream, { encoding: "utf-8" }); + let buffer = ""; + let depth = 0; // conta le parentesi graffe + let inObject = false; + + logger.debug("Inizio inserimento datapoints nel database..."); + let tempArray = []; + for await (const chunk of stream2) { + //logger.debug("Lettura chunk di dati..."); + for (const char of chunk) { + //logger.debug(`Elaborazione carattere: ${char}`); + if (char === "{") { + //logger.debug("Inizio di un nuovo oggetto JSON rilevato."); + if (!inObject) inObject = true; + depth++; + } + + //logger.debug(`Profondità attuale delle parentesi graffe: ${depth}`); + if (inObject) buffer += char; + + //logger.debug(`Buffer attuale: ${buffer}`); + if (char === "}") { + //logger.debug("Fine di un oggetto JSON rilevata."); + depth--; + if (depth === 0 && inObject) { + //logger.debug("Oggetto JSON completo rilevato, procedo con l'inserimento nel database."); + // oggetto completo + const obj = JSON.parse(buffer); + //logger.debug(`Oggetto JSON da inserire: ${JSON.stringify(obj)}`); + //const DatapointModel = await Datapoints.getDatapointModel(); + //const datapoint = new DatapointModel(obj); + tempArray.push(obj); + if (tempArray.length >= config.batch) { + await Datapoints.insertMany(tempArray); + tempArray = []; + logger.debug(config.batch +" Datapoints salvati nel database."); + } + //await Datapoints.insertMany([obj]); + //logger.debug("Datapoint salvato nel database."); + buffer = ""; + inObject = false; + } + } + } + } + return []; + }; diff --git a/backend/src/writers/orionWriter.js b/backend/src/writers/orionWriter.js index ca3ac58..ad9bb2e 100644 --- a/backend/src/writers/orionWriter.js +++ b/backend/src/writers/orionWriter.js @@ -193,7 +193,7 @@ const writeObject = async (objNumber, obj, modelSchema, config) => { logger.debug('Sending to Orion CB object number: ' + objNumber + ' , id: ' + obj.id); logger.debug({ ToOrionObject: !config.orionWriter.keyValues || obj }) - var orionedObj = !config.orionWriter.keyValues && toOrionObject(obj, modelSchema) || obj; + var orionedObj = !config.orionWriter.keyValues && toOrionObject(obj, modelSchema, config) || obj; var options = { method: 'POST', @@ -553,79 +553,88 @@ const writeObject = async (objNumber, obj, modelSchema, config) => { } } -function toOrionObject(obj, schema) { +function toOrionObject(obj, schema, config) { logger.debug("Transforming Mapped object to an Orion Entity (explicit types in attributes)"); for (key in obj) { if (key != 'id' && key != 'type') { - var modelField = schema.allOf[0].properties[key]; - var modelFieldType = modelField.type; - var modelFieldFormat = modelField.format; - var objField = obj[key]; - logger.debug({ key: obj[key], modelField }); - - if (key == 'location') { - - var newValue = {}; - newValue = { - type: modelFieldType,//"geo:json", - value: objField - }; - obj['location'] = newValue; - - } else if (modelFieldType === 'object') { - - //var nestedValue = {}; - //for (fieldKey in objField) { - - // var modelSubField = modelField.properties[fieldKey]; - // var modelSubFieldType = modelSubField.type; - // var modelSubFieldFormat = modelSubField.format; - - // if (modelSubFieldFormat) - // nestedValue[fieldKey] = { - // value: objField[fieldkey], - // type: modelSubFieldType, - // format: modelSubFieldFormat - // } - // else - // nestedValue[fieldKey] = { - // value: objField[fieldKey], - // type: modelSubFieldType - // } - - // delete objField[fieldKey]; - //} - - var nestedObject = objField; - delete objField;//TODO check + if (config.orionWriter.protocol != "v2") obj[key] = { - type: modelFieldType, - value: nestedObject + type: config.orionWriter.protocol == "v2" ? modelFieldType : "Property", + value: obj[key] } + else { - } else { - if (modelFieldFormat) { - if (modelFieldFormat === 'date-time') - obj[key] = { - type: 'DateTime', - value: objField - }; + var modelField = schema.allOf[0].properties[key]; + var modelFieldType = modelField.type; + var modelFieldFormat = modelField.format; + var objField = obj[key]; + logger.debug({ key: obj[key], modelField }); + + if (key == 'location') { + + var newValue = {}; + newValue = { + type: modelFieldType,//"geo:json", + value: objField + }; + obj['location'] = newValue; + + } else if (modelFieldType === 'object') { + + //var nestedValue = {}; + //for (fieldKey in objField) { + + // var modelSubField = modelField.properties[fieldKey]; + // var modelSubFieldType = modelSubField.type; + // var modelSubFieldFormat = modelSubField.format; + + // if (modelSubFieldFormat) + // nestedValue[fieldKey] = { + // value: objField[fieldkey], + // type: modelSubFieldType, + // format: modelSubFieldFormat + // } + // else + // nestedValue[fieldKey] = { + // value: objField[fieldKey], + // type: modelSubFieldType + // } + + // delete objField[fieldKey]; + //} + + var nestedObject = objField; + delete objField;//TODO check + obj[key] = { + type: config.orionWriter.protocol == "v2" ? modelFieldType : "Property", + value: nestedObject + } + + } else { + + if (modelFieldFormat) { + if (modelFieldFormat === 'date-time') + obj[key] = { + type: 'DateTime', + value: objField + }; + else + obj[key] = { + type: modelFieldType, + value: objField + // format: modelFieldFormat + }; + } else obj[key] = { type: modelFieldType, value: objField - // format: modelFieldFormat - }; + } } - else - obj[key] = { - type: modelFieldType, - value: objField - } } } } diff --git a/server b/server index 18c6c48..4611d9c 100644 --- a/server +++ b/server @@ -3,7 +3,7 @@ WORKDIR /app COPY ./backend/dataModels ./dataModels COPY ./backend/docs ./docs COPY ./backend/src ./src -COPY ./backend/output ./output +RUN mkdir ./output #COPY ./backend/config*.js ./ COPY ./backend/mapper.js ./ COPY ./backend/LICENSE ./