diff --git a/src/tasks/clean.py b/src/tasks/clean.py index b494ac5..e52c1ff 100644 --- a/src/tasks/clean.py +++ b/src/tasks/clean.py @@ -112,6 +112,9 @@ def clean_decp(lf: pl.LazyFrame, decp_format: DecpFormat) -> pl.LazyFrame: # Codes CPV, suppression du caractères de contrôle ("-[0-9]$") lf = lf.with_columns(pl.col("codeCPV").str.split("-").list[0].alias("codeCPV")) + # Nettoyage des codes département (ex: "006" -> "06") + lf = clean_lieu_execution_code(lf) + # Champs liste lf = process_string_lists(lf) @@ -172,6 +175,34 @@ def clean_invalid_characters(chunk: bytes): return bytes(chunk, "utf-8") +def clean_lieu_execution_code(lf: pl.LazyFrame) -> pl.LazyFrame: + """ + Corrige les codes département mal formatés. + + Certains codes département sont sur 3 chiffres avec un zéro devant (ex: "006" au lieu de "06"). + Cette fonction supprime le zéro initial pour les codes métropolitains, tout en préservant + les codes DOM-TOM légitimes sur 3 chiffres (971, 972, 973, 974, 976). + """ + columns = lf.collect_schema().names() + + if "lieuExecution_code" not in columns or "lieuExecution_typeCode" not in columns: + return lf + + lf = lf.with_columns( + pl.when( + (pl.col("lieuExecution_typeCode") == "Code département") + & (pl.col("lieuExecution_code").str.len_chars() == 3) + & (pl.col("lieuExecution_code").str.starts_with("0")) + & (pl.col("lieuExecution_code").str.contains(r"^\d{3}$")) + ) + .then(pl.col("lieuExecution_code").str.slice(1)) # Supprimer le premier caractère (le "0") + .otherwise(pl.col("lieuExecution_code")) + .alias("lieuExecution_code") + ) + + return lf + + def clean_null_equivalent(lf: pl.LazyFrame) -> pl.LazyFrame: """Supprime les strings équivalente à null""" mapping_null = { @@ -198,61 +229,73 @@ def clean_null_equivalent(lf: pl.LazyFrame) -> pl.LazyFrame: return lf -def clean_titulaires(lf: pl.LazyFrame, decp_format: DecpFormat, column) -> pl.LazyFrame: +def clean_titulaires(lf: pl.LazyFrame, decp_format: DecpFormat, column: str) -> pl.LazyFrame: """ Normalise les listes de titulaires en utilisant des expressions Polars natives. `column` peut être titulaires ou modification_titulaires Codée avec l'assistance du LLM Gemini 3 Pro et révisée par le développeur. + + Note: Les données scrappées de marches-publics.info (AWS) ont un format hybride : + - Enveloppe JSON format 2022 (marches.marche) + - Titulaires format 2019 (pas de clé "titulaire" imbriquée) + On détecte donc le format des titulaires dynamiquement. """ - # Définition des expressions de nettoyage selon le format - if decp_format.label == "DECP 2022": + # Skip processing if column dtype is Null (all values are null) + if lf.collect_schema()[column] == pl.Null: + return lf + + # Détection du format des titulaires en regardant la structure du schéma + titulaires_schema = lf.collect_schema()[column] + + # Vérifier si le schéma a une clé "titulaire" imbriquée + # Format 2022 : List(Struct({titulaire: Struct({id, typeIdentifiant})})) + # Format 2019 ou AWS hybride : List(Struct({id, typeIdentifiant, ...})) + has_nested_titulaire = False + if hasattr(titulaires_schema, "inner"): + inner_struct = titulaires_schema.inner + if hasattr(inner_struct, "fields"): + field_names = [f.name for f in inner_struct.fields] + has_nested_titulaire = "titulaire" in field_names + + if has_nested_titulaire: # Format 2022 : [{"titulaire": {"id": ..., "typeIdentifiant": ...}}] - # On veut extraire les champs de la struct imbriquée expr_titulaire = pl.struct( titulaire_id=pl.element().struct.field("titulaire").struct.field("id"), titulaire_typeIdentifiant=pl.element() .struct.field("titulaire") .struct.field("typeIdentifiant"), ) - else: - # Format 2019 : [{"id": ..., "typeIdentifiant": ...}] - # On renomme juste les champs + # Format 2019 ou AWS hybride : [{"id": ..., "typeIdentifiant": ...}] expr_titulaire = pl.struct( titulaire_id=pl.element().struct.field("id"), titulaire_typeIdentifiant=pl.element().struct.field("typeIdentifiant"), ) - # Skip processing if column dtype is Null (all values are null) - # This happens when modification_titulaires has no actual data - if lf.collect_schema()[column] != pl.Null: - lf = lf.with_columns( - pl.col(column) - .list.eval(expr_titulaire) - .list.eval( - # Filtrer les éléments où id ET typeIdentifiant sont null - pl.element().filter( - pl.element().struct.field("titulaire_id").is_not_null() - | pl.element() - .struct.field("titulaire_typeIdentifiant") - .is_not_null() - ) + lf = lf.with_columns( + pl.col(column) + .list.eval(expr_titulaire) + .list.eval( + # Filtrer les éléments où id ET typeIdentifiant sont null + pl.element().filter( + pl.element().struct.field("titulaire_id").is_not_null() + | pl.element() + .struct.field("titulaire_typeIdentifiant") + .is_not_null() ) - .alias(column) ) + .alias(column) + ) # Remplacer les listes de titulaires vides par null - # Only process columns that have List dtype - - if lf.collect_schema()[column] != pl.Null: - lf = lf.with_columns( - [ - pl.when(pl.col(column).list.len() == 0) - .then(None) - .otherwise(pl.col(column)) - .alias(column) - ] - ) + lf = lf.with_columns( + [ + pl.when(pl.col(column).list.len() == 0) + .then(None) + .otherwise(pl.col(column)) + .alias(column) + ] + ) return lf diff --git a/src/tasks/get.py b/src/tasks/get.py index b42591b..8147783 100644 --- a/src/tasks/get.py +++ b/src/tasks/get.py @@ -413,13 +413,13 @@ def get_process_file(_href: str): content = response.content lff = pl.scan_csv(content, schema_overrides=schema) lff = lff.select(columns) - logger.info(_href.split("/")[-1], "OK") + logger.info(_href.split("/")[-1] + " OK") return lff # Traitement en parrallèle avec 8 threads lfs = [] - with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=15) as executor: futures = [executor.submit(get_process_file, href) for href in hrefs] for future in concurrent.futures.as_completed(futures): try: