Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/src/mapHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ const handleSourceFieldsToDestArray = (sourceFieldArray, source, itemsType) => {
}
});


// print Array String as output
logger.debug({ finalArray })
return finalArray
Expand Down
14 changes: 14 additions & 0 deletions backend/src/utils/decoders/Datapoint.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const mongoose = require('mongoose');

const datapointSchema = new mongoose.Schema({
source: String,
survey: String,
surveyName: String,
region: String,
fromUrl: String,
timestamp: String,
dimensions: Object,
value: Number
}, {strict: false});

module.exports = mongoose.model('Datapoint', datapointSchema);
226 changes: 122 additions & 104 deletions backend/src/utils/decoders/json-stat.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,126 +74,144 @@ module.exports = async function decode(source) {
const indices = new Array(ids.length).fill(0);
const timestamp = js.updated;

if (config.debug?.recursiveJsonStat) { //TODO once tested remove this option
function walk(dimIndex) {
if (dimIndex === ids.length) {
let flat = 0;
let regionLevel = "unknown";
const humanDims = {};

for (let i = 0; i < ids.length; i++) {
flat += indices[i] * strides[i];
let dim = ids[i];
const code = indexToCode[dim][indices[i]];
let label = indexToLabel[dim][indices[i]];
console.log(89)
if (dim == "time")
dim = "year"
humanDims[dim] = label

if (dim === geoDimName) {
const isRegional = !NON_REGIONAL.has(code);

if (nutsMap[code]?.level != null) {
regionLevel = "NUTS" + nutsMap[code].level;
} else if (isRegional) {
if (code.length === 3) regionLevel = "NUTS1";
else if (code.length === 4) regionLevel = "NUTS2";
else if (code.length === 5) regionLevel = "NUTS3";
else regionLevel = "NON_NUTS";
} else {
regionLevel = "NON_NUTS";
}

regionName = label;
}
}

const val = values[flat];
if (val == null) return;

output.push({
source: source.extension.agencyId || source.extension.datastructure.agencyId,
survey: source.extension.id || source.extension.datastructure.id,
region: regionLevel,
dimensions: humanDims,
value: val,
timestamp
});
return;
}
if (sizes.length !== ids.length) {
throw new Error("sizes e ids non allineati");
}

for (let i = 0; i < sizes[dimIndex]; i++) {
indices[dimIndex] = i;
walk(dimIndex + 1);
}
sizes.forEach((s, i) => {
if (!Number.isInteger(s) || s <= 0) {
throw new Error(`Size non valida alla dimensione ${ids[i]}: ${s}`);
}
walk(0);
}
});

else
while (true) {
let flat = 0;
let regionLevel = "unknown";
const humanDims = {};

for (let i = 0; i < ids.length; i++) {
flat += indices[i] * strides[i];

let dim = ids[i];
const code = indexToCode[dim][indices[i]];
const label = indexToLabel[dim][indices[i]];
if(dim == "time")
dim = "year"
humanDims[dim] = label;

if (dim === geoDimName) {
const isRegional = !NON_REGIONAL.has(code);

if (nutsMap[code]?.level != null) {
regionLevel = "NUTS" + nutsMap[code].level;
} else if (isRegional) {
if (code.length === 3) regionLevel = "NUTS1";
else if (code.length === 4) regionLevel = "NUTS2";
else if (code.length === 5) regionLevel = "NUTS3";
else regionLevel = "NON_NUTS";
} else {
regionLevel = "NON_NUTS";
}
}
}
const fs = require("fs");

const val = values[flat];
if (val != null) {
output.push({
source: source.extension.agencyId || source.extension.datastructure.agencyId,
survey: source.extension.id || source.extension.datastructure.id,
region: regionLevel,
dimensions: humanDims,
value: val,
timestamp
});
}
let nameStream = ".out" + Date.now() + ".json";

const stream = fs.createWriteStream(nameStream, {
highWaterMark: 1024 * 1024 // 1MB buffer, opzionale
});
stream.write("[\n");

let firstRecord = true;

while (true) {
let flat = 0;
let regionLevel = "unknown";
const humanDims = {};

for (let i = 0; i < ids.length; i++) {
flat += indices[i] * strides[i];

let dim = ids[i];
const code = indexToCode[dim][indices[i]];
const label = indexToLabel[dim][indices[i]];

if (dim === "time") dim = "year";
humanDims[dim] = label;

let carry = true;
for (let i = indices.length - 1; i >= 0 && carry; i--) {
indices[i]++;
if (indices[i] < sizes[i]) {
carry = false;
if (dim === geoDimName) {
const isRegional = !NON_REGIONAL.has(code);

if (nutsMap[code]?.level != null) {
regionLevel = "NUTS" + nutsMap[code].level;
} else if (isRegional) {
if (code.length === 3) regionLevel = "NUTS1";
else if (code.length === 4) regionLevel = "NUTS2";
else if (code.length === 5) regionLevel = "NUTS3";
else regionLevel = "NON_NUTS";
} else {
indices[i] = 0;
regionLevel = "NON_NUTS";
}
}
}

const val = values[flat];
if (val != null) {
const record = JSON.stringify({
source: source.extension.agencyId || source.extension.datastructure.agencyId,
survey: source.extension.id || source.extension.datastructure.id,
region: regionLevel,
dimensions: humanDims,
value: val,
timestamp
});

if (!firstRecord) stream.write(",\n");
else firstRecord = false;

// Scrive su SSD direttamente, senza accumulare in RAM
stream.write(record);
}

if (carry) break;
// incrementa gli indici
let carry = true;
for (let i = indices.length - 1; i >= 0 && carry; i--) {
indices[i]++;
if (indices[i] < sizes[i]) carry = false;
else indices[i] = 0;
}

if (carry) break; // terminazione del ciclo
}

stream.write("\n]");
stream.end();



if (config.debug?.jsonStat) {
logger.debug("Salvataggio file di output...");
fs.writeFileSync("out_human_nuts.json", JSON.stringify(output, null, 2));
logger.debug("File salvato: out_human_nuts.json");
}

return output;
const Datapoints = require('./Datapoint');
const stream2 = fs.createReadStream(nameStream, { encoding: "utf-8" });
let buffer = "";
let depth = 0; // conta le parentesi graffe
let inObject = false;

logger.debug("Inizio inserimento datapoints nel database...");
let tempArray = [];
for await (const chunk of stream2) {
//logger.debug("Lettura chunk di dati...");
for (const char of chunk) {
//logger.debug(`Elaborazione carattere: ${char}`);
if (char === "{") {
//logger.debug("Inizio di un nuovo oggetto JSON rilevato.");
if (!inObject) inObject = true;
depth++;
}

//logger.debug(`Profondità attuale delle parentesi graffe: ${depth}`);
if (inObject) buffer += char;

//logger.debug(`Buffer attuale: ${buffer}`);
if (char === "}") {
//logger.debug("Fine di un oggetto JSON rilevata.");
depth--;
if (depth === 0 && inObject) {
//logger.debug("Oggetto JSON completo rilevato, procedo con l'inserimento nel database.");
// oggetto completo
const obj = JSON.parse(buffer);
//logger.debug(`Oggetto JSON da inserire: ${JSON.stringify(obj)}`);
//const DatapointModel = await Datapoints.getDatapointModel();
//const datapoint = new DatapointModel(obj);
tempArray.push(obj);
if (tempArray.length >= config.batch) {
await Datapoints.insertMany(tempArray);
tempArray = [];
logger.debug(config.batch +" Datapoints salvati nel database.");
}
//await Datapoints.insertMany([obj]);
//logger.debug("Datapoint salvato nel database.");
buffer = "";
inObject = false;
}
}
}
}
return [];

};
Loading