From 971a0691dd2da618899214a96f5ec3ba01ad0b0e Mon Sep 17 00:00:00 2001 From: Timm Ruland Date: Fri, 25 Jul 2025 10:38:30 +0200 Subject: [PATCH 1/9] feat(filtering): Added a datatrove based pipeline for filtering tokenized data using scores. - Included an example configuration file. - Added datatrove and pydantic-settings to requirements. - Note that modalities is also required for the pipeline to work, but it is not included in the requirements file. --- .../example_filter_pipeline_config.yaml | 22 ++ pyproject.toml | 2 + .../score_based_filtering/__init__.py | 0 .../score_based_filtering/filter_pipeline.py | 245 ++++++++++++++++++ .../step_data_filtering.py | 98 +++++++ .../step_score_parsing.py | 127 +++++++++ 6 files changed, 494 insertions(+) create mode 100644 configs/data_processing/example_filter_pipeline_config.yaml create mode 100644 src/ml_filter/data_processing/score_based_filtering/__init__.py create mode 100644 src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py create mode 100644 src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py create mode 100644 src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py diff --git a/configs/data_processing/example_filter_pipeline_config.yaml b/configs/data_processing/example_filter_pipeline_config.yaml new file mode 100644 index 00000000..73156384 --- /dev/null +++ b/configs/data_processing/example_filter_pipeline_config.yaml @@ -0,0 +1,22 @@ +params: + score_path: /path/to/annotations + tokenized_data_path: /path/to/tokenized + output_folder: /path/to/filtered + + thresholds: + score_Gemma_Snowflake: 3.0 + score_Llama_Snowflake: 2.0 + + hash_to_base_file_mapping_csv: path/to/hashes.csv + base_file_prefix: /path/to/raw + tokenized_data_extension: .pbin + +running_on_slurm: false + +local_settings: + tasks: 1 + local_tasks: 1 + local_rank_offset: 0 + logging_dir: null + +slurm_settings: null diff --git a/pyproject.toml b/pyproject.toml index 17f3eeea..508ace55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "tqdm", "datasets", "pydantic", + "pydantic-settings", "transformers @ git+https://github.com/huggingface/transformers.git@v4.49.0-Gemma-3", "click", "click_pathlib", @@ -28,6 +29,7 @@ dependencies = [ "python-dotenv", "jq", "tabulate", + "datatrove", ] [project.optional-dependencies] diff --git a/src/ml_filter/data_processing/score_based_filtering/__init__.py b/src/ml_filter/data_processing/score_based_filtering/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py new file mode 100644 index 00000000..977ab070 --- /dev/null +++ b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py @@ -0,0 +1,245 @@ +from __future__ import annotations + +import csv +import os +import sys +from pathlib import Path + +from datatrove.executor import LocalPipelineExecutor, SlurmPipelineExecutor +from datatrove.pipeline.base import PipelineStep +from pydantic import BaseModel, Field, model_validator +from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict, YamlConfigSettingsSource + +from ml_filter.data_processing.score_based_filtering.step_data_filtering import DataFiltering +from ml_filter.data_processing.score_based_filtering.step_score_parsing import ScoresParser + + +class FilterPipelineBuilder(BaseSettings): + """Configuration parameters and building for the score-based filtering pipeline. + This class defines the settings for running a data filtering pipeline that processes datasets based on scores. + It includes parameters for both local and Slurm execution environments. + The pipeline consists of steps for parsing scores and filtering datasets based on those scores. + + Besides initializing this class directly, it can also be configured using a YAML file or environment variables. + The YAML file can be specified using the `FILTER_PIPELINE_YAML_FILE` environment variable. + If no YAML file is provided, the class will use default settings and environment variables. + """ + + model_config = SettingsConfigDict(env_prefix="filter_pipeline_", env_nested_delimiter="__") + + # Pipeline configuration parameters + params: FilterPipelineParameters + + # Execution parameters + running_on_slurm: bool = False + local_settings: LocalExecutionSettings | None = None + slurm_settings: SlurmExecutionSettings | None = None + + @model_validator(mode="after") + def slurm_vs_local(self): + if self.running_on_slurm and self.local_settings is not None: + raise ValueError("Running on Slurm requires slurm execution settings, not local settings.") + if self.running_on_slurm and self.slurm_settings is None: + self.slurm_settings = SlurmExecutionSettings() + elif not self.running_on_slurm and self.slurm_settings is not None: + raise ValueError("Running locally requires local execution settings, not Slurm settings.") + if not self.running_on_slurm and self.local_settings is None: + self.local_settings = LocalExecutionSettings() + return self + + @model_validator(mode="after") + def set_logging_dir(self): + if self.local_settings is not None and self.local_settings.logging_dir is None: + self.local_settings.logging_dir = str(self.params.output_folder / "logs") + if self.slurm_settings is not None and self.slurm_settings.logging_dir is None: + self.slurm_settings.logging_dir = str(self.params.output_folder / "logs") + return self + + def build_pipeline_executor(self) -> LocalPipelineExecutor | SlurmPipelineExecutor: + """Builds the appropriate pipeline executor based on the execution settings.""" + pipeline = self._build_pipeline() + if self.running_on_slurm: + return SlurmPipelineExecutor(pipeline=pipeline, **self.slurm_settings.model_dump()) + else: + return LocalPipelineExecutor(pipeline=pipeline, **self.local_settings.model_dump()) + + def _build_pipeline(self) -> list[PipelineStep]: + """Builds the pipeline based on the provided configuration.""" + return build_pipeline( + score_path=self.params.score_path, + tokenized_data_path=self.params.tokenized_data_path, + output_folder=self.params.output_folder, + thresholds=self.params.thresholds, + hash_to_base_file_mapping_csv=self.params.hash_to_base_file_mapping_csv, + base_file_prefix=self.params.base_file_prefix, + tokenized_data_extension=self.params.tokenized_data_extension, + ) + + @classmethod + def settings_customise_sources( + cls, + settings_cls: type[BaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> tuple[PydanticBaseSettingsSource, ...]: + return ( + init_settings, + env_settings, + YamlConfigSettingsSource(settings_cls, yaml_file=os.getenv("FILTER_PIPELINE_YAML_FILE")), + dotenv_settings, + file_secret_settings, + ) + + +class FilterPipelineParameters(BaseModel): + """Parameters for the score-based filtering pipeline.""" + + score_path: Path = Field(..., description="The path to the directory containing JSONL files with scores.") + tokenized_data_path: Path = Field(..., description="The path for the tokenized data files.") + output_folder: Path = Field(..., description="The folder where the filtered datasets will be saved.") + thresholds: dict[str, float] = Field( + ..., description="Dictionary where keys are score names and values are thresholds to filter samples." + ) + hash_to_base_file_mapping_csv: Path = Field( + ..., description="CSV file mapping base file hashes to their corresponding paths." + ) + base_file_prefix: Path = Field( + default=Path(""), + description="The prefix path for the raw/base files. This prefix will be removed " + "when mapping from the raw files to the corresponding tokenized files", + ) + tokenized_data_extension: str = Field( + default=".pbin", description="The file extension for the tokenized data files." + ) + + +class LocalExecutionSettings(BaseModel): + """Settings for running the pipeline locally.""" + + tasks: int = 1 + local_tasks: int = 1 + local_rank_offset: int = 0 + logging_dir: str | None = None + + +class SlurmExecutionSettings(BaseModel): + """Settings for running the pipeline on a Slurm cluster.""" + + tasks: int = 1 + time: str = "00:15:00" + partition: str = "default" + account: str | None = None # FIXME is this supported? + cpus_per_task: int = 1 + mem_per_cpu_gb: int = 2 + workers: int = -1 + job_name: str = "data_processing" + qos: str = "normal" + env_command: str | None = None + condaenv: str | None = None + venv_path: str | None = None + sbatch_args: dict[str, str] | None = None + max_array_size: int = 1001 + depends_job_id: str | None = None + job_id_position: int = -1 + # job_id_retriever: Callable | None = None + logging_dir: str | None = None + skip_completed: bool = True + slurm_logs_folder: str | None = None + max_array_launch_parallel: bool = False + stagger_max_array_jobs: int = 0 + run_on_dependency_fail: bool = False + randomize_start_duration: int = 0 + requeue_signals: tuple[str] | None = ("SIGUSR1",) + mail_type: str = "ALL" + mail_user: str | None = None + requeue: bool = True + srun_args: dict[str, str] | None = None + tasks_per_job: int = 1 + + +def run_pipeline(args: FilterPipelineBuilder) -> None: + """Runs a datatrove pipeline to filter datasets based on scores. + Args: + args (PipelineArgs): The configuration parameters for the pipeline. + """ + executor = args.build_pipeline_executor() + executor.run() + + +def build_pipeline( + score_path: Path, + tokenized_data_path: Path, + output_folder: Path, + thresholds: dict[str, float], + hash_to_base_file_mapping_csv: Path, + base_file_prefix: Path = Path(""), + tokenized_data_extension: str = ".pbin", +) -> list[PipelineStep]: + """ + Builds a datatrove pipeline for filtering datasets based on scores. + Args: + score_path (Path): The path to the JSONL file containing scores. + tokenized_data_path (Path): The path for the tokenized data files. + output_folder (Path): The folder where the filtered datasets will be saved. + thresholds (dict[str, float]): A dictionary where keys are score names and values are the + thresholds to filter samples. + hash_to_base_file (dict[str, Path]): A mapping from base file hashes to their corresponding paths. + base_file_prefix (Path): The prefix path for the base files. + tokenized_data_extension (str): The file extension for the tokenized data files. + Returns: + list[PipelineStep]: A list containing the pipeline steps for filtering datasets. + """ + assert score_path.is_dir(), f"Score path {score_path} must be a directory." + assert output_folder.is_dir(), f"Output folder {output_folder} must be a directory." + assert len(thresholds) > 0, "At least one threshold must be provided." + assert ( + hash_to_base_file_mapping_csv.is_file() + ), f"Hash to base file mapping {hash_to_base_file_mapping_csv} must be a file." + hash_to_base_file = read_hash_to_base_file_mapping(hash_to_base_file_mapping_csv) + pipeline: list[PipelineStep] = [ + ScoresParser( + data_folder=str(score_path), + score_keys=list(thresholds.keys()), + hash_to_base_file=hash_to_base_file, + tokenized_data_path=tokenized_data_path, + base_file_prefix=base_file_prefix, + tokenized_data_extension=tokenized_data_extension, + ), + DataFiltering( + output_folder=output_folder, + thresholds=thresholds, + tokenized_data_path=tokenized_data_path, + ), + ] + return pipeline + + +def read_hash_to_base_file_mapping(csv_file: Path) -> dict[str, Path]: + """ + Reads a CSV file containing a mapping from base file hashes to their corresponding paths. + Args: + csv_file (Path): The path to the CSV file. + Returns: + dict[str, Path]: A dictionary mapping base file hashes to their corresponding paths. + """ + hash_to_base_file: dict[str, Path] = {} + with open(csv_file, "r") as f: + reader = csv.DictReader(f) + for row in reader: + hash_to_base_file[row["md5"]] = Path(row["file_path"]) + return hash_to_base_file + + +if __name__ == "__main__": + if len(sys.argv) > 1 or not (yaml_file := os.getenv("FILTER_PIPELINE_YAML_FILE")) or not os.path.isfile(yaml_file): + print( + "This script is intended to be used with a YAML configuration " + "file set via the environment variable `FILTER_PIPELINE_YAML_FILE`.\n" + "If you want to run it without a YAML file, please import from it " + "and use the FilterPipelineBuilder class directly." + ) + exit(1) + args = FilterPipelineBuilder() + run_pipeline(args) diff --git a/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py b/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py new file mode 100644 index 00000000..9f445316 --- /dev/null +++ b/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py @@ -0,0 +1,98 @@ +import dataclasses +import logging +from pathlib import Path +from typing import Callable + +import numpy as np +from datatrove.data import Document, DocumentsPipeline +from datatrove.pipeline.base import PipelineStep +from numpy.typing import NDArray + +from ml_filter.data_processing.score_based_filtering.step_score_parsing import ScoresParser + +try: + from modalities.dataloader.filter_packed_data import filter_dataset +except ImportError: + logging.error("The filtering pipeline requires the 'modalities' package to be installed.") + exit(1) + + +class DataFiltering(PipelineStep): + """ + A class to filter datasets based on scores and specified thresholds. + This class is designed to be used within a datatrove pipeline. + For a given list of score dictionaries, it filters the corresponding tokenized dataset files + based on the provided thresholds for each score. + The resulting filtered datasets are saved in the specified output folder. + Args: + output_folder (Path): The folder where the filtered datasets will be saved. + thresholds (dict[str, float]): A dictionary where keys are score names and values are the + thresholds to filter samples. + tokenized_data_path (Path): The path for the tokenized data files. + Raises: + AssertionError: If the output folder is not a directory or if no thresholds are provided. + """ + + name = "DataFiltering" + type = "Filter" + _requires_dependencies = [] + + def __init__(self, output_folder: Path, thresholds: dict[str, float], tokenized_data_path: Path = Path("")): + super().__init__() + self._output_folder = output_folder + assert self._output_folder.is_dir(), f"Output folder {self._output_folder} must be a directory." + self._thresholds = thresholds + assert len(self._thresholds) > 0, "At least one threshold must be provided." + self._tokenized_data_path = tokenized_data_path + + def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: + for document in data: + with self.track_time(): + self._filter_document(document) + yield document + + def _filter_document(self, document: Document): + """ + Filters a single, tokenized dataset based on the scores contained in the document. + Args: + document (Document): The document containing scores and the path to the tokenized data file. + Raises: + ValueError: If the document does not contain the required keys or if the tokenized file path is invalid. + """ + document: dict[str, list[dict[str, float]] | str] = dataclasses.asdict(document) + scores: list[dict[str, float]] = document["metadata"][ScoresParser.SCORE_ENTRIES_KEY] + tokenized_file = Path(document["metadata"][ScoresParser.TOKENIZED_FILE_KEY]) + output_path = self._prepare_output_path(tokenized_file) + filter_func = make_filter_func(scores, self._thresholds) + filter_dataset(src_path=tokenized_file, dst_path=output_path, filter_func=filter_func, sample_key="input_ids") + + def _prepare_output_path(self, tokenized_file: Path) -> Path: + tokenized_file_rel = tokenized_file.relative_to(self._tokenized_data_path) + output_path = self._output_folder / tokenized_file_rel.with_suffix(".filtered.pbin") + output_path.parent.mkdir(parents=True, exist_ok=True) + return output_path + + +def make_filter_func( + scores: list[dict[str, float]], thresholds: dict[str, float] +) -> Callable[[tuple[int, dict[str, NDArray[np.int_]]]], bool]: + """ + Creates a filter function that checks if the scores of each sample meet the specified thresholds. + Args: + scores (list[dict[str, float]]): A list of dictionaries containing scores for each sample. + thresholds (dict[str, float]): A dictionary where keys are score names and values are the thresholds to + filter samples. + Returns: + Callable[[tuple[int, dict[str, NDArray[np.int_]]]], bool]: A function that takes an item (index and + sample) and returns True if the sample meets the thresholds, otherwise False. + """ + + def filter_func(item: tuple[int, dict[str, NDArray[np.int_]]]) -> bool: + idx, _ = item + score_entry = scores[idx] + for score_key, threshold in thresholds.items(): + if score_entry[score_key] < threshold: + return False + return True + + return filter_func diff --git a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py new file mode 100644 index 00000000..f1ee14a8 --- /dev/null +++ b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py @@ -0,0 +1,127 @@ +import json +from pathlib import Path +from typing import Callable, Iterable, Literal + +from datatrove.data import DocumentsPipeline +from datatrove.io import DataFileLike, DataFolderLike +from datatrove.pipeline.readers.base import BaseDiskReader + + +class ScoresParser(BaseDiskReader): + """ + A parser that reads a JSONL file containing scores for samples and maps them to the + corresponding tokenized data files. Each entry in the JSONL file is expected to have + a "document_id" field that contains a base file hash and an index, and the scores + for that sample. + """ + + name = "ScoresParser" + # type = "Parser" + _requires_dependencies = [] + + SCORE_ENTRIES_KEY = "score_entries" + TOKENIZED_FILE_KEY = "tokenized_file" + + def __init__( + self, + data_folder: DataFolderLike, + score_keys: Iterable[str], + hash_to_base_file: dict[str, Path], + tokenized_data_path: Path, + base_file_prefix: Path = Path(""), + tokenized_data_extension: str = ".pbin", + compression: Literal["infer", "gzip", "zstd"] | None = "infer", + paths_file: DataFileLike | None = None, + limit: int = -1, + skip: int = 0, + file_progress: bool = False, + doc_progress: bool = False, + adapter: Callable | None = None, + text_key: str = "text", + id_key: str = "id", + default_metadata: dict | None = None, + recursive: bool = True, + glob_pattern: str | None = None, + shuffle_files: bool = False, + ): + super().__init__( + data_folder=data_folder, + paths_file=paths_file, + limit=limit, + skip=skip, + file_progress=file_progress, + doc_progress=doc_progress, + adapter=adapter, + text_key=text_key, + id_key=id_key, + default_metadata=default_metadata, + recursive=recursive, + glob_pattern=glob_pattern, + shuffle_files=shuffle_files, + ) + self._score_keys = list(score_keys) + assert len(self._score_keys) > 0, "At least one score key must be provided." + self._hash_to_base_file = hash_to_base_file + self._tokenized_data_path = tokenized_data_path + self._base_file_prefix = base_file_prefix + self._tokenized_data_extension = tokenized_data_extension + self._compression = compression + + def read_file(self, filepath: str) -> DocumentsPipeline: + """ + Turns a given JSONL file into a Document object containing the path to the corresponding tokenized data file + and a list of dictionaries with the scores for each sample in the file. + Args: + filepath: path of the file to read + + Returns: generator of Document + """ + base_file_hash, scores_as_list = self._parse_scores_jsonl_file(filepath) + tokenized_data_path = self._map_to_tokenized_data_path(base_file_hash) + if not tokenized_data_path.exists(): + raise FileNotFoundError(f"Tokenized data file {tokenized_data_path} does not exist.") + doc_content = { + "text": ".", # Text needs to be non-empty. + self.SCORE_ENTRIES_KEY: scores_as_list, + self.TOKENIZED_FILE_KEY: tokenized_data_path, + } + document = self.get_document_from_dict(doc_content, filepath, 0) + return [document] + + def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, float]]]: + scores_for_idx: dict[int, dict[str, float]] = {} + hashes: set[str] = set() + with self.data_folder.open(filepath, "r", compression=self._compression) as f: + for line in f: + file_data = json.loads(line) + base_file_hash, document_idx = file_data["document_id"].rsplit("_") + scores_for_idx[int(document_idx)] = {k: file_data[k] for k in self._score_keys} + hashes.add(base_file_hash) + self._verify_file_format(scores_for_idx, hashes) + scores_as_list = list(map(lambda x: x[1], sorted(scores_for_idx.items(), key=lambda x: x[0]))) + base_file_hash = next(iter(hashes)) + return base_file_hash, scores_as_list + + def _verify_file_format(self, scores_for_idx: dict[int, dict[str, float]], hashes: set[str]): + assert len(hashes) == 1, "All entries in the score file must refer to the same base file." + assert min(scores_for_idx.keys()) == 0 and max(scores_for_idx.keys()) + 1 == len( + scores_for_idx + ), "All indices in the score file must be continuous." + + def _map_to_tokenized_data_path(self, base_file_hash: str) -> Path: + """ + Maps a base file hash to the corresponding tokenized data path. + Args: + base_file_hash (str): The hash of the base file. + Returns: + Path: The path to the tokenized data file. + """ + if base_file_hash not in self._hash_to_base_file: + raise ValueError(f"Base file hash {base_file_hash} not found in the provided hash mapping.") + base_file = self._hash_to_base_file[base_file_hash] + base_file_rel = base_file.relative_to(self._base_file_prefix) + tokenized_rel = base_file_rel.with_suffix(self._tokenized_data_extension) + tokenized_data_path = self._tokenized_data_path / tokenized_rel + if not tokenized_data_path.exists(): + raise FileNotFoundError(f"Tokenized data file {tokenized_data_path} does not exist.") + return tokenized_data_path From 81aafa82579ead0c61f453ed85c52fabf2620a64 Mon Sep 17 00:00:00 2001 From: BlueCrescent <7198877+BlueCrescent@users.noreply.github.com> Date: Fri, 25 Jul 2025 10:43:29 +0200 Subject: [PATCH 2/9] chore(filtering): More robust doc id parsing. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../data_processing/score_based_filtering/step_score_parsing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py index f1ee14a8..13298e44 100644 --- a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py +++ b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py @@ -94,7 +94,7 @@ def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, f with self.data_folder.open(filepath, "r", compression=self._compression) as f: for line in f: file_data = json.loads(line) - base_file_hash, document_idx = file_data["document_id"].rsplit("_") + base_file_hash, document_idx = file_data["document_id"].rsplit("_", 1) scores_for_idx[int(document_idx)] = {k: file_data[k] for k in self._score_keys} hashes.add(base_file_hash) self._verify_file_format(scores_for_idx, hashes) From b1d1a46fc74f94a7b19ad8454027391616d9d6f2 Mon Sep 17 00:00:00 2001 From: Timm Ruland Date: Fri, 25 Jul 2025 10:50:27 +0200 Subject: [PATCH 3/9] fix(filtering): Removed duplicate file exists check. --- .../data_processing/score_based_filtering/step_score_parsing.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py index 13298e44..3db1ae8f 100644 --- a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py +++ b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py @@ -78,8 +78,6 @@ def read_file(self, filepath: str) -> DocumentsPipeline: """ base_file_hash, scores_as_list = self._parse_scores_jsonl_file(filepath) tokenized_data_path = self._map_to_tokenized_data_path(base_file_hash) - if not tokenized_data_path.exists(): - raise FileNotFoundError(f"Tokenized data file {tokenized_data_path} does not exist.") doc_content = { "text": ".", # Text needs to be non-empty. self.SCORE_ENTRIES_KEY: scores_as_list, From af891823cb557e02bb9a285a3791bdb87893f7db Mon Sep 17 00:00:00 2001 From: BlueCrescent <7198877+BlueCrescent@users.noreply.github.com> Date: Fri, 25 Jul 2025 10:52:01 +0200 Subject: [PATCH 4/9] fix(filtering): fixed docstring Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../data_processing/score_based_filtering/filter_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py index 977ab070..ec1e5410 100644 --- a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py +++ b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py @@ -185,7 +185,7 @@ def build_pipeline( output_folder (Path): The folder where the filtered datasets will be saved. thresholds (dict[str, float]): A dictionary where keys are score names and values are the thresholds to filter samples. - hash_to_base_file (dict[str, Path]): A mapping from base file hashes to their corresponding paths. + hash_to_base_file_mapping_csv (Path): A CSV file mapping base file hashes to their corresponding paths. base_file_prefix (Path): The prefix path for the base files. tokenized_data_extension (str): The file extension for the tokenized data files. Returns: From bb65badda5f34c323638192b097fe10ba4dbff06 Mon Sep 17 00:00:00 2001 From: ale25663 Date: Fri, 26 Sep 2025 11:07:16 +0200 Subject: [PATCH 5/9] feat(pipeline): add language sampling and filtering pipeline with configuration and job submission scripts --- .../example_sampling_pipeline_config.yaml | 14 + .../run_language_pipeline.py | 291 +++++++++++++ .../run_language_pipeline.slurm | 25 ++ .../lang_based_sampling/sampling_utils.py | 392 ++++++++++++++++++ .../lang_based_sampling/submit_jobs.sh | 12 + 5 files changed, 734 insertions(+) create mode 100644 configs/data_processing/example_sampling_pipeline_config.yaml create mode 100644 src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py create mode 100755 src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.slurm create mode 100644 src/ml_filter/data_processing/lang_based_sampling/sampling_utils.py create mode 100755 src/ml_filter/data_processing/lang_based_sampling/submit_jobs.sh diff --git a/configs/data_processing/example_sampling_pipeline_config.yaml b/configs/data_processing/example_sampling_pipeline_config.yaml new file mode 100644 index 00000000..57b2dc13 --- /dev/null +++ b/configs/data_processing/example_sampling_pipeline_config.yaml @@ -0,0 +1,14 @@ +paths: + annotations_base: "/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/annotations" + tokenized_base: "/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/tokenized" + output_folder: "/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/output_filtered" + base_file_prefix: "/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/jsonl_files" #"/data/horse/ws/s6690609-gptx_traindata/s6690609-gptx_traindata-1744239642/eurolingua/data/raw_data/fineweb2" + csv_path: "/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/hashes.csv" + tokenizer_model: "/data/horse/ws/alju972f-tokenization_at_scale/eurolingua_tokenization/tokenizer/tueken2_tokenizer_model.model" + +sampling: + language_distribution: + als_Latn: 25 + deu_Latn: 75 + total_sample_size: 10 + random_seed: 42 \ No newline at end of file diff --git a/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py b/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py new file mode 100644 index 00000000..3577eaf3 --- /dev/null +++ b/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py @@ -0,0 +1,291 @@ +"""Language sampling, filtering, and validation pipeline. + +Refactored for clarity, testability, and CLI flexibility. +""" + +from __future__ import annotations + +import argparse +import logging +import os +import sys +import json +import random +from pathlib import Path +from typing import Dict, Tuple, Set + +import yaml +import sentencepiece as spm +from pydantic import BaseModel, Field, validator +from ml_filter.src.ml_filter.data_processing.lang_based_sampling.sampling_utils import ( + load_hash_mapping, + invert_hash_mapping, + load_jsonl_counts, + compute_target_samples, + sample_documents, + TokenizedFilterer, +) +from modalities.dataloader.dataset import PackedMemMapDatasetBase + +EOD_TOKEN_ID = 3 # Sentence end token appended by pipeline, to be ignored in validation + +logger = logging.getLogger("language_pipeline") + + +def setup_logging(level: str) -> None: + """Configure root logging once.""" + logging.basicConfig( + level=getattr(logging, level.upper(), logging.INFO), + format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", + ) + + +class PathsConfig(BaseModel): + tokenized_base: Path + output_folder: Path + base_file_prefix: Path + csv_path: Path + tokenizer_model: Path + + @validator("tokenized_base", "output_folder", "base_file_prefix", "csv_path", "tokenizer_model", pre=True) + def _to_path(cls, v): # type: ignore + return Path(v) + + +class SamplingConfig(BaseModel): + language_distribution: Dict[str, int] + total_sample_size: int = Field(gt=0) + + @validator("language_distribution") + def non_empty_distribution(cls, v): # type: ignore + if not v: + raise ValueError("language_distribution cannot be empty") + return v + + +class PipelineConfig(BaseModel): + paths: PathsConfig + sampling: SamplingConfig + + +def load_config(path: Path) -> PipelineConfig: + """Load YAML config into strongly typed PipelineConfig.""" + with open(path) as f: + raw = yaml.safe_load(f) + return PipelineConfig(**raw) + + +def run_sampling_and_filter( + lang: str, + cfg: PipelineConfig, + use_wc: bool, +) -> Tuple[Set[str], Dict[str, Dict[Path, int]], dict, dict]: + """Run sampling for all languages, return selected IDs for requested language and related mappings.""" + paths_cfg = cfg.paths + sampling_cfg = cfg.sampling + tokenized_base = paths_cfg.tokenized_base + output_folder = paths_cfg.output_folder + base_file_prefix = paths_cfg.base_file_prefix + csv_path = paths_cfg.csv_path + + language_distribution = sampling_cfg.language_distribution + total_sample_size = sampling_cfg.total_sample_size + logger.info( + f"Sampling config total_sample_size={total_sample_size} distribution={language_distribution}" + ) + + hash_mapping = load_hash_mapping(csv_path) + inv_hash_mapping = invert_hash_mapping(hash_mapping) + lang_to_files = load_jsonl_counts(base_file_prefix, use_wc=use_wc) + targets = compute_target_samples(language_distribution, total_sample_size) + selected_doc_ids_all = sample_documents(lang_to_files, targets, inv_hash_mapping) + logger.info( + "Sampling done: " + ", ".join(f"{k}:{len(v)}" for k, v in selected_doc_ids_all.items()) + ) + if lang not in selected_doc_ids_all: + raise KeyError( + f"Language {lang} not present in sampled IDs. Available={list(selected_doc_ids_all.keys())}" + ) + selected_ids = set(selected_doc_ids_all[lang]) + logger.info(f"Selected {len(selected_ids)} synthetic IDs for {lang}") + + # Filtering + filterer = TokenizedFilterer( + tokenized_base, + output_folder, + hash_mapping, + inv_hash_mapping, + base_file_prefix, + ) + files_filtered = 0 + for annotated_file in lang_to_files[lang].keys(): + filterer.filter_document(annotated_file, selected_ids) + files_filtered += 1 + logger.info(f"Filtering complete: files_processed={files_filtered}") + return selected_ids, lang_to_files, hash_mapping, inv_hash_mapping + + +def validate_filtered( + lang: str, + selected_ids: Set[str], + lang_to_files: Dict[str, Dict[Path, int]], + mappings: Tuple[dict, dict], + cfg: PipelineConfig, + samples_per_file: int | None = None, +) -> int: + """Validate filtered tokenized output against SentencePiece encoding. + + Returns number of validated documents. + """ + hash_mapping, inv_hash_mapping = mappings + tokenizer_model = cfg.paths.tokenizer_model + base_file_prefix = cfg.paths.base_file_prefix + output_folder = cfg.paths.output_folder + + logger.info("Loading SentencePiece model for validation") + sp = spm.SentencePieceProcessor(model_file=str(tokenizer_model)) + logger.info("Validation phase starting") + + def _filter_ids_for_file(file_path: Path, selected: set[str]): + base_md5 = inv_hash_mapping.get(file_path) + if not base_md5: + return [], {} + prefix = base_md5 + "_" + file_ids = [sid for sid in selected if sid.startswith(prefix)] + if not file_ids: + return [], {} + rows: dict[int, str] = {} + for sid in file_ids: + try: + rows[int(sid.rsplit('_', 1)[1])] = sid + except (ValueError, IndexError): + logging.warning(f"Malformed doc_id (skipping): {sid}") + continue + return file_ids, rows + + validation_docs = 0 + for data_file in lang_to_files[lang].keys(): + filtered_ids, target_rows = _filter_ids_for_file(data_file, selected_ids) + if not filtered_ids: + continue + rel = data_file.relative_to(base_file_prefix) + filtered_file = output_folder / rel.with_suffix(".filtered.pbin") + logger.info( + f"Validating: src_jsonl={data_file} filtered_pbin={filtered_file} ids={len(filtered_ids)}" + ) + try: + source_data = PackedMemMapDatasetBase( + filtered_file, sample_key="input_ids", load_index=True + ) + except FileNotFoundError: + logger.error(f"Filtered pbin not found: {filtered_file}") + continue + selected_lines: list[tuple[int, dict]] = [] + with open(data_file) as f: + for idx, line in enumerate(f): + if idx not in target_rows: + continue + try: + rec = json.loads(line) + except json.JSONDecodeError as e: + logger.warning( + f"JSON decode error at line {idx} in {data_file}: {e}" + ) + continue + selected_lines.append((idx, rec)) + + # Optionally subsample validation lines for this file + if samples_per_file is not None and len(selected_lines) > samples_per_file: + selected_lines = random.sample(selected_lines, samples_per_file) + + #logger the selected lines + logger.debug(f"Selected lines for validation in {data_file}: {[idx for idx, _ in selected_lines]}") + + for out_idx, (row_idx, rec) in enumerate(selected_lines): + if out_idx >= len(source_data): + break + pipeline_tokens = source_data[out_idx]["input_ids"].tolist() + ref_tokens = sp.encode(rec["text"], out_type=int) + had_trailing_eod = False + if ( + pipeline_tokens + and pipeline_tokens[-1] == EOD_TOKEN_ID + and (len(pipeline_tokens) - 1) == len(ref_tokens) + ): + had_trailing_eod = True + logger.debug(f"Trailing EOD token found in line {row_idx} of {data_file}, ignoring for validation") + compare_pipeline = pipeline_tokens[:-1] + else: + compare_pipeline = pipeline_tokens + base_md5 = inv_hash_mapping.get(data_file) + synthetic_id = f"{base_md5}_{row_idx}" if base_md5 else f"UNKNOWN_{row_idx}" + for i, (p_tok, r_tok) in enumerate(zip(compare_pipeline, ref_tokens)): + if p_tok != r_tok: + logger.error( + f"Token mismatch file={data_file} line={row_idx} out_idx={out_idx} doc_id={synthetic_id} pos={i} pipeline_tok={p_tok} ref_tok={r_tok}" + ) + raise AssertionError(f"Token mismatch for line {row_idx}") + if len(compare_pipeline) != len(ref_tokens): + logger.error( + f"Length mismatch file={data_file} line={row_idx} out_idx={out_idx} doc_id={synthetic_id} pipeline_len={len(compare_pipeline)} ref_len={len(ref_tokens)} had_trailing_eod={had_trailing_eod} original_pipeline_len={len(pipeline_tokens)}" + ) + raise AssertionError(f"Length mismatch for line {row_idx}") + validation_docs += 1 + logger.info( + f"Validated file {data_file} ok (docs_validated={len(selected_lines)})" + ) + logger.info(f"Validation complete: total_validated_docs={validation_docs}") + return validation_docs + + +def parse_args(argv: list[str]) -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Run language sampling/filtering/validation pipeline" + ) + p.add_argument("lang", help="Language code to process (must exist in distribution)") + p.add_argument( + "--config", "-c", default=os.environ.get("PIPELINE_CONFIG", "config.yaml"), + help="Path to YAML config (default env PIPELINE_CONFIG or config.yaml)", + ) + p.add_argument( + "--log-level", default=os.environ.get("LOG_LEVEL", "INFO"), + help="Logging level (default from LOG_LEVEL env or INFO)", + ) + p.add_argument( + "--skip-validation", action="store_true", help="Skip token-level validation phase" + ) + p.add_argument( + "--disable-wc", action="store_true", help="Disable fast wc -l counting" + ) + p.add_argument( + "--validation-samples-per-file", type=int, default=5, + help="Maximum number of documents to validate per file (random sample). If omitted, validate all." + ) + return p.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv or sys.argv[1:]) + setup_logging(args.log_level) + logger.info(f"Starting pipeline lang={args.lang}") + cfg = load_config(Path(args.config)) + selected_ids, lang_to_files, hash_mapping, inv_hash_mapping = run_sampling_and_filter( + args.lang, cfg, use_wc=not args.disable_wc + ) + if args.skip_validation: + logger.info("Validation skipped by flag") + else: + validate_filtered( + args.lang, + selected_ids, + lang_to_files, + (hash_mapping, inv_hash_mapping), + cfg, + samples_per_file=args.validation_samples_per_file, + ) + logger.info(f"Job for {args.lang} completed successfully ✅") + return 0 + + +if __name__ == "__main__": # pragma: no cover + raise SystemExit(main()) diff --git a/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.slurm b/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.slurm new file mode 100755 index 00000000..1d579385 --- /dev/null +++ b/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.slurm @@ -0,0 +1,25 @@ +#!/bin/bash +#SBATCH --job-name=sampling_pipeline +#SBATCH --output=logs/%x_%j.out +#SBATCH --error=logs/%x_%j.err +#SBATCH --partition=barnard +#SBATCH --account=p_gptx +#SBATCH --time=04:00:00 +#SBATCH --nodes=1 +#SBATCH --cpus-per-task=100 +#SBATCH --ntasks=1 +#SBATCH --mem-per-cpu=4500 + +# Load modules and activate environment +module purge + +module --ignore_cache load release/24.04 +module load GCC/13.2.0 +module load Python/3.11.5 + +source /data/horse/ws/alju972f-tokenization_at_scale/env_tokenization/bin/activate + +LANG=$1 +echo "Running pipeline for $LANG" + +python3 run_language_pipeline.py $LANG diff --git a/src/ml_filter/data_processing/lang_based_sampling/sampling_utils.py b/src/ml_filter/data_processing/lang_based_sampling/sampling_utils.py new file mode 100644 index 00000000..5895363d --- /dev/null +++ b/src/ml_filter/data_processing/lang_based_sampling/sampling_utils.py @@ -0,0 +1,392 @@ +import json +import csv +import logging +from pathlib import Path +from collections import defaultdict +from typing import Callable +import random +import numpy as np +import subprocess +from typing import Dict, List, Optional +from modalities.dataloader.filter_packed_data import filter_dataset +from modalities.dataloader.dataset import PackedMemMapDatasetBase + + +logger = logging.getLogger(__name__) + +def compute_target_samples(language_distribution: dict[str, int], total_sample_size: int) -> dict[str, int]: + targets = {lang: round(total_sample_size * pct / 100) for lang, pct in language_distribution.items()} + logger.info(f"Computed targets (total={total_sample_size}): {targets}") + return targets + + +def load_hash_mapping(csv_path: Path) -> dict[str, Path]: + """ + CSV has columns: file_path,md5 + Returns: {md5: Path(raw_jsonl)} + """ + mapping = {} + with open(csv_path, "r") as f: + reader = csv.DictReader(f) + n = 0 + for row in reader: + mapping[row["md5"]] = Path(row["file_path"]) + n += 1 + logger.info(f"Loaded hash mapping entries={n} from {csv_path}") + return mapping + + +def invert_hash_mapping(hash_mapping: dict[str, Path]) -> dict[Path, str]: + """Return inverse mapping: {Path(raw_jsonl): md5}. + """ + inverse: dict[Path, str] = {} + for md5, p in hash_mapping.items(): + if p in inverse: + logger.warning(f"Duplicate path in inversion: {p} (old={inverse[p]}, new={md5}) overwriting") + inverse[p] = md5 + return inverse + + +def load_jsonl_counts(annotated_base: Path, use_wc: bool = True) -> dict[str, dict[Path, int]]: + """Traverse annotated_base//... and count docs in each JSONL. + + Args: + annotated_base: Root directory containing per-language subdirectories with JSONL files. + use_wc: If True (default), attempt fast line counting via external `wc -l` command. + Falls back to Python iteration on failure. + + Returns: + {lang: {annotated_file_path: num_docs}} + """ + lang_to_files: dict[str, dict[Path, int]] = defaultdict(dict) + + def count_lines_py(path: Path) -> int: + with open(path, "r") as f: + return sum(1 for _ in f) + + for lang_dir in annotated_base.iterdir(): + if not lang_dir.is_dir(): + continue + lang = lang_dir.name + jsonl_files = list(lang_dir.rglob("*.jsonl")) + if not jsonl_files: + logger.debug(f"No JSONL files found under {lang_dir}") + continue + if use_wc: + logger.info(f"Using 'wc -l' to count lines in {len(jsonl_files)} files for language '{lang}'") + try: + cmd = ["wc", "-l", *[str(p) for p in jsonl_files]] + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + for line in result.stdout.strip().splitlines(): + line = line.strip() + if not line or line.startswith("total"): + continue + parts = line.split() + if len(parts) < 2: + continue + try: + count = int(parts[0]) + except ValueError: + continue + fname = " ".join(parts[1:]) + fpath = Path(fname) + if fpath in jsonl_files: + lang_to_files[lang][fpath] = count + + for p in jsonl_files: + if p not in lang_to_files[lang]: + lang_to_files[lang][p] = count_lines_py(p) + except Exception as e: + logger.warning(f"wc -l failed for lang={lang} falling back to Python counting: {e}") + for p in jsonl_files: + lang_to_files[lang][p] = count_lines_py(p) + else: + for p in jsonl_files: + lang_to_files[lang][p] = count_lines_py(p) + + total_files = sum(len(files) for files in lang_to_files.values()) + logger.info( + f"Counted JSONL files total_files={total_files} details=" + + ", ".join(f"{lg}:{len(files)}" for lg, files in lang_to_files.items()) + ) + return lang_to_files + + +def sample_documents( + lang_to_files: Dict[str, Dict[Path, int]], + targets: Dict[str, int], + file_to_hash: dict[Path, str], + rng: Optional[random.Random] = None +) -> Dict[str, List[str]]: + """Sample synthetic document IDs of the form _. + + Args: + lang_to_files: { lang: { Path(jsonl): n_docs, ... }, ... } + targets: { lang: target_count, ... } + file_to_hash: { Path(jsonl): md5 } mapping (inverse of the CSV md5->path mapping) + rng: optional random.Random instance for reproducibility + + Returns: + { lang: [ "_", ... ], ... } + """ + if rng is None: + rng = random.Random() + + selected_doc_ids: Dict[str, List[str]] = defaultdict(list) + + for lang, target in targets.items(): + files_items = list(lang_to_files.get(lang, {}).items()) + if not files_items: + raise ValueError(f"No files found for language '{lang}'") + + total_docs = sum(n_docs for _, n_docs in files_items) + if total_docs < target: + raise ValueError(f"Not enough documents for {lang}: target={target}, available={total_docs}") + if target == 0: + continue + + logger.info(f"Sampling lang={lang} target={target} files={len(files_items)} total_docs={total_docs}") + + # Step 1: proportional allocation (floating) + raw_allocs = [target * (n_docs / total_docs) for _, n_docs in files_items] + allocs = [int(x) for x in raw_allocs] + + # Step 2: distribute remainder by largest fractional part (tie-break random) + remainder = target - sum(allocs) + if remainder > 0: + frac_info = [(i, raw_allocs[i] - allocs[i]) for i in range(len(files_items))] + rng.shuffle(frac_info) + frac_info.sort(key=lambda x: x[1], reverse=True) + for i, _ in frac_info: + if remainder <= 0: + break + if allocs[i] < files_items[i][1]: + allocs[i] += 1 + remainder -= 1 + logger.debug(f"ALLOC_INITIAL lang={lang} allocs={allocs}") + + # Step 3: cap allocations and reclaim overflow + remainder = 0 + for i, (_, n_docs) in enumerate(files_items): + if allocs[i] > n_docs: + remainder += allocs[i] - n_docs + allocs[i] = n_docs + + # Step 3b: redistribute reclaimed remainder greedily + while remainder > 0: + candidates = [i for i, (_, n_docs) in enumerate(files_items) if allocs[i] < n_docs] + if not candidates: + raise RuntimeError(f"Could not redistribute {remainder} samples for {lang}") + rng.shuffle(candidates) + candidates.sort(key=lambda i: (files_items[i][1] - allocs[i]), reverse=True) + for i in candidates: + if remainder <= 0: + break + allocs[i] += 1 + remainder -= 1 + logger.debug(f"ALLOC_FINAL lang={lang} allocs={allocs}") + + # Step 4: sample line indices for each file and synthesize IDs + for (fpath, n_docs), quota in zip(files_items, allocs): + if quota <= 0: + continue + doc_hash = file_to_hash.get(fpath) + if doc_hash is None: + raise KeyError(f"Hash for file {fpath} not found in file_to_hash mapping") + # sample unique line indices (0-based) + if quota > n_docs: + raise ValueError(f"Quota {quota} exceeds available lines {n_docs} in {fpath}") + if quota == n_docs: + indices = list(range(n_docs)) + else: + indices = rng.sample(range(n_docs), quota) + ids = [f"{doc_hash}_{idx}" for idx in indices] + selected_doc_ids[lang].extend(ids) + logger.debug( + f"SAMPLED lang={lang} file={fpath.name} hash={doc_hash} quota={quota} first_ids={ids[:5]}" + ) + logger.info(f"Completed sampling lang={lang} selected={len(selected_doc_ids[lang])}") + + return selected_doc_ids + + + +# --- Helper to make filter function --- +def make_filter_func_from_ids(doc_ids: list[str], selected_ids: set[str]) -> Callable[[tuple[int, dict[str, np.ndarray]]], bool]: + def filter_func(item: tuple[int, dict[str, np.ndarray]]) -> bool: + idx, _ = item + try: + return doc_ids[idx] in selected_ids + except IndexError: + logging.error(f"Index {idx} not found in doc_ids list") + return False + return filter_func + + +def _filter_ids_for_file(file_path: Path, selected: set[str], inv_map: dict[Path, str]): + """Return (filtered_ids, target_rows) for a given annotated file. + filtered_ids: list of selected ids whose md5 prefix matches this file's md5. + target_rows: {row_index: full_id} + Assumes doc_id format: _. + """ + base_md5 = inv_map.get(file_path) + if not base_md5: + raise ValueError(f"File path {file_path} not found in inverse hash mapping") + prefix = base_md5 + "_" + file_ids = [sid for sid in selected if sid.startswith(prefix)] + if not file_ids: + return [], {} + rows: dict[int, str] = {} + for sid in file_ids: + try: + rows[int(sid.rsplit('_', 1)[1])] = sid + except (ValueError, IndexError): + logging.warning(f"Malformed doc_id (skipping): {sid}") + continue + logger.debug(f"FILTER_IDS file={file_path} hash={base_md5} count={len(file_ids)}") + return file_ids, rows + +# --- Tokenized filterer --- +class TokenizedFilterer: + def __init__(self, tokenized_base: Path, output_folder: Path, hash_mapping: dict[str, Path], inverse_mapping: dict[Path, str], base_file_prefix: Path): + self._tokenized_base = tokenized_base + self._output_folder = output_folder + self._hash_mapping = hash_mapping # md5 -> raw_path + self._inverse_mapping = inverse_mapping # Path(jsonl) -> md5 + self._base_file_prefix = base_file_prefix + + def _prepare_output_path(self, tokenized_file: Path) -> Path: + tokenized_file_rel = tokenized_file.relative_to(self._tokenized_base) + output_path = self._output_folder / tokenized_file_rel.with_suffix(".filtered.pbin") + output_path.parent.mkdir(parents=True, exist_ok=True) + return output_path + + def filter_document(self, annotated_file: Path, selected_ids: set[str]): + """Filter a tokenized file using synthetic IDs (_).""" + base_hash = self._inverse_mapping.get(annotated_file) + if not base_hash: + logger.info(f"Skipping {annotated_file}: no hash mapping found") + return + + # fast check: do any selected ids reference this hash? + prefix = base_hash + "_" + if not any(sid.startswith(prefix) for sid in selected_ids): + logger.info(f"Skipping {annotated_file}: no selected ids for hash {base_hash}") + return + + # Build doc_ids list for index alignment (no need to parse JSON; just count lines) + with open(annotated_file, "r") as f: + doc_ids = [f"{base_hash}_{i}" for i, _ in enumerate(f)] + logger.debug( + f"FILTER file={annotated_file.name} hash={base_hash} total_lines={len(doc_ids)} selected_in_hash={sum(1 for i in selected_ids if i.startswith(prefix))}" + ) + + if base_hash not in self._hash_mapping: + raise ValueError(f"Base hash {base_hash} not found in CSV hash mapping") + + raw_path = self._hash_mapping[base_hash] + try: + rel = raw_path.relative_to(self._base_file_prefix) + tokenized_file = (self._tokenized_base / rel).with_suffix(".pbin") + except ValueError: + raise ValueError(f"Raw path {raw_path} is not under base prefix {self._base_file_prefix}") + output_path = self._prepare_output_path(tokenized_file) + + filter_func = make_filter_func_from_ids(doc_ids, selected_ids) + logger.info(f"Filtering hash={base_hash} src={tokenized_file} -> dst={output_path}") + filter_dataset(src_path=tokenized_file, dst_path=output_path, filter_func=filter_func) + logger.info(f"Finished filtering hash={base_hash} output={output_path}") + + + +# # --- Main pipeline --- +if __name__ == "__main__": + csv_path = Path("/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/hashes.csv") + annotations_base = Path("/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/annotations") + tokenized_base = Path("/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/tokenized") + output_folder = Path("/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/output_filtered") + base_file_prefix = Path("/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/jsonl_files") + + language_distribution = {"als_Latn": 25, "deu_Latn": 75} + total_sample_size = 10 + + # # Load CSV hash mapping + hash_mapping = load_hash_mapping(csv_path) + inv_hash_mapping = invert_hash_mapping(hash_mapping) + # # Count annotated JSONL docs + lang_to_files = load_jsonl_counts(base_file_prefix) + + # Compute targets + targets = compute_target_samples(language_distribution, total_sample_size) + + # Sample documents + selected_doc_ids = sample_documents(lang_to_files, targets, inv_hash_mapping) + + # Filter tokenized files + filterer = TokenizedFilterer(tokenized_base, output_folder, hash_mapping, inv_hash_mapping, base_file_prefix) + for lang, ids in selected_doc_ids.items(): + selected_set = set(ids) + for annotated_file in lang_to_files[lang].keys(): + filterer.filter_document(annotated_file, selected_set) + + + # # --- Validation step: re-tokenize and compare --- + # import json + import sentencepiece as spm + # from pathlib import Path + + # # --- Validation step: re-tokenize and compare --- + # from collections import defaultdict + + # selected_doc_ids = defaultdict(list) + + # # selected_doc_ids["als_Latn"].append("29d82196d55803ab9c792e45b59919bf_273561") + # # selected_doc_ids["deu_Latn"].append("6f174ddca737f54cea5f34da31e15178_931238") + + sp = spm.SentencePieceProcessor( + model_file="/data/horse/ws/alju972f-tokenization_at_scale/eurolingua_tokenization/tokenizer/tueken2_tokenizer_model.model" + ) + + for lang, ids in selected_doc_ids.items(): + for data_file in lang_to_files[lang].keys(): + filtered_ids, target_rows = _filter_ids_for_file(data_file, ids, inv_hash_mapping) + if not filtered_ids: + continue + + rel = data_file.relative_to(base_file_prefix) + filtered_file = output_folder / rel.with_suffix(".filtered.pbin") + source_data = PackedMemMapDatasetBase(filtered_file, sample_key="input_ids", load_index=True) + + # print out which files are being compared + print(f"Validating {data_file} against {filtered_file} for language {lang}") + + selected_lines: list[tuple[int, dict]] = [] + with open(data_file) as f: + for idx, line in enumerate(f): + if idx not in target_rows: + continue + rec = json.loads(line) + selected_lines.append((idx, rec)) + + if len(selected_lines) != len(source_data): + logging.warning( + f"Length mismatch for {annotated_file}: filtered_pbin={len(source_data)} selected_lines={len(selected_lines)}" + ) + + selected_lines = random.sample(selected_lines, 3) + for out_idx, (row_idx, rec) in enumerate(selected_lines): + if out_idx >= len(source_data): + break + pipeline_tokens = source_data[out_idx]["input_ids"].tolist() + ref_tokens = sp.encode(rec["text"], out_type=int) + # First check length mismatch (often most informative / faster) + if len(pipeline_tokens) != len(ref_tokens): + print( + f"Length mismatch for line {row_idx}: pipeline_len={len(pipeline_tokens)} ref_len={len(ref_tokens)}" + ) + raise AssertionError(f"❌ Length mismatch for line {row_idx}") + for i, (p_tok, r_tok) in enumerate(zip(pipeline_tokens, ref_tokens)): + if p_tok != r_tok: + print(f"Token mismatch at position {i} for line {row_idx}: pipeline={p_tok} ref={r_tok}") + break + raise AssertionError(f"❌ Token mismatch for line {row_idx}") diff --git a/src/ml_filter/data_processing/lang_based_sampling/submit_jobs.sh b/src/ml_filter/data_processing/lang_based_sampling/submit_jobs.sh new file mode 100755 index 00000000..a5488afb --- /dev/null +++ b/src/ml_filter/data_processing/lang_based_sampling/submit_jobs.sh @@ -0,0 +1,12 @@ +#!/bin/bash +set -euo pipefail + +mkdir -p logs + +echo "Submitting jobs :" +for lang in $(python3 -c "import yaml, os; cfg=yaml.safe_load(open(os.environ['PIPELINE_CONFIG'])); print(' '.join(cfg['sampling']['language_distribution'].keys()))"); do + echo "Submitting job for $lang" + sbatch run_language_pipeline.slurm "$lang" +done + +echo "✅ All jobs submitted." From c057ecd0be34dcf57f79f484835c28862c9cb6c5 Mon Sep 17 00:00:00 2001 From: ale25663 Date: Fri, 26 Sep 2025 11:55:07 +0200 Subject: [PATCH 6/9] feat(config): update filter and sampling pipeline configurations with new paths and parameters --- .../example_filter_pipeline_config.yaml | 44 +++- .../example_sampling_pipeline_config.yaml | 6 +- .../run_language_pipeline.py | 45 ++--- .../lang_based_sampling/sampling_utils.py | 188 ++++-------------- 4 files changed, 96 insertions(+), 187 deletions(-) diff --git a/configs/data_processing/example_filter_pipeline_config.yaml b/configs/data_processing/example_filter_pipeline_config.yaml index 73156384..66f8e09c 100644 --- a/configs/data_processing/example_filter_pipeline_config.yaml +++ b/configs/data_processing/example_filter_pipeline_config.yaml @@ -1,17 +1,45 @@ params: - score_path: /path/to/annotations - tokenized_data_path: /path/to/tokenized - output_folder: /path/to/filtered + score_path: /data/horse/ws/alju972f-tokenization_at_scale/annotated_scores/sample/annotations + tokenized_data_path: /data/horse/ws/alju972f-tokenization_at_scale/eurolingua_tokenization/raw_data_tokenized/ + output_folder: /data/horse/ws/alju972f-tokenization_at_scale/annotated_scores/output thresholds: score_Gemma_Snowflake: 3.0 - score_Llama_Snowflake: 2.0 - hash_to_base_file_mapping_csv: path/to/hashes.csv - base_file_prefix: /path/to/raw + hash_to_base_file_mapping_csv: /data/horse/ws/alju972f-tokenization_at_scale/annotated_scores/sample/hashes.csv + base_file_prefix: /data/horse/ws/s6690609-gptx_traindata/s6690609-gptx_traindata-1744239642/eurolingua/data/raw_data tokenized_data_extension: .pbin -running_on_slurm: false +params: + score_path: /data/horse/ws/alju972f-tokenization_at_scale/annotated_scores/annotations + tokenized_data_path: /data/horse/ws/alju972f-tokenization_at_scale/eurolingua_tokenization/raw_data_tokenized/ + output_folder: /data/horse/ws/alju972f-tokenization_at_scale/annotated_scores/output + + thresholds: + score_Mistral_Snowflake: 3.0 + + hash_to_base_file_mapping_csv: /data/horse/ws/alju972f-tokenization_at_scale/scripts/hashing/hashes_fineweb_final.csv + base_file_prefix: /data/horse/ws/s6690609-gptx_traindata/s6690609-gptx_traindata-1744239642/eurolingua/data/raw_data + tokenized_data_extension: .pbin + +# running_on_slurm: true + +# slurm_settings: +# tasks: 10 +# sbatch_args: +# account: "p_gptx" +# nodes: "1" +# ntasks: "1" +# partition: "barnard" +# time: "02:00:00" +# cpus_per_task: 40 +# mem_per_cpu: 2500 +# job_name: "filter_tokenized_data" +# output: /data/horse/ws/alju972f-tokenization_at_scale/final_run/logs/filtering/output_dir/%j.out +# error: /data/horse/ws/alju972f-tokenization_at_scale/final_run/logs/filtering/error_dir/%j.err +# venv_path: /data/horse/ws/alju972f-tokenization_at_scale/env_filtering/bin/activate + +running_on_slurm: false # set false to run locally local_settings: tasks: 1 @@ -19,4 +47,4 @@ local_settings: local_rank_offset: 0 logging_dir: null -slurm_settings: null + diff --git a/configs/data_processing/example_sampling_pipeline_config.yaml b/configs/data_processing/example_sampling_pipeline_config.yaml index 57b2dc13..e5612d31 100644 --- a/configs/data_processing/example_sampling_pipeline_config.yaml +++ b/configs/data_processing/example_sampling_pipeline_config.yaml @@ -1,8 +1,7 @@ paths: - annotations_base: "/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/annotations" tokenized_base: "/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/tokenized" output_folder: "/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/output_filtered" - base_file_prefix: "/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/jsonl_files" #"/data/horse/ws/s6690609-gptx_traindata/s6690609-gptx_traindata-1744239642/eurolingua/data/raw_data/fineweb2" + base_file_prefix: "/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/jsonl_files" # location of raw jsonl files csv_path: "/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/hashes.csv" tokenizer_model: "/data/horse/ws/alju972f-tokenization_at_scale/eurolingua_tokenization/tokenizer/tueken2_tokenizer_model.model" @@ -10,5 +9,4 @@ sampling: language_distribution: als_Latn: 25 deu_Latn: 75 - total_sample_size: 10 - random_seed: 42 \ No newline at end of file + total_sample_size: 10 \ No newline at end of file diff --git a/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py b/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py index 3577eaf3..d1b87f50 100644 --- a/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py +++ b/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py @@ -1,8 +1,3 @@ -"""Language sampling, filtering, and validation pipeline. - -Refactored for clarity, testability, and CLI flexibility. -""" - from __future__ import annotations import argparse @@ -16,7 +11,7 @@ import yaml import sentencepiece as spm -from pydantic import BaseModel, Field, validator +from pydantic import BaseModel, Field from ml_filter.src.ml_filter.data_processing.lang_based_sampling.sampling_utils import ( load_hash_mapping, invert_hash_mapping, @@ -33,7 +28,7 @@ def setup_logging(level: str) -> None: - """Configure root logging once.""" + """Configure root logging.""" logging.basicConfig( level=getattr(logging, level.upper(), logging.INFO), format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", @@ -41,35 +36,28 @@ def setup_logging(level: str) -> None: class PathsConfig(BaseModel): + """Filesystem paths used by the pipeline.""" tokenized_base: Path output_folder: Path base_file_prefix: Path csv_path: Path tokenizer_model: Path - @validator("tokenized_base", "output_folder", "base_file_prefix", "csv_path", "tokenizer_model", pre=True) - def _to_path(cls, v): # type: ignore - return Path(v) - class SamplingConfig(BaseModel): + """Sampling parameters: language distribution and total size.""" language_distribution: Dict[str, int] total_sample_size: int = Field(gt=0) - @validator("language_distribution") - def non_empty_distribution(cls, v): # type: ignore - if not v: - raise ValueError("language_distribution cannot be empty") - return v - class PipelineConfig(BaseModel): + """Top-level pipeline configuration object.""" paths: PathsConfig sampling: SamplingConfig def load_config(path: Path) -> PipelineConfig: - """Load YAML config into strongly typed PipelineConfig.""" + """Load YAML config into PipelineConfig.""" with open(path) as f: raw = yaml.safe_load(f) return PipelineConfig(**raw) @@ -80,7 +68,7 @@ def run_sampling_and_filter( cfg: PipelineConfig, use_wc: bool, ) -> Tuple[Set[str], Dict[str, Dict[Path, int]], dict, dict]: - """Run sampling for all languages, return selected IDs for requested language and related mappings.""" + """Run sampling and filtering, returning selected ids and metadata.""" paths_cfg = cfg.paths sampling_cfg = cfg.sampling tokenized_base = paths_cfg.tokenized_base @@ -109,7 +97,6 @@ def run_sampling_and_filter( selected_ids = set(selected_doc_ids_all[lang]) logger.info(f"Selected {len(selected_ids)} synthetic IDs for {lang}") - # Filtering filterer = TokenizedFilterer( tokenized_base, output_folder, @@ -133,10 +120,7 @@ def validate_filtered( cfg: PipelineConfig, samples_per_file: int | None = None, ) -> int: - """Validate filtered tokenized output against SentencePiece encoding. - - Returns number of validated documents. - """ + """Validate filtered tokenized output; return count validated.""" hash_mapping, inv_hash_mapping = mappings tokenizer_model = cfg.paths.tokenizer_model base_file_prefix = cfg.paths.base_file_prefix @@ -194,11 +178,8 @@ def _filter_ids_for_file(file_path: Path, selected: set[str]): continue selected_lines.append((idx, rec)) - # Optionally subsample validation lines for this file if samples_per_file is not None and len(selected_lines) > samples_per_file: - selected_lines = random.sample(selected_lines, samples_per_file) - - #logger the selected lines + selected_lines = selected_lines[:samples_per_file] logger.debug(f"Selected lines for validation in {data_file}: {[idx for idx, _ in selected_lines]}") for out_idx, (row_idx, rec) in enumerate(selected_lines): @@ -213,7 +194,7 @@ def _filter_ids_for_file(file_path: Path, selected: set[str]): and (len(pipeline_tokens) - 1) == len(ref_tokens) ): had_trailing_eod = True - logger.debug(f"Trailing EOD token found in line {row_idx} of {data_file}, ignoring for validation") + logger.debug(f"Trailing EOD token line={row_idx} file={data_file}") compare_pipeline = pipeline_tokens[:-1] else: compare_pipeline = pipeline_tokens @@ -239,6 +220,7 @@ def _filter_ids_for_file(file_path: Path, selected: set[str]): def parse_args(argv: list[str]) -> argparse.Namespace: + """Parse CLI arguments.""" p = argparse.ArgumentParser( description="Run language sampling/filtering/validation pipeline" ) @@ -259,12 +241,13 @@ def parse_args(argv: list[str]) -> argparse.Namespace: ) p.add_argument( "--validation-samples-per-file", type=int, default=5, - help="Maximum number of documents to validate per file (random sample). If omitted, validate all." + help="Max docs to validate per file." ) return p.parse_args(argv) def main(argv: list[str] | None = None) -> int: + """Entry point for sampling + filtering + optional validation.""" args = parse_args(argv or sys.argv[1:]) setup_logging(args.log_level) logger.info(f"Starting pipeline lang={args.lang}") @@ -287,5 +270,5 @@ def main(argv: list[str] | None = None) -> int: return 0 -if __name__ == "__main__": # pragma: no cover +if __name__ == "__main__": raise SystemExit(main()) diff --git a/src/ml_filter/data_processing/lang_based_sampling/sampling_utils.py b/src/ml_filter/data_processing/lang_based_sampling/sampling_utils.py index 5895363d..b7f74576 100644 --- a/src/ml_filter/data_processing/lang_based_sampling/sampling_utils.py +++ b/src/ml_filter/data_processing/lang_based_sampling/sampling_utils.py @@ -15,16 +15,19 @@ logger = logging.getLogger(__name__) def compute_target_samples(language_distribution: dict[str, int], total_sample_size: int) -> dict[str, int]: + """Return per-language target counts from percentage distribution. + + language_distribution: mapping language -> percent (0-100) + total_sample_size: total desired documents across all languages + Returns: mapping language -> integer target count (rounded) + """ targets = {lang: round(total_sample_size * pct / 100) for lang, pct in language_distribution.items()} logger.info(f"Computed targets (total={total_sample_size}): {targets}") return targets def load_hash_mapping(csv_path: Path) -> dict[str, Path]: - """ - CSV has columns: file_path,md5 - Returns: {md5: Path(raw_jsonl)} - """ + """Load md5 -> raw file path mapping from a CSV with columns md5,file_path.""" mapping = {} with open(csv_path, "r") as f: reader = csv.DictReader(f) @@ -37,8 +40,7 @@ def load_hash_mapping(csv_path: Path) -> dict[str, Path]: def invert_hash_mapping(hash_mapping: dict[str, Path]) -> dict[Path, str]: - """Return inverse mapping: {Path(raw_jsonl): md5}. - """ + """Invert md5->path mapping to path->md5, warning on duplicates.""" inverse: dict[Path, str] = {} for md5, p in hash_mapping.items(): if p in inverse: @@ -48,15 +50,11 @@ def invert_hash_mapping(hash_mapping: dict[str, Path]) -> dict[Path, str]: def load_jsonl_counts(annotated_base: Path, use_wc: bool = True) -> dict[str, dict[Path, int]]: - """Traverse annotated_base//... and count docs in each JSONL. - - Args: - annotated_base: Root directory containing per-language subdirectories with JSONL files. - use_wc: If True (default), attempt fast line counting via external `wc -l` command. - Falls back to Python iteration on failure. + """Return nested mapping lang -> jsonl file -> line count. - Returns: - {lang: {annotated_file_path: num_docs}} + Uses `wc -l` when use_wc is True for speed, falling back to Python counting on failure. + annotated_base: root containing per-language subdirectories. + use_wc: attempt shell wc -l acceleration. """ lang_to_files: dict[str, dict[Path, int]] = defaultdict(dict) @@ -73,7 +71,7 @@ def count_lines_py(path: Path) -> int: logger.debug(f"No JSONL files found under {lang_dir}") continue if use_wc: - logger.info(f"Using 'wc -l' to count lines in {len(jsonl_files)} files for language '{lang}'") + logger.info(f"Using wc -l for {len(jsonl_files)} files lang={lang}") try: cmd = ["wc", "-l", *[str(p) for p in jsonl_files]] result = subprocess.run(cmd, capture_output=True, text=True, check=True) @@ -118,16 +116,13 @@ def sample_documents( file_to_hash: dict[Path, str], rng: Optional[random.Random] = None ) -> Dict[str, List[str]]: - """Sample synthetic document IDs of the form _. + """Sample document line indices proportionally per file to meet per-language targets. - Args: - lang_to_files: { lang: { Path(jsonl): n_docs, ... }, ... } - targets: { lang: target_count, ... } - file_to_hash: { Path(jsonl): md5 } mapping (inverse of the CSV md5->path mapping) - rng: optional random.Random instance for reproducibility - - Returns: - { lang: [ "_", ... ], ... } + lang_to_files: lang -> file -> line count + targets: lang -> desired sample size + file_to_hash: file path -> md5 hash used in synthetic id prefix + rng: optional random.Random for deterministic tests + Returns: lang -> list of synthetic ids _ """ if rng is None: rng = random.Random() @@ -147,11 +142,8 @@ def sample_documents( logger.info(f"Sampling lang={lang} target={target} files={len(files_items)} total_docs={total_docs}") - # Step 1: proportional allocation (floating) raw_allocs = [target * (n_docs / total_docs) for _, n_docs in files_items] allocs = [int(x) for x in raw_allocs] - - # Step 2: distribute remainder by largest fractional part (tie-break random) remainder = target - sum(allocs) if remainder > 0: frac_info = [(i, raw_allocs[i] - allocs[i]) for i in range(len(files_items))] @@ -164,15 +156,11 @@ def sample_documents( allocs[i] += 1 remainder -= 1 logger.debug(f"ALLOC_INITIAL lang={lang} allocs={allocs}") - - # Step 3: cap allocations and reclaim overflow remainder = 0 for i, (_, n_docs) in enumerate(files_items): if allocs[i] > n_docs: remainder += allocs[i] - n_docs allocs[i] = n_docs - - # Step 3b: redistribute reclaimed remainder greedily while remainder > 0: candidates = [i for i, (_, n_docs) in enumerate(files_items) if allocs[i] < n_docs] if not candidates: @@ -185,15 +173,12 @@ def sample_documents( allocs[i] += 1 remainder -= 1 logger.debug(f"ALLOC_FINAL lang={lang} allocs={allocs}") - - # Step 4: sample line indices for each file and synthesize IDs for (fpath, n_docs), quota in zip(files_items, allocs): if quota <= 0: continue doc_hash = file_to_hash.get(fpath) if doc_hash is None: raise KeyError(f"Hash for file {fpath} not found in file_to_hash mapping") - # sample unique line indices (0-based) if quota > n_docs: raise ValueError(f"Quota {quota} exceeds available lines {n_docs} in {fpath}") if quota == n_docs: @@ -210,9 +195,8 @@ def sample_documents( return selected_doc_ids - -# --- Helper to make filter function --- def make_filter_func_from_ids(doc_ids: list[str], selected_ids: set[str]) -> Callable[[tuple[int, dict[str, np.ndarray]]], bool]: + """Build predicate for filtering PackedMemMap items by synthetic id membership.""" def filter_func(item: tuple[int, dict[str, np.ndarray]]) -> bool: idx, _ = item try: @@ -224,11 +208,7 @@ def filter_func(item: tuple[int, dict[str, np.ndarray]]) -> bool: def _filter_ids_for_file(file_path: Path, selected: set[str], inv_map: dict[Path, str]): - """Return (filtered_ids, target_rows) for a given annotated file. - filtered_ids: list of selected ids whose md5 prefix matches this file's md5. - target_rows: {row_index: full_id} - Assumes doc_id format: _. - """ + """Return (ids_for_file, index_map) for one annotated file based on selected ids.""" base_md5 = inv_map.get(file_path) if not base_md5: raise ValueError(f"File path {file_path} not found in inverse hash mapping") @@ -246,35 +226,50 @@ def _filter_ids_for_file(file_path: Path, selected: set[str], inv_map: dict[Path logger.debug(f"FILTER_IDS file={file_path} hash={base_md5} count={len(file_ids)}") return file_ids, rows -# --- Tokenized filterer --- + class TokenizedFilterer: + """Filter tokenized packed dataset files based on selected synthetic ids. + + Maps annotated jsonl files (line-based) to corresponding tokenized .pbin files + and writes filtered copies preserving only selected document indices. + """ def __init__(self, tokenized_base: Path, output_folder: Path, hash_mapping: dict[str, Path], inverse_mapping: dict[Path, str], base_file_prefix: Path): + """Create filterer. + + tokenized_base: root directory of tokenized .pbin files + output_folder: destination root for filtered outputs + hash_mapping: md5 -> raw path mapping (from CSV) + inverse_mapping: raw path -> md5 mapping + base_file_prefix: common prefix directory of raw paths for relative resolution + """ self._tokenized_base = tokenized_base self._output_folder = output_folder - self._hash_mapping = hash_mapping # md5 -> raw_path - self._inverse_mapping = inverse_mapping # Path(jsonl) -> md5 + self._hash_mapping = hash_mapping + self._inverse_mapping = inverse_mapping self._base_file_prefix = base_file_prefix def _prepare_output_path(self, tokenized_file: Path) -> Path: + """Return output path for a tokenized file ensuring parent directories exist.""" tokenized_file_rel = tokenized_file.relative_to(self._tokenized_base) output_path = self._output_folder / tokenized_file_rel.with_suffix(".filtered.pbin") output_path.parent.mkdir(parents=True, exist_ok=True) return output_path def filter_document(self, annotated_file: Path, selected_ids: set[str]): - """Filter a tokenized file using synthetic IDs (_).""" + """Filter one annotated jsonl file's corresponding tokenized file using selected ids. + + annotated_file: path to the annotated jsonl + selected_ids: full set of synthetic ids to retain (across languages) + """ base_hash = self._inverse_mapping.get(annotated_file) if not base_hash: logger.info(f"Skipping {annotated_file}: no hash mapping found") return - # fast check: do any selected ids reference this hash? prefix = base_hash + "_" if not any(sid.startswith(prefix) for sid in selected_ids): logger.info(f"Skipping {annotated_file}: no selected ids for hash {base_hash}") return - - # Build doc_ids list for index alignment (no need to parse JSON; just count lines) with open(annotated_file, "r") as f: doc_ids = [f"{base_hash}_{i}" for i, _ in enumerate(f)] logger.debug( @@ -291,102 +286,7 @@ def filter_document(self, annotated_file: Path, selected_ids: set[str]): except ValueError: raise ValueError(f"Raw path {raw_path} is not under base prefix {self._base_file_prefix}") output_path = self._prepare_output_path(tokenized_file) - filter_func = make_filter_func_from_ids(doc_ids, selected_ids) logger.info(f"Filtering hash={base_hash} src={tokenized_file} -> dst={output_path}") filter_dataset(src_path=tokenized_file, dst_path=output_path, filter_func=filter_func) - logger.info(f"Finished filtering hash={base_hash} output={output_path}") - - - -# # --- Main pipeline --- -if __name__ == "__main__": - csv_path = Path("/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/hashes.csv") - annotations_base = Path("/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/annotations") - tokenized_base = Path("/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/tokenized") - output_folder = Path("/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/output_filtered") - base_file_prefix = Path("/data/horse/ws/alju972f-tokenization_at_scale/sampling_pipeline_test/jsonl_files") - - language_distribution = {"als_Latn": 25, "deu_Latn": 75} - total_sample_size = 10 - - # # Load CSV hash mapping - hash_mapping = load_hash_mapping(csv_path) - inv_hash_mapping = invert_hash_mapping(hash_mapping) - # # Count annotated JSONL docs - lang_to_files = load_jsonl_counts(base_file_prefix) - - # Compute targets - targets = compute_target_samples(language_distribution, total_sample_size) - - # Sample documents - selected_doc_ids = sample_documents(lang_to_files, targets, inv_hash_mapping) - - # Filter tokenized files - filterer = TokenizedFilterer(tokenized_base, output_folder, hash_mapping, inv_hash_mapping, base_file_prefix) - for lang, ids in selected_doc_ids.items(): - selected_set = set(ids) - for annotated_file in lang_to_files[lang].keys(): - filterer.filter_document(annotated_file, selected_set) - - - # # --- Validation step: re-tokenize and compare --- - # import json - import sentencepiece as spm - # from pathlib import Path - - # # --- Validation step: re-tokenize and compare --- - # from collections import defaultdict - - # selected_doc_ids = defaultdict(list) - - # # selected_doc_ids["als_Latn"].append("29d82196d55803ab9c792e45b59919bf_273561") - # # selected_doc_ids["deu_Latn"].append("6f174ddca737f54cea5f34da31e15178_931238") - - sp = spm.SentencePieceProcessor( - model_file="/data/horse/ws/alju972f-tokenization_at_scale/eurolingua_tokenization/tokenizer/tueken2_tokenizer_model.model" - ) - - for lang, ids in selected_doc_ids.items(): - for data_file in lang_to_files[lang].keys(): - filtered_ids, target_rows = _filter_ids_for_file(data_file, ids, inv_hash_mapping) - if not filtered_ids: - continue - - rel = data_file.relative_to(base_file_prefix) - filtered_file = output_folder / rel.with_suffix(".filtered.pbin") - source_data = PackedMemMapDatasetBase(filtered_file, sample_key="input_ids", load_index=True) - - # print out which files are being compared - print(f"Validating {data_file} against {filtered_file} for language {lang}") - - selected_lines: list[tuple[int, dict]] = [] - with open(data_file) as f: - for idx, line in enumerate(f): - if idx not in target_rows: - continue - rec = json.loads(line) - selected_lines.append((idx, rec)) - - if len(selected_lines) != len(source_data): - logging.warning( - f"Length mismatch for {annotated_file}: filtered_pbin={len(source_data)} selected_lines={len(selected_lines)}" - ) - - selected_lines = random.sample(selected_lines, 3) - for out_idx, (row_idx, rec) in enumerate(selected_lines): - if out_idx >= len(source_data): - break - pipeline_tokens = source_data[out_idx]["input_ids"].tolist() - ref_tokens = sp.encode(rec["text"], out_type=int) - # First check length mismatch (often most informative / faster) - if len(pipeline_tokens) != len(ref_tokens): - print( - f"Length mismatch for line {row_idx}: pipeline_len={len(pipeline_tokens)} ref_len={len(ref_tokens)}" - ) - raise AssertionError(f"❌ Length mismatch for line {row_idx}") - for i, (p_tok, r_tok) in enumerate(zip(pipeline_tokens, ref_tokens)): - if p_tok != r_tok: - print(f"Token mismatch at position {i} for line {row_idx}: pipeline={p_tok} ref={r_tok}") - break - raise AssertionError(f"❌ Token mismatch for line {row_idx}") + logger.info(f"Finished filtering hash={base_hash} output={output_path}") \ No newline at end of file From 0389d2beed68164c4f0ffa013c5fc43ce9314d18 Mon Sep 17 00:00:00 2001 From: ale25663 Date: Fri, 26 Sep 2025 12:20:39 +0200 Subject: [PATCH 7/9] fix(imports): update import path for sampling_utils to simplify module reference --- .../lang_based_sampling/run_language_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py b/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py index d1b87f50..5641884b 100644 --- a/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py +++ b/src/ml_filter/data_processing/lang_based_sampling/run_language_pipeline.py @@ -12,7 +12,7 @@ import yaml import sentencepiece as spm from pydantic import BaseModel, Field -from ml_filter.src.ml_filter.data_processing.lang_based_sampling.sampling_utils import ( +from sampling_utils import ( load_hash_mapping, invert_hash_mapping, load_jsonl_counts, From 4c82fdf2c4ddbd20f0dbb817f5e8c63417cc87cc Mon Sep 17 00:00:00 2001 From: ale25663 Date: Fri, 26 Sep 2025 12:36:35 +0200 Subject: [PATCH 8/9] refactor(pipeline): remove score-based filtering files and configurations --- .../example_filter_pipeline_config.yaml | 50 ---- .../score_based_filtering/__init__.py | 0 .../score_based_filtering/filter_pipeline.py | 245 ------------------ .../step_data_filtering.py | 98 ------- .../step_score_parsing.py | 125 --------- 5 files changed, 518 deletions(-) delete mode 100644 configs/data_processing/example_filter_pipeline_config.yaml delete mode 100644 src/ml_filter/data_processing/score_based_filtering/__init__.py delete mode 100644 src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py delete mode 100644 src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py delete mode 100644 src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py diff --git a/configs/data_processing/example_filter_pipeline_config.yaml b/configs/data_processing/example_filter_pipeline_config.yaml deleted file mode 100644 index 66f8e09c..00000000 --- a/configs/data_processing/example_filter_pipeline_config.yaml +++ /dev/null @@ -1,50 +0,0 @@ -params: - score_path: /data/horse/ws/alju972f-tokenization_at_scale/annotated_scores/sample/annotations - tokenized_data_path: /data/horse/ws/alju972f-tokenization_at_scale/eurolingua_tokenization/raw_data_tokenized/ - output_folder: /data/horse/ws/alju972f-tokenization_at_scale/annotated_scores/output - - thresholds: - score_Gemma_Snowflake: 3.0 - - hash_to_base_file_mapping_csv: /data/horse/ws/alju972f-tokenization_at_scale/annotated_scores/sample/hashes.csv - base_file_prefix: /data/horse/ws/s6690609-gptx_traindata/s6690609-gptx_traindata-1744239642/eurolingua/data/raw_data - tokenized_data_extension: .pbin - -params: - score_path: /data/horse/ws/alju972f-tokenization_at_scale/annotated_scores/annotations - tokenized_data_path: /data/horse/ws/alju972f-tokenization_at_scale/eurolingua_tokenization/raw_data_tokenized/ - output_folder: /data/horse/ws/alju972f-tokenization_at_scale/annotated_scores/output - - thresholds: - score_Mistral_Snowflake: 3.0 - - hash_to_base_file_mapping_csv: /data/horse/ws/alju972f-tokenization_at_scale/scripts/hashing/hashes_fineweb_final.csv - base_file_prefix: /data/horse/ws/s6690609-gptx_traindata/s6690609-gptx_traindata-1744239642/eurolingua/data/raw_data - tokenized_data_extension: .pbin - -# running_on_slurm: true - -# slurm_settings: -# tasks: 10 -# sbatch_args: -# account: "p_gptx" -# nodes: "1" -# ntasks: "1" -# partition: "barnard" -# time: "02:00:00" -# cpus_per_task: 40 -# mem_per_cpu: 2500 -# job_name: "filter_tokenized_data" -# output: /data/horse/ws/alju972f-tokenization_at_scale/final_run/logs/filtering/output_dir/%j.out -# error: /data/horse/ws/alju972f-tokenization_at_scale/final_run/logs/filtering/error_dir/%j.err -# venv_path: /data/horse/ws/alju972f-tokenization_at_scale/env_filtering/bin/activate - -running_on_slurm: false # set false to run locally - -local_settings: - tasks: 1 - local_tasks: 1 - local_rank_offset: 0 - logging_dir: null - - diff --git a/src/ml_filter/data_processing/score_based_filtering/__init__.py b/src/ml_filter/data_processing/score_based_filtering/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py b/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py deleted file mode 100644 index ec1e5410..00000000 --- a/src/ml_filter/data_processing/score_based_filtering/filter_pipeline.py +++ /dev/null @@ -1,245 +0,0 @@ -from __future__ import annotations - -import csv -import os -import sys -from pathlib import Path - -from datatrove.executor import LocalPipelineExecutor, SlurmPipelineExecutor -from datatrove.pipeline.base import PipelineStep -from pydantic import BaseModel, Field, model_validator -from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict, YamlConfigSettingsSource - -from ml_filter.data_processing.score_based_filtering.step_data_filtering import DataFiltering -from ml_filter.data_processing.score_based_filtering.step_score_parsing import ScoresParser - - -class FilterPipelineBuilder(BaseSettings): - """Configuration parameters and building for the score-based filtering pipeline. - This class defines the settings for running a data filtering pipeline that processes datasets based on scores. - It includes parameters for both local and Slurm execution environments. - The pipeline consists of steps for parsing scores and filtering datasets based on those scores. - - Besides initializing this class directly, it can also be configured using a YAML file or environment variables. - The YAML file can be specified using the `FILTER_PIPELINE_YAML_FILE` environment variable. - If no YAML file is provided, the class will use default settings and environment variables. - """ - - model_config = SettingsConfigDict(env_prefix="filter_pipeline_", env_nested_delimiter="__") - - # Pipeline configuration parameters - params: FilterPipelineParameters - - # Execution parameters - running_on_slurm: bool = False - local_settings: LocalExecutionSettings | None = None - slurm_settings: SlurmExecutionSettings | None = None - - @model_validator(mode="after") - def slurm_vs_local(self): - if self.running_on_slurm and self.local_settings is not None: - raise ValueError("Running on Slurm requires slurm execution settings, not local settings.") - if self.running_on_slurm and self.slurm_settings is None: - self.slurm_settings = SlurmExecutionSettings() - elif not self.running_on_slurm and self.slurm_settings is not None: - raise ValueError("Running locally requires local execution settings, not Slurm settings.") - if not self.running_on_slurm and self.local_settings is None: - self.local_settings = LocalExecutionSettings() - return self - - @model_validator(mode="after") - def set_logging_dir(self): - if self.local_settings is not None and self.local_settings.logging_dir is None: - self.local_settings.logging_dir = str(self.params.output_folder / "logs") - if self.slurm_settings is not None and self.slurm_settings.logging_dir is None: - self.slurm_settings.logging_dir = str(self.params.output_folder / "logs") - return self - - def build_pipeline_executor(self) -> LocalPipelineExecutor | SlurmPipelineExecutor: - """Builds the appropriate pipeline executor based on the execution settings.""" - pipeline = self._build_pipeline() - if self.running_on_slurm: - return SlurmPipelineExecutor(pipeline=pipeline, **self.slurm_settings.model_dump()) - else: - return LocalPipelineExecutor(pipeline=pipeline, **self.local_settings.model_dump()) - - def _build_pipeline(self) -> list[PipelineStep]: - """Builds the pipeline based on the provided configuration.""" - return build_pipeline( - score_path=self.params.score_path, - tokenized_data_path=self.params.tokenized_data_path, - output_folder=self.params.output_folder, - thresholds=self.params.thresholds, - hash_to_base_file_mapping_csv=self.params.hash_to_base_file_mapping_csv, - base_file_prefix=self.params.base_file_prefix, - tokenized_data_extension=self.params.tokenized_data_extension, - ) - - @classmethod - def settings_customise_sources( - cls, - settings_cls: type[BaseSettings], - init_settings: PydanticBaseSettingsSource, - env_settings: PydanticBaseSettingsSource, - dotenv_settings: PydanticBaseSettingsSource, - file_secret_settings: PydanticBaseSettingsSource, - ) -> tuple[PydanticBaseSettingsSource, ...]: - return ( - init_settings, - env_settings, - YamlConfigSettingsSource(settings_cls, yaml_file=os.getenv("FILTER_PIPELINE_YAML_FILE")), - dotenv_settings, - file_secret_settings, - ) - - -class FilterPipelineParameters(BaseModel): - """Parameters for the score-based filtering pipeline.""" - - score_path: Path = Field(..., description="The path to the directory containing JSONL files with scores.") - tokenized_data_path: Path = Field(..., description="The path for the tokenized data files.") - output_folder: Path = Field(..., description="The folder where the filtered datasets will be saved.") - thresholds: dict[str, float] = Field( - ..., description="Dictionary where keys are score names and values are thresholds to filter samples." - ) - hash_to_base_file_mapping_csv: Path = Field( - ..., description="CSV file mapping base file hashes to their corresponding paths." - ) - base_file_prefix: Path = Field( - default=Path(""), - description="The prefix path for the raw/base files. This prefix will be removed " - "when mapping from the raw files to the corresponding tokenized files", - ) - tokenized_data_extension: str = Field( - default=".pbin", description="The file extension for the tokenized data files." - ) - - -class LocalExecutionSettings(BaseModel): - """Settings for running the pipeline locally.""" - - tasks: int = 1 - local_tasks: int = 1 - local_rank_offset: int = 0 - logging_dir: str | None = None - - -class SlurmExecutionSettings(BaseModel): - """Settings for running the pipeline on a Slurm cluster.""" - - tasks: int = 1 - time: str = "00:15:00" - partition: str = "default" - account: str | None = None # FIXME is this supported? - cpus_per_task: int = 1 - mem_per_cpu_gb: int = 2 - workers: int = -1 - job_name: str = "data_processing" - qos: str = "normal" - env_command: str | None = None - condaenv: str | None = None - venv_path: str | None = None - sbatch_args: dict[str, str] | None = None - max_array_size: int = 1001 - depends_job_id: str | None = None - job_id_position: int = -1 - # job_id_retriever: Callable | None = None - logging_dir: str | None = None - skip_completed: bool = True - slurm_logs_folder: str | None = None - max_array_launch_parallel: bool = False - stagger_max_array_jobs: int = 0 - run_on_dependency_fail: bool = False - randomize_start_duration: int = 0 - requeue_signals: tuple[str] | None = ("SIGUSR1",) - mail_type: str = "ALL" - mail_user: str | None = None - requeue: bool = True - srun_args: dict[str, str] | None = None - tasks_per_job: int = 1 - - -def run_pipeline(args: FilterPipelineBuilder) -> None: - """Runs a datatrove pipeline to filter datasets based on scores. - Args: - args (PipelineArgs): The configuration parameters for the pipeline. - """ - executor = args.build_pipeline_executor() - executor.run() - - -def build_pipeline( - score_path: Path, - tokenized_data_path: Path, - output_folder: Path, - thresholds: dict[str, float], - hash_to_base_file_mapping_csv: Path, - base_file_prefix: Path = Path(""), - tokenized_data_extension: str = ".pbin", -) -> list[PipelineStep]: - """ - Builds a datatrove pipeline for filtering datasets based on scores. - Args: - score_path (Path): The path to the JSONL file containing scores. - tokenized_data_path (Path): The path for the tokenized data files. - output_folder (Path): The folder where the filtered datasets will be saved. - thresholds (dict[str, float]): A dictionary where keys are score names and values are the - thresholds to filter samples. - hash_to_base_file_mapping_csv (Path): A CSV file mapping base file hashes to their corresponding paths. - base_file_prefix (Path): The prefix path for the base files. - tokenized_data_extension (str): The file extension for the tokenized data files. - Returns: - list[PipelineStep]: A list containing the pipeline steps for filtering datasets. - """ - assert score_path.is_dir(), f"Score path {score_path} must be a directory." - assert output_folder.is_dir(), f"Output folder {output_folder} must be a directory." - assert len(thresholds) > 0, "At least one threshold must be provided." - assert ( - hash_to_base_file_mapping_csv.is_file() - ), f"Hash to base file mapping {hash_to_base_file_mapping_csv} must be a file." - hash_to_base_file = read_hash_to_base_file_mapping(hash_to_base_file_mapping_csv) - pipeline: list[PipelineStep] = [ - ScoresParser( - data_folder=str(score_path), - score_keys=list(thresholds.keys()), - hash_to_base_file=hash_to_base_file, - tokenized_data_path=tokenized_data_path, - base_file_prefix=base_file_prefix, - tokenized_data_extension=tokenized_data_extension, - ), - DataFiltering( - output_folder=output_folder, - thresholds=thresholds, - tokenized_data_path=tokenized_data_path, - ), - ] - return pipeline - - -def read_hash_to_base_file_mapping(csv_file: Path) -> dict[str, Path]: - """ - Reads a CSV file containing a mapping from base file hashes to their corresponding paths. - Args: - csv_file (Path): The path to the CSV file. - Returns: - dict[str, Path]: A dictionary mapping base file hashes to their corresponding paths. - """ - hash_to_base_file: dict[str, Path] = {} - with open(csv_file, "r") as f: - reader = csv.DictReader(f) - for row in reader: - hash_to_base_file[row["md5"]] = Path(row["file_path"]) - return hash_to_base_file - - -if __name__ == "__main__": - if len(sys.argv) > 1 or not (yaml_file := os.getenv("FILTER_PIPELINE_YAML_FILE")) or not os.path.isfile(yaml_file): - print( - "This script is intended to be used with a YAML configuration " - "file set via the environment variable `FILTER_PIPELINE_YAML_FILE`.\n" - "If you want to run it without a YAML file, please import from it " - "and use the FilterPipelineBuilder class directly." - ) - exit(1) - args = FilterPipelineBuilder() - run_pipeline(args) diff --git a/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py b/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py deleted file mode 100644 index 9f445316..00000000 --- a/src/ml_filter/data_processing/score_based_filtering/step_data_filtering.py +++ /dev/null @@ -1,98 +0,0 @@ -import dataclasses -import logging -from pathlib import Path -from typing import Callable - -import numpy as np -from datatrove.data import Document, DocumentsPipeline -from datatrove.pipeline.base import PipelineStep -from numpy.typing import NDArray - -from ml_filter.data_processing.score_based_filtering.step_score_parsing import ScoresParser - -try: - from modalities.dataloader.filter_packed_data import filter_dataset -except ImportError: - logging.error("The filtering pipeline requires the 'modalities' package to be installed.") - exit(1) - - -class DataFiltering(PipelineStep): - """ - A class to filter datasets based on scores and specified thresholds. - This class is designed to be used within a datatrove pipeline. - For a given list of score dictionaries, it filters the corresponding tokenized dataset files - based on the provided thresholds for each score. - The resulting filtered datasets are saved in the specified output folder. - Args: - output_folder (Path): The folder where the filtered datasets will be saved. - thresholds (dict[str, float]): A dictionary where keys are score names and values are the - thresholds to filter samples. - tokenized_data_path (Path): The path for the tokenized data files. - Raises: - AssertionError: If the output folder is not a directory or if no thresholds are provided. - """ - - name = "DataFiltering" - type = "Filter" - _requires_dependencies = [] - - def __init__(self, output_folder: Path, thresholds: dict[str, float], tokenized_data_path: Path = Path("")): - super().__init__() - self._output_folder = output_folder - assert self._output_folder.is_dir(), f"Output folder {self._output_folder} must be a directory." - self._thresholds = thresholds - assert len(self._thresholds) > 0, "At least one threshold must be provided." - self._tokenized_data_path = tokenized_data_path - - def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: - for document in data: - with self.track_time(): - self._filter_document(document) - yield document - - def _filter_document(self, document: Document): - """ - Filters a single, tokenized dataset based on the scores contained in the document. - Args: - document (Document): The document containing scores and the path to the tokenized data file. - Raises: - ValueError: If the document does not contain the required keys or if the tokenized file path is invalid. - """ - document: dict[str, list[dict[str, float]] | str] = dataclasses.asdict(document) - scores: list[dict[str, float]] = document["metadata"][ScoresParser.SCORE_ENTRIES_KEY] - tokenized_file = Path(document["metadata"][ScoresParser.TOKENIZED_FILE_KEY]) - output_path = self._prepare_output_path(tokenized_file) - filter_func = make_filter_func(scores, self._thresholds) - filter_dataset(src_path=tokenized_file, dst_path=output_path, filter_func=filter_func, sample_key="input_ids") - - def _prepare_output_path(self, tokenized_file: Path) -> Path: - tokenized_file_rel = tokenized_file.relative_to(self._tokenized_data_path) - output_path = self._output_folder / tokenized_file_rel.with_suffix(".filtered.pbin") - output_path.parent.mkdir(parents=True, exist_ok=True) - return output_path - - -def make_filter_func( - scores: list[dict[str, float]], thresholds: dict[str, float] -) -> Callable[[tuple[int, dict[str, NDArray[np.int_]]]], bool]: - """ - Creates a filter function that checks if the scores of each sample meet the specified thresholds. - Args: - scores (list[dict[str, float]]): A list of dictionaries containing scores for each sample. - thresholds (dict[str, float]): A dictionary where keys are score names and values are the thresholds to - filter samples. - Returns: - Callable[[tuple[int, dict[str, NDArray[np.int_]]]], bool]: A function that takes an item (index and - sample) and returns True if the sample meets the thresholds, otherwise False. - """ - - def filter_func(item: tuple[int, dict[str, NDArray[np.int_]]]) -> bool: - idx, _ = item - score_entry = scores[idx] - for score_key, threshold in thresholds.items(): - if score_entry[score_key] < threshold: - return False - return True - - return filter_func diff --git a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py b/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py deleted file mode 100644 index 3db1ae8f..00000000 --- a/src/ml_filter/data_processing/score_based_filtering/step_score_parsing.py +++ /dev/null @@ -1,125 +0,0 @@ -import json -from pathlib import Path -from typing import Callable, Iterable, Literal - -from datatrove.data import DocumentsPipeline -from datatrove.io import DataFileLike, DataFolderLike -from datatrove.pipeline.readers.base import BaseDiskReader - - -class ScoresParser(BaseDiskReader): - """ - A parser that reads a JSONL file containing scores for samples and maps them to the - corresponding tokenized data files. Each entry in the JSONL file is expected to have - a "document_id" field that contains a base file hash and an index, and the scores - for that sample. - """ - - name = "ScoresParser" - # type = "Parser" - _requires_dependencies = [] - - SCORE_ENTRIES_KEY = "score_entries" - TOKENIZED_FILE_KEY = "tokenized_file" - - def __init__( - self, - data_folder: DataFolderLike, - score_keys: Iterable[str], - hash_to_base_file: dict[str, Path], - tokenized_data_path: Path, - base_file_prefix: Path = Path(""), - tokenized_data_extension: str = ".pbin", - compression: Literal["infer", "gzip", "zstd"] | None = "infer", - paths_file: DataFileLike | None = None, - limit: int = -1, - skip: int = 0, - file_progress: bool = False, - doc_progress: bool = False, - adapter: Callable | None = None, - text_key: str = "text", - id_key: str = "id", - default_metadata: dict | None = None, - recursive: bool = True, - glob_pattern: str | None = None, - shuffle_files: bool = False, - ): - super().__init__( - data_folder=data_folder, - paths_file=paths_file, - limit=limit, - skip=skip, - file_progress=file_progress, - doc_progress=doc_progress, - adapter=adapter, - text_key=text_key, - id_key=id_key, - default_metadata=default_metadata, - recursive=recursive, - glob_pattern=glob_pattern, - shuffle_files=shuffle_files, - ) - self._score_keys = list(score_keys) - assert len(self._score_keys) > 0, "At least one score key must be provided." - self._hash_to_base_file = hash_to_base_file - self._tokenized_data_path = tokenized_data_path - self._base_file_prefix = base_file_prefix - self._tokenized_data_extension = tokenized_data_extension - self._compression = compression - - def read_file(self, filepath: str) -> DocumentsPipeline: - """ - Turns a given JSONL file into a Document object containing the path to the corresponding tokenized data file - and a list of dictionaries with the scores for each sample in the file. - Args: - filepath: path of the file to read - - Returns: generator of Document - """ - base_file_hash, scores_as_list = self._parse_scores_jsonl_file(filepath) - tokenized_data_path = self._map_to_tokenized_data_path(base_file_hash) - doc_content = { - "text": ".", # Text needs to be non-empty. - self.SCORE_ENTRIES_KEY: scores_as_list, - self.TOKENIZED_FILE_KEY: tokenized_data_path, - } - document = self.get_document_from_dict(doc_content, filepath, 0) - return [document] - - def _parse_scores_jsonl_file(self, filepath: str) -> tuple[str, list[dict[str, float]]]: - scores_for_idx: dict[int, dict[str, float]] = {} - hashes: set[str] = set() - with self.data_folder.open(filepath, "r", compression=self._compression) as f: - for line in f: - file_data = json.loads(line) - base_file_hash, document_idx = file_data["document_id"].rsplit("_", 1) - scores_for_idx[int(document_idx)] = {k: file_data[k] for k in self._score_keys} - hashes.add(base_file_hash) - self._verify_file_format(scores_for_idx, hashes) - scores_as_list = list(map(lambda x: x[1], sorted(scores_for_idx.items(), key=lambda x: x[0]))) - base_file_hash = next(iter(hashes)) - return base_file_hash, scores_as_list - - def _verify_file_format(self, scores_for_idx: dict[int, dict[str, float]], hashes: set[str]): - assert len(hashes) == 1, "All entries in the score file must refer to the same base file." - assert min(scores_for_idx.keys()) == 0 and max(scores_for_idx.keys()) + 1 == len( - scores_for_idx - ), "All indices in the score file must be continuous." - - def _map_to_tokenized_data_path(self, base_file_hash: str) -> Path: - """ - Maps a base file hash to the corresponding tokenized data path. - Args: - base_file_hash (str): The hash of the base file. - Returns: - Path: The path to the tokenized data file. - """ - if base_file_hash not in self._hash_to_base_file: - raise ValueError(f"Base file hash {base_file_hash} not found in the provided hash mapping.") - base_file = self._hash_to_base_file[base_file_hash] - base_file_rel = base_file.relative_to(self._base_file_prefix) - tokenized_rel = base_file_rel.with_suffix(self._tokenized_data_extension) - tokenized_data_path = self._tokenized_data_path / tokenized_rel - if not tokenized_data_path.exists(): - raise FileNotFoundError(f"Tokenized data file {tokenized_data_path} does not exist.") - return tokenized_data_path From 88435ffe6584973708edb8c0129d33c5a9fe5961 Mon Sep 17 00:00:00 2001 From: ale25663 Date: Fri, 26 Sep 2025 12:37:26 +0200 Subject: [PATCH 9/9] fix(dependencies): remove unused 'datatrove' dependency from project --- pyproject.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 508ace55..f598b939 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,8 +28,7 @@ dependencies = [ "seaborn", "python-dotenv", "jq", - "tabulate", - "datatrove", + "tabulate" ] [project.optional-dependencies]