From ee546e1cca2807ef424efde2aee847f48d4a6ee9 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco Date: Thu, 8 Jan 2026 11:42:20 +0100 Subject: [PATCH 1/9] no message --- backend/src/mapHandler.js | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/src/mapHandler.js b/backend/src/mapHandler.js index 64740c8..ddd625b 100644 --- a/backend/src/mapHandler.js +++ b/backend/src/mapHandler.js @@ -681,6 +681,7 @@ const handleSourceFieldsToDestArray = (sourceFieldArray, source, itemsType) => { } }); + // print Array String as output logger.debug({ finalArray }) return finalArray From 9f64cb3ed28e1aa142427a692753696fae0c7299 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco Date: Thu, 8 Jan 2026 11:44:16 +0100 Subject: [PATCH 2/9] no message --- server | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server b/server index 18c6c48..4611d9c 100644 --- a/server +++ b/server @@ -3,7 +3,7 @@ WORKDIR /app COPY ./backend/dataModels ./dataModels COPY ./backend/docs ./docs COPY ./backend/src ./src -COPY ./backend/output ./output +RUN mkdir ./output #COPY ./backend/config*.js ./ COPY ./backend/mapper.js ./ COPY ./backend/LICENSE ./ From 74e0314c8bd739ee5601178b7d6d492c65f19769 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco Date: Fri, 16 Jan 2026 10:21:10 +0100 Subject: [PATCH 3/9] temp fix --- backend/src/utils/decoders/Datapoint.js | 42 +++++ backend/src/utils/decoders/json-stat.js | 217 ++++++++++++------------ 2 files changed, 155 insertions(+), 104 deletions(-) create mode 100644 backend/src/utils/decoders/Datapoint.js diff --git a/backend/src/utils/decoders/Datapoint.js b/backend/src/utils/decoders/Datapoint.js new file mode 100644 index 0000000..cfea462 --- /dev/null +++ b/backend/src/utils/decoders/Datapoint.js @@ -0,0 +1,42 @@ +const mongoose = require('mongoose'); + +const MONGO_URI = 'mongodb://localhost:22000/Minio-Mongo'; // cambia "mydb" con il tuo database + +// Oggetto che conterrà il modello una volta creato +let Datapoint; + +mongoose.connect(MONGO_URI, { + useNewUrlParser: true, + useUnifiedTopology: true +}) +.then(() => { + console.log("Connesso a MongoDB!"); + + // definizione schema + const datapointSchema = new mongoose.Schema({ + source: String, + survey: String, + surveyName: String, + region: String, + fromUrl: String, + timestamp: String, + dimensions: Object, + value: Number + }, { strict: false }); + + // creazione modello + Datapoint = mongoose.model('Datapoint', datapointSchema); + +}) +.catch(err => { + console.error("Errore di connessione a MongoDB:", err); +}); + +// esportiamo una funzione che ritorna il modello solo quando è pronto +module.exports.getDatapointModel = async function() { + // aspetta che la connessione sia pronta + if (!Datapoint) { + await mongoose.connection.asPromise(); // Node 18+ / mongoose 7+ + } + return Datapoint; +}; diff --git a/backend/src/utils/decoders/json-stat.js b/backend/src/utils/decoders/json-stat.js index 03a0b2d..e44e72d 100644 --- a/backend/src/utils/decoders/json-stat.js +++ b/backend/src/utils/decoders/json-stat.js @@ -74,120 +74,89 @@ module.exports = async function decode(source) { const indices = new Array(ids.length).fill(0); const timestamp = js.updated; - if (config.debug?.recursiveJsonStat) { //TODO once tested remove this option - function walk(dimIndex) { - if (dimIndex === ids.length) { - let flat = 0; - let regionLevel = "unknown"; - const humanDims = {}; - - for (let i = 0; i < ids.length; i++) { - flat += indices[i] * strides[i]; - let dim = ids[i]; - const code = indexToCode[dim][indices[i]]; - let label = indexToLabel[dim][indices[i]]; - console.log(89) - if (dim == "time") - dim = "year" - humanDims[dim] = label - - if (dim === geoDimName) { - const isRegional = !NON_REGIONAL.has(code); - - if (nutsMap[code]?.level != null) { - regionLevel = "NUTS" + nutsMap[code].level; - } else if (isRegional) { - if (code.length === 3) regionLevel = "NUTS1"; - else if (code.length === 4) regionLevel = "NUTS2"; - else if (code.length === 5) regionLevel = "NUTS3"; - else regionLevel = "NON_NUTS"; - } else { - regionLevel = "NON_NUTS"; - } - - regionName = label; - } - } - - const val = values[flat]; - if (val == null) return; - - output.push({ - source: source.extension.agencyId || source.extension.datastructure.agencyId, - survey: source.extension.id || source.extension.datastructure.id, - region: regionLevel, - dimensions: humanDims, - value: val, - timestamp - }); - return; - } + if (sizes.length !== ids.length) { + throw new Error("sizes e ids non allineati"); + } - for (let i = 0; i < sizes[dimIndex]; i++) { - indices[dimIndex] = i; - walk(dimIndex + 1); - } + sizes.forEach((s, i) => { + if (!Number.isInteger(s) || s <= 0) { + throw new Error(`Size non valida alla dimensione ${ids[i]}: ${s}`); } - walk(0); - } + }); - else - while (true) { - let flat = 0; - let regionLevel = "unknown"; - const humanDims = {}; - - for (let i = 0; i < ids.length; i++) { - flat += indices[i] * strides[i]; - - let dim = ids[i]; - const code = indexToCode[dim][indices[i]]; - const label = indexToLabel[dim][indices[i]]; - if(dim == "time") - dim = "year" - humanDims[dim] = label; - - if (dim === geoDimName) { - const isRegional = !NON_REGIONAL.has(code); - - if (nutsMap[code]?.level != null) { - regionLevel = "NUTS" + nutsMap[code].level; - } else if (isRegional) { - if (code.length === 3) regionLevel = "NUTS1"; - else if (code.length === 4) regionLevel = "NUTS2"; - else if (code.length === 5) regionLevel = "NUTS3"; - else regionLevel = "NON_NUTS"; - } else { - regionLevel = "NON_NUTS"; - } - } - } + const fs = require("fs"); - const val = values[flat]; - if (val != null) { - output.push({ - source: source.extension.agencyId || source.extension.datastructure.agencyId, - survey: source.extension.id || source.extension.datastructure.id, - region: regionLevel, - dimensions: humanDims, - value: val, - timestamp - }); - } + const stream = fs.createWriteStream("out_human_nuts.json", { + highWaterMark: 1024 * 1024 // 1MB buffer, opzionale + }); + stream.write("[\n"); + + let firstRecord = true; + + while (true) { + let flat = 0; + let regionLevel = "unknown"; + const humanDims = {}; + + for (let i = 0; i < ids.length; i++) { + flat += indices[i] * strides[i]; + + let dim = ids[i]; + const code = indexToCode[dim][indices[i]]; + const label = indexToLabel[dim][indices[i]]; + + if (dim === "time") dim = "year"; + humanDims[dim] = label; + + if (dim === geoDimName) { + const isRegional = !NON_REGIONAL.has(code); - let carry = true; - for (let i = indices.length - 1; i >= 0 && carry; i--) { - indices[i]++; - if (indices[i] < sizes[i]) { - carry = false; + if (nutsMap[code]?.level != null) { + regionLevel = "NUTS" + nutsMap[code].level; + } else if (isRegional) { + if (code.length === 3) regionLevel = "NUTS1"; + else if (code.length === 4) regionLevel = "NUTS2"; + else if (code.length === 5) regionLevel = "NUTS3"; + else regionLevel = "NON_NUTS"; } else { - indices[i] = 0; + regionLevel = "NON_NUTS"; } } + } + + const val = values[flat]; + if (val != null) { + const record = JSON.stringify({ + source: source.extension.agencyId || source.extension.datastructure.agencyId, + survey: source.extension.id || source.extension.datastructure.id, + region: regionLevel, + dimensions: humanDims, + value: val, + timestamp + }); + + if (!firstRecord) stream.write(",\n"); + else firstRecord = false; + + // Scrive su SSD direttamente, senza accumulare in RAM + stream.write(record); + } - if (carry) break; + // incrementa gli indici + let carry = true; + for (let i = indices.length - 1; i >= 0 && carry; i--) { + indices[i]++; + if (indices[i] < sizes[i]) carry = false; + else indices[i] = 0; } + if (carry) break; // terminazione del ciclo + } + + stream.write("\n]"); + stream.end(); + + if (config.debug?.jsonStat) { logger.debug("Salvataggio file di output..."); @@ -195,5 +164,45 @@ module.exports = async function decode(source) { logger.debug("File salvato: out_human_nuts.json"); } - return output; + const Datapoints = require('./Datapoint'); + const stream2 = fs.createReadStream("out_human_nuts.json", { encoding: "utf-8" }); + let buffer = ""; + let depth = 0; // conta le parentesi graffe + let inObject = false; + + logger.debug("Inizio inserimento datapoints nel database..."); + for await (const chunk of stream2) { + logger.debug("Lettura chunk di dati..."); + for (const char of chunk) { + logger.debug(`Elaborazione carattere: ${char}`); + if (char === "{") { + logger.debug("Inizio di un nuovo oggetto JSON rilevato."); + if (!inObject) inObject = true; + depth++; + } + + logger.debug(`Profondità attuale delle parentesi graffe: ${depth}`); + if (inObject) buffer += char; + + logger.debug(`Buffer attuale: ${buffer}`); + if (char === "}") { + logger.debug("Fine di un oggetto JSON rilevata."); + depth--; + if (depth === 0 && inObject) { + logger.debug("Oggetto JSON completo rilevato, procedo con l'inserimento nel database."); + // oggetto completo + const obj = JSON.parse(buffer); + logger.debug(`Oggetto JSON da inserire: ${JSON.stringify(obj)}`); + const DatapointModel = await Datapoints.getDatapointModel(); + const datapoint = new DatapointModel(obj); + await datapoint.save(); + logger.debug("Datapoint salvato nel database."); + buffer = ""; + inObject = false; + } + } + } + } + return []; + }; From 6aa71e0e630f905fa7cdc69dae6dcca507ca799b Mon Sep 17 00:00:00 2001 From: Gabriele Percoco Date: Fri, 16 Jan 2026 10:46:08 +0100 Subject: [PATCH 4/9] fe --- backend/src/utils/decoders/Datapoint.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/src/utils/decoders/Datapoint.js b/backend/src/utils/decoders/Datapoint.js index cfea462..97be209 100644 --- a/backend/src/utils/decoders/Datapoint.js +++ b/backend/src/utils/decoders/Datapoint.js @@ -1,6 +1,7 @@ const mongoose = require('mongoose'); +const config = require('../../../config'); -const MONGO_URI = 'mongodb://localhost:22000/Minio-Mongo'; // cambia "mydb" con il tuo database +const MONGO_URI = config.mongoSourceConnector; // cambia "mydb" con il tuo database // Oggetto che conterrà il modello una volta creato let Datapoint; From 962bc0cb722165ded63ea3906b989e5ce9a74bb1 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco Date: Fri, 16 Jan 2026 10:52:55 +0100 Subject: [PATCH 5/9] no message --- backend/src/utils/decoders/Datapoint.js | 53 ++++++------------------- 1 file changed, 12 insertions(+), 41 deletions(-) diff --git a/backend/src/utils/decoders/Datapoint.js b/backend/src/utils/decoders/Datapoint.js index 97be209..919d6c3 100644 --- a/backend/src/utils/decoders/Datapoint.js +++ b/backend/src/utils/decoders/Datapoint.js @@ -1,43 +1,14 @@ const mongoose = require('mongoose'); -const config = require('../../../config'); -const MONGO_URI = config.mongoSourceConnector; // cambia "mydb" con il tuo database - -// Oggetto che conterrà il modello una volta creato -let Datapoint; - -mongoose.connect(MONGO_URI, { - useNewUrlParser: true, - useUnifiedTopology: true -}) -.then(() => { - console.log("Connesso a MongoDB!"); - - // definizione schema - const datapointSchema = new mongoose.Schema({ - source: String, - survey: String, - surveyName: String, - region: String, - fromUrl: String, - timestamp: String, - dimensions: Object, - value: Number - }, { strict: false }); - - // creazione modello - Datapoint = mongoose.model('Datapoint', datapointSchema); - -}) -.catch(err => { - console.error("Errore di connessione a MongoDB:", err); -}); - -// esportiamo una funzione che ritorna il modello solo quando è pronto -module.exports.getDatapointModel = async function() { - // aspetta che la connessione sia pronta - if (!Datapoint) { - await mongoose.connection.asPromise(); // Node 18+ / mongoose 7+ - } - return Datapoint; -}; +const datapointSchema = new mongoose.Schema({ + source: String, + survey: String, + surveyName: String, + region: String, + fromUrl: String, + timestamp: String, + dimensions: Object, + value: Number +}, {strict: false}); + +module.exports = mongoose.model('Datapoint', datapointSchema); \ No newline at end of file From a32a0d18accb99523fafd15154e53f2eb09963aa Mon Sep 17 00:00:00 2001 From: Gabriele Percoco Date: Fri, 16 Jan 2026 10:57:37 +0100 Subject: [PATCH 6/9] no message --- backend/src/utils/decoders/json-stat.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/src/utils/decoders/json-stat.js b/backend/src/utils/decoders/json-stat.js index e44e72d..3eb4475 100644 --- a/backend/src/utils/decoders/json-stat.js +++ b/backend/src/utils/decoders/json-stat.js @@ -193,9 +193,9 @@ module.exports = async function decode(source) { // oggetto completo const obj = JSON.parse(buffer); logger.debug(`Oggetto JSON da inserire: ${JSON.stringify(obj)}`); - const DatapointModel = await Datapoints.getDatapointModel(); - const datapoint = new DatapointModel(obj); - await datapoint.save(); + //const DatapointModel = await Datapoints.getDatapointModel(); + //const datapoint = new DatapointModel(obj); + await Datapoints.insertMany([obj]); logger.debug("Datapoint salvato nel database."); buffer = ""; inObject = false; From 131ff22de54940c6e8e1e84437a223b638f426e1 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco Date: Fri, 16 Jan 2026 12:16:36 +0100 Subject: [PATCH 7/9] fe --- backend/src/utils/decoders/json-stat.js | 31 +++++++++++++++---------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/backend/src/utils/decoders/json-stat.js b/backend/src/utils/decoders/json-stat.js index 3eb4475..88aa258 100644 --- a/backend/src/utils/decoders/json-stat.js +++ b/backend/src/utils/decoders/json-stat.js @@ -86,7 +86,7 @@ module.exports = async function decode(source) { const fs = require("fs"); - const stream = fs.createWriteStream("out_human_nuts.json", { + const stream = fs.createWriteStream("./out_human_nuts.json", { highWaterMark: 1024 * 1024 // 1MB buffer, opzionale }); stream.write("[\n"); @@ -165,38 +165,45 @@ module.exports = async function decode(source) { } const Datapoints = require('./Datapoint'); - const stream2 = fs.createReadStream("out_human_nuts.json", { encoding: "utf-8" }); + const stream2 = fs.createReadStream("./out_human_nuts.json", { encoding: "utf-8" }); let buffer = ""; let depth = 0; // conta le parentesi graffe let inObject = false; logger.debug("Inizio inserimento datapoints nel database..."); + let tempArray = []; for await (const chunk of stream2) { - logger.debug("Lettura chunk di dati..."); + //logger.debug("Lettura chunk di dati..."); for (const char of chunk) { - logger.debug(`Elaborazione carattere: ${char}`); + //logger.debug(`Elaborazione carattere: ${char}`); if (char === "{") { - logger.debug("Inizio di un nuovo oggetto JSON rilevato."); + //logger.debug("Inizio di un nuovo oggetto JSON rilevato."); if (!inObject) inObject = true; depth++; } - logger.debug(`Profondità attuale delle parentesi graffe: ${depth}`); + //logger.debug(`Profondità attuale delle parentesi graffe: ${depth}`); if (inObject) buffer += char; - logger.debug(`Buffer attuale: ${buffer}`); + //logger.debug(`Buffer attuale: ${buffer}`); if (char === "}") { - logger.debug("Fine di un oggetto JSON rilevata."); + //logger.debug("Fine di un oggetto JSON rilevata."); depth--; if (depth === 0 && inObject) { - logger.debug("Oggetto JSON completo rilevato, procedo con l'inserimento nel database."); + //logger.debug("Oggetto JSON completo rilevato, procedo con l'inserimento nel database."); // oggetto completo const obj = JSON.parse(buffer); - logger.debug(`Oggetto JSON da inserire: ${JSON.stringify(obj)}`); + //logger.debug(`Oggetto JSON da inserire: ${JSON.stringify(obj)}`); //const DatapointModel = await Datapoints.getDatapointModel(); //const datapoint = new DatapointModel(obj); - await Datapoints.insertMany([obj]); - logger.debug("Datapoint salvato nel database."); + tempArray.push(obj); + if (tempArray.length >= 1000) { + await Datapoints.insertMany(tempArray); + tempArray = []; + logger.debug("1000 Datapoints salvati nel database."); + } + //await Datapoints.insertMany([obj]); + //logger.debug("Datapoint salvato nel database."); buffer = ""; inObject = false; } From 5d689cc9ba71ca2a421648c5fa87468338960b66 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco Date: Fri, 16 Jan 2026 12:44:59 +0100 Subject: [PATCH 8/9] batch added --- backend/src/utils/decoders/json-stat.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/backend/src/utils/decoders/json-stat.js b/backend/src/utils/decoders/json-stat.js index 88aa258..5b65469 100644 --- a/backend/src/utils/decoders/json-stat.js +++ b/backend/src/utils/decoders/json-stat.js @@ -86,7 +86,9 @@ module.exports = async function decode(source) { const fs = require("fs"); - const stream = fs.createWriteStream("./out_human_nuts.json", { + let nameStream = ".out" + Date.now() + ".json"; + + const stream = fs.createWriteStream(nameStream, { highWaterMark: 1024 * 1024 // 1MB buffer, opzionale }); stream.write("[\n"); @@ -165,7 +167,7 @@ module.exports = async function decode(source) { } const Datapoints = require('./Datapoint'); - const stream2 = fs.createReadStream("./out_human_nuts.json", { encoding: "utf-8" }); + const stream2 = fs.createReadStream(nameStream, { encoding: "utf-8" }); let buffer = ""; let depth = 0; // conta le parentesi graffe let inObject = false; @@ -197,10 +199,10 @@ module.exports = async function decode(source) { //const DatapointModel = await Datapoints.getDatapointModel(); //const datapoint = new DatapointModel(obj); tempArray.push(obj); - if (tempArray.length >= 1000) { + if (tempArray.length >= config.batch) { await Datapoints.insertMany(tempArray); tempArray = []; - logger.debug("1000 Datapoints salvati nel database."); + logger.debug(config.batch +" Datapoints salvati nel database."); } //await Datapoints.insertMany([obj]); //logger.debug("Datapoint salvato nel database."); From c26c669c3b78151bc20d107a23ccdd7f4543b311 Mon Sep 17 00:00:00 2001 From: Gabriele Percoco Date: Fri, 16 Jan 2026 15:06:26 +0100 Subject: [PATCH 9/9] fix orion ngsi-ld inserting --- backend/src/writers/orionWriter.js | 127 +++++++++++++++-------------- 1 file changed, 68 insertions(+), 59 deletions(-) diff --git a/backend/src/writers/orionWriter.js b/backend/src/writers/orionWriter.js index ca3ac58..ad9bb2e 100644 --- a/backend/src/writers/orionWriter.js +++ b/backend/src/writers/orionWriter.js @@ -193,7 +193,7 @@ const writeObject = async (objNumber, obj, modelSchema, config) => { logger.debug('Sending to Orion CB object number: ' + objNumber + ' , id: ' + obj.id); logger.debug({ ToOrionObject: !config.orionWriter.keyValues || obj }) - var orionedObj = !config.orionWriter.keyValues && toOrionObject(obj, modelSchema) || obj; + var orionedObj = !config.orionWriter.keyValues && toOrionObject(obj, modelSchema, config) || obj; var options = { method: 'POST', @@ -553,79 +553,88 @@ const writeObject = async (objNumber, obj, modelSchema, config) => { } } -function toOrionObject(obj, schema) { +function toOrionObject(obj, schema, config) { logger.debug("Transforming Mapped object to an Orion Entity (explicit types in attributes)"); for (key in obj) { if (key != 'id' && key != 'type') { - var modelField = schema.allOf[0].properties[key]; - var modelFieldType = modelField.type; - var modelFieldFormat = modelField.format; - var objField = obj[key]; - logger.debug({ key: obj[key], modelField }); - - if (key == 'location') { - - var newValue = {}; - newValue = { - type: modelFieldType,//"geo:json", - value: objField - }; - obj['location'] = newValue; - - } else if (modelFieldType === 'object') { - - //var nestedValue = {}; - //for (fieldKey in objField) { - - // var modelSubField = modelField.properties[fieldKey]; - // var modelSubFieldType = modelSubField.type; - // var modelSubFieldFormat = modelSubField.format; - - // if (modelSubFieldFormat) - // nestedValue[fieldKey] = { - // value: objField[fieldkey], - // type: modelSubFieldType, - // format: modelSubFieldFormat - // } - // else - // nestedValue[fieldKey] = { - // value: objField[fieldKey], - // type: modelSubFieldType - // } - - // delete objField[fieldKey]; - //} - - var nestedObject = objField; - delete objField;//TODO check + if (config.orionWriter.protocol != "v2") obj[key] = { - type: modelFieldType, - value: nestedObject + type: config.orionWriter.protocol == "v2" ? modelFieldType : "Property", + value: obj[key] } + else { - } else { - if (modelFieldFormat) { - if (modelFieldFormat === 'date-time') - obj[key] = { - type: 'DateTime', - value: objField - }; + var modelField = schema.allOf[0].properties[key]; + var modelFieldType = modelField.type; + var modelFieldFormat = modelField.format; + var objField = obj[key]; + logger.debug({ key: obj[key], modelField }); + + if (key == 'location') { + + var newValue = {}; + newValue = { + type: modelFieldType,//"geo:json", + value: objField + }; + obj['location'] = newValue; + + } else if (modelFieldType === 'object') { + + //var nestedValue = {}; + //for (fieldKey in objField) { + + // var modelSubField = modelField.properties[fieldKey]; + // var modelSubFieldType = modelSubField.type; + // var modelSubFieldFormat = modelSubField.format; + + // if (modelSubFieldFormat) + // nestedValue[fieldKey] = { + // value: objField[fieldkey], + // type: modelSubFieldType, + // format: modelSubFieldFormat + // } + // else + // nestedValue[fieldKey] = { + // value: objField[fieldKey], + // type: modelSubFieldType + // } + + // delete objField[fieldKey]; + //} + + var nestedObject = objField; + delete objField;//TODO check + obj[key] = { + type: config.orionWriter.protocol == "v2" ? modelFieldType : "Property", + value: nestedObject + } + + } else { + + if (modelFieldFormat) { + if (modelFieldFormat === 'date-time') + obj[key] = { + type: 'DateTime', + value: objField + }; + else + obj[key] = { + type: modelFieldType, + value: objField + // format: modelFieldFormat + }; + } else obj[key] = { type: modelFieldType, value: objField - // format: modelFieldFormat - }; + } } - else - obj[key] = { - type: modelFieldType, - value: objField - } } } }