From 0896382bc5830a140dbc4b1370c6e46f3bda731a Mon Sep 17 00:00:00 2001 From: alex-jude Date: Mon, 24 Nov 2025 12:57:53 +0100 Subject: [PATCH 01/11] feat: add metadata key customization for embeddings and labels --- .../lorem_ipsum_annotation_pipeline.yaml | 10 ++++- configs/annotation/lorem_ipsum_embedding.yaml | 6 ++- .../annotation/annotation_pipeline.py | 22 +++++++++- .../annotation/datatrove_jql_annotator.py | 40 ++++++++++++++----- .../annotation/embedding_pipeline.py | 14 ++++++- 5 files changed, 76 insertions(+), 16 deletions(-) diff --git a/configs/annotation/lorem_ipsum_annotation_pipeline.yaml b/configs/annotation/lorem_ipsum_annotation_pipeline.yaml index b56c8662..d25b66f4 100644 --- a/configs/annotation/lorem_ipsum_annotation_pipeline.yaml +++ b/configs/annotation/lorem_ipsum_annotation_pipeline.yaml @@ -8,11 +8,19 @@ params: Mistral_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/embedding_ablations/training/final #/raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-mistral-snowflake-balanced.ckpt batch_size: 1000 hdf5_dataset_name: train - output_keys: ["document_id", "score_Gemma_Snowflake", "score_Llama_Snowflake", "score_Mistral_Snowflake"] + + # Metadata key customization + embedding_key: embeddings + label_key: labels + document_id_key: document_ids + score_prefix: "score_" + + output_keys: [document_ids] model_dtype: bfloat16 embedding_dtype: bfloat16 label_dtype: bfloat16 compression: gzip + running_on_slurm: false local_settings: diff --git a/configs/annotation/lorem_ipsum_embedding.yaml b/configs/annotation/lorem_ipsum_embedding.yaml index 75bdb408..5d9a0179 100644 --- a/configs/annotation/lorem_ipsum_embedding.yaml +++ b/configs/annotation/lorem_ipsum_embedding.yaml @@ -21,11 +21,15 @@ params: embedding_model: jhu-clsp/mmBERT-base batch_size: 128 writer_batch_size: 1000 - hdf5_dataset_name: train + hdf5_dataset_name: fineweb2 save_labels: false max_length: 8192 padding: true truncation: true + # Metadata key customization + embedding_key: embeddings + label_key: labels + document_id_key: document_ids running_on_slurm: false diff --git a/src/ml_filter/annotation/annotation_pipeline.py b/src/ml_filter/annotation/annotation_pipeline.py index 83157d07..8beb8d15 100644 --- a/src/ml_filter/annotation/annotation_pipeline.py +++ b/src/ml_filter/annotation/annotation_pipeline.py @@ -88,6 +88,10 @@ class AnnotationPipelineParameters(BaseModel): embedding_dtype: str = Field(..., description="Storage dtype for embeddings (float32, float16, bfloat16->float32).") label_dtype: str | None = Field(..., description="Storage dtype for labels (e.g., int8, float32). Optional.") model_dtype: str = Field(..., description="Model compute dtype (float32, float16, bfloat16).") + embedding_key: str = Field("embedding", description="Metadata key for embedding vectors in HDF5 reader.") + document_id_key: str = Field("document_id", description="Metadata key for document id in reader output.") + label_key: str | None = Field("label", description="Metadata key for labels if present.") + score_prefix: str = Field("score_", description="Prefix applied to regression head score keys.") @property def annotated_output_dir(self) -> Path: @@ -161,6 +165,10 @@ def _p(name: str, default=None): embedding_dtype=_p("embedding_dtype"), label_dtype=_p("label_dtype"), model_dtype=_p("model_dtype"), + embedding_key=_p("embedding_key"), + document_id_key=_p("document_id_key"), + label_key=_p("label_key"), + score_prefix=_p("score_prefix"), ) local_settings_obj = None @@ -189,8 +197,17 @@ def build_pipeline(self) -> list[PipelineStep]: 'embedding_dtype': p.embedding_dtype, 'label_dtype': p.label_dtype, }, pipeline="annotation_pipeline") + base_output_keys = p.output_keys + dynamic_score_keys = [f"{p.score_prefix}{name}" for name in p.regression_head_checkpoints.keys()] + output_keys = list(dict.fromkeys(base_output_keys + dynamic_score_keys)) + pipeline = [ - JQLEmbeddingReader(data_folder=p.embeddings_directory, dataset_name=p.dataset_name), + JQLEmbeddingReader( + data_folder=p.embeddings_directory, + dataset_name=p.dataset_name, + embedding_key=p.embedding_key, + document_id_key=p.document_id_key, + ), JQLHead( regression_head_checkpoints=p.regression_head_checkpoints, batch_size=p.batch_size, @@ -199,10 +216,11 @@ def build_pipeline(self) -> list[PipelineStep]: "embedding_dtype": _resolved["embedding_dtype"], "label_dtype": _resolved["label_dtype"], }, + score_prefix=p.score_prefix, stats_writer=JsonlWriter( output_folder=str(p.annotated_output_dir), output_filename="${source_filename}.jsonl", - adapter=partial(stats_adapter, output_keys=p.output_keys), + adapter=partial(stats_adapter, output_keys=output_keys), expand_metadata=True, ), ), diff --git a/src/ml_filter/annotation/datatrove_jql_annotator.py b/src/ml_filter/annotation/datatrove_jql_annotator.py index 85d29bb5..1e924a3b 100644 --- a/src/ml_filter/annotation/datatrove_jql_annotator.py +++ b/src/ml_filter/annotation/datatrove_jql_annotator.py @@ -91,6 +91,8 @@ def __init__( glob_pattern: str | None = None, shuffle_files: bool = False, save_labels: bool = True, + document_id_key: str = "document_id", + label_key: str = "label", ): super().__init__( data_folder, @@ -109,6 +111,8 @@ def __init__( ) self.compression = compression self.save_labels = save_labels + self.document_id_key = document_id_key + self.label_key = label_key self.keys_to_index = keys_to_index self.n_keys = len(keys_to_index) @@ -146,12 +150,12 @@ def read_file(self, filepath: str): document = self.get_document_from_dict(orjson.loads(line), filepath, li) if not document: continue - document.metadata["document_id"] = self._combine_metadata_keys(document.metadata) + document.metadata[self.document_id_key] = self._combine_metadata_keys(document.metadata) document.metadata["source_filename"] = _get_file_path(document) if self.save_labels: # copy score into label for downstream consumers if enabled if "score" in document.metadata: - document.metadata["label"] = document.metadata["score"] + document.metadata[self.label_key] = document.metadata["score"] else: raise ValueError("No 'score' field found in document metadata to copy to 'label'.") except (EOFError, JSONDecodeError) as e: @@ -194,6 +198,7 @@ def __init__( truncation: bool | str, device_overwrite: Optional[str] = None, stats_writer: DiskWriter = None, + embedding_key: str = "embedding", ): super().__init__() self.embedder_model_id = embedder_model_id @@ -204,6 +209,7 @@ def __init__( self.padding = padding self.truncation = truncation self.model_dtype = model_dtype + self.embedding_key = embedding_key def run(self, doc_pipeline: DocumentsPipeline, rank: int = 0, world_size: int = 1, **kwargs) -> DocumentsPipeline: if not cuda.is_available(): @@ -234,7 +240,7 @@ def run(self, doc_pipeline: DocumentsPipeline, rank: int = 0, world_size: int = ) for idx, (doc, embedding) in enumerate(zip(doc_batch, embeddings)): # doc.metadata["source_filename"] = _get_file_path(doc) - doc.metadata["embedding"] = embedding + doc.metadata[self.embedding_key] = embedding if writer: writer.write(doc, rank) yield doc @@ -305,6 +311,9 @@ def __init__( dataset_name: str = "train", save_labels: bool = True, compression: str = None, + embedding_key: str = "embedding", + document_id_key: str = "document_id", + label_key: str = "label", ): super().__init__( output_folder, @@ -323,16 +332,19 @@ def __init__( self.dataset_name = dataset_name self.save_labels = save_labels self.compression = compression + self.embedding_key = embedding_key + self.document_id_key = document_id_key + self.label_key = label_key def _write_batch(self, filename: str): if not self._batches[filename]: return batch = self._batches.pop(filename) - embeddings = np.stack([doc["metadata"]["embedding"] for doc in batch], dtype=self.dtype_schema["embedding_dtype"]) - document_id = [doc["metadata"]["document_id"] for doc in batch] - if self.save_labels and "label" in batch[0]["metadata"]: - labels = np.array([doc["metadata"]["label"] for doc in batch], dtype=self.dtype_schema["label_dtype"]) + embeddings = np.stack([doc["metadata"][self.embedding_key] for doc in batch], dtype=self.dtype_schema["embedding_dtype"]) + document_id = [doc["metadata"][self.document_id_key] for doc in batch] + if self.save_labels and self.label_key in batch[0]["metadata"]: + labels = np.array([doc["metadata"][self.label_key] for doc in batch], dtype=self.dtype_schema["label_dtype"]) else: labels = None @@ -479,6 +491,8 @@ def __init__( recursive: bool = True, glob_pattern: str | None = "**/*.h5", shuffle_files: bool = False, + embedding_key: str = "embedding", + document_id_key: str = "document_id", ): super().__init__( data_folder=data_folder, @@ -496,6 +510,8 @@ def __init__( shuffle_files=shuffle_files, ) self.dataset_name = dataset_name + self.embedding_key = embedding_key + self.document_id_key = document_id_key def read_file(self, filepath: str): """ @@ -539,11 +555,11 @@ def read_file(self, filepath: str): with self.track_time(): doc_dict = { "id": str(i), - "embeddings": embeddings[i].tolist(), - "document_id": document_ids[i], + self.text_key: embeddings[i].tolist(), + self.document_id_key: document_ids[i], } doc = self.get_document_from_dict(doc_dict, filepath, i) - doc.metadata["document_id"] = document_ids[i].decode('utf-8') + doc.metadata[self.document_id_key] = document_ids[i].decode('utf-8') doc.metadata["source_filename"] = str(Path(doc.metadata.get("file_path")).relative_to(self.data_folder.path)) yield doc @@ -577,6 +593,7 @@ def __init__( device_overwrite: Optional[str] = None, stats_writer: DiskWriter = None, dtype_schema: Any = None, + score_prefix: str = "score_", ): super().__init__() if regression_head_checkpoints is None: @@ -588,6 +605,7 @@ def __init__( self.stats_writer = stats_writer self.output_keys = output_keys self.dtype_schema = dtype_schema + self.score_prefix = score_prefix def run(self, doc_pipeline: DocumentsPipeline, rank: int = 0, world_size: int = 1, **kwargs) -> DocumentsPipeline: """ @@ -632,7 +650,7 @@ def run(self, doc_pipeline: DocumentsPipeline, rank: int = 0, world_size: int = scores = {} with no_grad(): for name, regression_head in self.regression_heads.items(): - scores[f'score_{name}'] = regression_head(embeddings_tensor).logits.cpu().squeeze(1) + scores[f'{self.score_prefix}{name}'] = regression_head(embeddings_tensor).logits.cpu().squeeze(1) for batch_idx, doc in enumerate(doc_batch): for name, score in scores.items(): diff --git a/src/ml_filter/annotation/embedding_pipeline.py b/src/ml_filter/annotation/embedding_pipeline.py index 67e466db..747ee478 100644 --- a/src/ml_filter/annotation/embedding_pipeline.py +++ b/src/ml_filter/annotation/embedding_pipeline.py @@ -38,6 +38,9 @@ class EmbeddingPipelineParameters(BaseModel): padding: bool | str = Field(..., description="Padding strategy.") truncation: bool | str = Field(..., description="Truncation strategy.") save_labels: bool = Field(..., description="Copy score->label if present when writing.") + embedding_key: str = Field("embedding", description="Metadata key name for embedding vector.") + label_key: str = Field("label", description="Metadata key name for label value when saved.") + document_id_key: str = Field("document_id", description="Metadata key used for combined/document id.") @property def embedding_output_dir(self) -> Path: @@ -202,7 +205,10 @@ def _p(name: str, default=None): max_length=_p("max_length"), padding=_p("padding"), truncation=_p("truncation"), - save_labels=_p("save_labels") + save_labels=_p("save_labels"), + embedding_key=_p("embedding_key"), + label_key=_p("label_key"), + document_id_key=_p("document_id_key"), ) builder_kwargs = {"params": params, "running_on_slurm": rs} @@ -236,6 +242,8 @@ def build_pipeline(self) -> list[PipelineStep]: glob_pattern=p.glob_pattern, text_key=p.text_field, save_labels=p.save_labels, + document_id_key=p.document_id_key, + label_key=p.label_key, ), JQLEmbedder( embedder_model_id=p.embedding_model, @@ -255,7 +263,11 @@ def build_pipeline(self) -> list[PipelineStep]: "embedding_dtype": _resolved["embedding_dtype"], "label_dtype": _resolved["label_dtype"], }, + embedding_key=p.embedding_key, + document_id_key=p.document_id_key, + label_key=p.label_key, ), + embedding_key=p.embedding_key, ), ] return pipeline From f2e33c7ce49b6a3cd98c1384272681ac1fa09333 Mon Sep 17 00:00:00 2001 From: alex-jude Date: Mon, 24 Nov 2025 15:29:41 +0100 Subject: [PATCH 02/11] feat: add glob pattern parameter for embedding file matching --- configs/annotation/lorem_ipsum_annotation_pipeline.yaml | 5 +++-- configs/annotation/lorem_ipsum_embedding.yaml | 3 ++- src/ml_filter/annotation/annotation_pipeline.py | 3 +++ 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/configs/annotation/lorem_ipsum_annotation_pipeline.yaml b/configs/annotation/lorem_ipsum_annotation_pipeline.yaml index d25b66f4..7a8f568e 100644 --- a/configs/annotation/lorem_ipsum_annotation_pipeline.yaml +++ b/configs/annotation/lorem_ipsum_annotation_pipeline.yaml @@ -1,4 +1,5 @@ params: + glob_pattern: "**/*.h5" embeddings_directory: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output/validation_embeddings output_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output/annotations @@ -7,14 +8,14 @@ params: Llama_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/embedding_ablations/training/final #/raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-llama-snowflake-balanced.ckpt Mistral_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/embedding_ablations/training/final #/raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-mistral-snowflake-balanced.ckpt batch_size: 1000 - hdf5_dataset_name: train + hdf5_dataset_name: fineweb2 # Metadata key customization embedding_key: embeddings label_key: labels document_id_key: document_ids score_prefix: "score_" - + output_keys: [document_ids] model_dtype: bfloat16 embedding_dtype: bfloat16 diff --git a/configs/annotation/lorem_ipsum_embedding.yaml b/configs/annotation/lorem_ipsum_embedding.yaml index 5d9a0179..0147b584 100644 --- a/configs/annotation/lorem_ipsum_embedding.yaml +++ b/configs/annotation/lorem_ipsum_embedding.yaml @@ -10,7 +10,7 @@ params: text_field: text keys_to_index: ["id", "aggregation_type"] embedding_dir: ${dataset_name}_embeddings - compression: gzip + compression: null # Precision embedding_dtype: float32 @@ -26,6 +26,7 @@ params: max_length: 8192 padding: true truncation: true + # Metadata key customization embedding_key: embeddings label_key: labels diff --git a/src/ml_filter/annotation/annotation_pipeline.py b/src/ml_filter/annotation/annotation_pipeline.py index 8beb8d15..1197e509 100644 --- a/src/ml_filter/annotation/annotation_pipeline.py +++ b/src/ml_filter/annotation/annotation_pipeline.py @@ -78,6 +78,7 @@ def _normalize_sbatch(cls, values): # --------------------------------------------------------------------------- class AnnotationPipelineParameters(BaseModel): + glob_pattern: str = Field(..., description="Glob pattern to match embedding files in the embeddings directory.") embeddings_directory: str = Field(..., description="Path to directory containing HDF5 embedding files.") output_keys: list[str] = Field(..., description="List of metadata keys to include in the annotated output files.") output_dir: Path = Field(..., description="Output directory for annotated JSONL files.") @@ -155,6 +156,7 @@ def _p(name: str, default=None): return params_cfg.get(name, default) params = AnnotationPipelineParameters( + glob_pattern=_p("glob_pattern"), embeddings_directory=_p("embeddings_directory"), output_dir=_p("output_dir"), output_keys=_p("output_keys"), @@ -207,6 +209,7 @@ def build_pipeline(self) -> list[PipelineStep]: dataset_name=p.dataset_name, embedding_key=p.embedding_key, document_id_key=p.document_id_key, + glob_pattern=p.glob_pattern, ), JQLHead( regression_head_checkpoints=p.regression_head_checkpoints, From 549721e23568bb6401e6482804d971afd54553aa Mon Sep 17 00:00:00 2001 From: alex-jude Date: Mon, 24 Nov 2025 15:51:54 +0100 Subject: [PATCH 03/11] feat: replace hardcoded dataset keys with customizable attributes in HDF5Writer and JQLEmbeddingReader --- .../annotation/datatrove_jql_annotator.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/ml_filter/annotation/datatrove_jql_annotator.py b/src/ml_filter/annotation/datatrove_jql_annotator.py index 1e924a3b..a3b39c8c 100644 --- a/src/ml_filter/annotation/datatrove_jql_annotator.py +++ b/src/ml_filter/annotation/datatrove_jql_annotator.py @@ -357,21 +357,21 @@ def _write_batch(self, filename: str): maxshape_emb = (None, embeddings.shape[1]) maxshape_ids = (None,) embeddings_dataset = group.create_dataset( - "embeddings", shape=(0, embeddings.shape[1]), maxshape=maxshape_emb, compression=self.compression, dtype=self.dtype_schema["embedding_dtype"] + self.embedding_key, shape=(0, embeddings.shape[1]), maxshape=maxshape_emb, compression=self.compression, dtype=self.dtype_schema["embedding_dtype"] ) dt = h5py.string_dtype(encoding="utf-8") - document_ids_dataset = group.create_dataset("document_id", shape=(0,), maxshape=maxshape_ids, compression=self.compression, dtype=dt) + document_ids_dataset = group.create_dataset(self.document_id_key, shape=(0,), maxshape=maxshape_ids, compression=self.compression, dtype=dt) labels_dataset = None if labels is not None: maxshape_labels = (None,) if labels.ndim == 1 else (None, labels.shape[1]) labels_dataset = group.create_dataset( - "labels", shape=(0,) if labels.ndim == 1 else (0, labels.shape[1]), maxshape=maxshape_labels, compression=self.compression, dtype=self.dtype_schema["label_dtype"] + self.label_key, shape=(0,) if labels.ndim == 1 else (0, labels.shape[1]), maxshape=maxshape_labels, compression=self.compression, dtype=self.dtype_schema["label_dtype"] ) else: group = file[group_name] - embeddings_dataset = group["embeddings"] - document_ids_dataset = group["document_id"] - labels_dataset = group.get("labels") if "labels" in group else None + embeddings_dataset = group[self.embedding_key] + document_ids_dataset = group[self.document_id_key] + labels_dataset = group.get(self.label_key) if self.label_key in group else None # Resize for new batch once and append the current in-memory batch to the on-disk datasets. # 1. Capture current length (current_row_count) @@ -391,7 +391,7 @@ def _write_batch(self, filename: str): # Create labels dataset now if first time labels appear maxshape_labels = (None,) if labels.ndim == 1 else (None, labels.shape[1]) labels_dataset = group.create_dataset( - "labels", shape=(0,) if labels.ndim == 1 else (0, labels.shape[1]), maxshape=maxshape_labels, compression=self.compression, dtype=self.dtype_schema["label_dtype"] + self.label_key, shape=(0,) if labels.ndim == 1 else (0, labels.shape[1]), maxshape=maxshape_labels, compression=self.compression, dtype=self.dtype_schema["label_dtype"] ) labels_dataset.resize(updated_row_count, axis=0) labels_dataset[current_row_count:updated_row_count] = labels @@ -538,8 +538,8 @@ def read_file(self, filepath: str): raise KeyError(f"Dataset '{self.dataset_name}' not found in {filepath}") grp = f[self.dataset_name] - embeddings = torch.from_numpy(grp["embeddings"][:]).float() - document_ids = grp["document_id"][:] + embeddings = torch.from_numpy(grp[self.embedding_key][:]).float() + document_ids = grp[self.document_id_key][:] if len(embeddings) != len(document_ids): raise ValueError( From 86e76eeae1b3ed668bd57464c336fdf5addc9275 Mon Sep 17 00:00:00 2001 From: alex-jude Date: Tue, 25 Nov 2025 00:21:53 +0100 Subject: [PATCH 04/11] feat: add customizable keys and glob patterns for embedding and document ID in annotation tests --- tests/annotation/test_annotation_pipeline.py | 6 +++++- tests/annotation/test_embedding_pipeline.py | 11 +++++++---- tests/annotation/test_jql_embedder.py | 4 ++-- tests/annotation/test_jql_embedding_reader.py | 8 ++++---- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/tests/annotation/test_annotation_pipeline.py b/tests/annotation/test_annotation_pipeline.py index c1afc0fe..ebf1400d 100644 --- a/tests/annotation/test_annotation_pipeline.py +++ b/tests/annotation/test_annotation_pipeline.py @@ -43,6 +43,10 @@ def setUp(self): OmegaConf.save(config=OmegaConf.create({ "params": { "embeddings_directory": self.embeddings_dir, + "embedding_key": "embeddings", + "document_id_key": "document_id", + "glob_pattern": "**/*.h5", + "score_prefix": "score_", "regression_head_checkpoints": { "Edu-JQL-Mistral-SF": mistral_ckpt_path }, @@ -76,7 +80,7 @@ def test_config_loading(self): def test_jql_embedding_reader(self): from ml_filter.annotation.datatrove_jql_annotator import JQLEmbeddingReader - reader = JQLEmbeddingReader(data_folder=self.embeddings_dir, dataset_name="train") + reader = JQLEmbeddingReader(data_folder=self.embeddings_dir, dataset_name="train", embedding_key="embeddings", document_id_key="document_id") docs = list(reader.run()) self.assertGreater(len(docs), 0) diff --git a/tests/annotation/test_embedding_pipeline.py b/tests/annotation/test_embedding_pipeline.py index f2367c55..ff209baf 100644 --- a/tests/annotation/test_embedding_pipeline.py +++ b/tests/annotation/test_embedding_pipeline.py @@ -66,7 +66,7 @@ def setUp(self): "input_dir": self.input_dir, "output_dir": self.output_dir, "keys_to_index": ["document_id"], - "embedding_dir": "embeddings", # required by builder for output path + "embedding_dir": "embeddings", "glob_pattern": "*.jsonl", "embedding_model": "Snowflake/snowflake-arctic-embed-m-v2.0", "hdf5_dataset_name": "train", @@ -80,11 +80,14 @@ def setUp(self): "embedding_dtype": "float32", "model_dtype": "bfloat16", "label_dtype": "bfloat16", + "embedding_key": "embeddings", + "label_key": "label", + "document_id_key": "document_id", }, "local_settings": { - "tasks": 2, - "workers": -1, - "local_tasks": 2, + "tasks": 1, + "workers": 1, + "local_tasks": 1, "local_rank_offset": 0, }, "slurm_settings": None, diff --git a/tests/annotation/test_jql_embedder.py b/tests/annotation/test_jql_embedder.py index 2ee14280..342f36dd 100644 --- a/tests/annotation/test_jql_embedder.py +++ b/tests/annotation/test_jql_embedder.py @@ -88,10 +88,10 @@ def test_write_and_verify_hdf5_output(self): with h5py.File(h5_path, "r") as f: self.assertIn("train", f) group = f["train"] - self.assertIn("embeddings", group) + self.assertIn("embedding", group) self.assertIn("document_id", group) - embeddings = group["embeddings"][:] + embeddings = group["embedding"][:] doc_ids = group["document_id"][:] self.assertEqual(embeddings.shape[0], len(self.input_docs)) diff --git a/tests/annotation/test_jql_embedding_reader.py b/tests/annotation/test_jql_embedding_reader.py index 4c3ce323..1159a9f4 100644 --- a/tests/annotation/test_jql_embedding_reader.py +++ b/tests/annotation/test_jql_embedding_reader.py @@ -41,22 +41,22 @@ def check_same_data(self, documents, expected_embeddings, expected_document_id, np.testing.assert_equal(doc.metadata["document_id"], expected_document_id[i]) def test_read(self): - reader = JQLEmbeddingReader(self.tmp_dir, "train") + reader = JQLEmbeddingReader(self.tmp_dir, "train", embedding_key="embeddings", document_id_key="document_id") documents = list(reader.run()) self.check_same_data(documents, self.embeddings, self.labels) def test_read_with_limit(self): - reader = JQLEmbeddingReader(self.tmp_dir, "train", limit=2) + reader = JQLEmbeddingReader(self.tmp_dir, "train", limit=2, embedding_key="embeddings", document_id_key="document_id") documents = list(reader.run()) self.check_same_data(documents, self.embeddings, self.labels, limit=2) def test_read_with_skip(self): - reader = JQLEmbeddingReader(self.tmp_dir, "train", skip=1) + reader = JQLEmbeddingReader(self.tmp_dir, "train", skip=1, embedding_key="embeddings", document_id_key="document_id") documents = list(reader.run()) self.check_same_data(documents, self.embeddings, self.labels, skip=1) def test_read_with_limit_and_skip(self): - reader = JQLEmbeddingReader(self.tmp_dir, "train", skip=1, limit=1) + reader = JQLEmbeddingReader(self.tmp_dir, "train", skip=1, limit=1, embedding_key="embeddings", document_id_key="document_id") documents = list(reader.run()) self.check_same_data(documents, self.embeddings, self.labels, skip=1, limit=1) From 7086e9ed03023259726dc8ac5bb4f60e45695903 Mon Sep 17 00:00:00 2001 From: alex-jude Date: Tue, 25 Nov 2025 00:40:37 +0100 Subject: [PATCH 05/11] feat: update annotation pipeline configurations with glob pattern and metadata key customization --- ...lorem_ipsum_annotation_pipeline_slurm.yaml | 23 +++++++++++++++---- .../lorem_ipsum_embedding_pipeline_slurm.yaml | 5 ++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/configs/annotation/lorem_ipsum_annotation_pipeline_slurm.yaml b/configs/annotation/lorem_ipsum_annotation_pipeline_slurm.yaml index 980e0da1..54e68a55 100644 --- a/configs/annotation/lorem_ipsum_annotation_pipeline_slurm.yaml +++ b/configs/annotation/lorem_ipsum_annotation_pipeline_slurm.yaml @@ -1,11 +1,26 @@ params: - embeddings_directory: /raid/s3/opengptx/jude/repos/ml_filter/data/input_dir_hierarchy - output_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/embedding_output_dir/annotations_new/ + glob_pattern: "**/*.h5" + embeddings_directory: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output/validation_embeddings + output_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output/annotations regression_head_checkpoints: - Gemma_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-gemma-snowflake-balanced.ckpt - Llama_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-llama-snowflake-balanced.ckpt + Gemma_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/embedding_ablations/training/final #/raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-gemma-snowflake-balanced.ckpt + Llama_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/embedding_ablations/training/final #/raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-llama-snowflake-balanced.ckpt + Mistral_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/embedding_ablations/training/final #/raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-mistral-snowflake-balanced.ckpt batch_size: 1000 + hdf5_dataset_name: fineweb2 + + # Metadata key customization + embedding_key: embeddings + label_key: labels + document_id_key: document_ids + score_prefix: "score_" + + output_keys: [document_ids] + model_dtype: bfloat16 + embedding_dtype: bfloat16 + label_dtype: bfloat16 + compression: gzip running_on_slurm: true diff --git a/configs/annotation/lorem_ipsum_embedding_pipeline_slurm.yaml b/configs/annotation/lorem_ipsum_embedding_pipeline_slurm.yaml index ac0f19eb..cb3c8fd5 100644 --- a/configs/annotation/lorem_ipsum_embedding_pipeline_slurm.yaml +++ b/configs/annotation/lorem_ipsum_embedding_pipeline_slurm.yaml @@ -17,6 +17,11 @@ params: label_dtype: int8 model_dtype: bfloat16 + # Metadata key customization + embedding_key: embeddings + label_key: labels + document_id_key: document_ids + # Model and embedding parameters embedding_model: jhu-clsp/mmBERT-base batch_size: 512 From dee468c54bee6893028f070af8d173ce4f080917 Mon Sep 17 00:00:00 2001 From: ale25663 Date: Tue, 25 Nov 2025 10:31:11 +0100 Subject: [PATCH 06/11] feat: update Slurm execution settings and adjust paths in annotation pipeline configurations --- ...lorem_ipsum_annotation_pipeline_slurm.yaml | 15 ++++---- .../lorem_ipsum_embedding_pipeline_slurm.yaml | 16 ++++---- .../annotation/annotation_pipeline.py | 37 +++++++++++-------- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/configs/annotation/lorem_ipsum_annotation_pipeline_slurm.yaml b/configs/annotation/lorem_ipsum_annotation_pipeline_slurm.yaml index 54e68a55..574f56b7 100644 --- a/configs/annotation/lorem_ipsum_annotation_pipeline_slurm.yaml +++ b/configs/annotation/lorem_ipsum_annotation_pipeline_slurm.yaml @@ -1,12 +1,12 @@ params: glob_pattern: "**/*.h5" - embeddings_directory: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output/validation_embeddings - output_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output/annotations + embeddings_directory: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/embedding/training_embeddings + output_dir: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/annotations regression_head_checkpoints: - Gemma_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/embedding_ablations/training/final #/raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-gemma-snowflake-balanced.ckpt - Llama_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/embedding_ablations/training/final #/raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-llama-snowflake-balanced.ckpt - Mistral_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/embedding_ablations/training/final #/raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-mistral-snowflake-balanced.ckpt + Gemma_Snowflake: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/final_regression_head + Llama_Snowflake: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/final_regression_head + Mistral_Snowflake: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/final_regression_head batch_size: 1000 hdf5_dataset_name: fineweb2 @@ -36,11 +36,10 @@ slurm_settings: time: "04:00:00" cpus_per_task: 8 mem_per_cpu_gb: 2 - gpus_per_task: 1 job_name: "MMbert_embedder" output: /data/cat/ws/alju972f-regression_heads/dataset/mmbet_embeddings/mmber_logs/%j.out error: /data/cat/ws/alju972f-regression_heads/dataset/mmbet_embeddings/mmber_logs/%j.err qos: "normal" venv_path: /data/cat/ws/alju972f-regression_heads/envs/env_regression_heads/bin/activate - tasks: 10 - workers: 1001 + tasks: 1 + workers: 1 diff --git a/configs/annotation/lorem_ipsum_embedding_pipeline_slurm.yaml b/configs/annotation/lorem_ipsum_embedding_pipeline_slurm.yaml index cb3c8fd5..dd018853 100644 --- a/configs/annotation/lorem_ipsum_embedding_pipeline_slurm.yaml +++ b/configs/annotation/lorem_ipsum_embedding_pipeline_slurm.yaml @@ -3,14 +3,14 @@ dataset_name: training params: # File selection glob_pattern: "**/*.jsonl" - input_dir: /data/cat/ws/alju972f-regression_heads/repos/data/embedding_creation/input #/data/cat/ws/alju972f-regression_heads/dataset/Regression_head/abbas/processed_data_natural/${dataset_name}_set + input_dir: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/input # Output - output_dir: /data/cat/ws/alju972f-regression_heads/repos/data/embedding_creation/output + output_dir: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/embedding keys_to_index: ["id", "aggregation_type"] text_field: text embedding_dir: ${dataset_name}_embeddings - compression: gzip + compression: null # Precision embedding_dtype: float32 @@ -23,7 +23,7 @@ params: document_id_key: document_ids # Model and embedding parameters - embedding_model: jhu-clsp/mmBERT-base + embedding_model: Snowflake/snowflake-arctic-embed-m-v2.0 batch_size: 512 writer_batch_size: 1000 hdf5_dataset_name: train @@ -41,13 +41,11 @@ slurm_settings: account: "p_gptx" nodes: 1 ntasks: 1 - gres: gpu:4 - exclusive: user + gres: gpu:1 partition: "capella" time: "04:00:00" - cpus_per_task: 32 - mem_per_cpu_gb: 8 - gpus_per_task: 4 + cpus_per_task: 8 + mem_per_cpu_gb: 2 job_name: "MMbert_embedder" output: ${params.output_dir}/logs/%j.out error: ${params.output_dir}/logs/%j.err diff --git a/src/ml_filter/annotation/annotation_pipeline.py b/src/ml_filter/annotation/annotation_pipeline.py index 1197e509..58df6ecb 100644 --- a/src/ml_filter/annotation/annotation_pipeline.py +++ b/src/ml_filter/annotation/annotation_pipeline.py @@ -38,21 +38,28 @@ class SlurmExecutionSettings(BaseModel): workers: int job_name: str qos: str - env_command: Optional[str] - condaenv: Optional[str] - venv_path: Optional[str] - sbatch_args: Optional[dict[str, str | int | float | bool]] - max_array_size: int - depends_job_id: Optional[str] - job_id_position: int - logging_dir: Optional[str] - skip_completed: bool - slurm_logs_folder: Optional[str] - mail_type: str - mail_user: Optional[str] - requeue: bool - srun_args: Optional[dict[str, str | int | float | bool]] - tasks_per_job: int + env_command: str | None = None + condaenv: str | None = None + venv_path: str | None = None + # Allow users to supply any sbatch arg (e.g. nodes, ntasks, gres, account, output, error, gpus-per-task, etc.) + # using either snake_case or dash-case. Primitive values get coerced to strings. + sbatch_args: dict[str, str | int | float | bool] | None = None + max_array_size: int = 1001 + depends_job_id: str | None = None + job_id_position: int = -1 + logging_dir: str | None = None + skip_completed: bool = True + slurm_logs_folder: str | None = None + max_array_launch_parallel: bool = False + stagger_max_array_jobs: int = 0 + run_on_dependency_fail: bool = False + randomize_start_duration: int = 0 + requeue_signals: tuple[str] | None = ("SIGUSR1",) + mail_type: str = "ALL" + mail_user: str | None = None + requeue: bool = True + srun_args: dict[str, str | int | float | bool] | None = None + tasks_per_job: int = 1 @model_validator(mode="before") def _normalize_sbatch(cls, values): From b7f79001fe3ed6a3a78577ac7931c0623250a369 Mon Sep 17 00:00:00 2001 From: alex-jude Date: Tue, 2 Dec 2025 12:32:27 +0100 Subject: [PATCH 07/11] feat: update dataset configuration and metadata handling in annotation pipeline --- configs/annotation/lorem_ipsum_embedding.yaml | 16 ++++++++-------- .../annotation/datatrove_jql_annotator.py | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/configs/annotation/lorem_ipsum_embedding.yaml b/configs/annotation/lorem_ipsum_embedding.yaml index 0147b584..d0c6af1a 100644 --- a/configs/annotation/lorem_ipsum_embedding.yaml +++ b/configs/annotation/lorem_ipsum_embedding.yaml @@ -1,14 +1,14 @@ -dataset_name: validation +dataset_name: train params: # File selection glob_pattern: "**/*.jsonl" - input_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/input #/raid/s3/opengptx/abbas/processed_data_natural/${dataset_name}_set + input_dir: /raid/s3/opengptx/jude/repos/ml_filter/soofi_classifier/sampled_output/Llama-3.3-70B-Instruct_aggregated_score/training_set #/raid/s3/opengptx/abbas/processed_data_natural/${dataset_name}_set # Output - output_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output + output_dir: /raid/s3/opengptx/jude/repos/ml_filter/soofi_classifier/embeddings/llama text_field: text - keys_to_index: ["id", "aggregation_type"] + keys_to_index: ["id"] embedding_dir: ${dataset_name}_embeddings compression: null @@ -18,18 +18,18 @@ params: model_dtype: bfloat16 # Model and embedding parameters - embedding_model: jhu-clsp/mmBERT-base - batch_size: 128 + embedding_model: Snowflake/snowflake-arctic-embed-m-v2.0 #jhu-clsp/mmBERT-base + batch_size: 256 writer_batch_size: 1000 hdf5_dataset_name: fineweb2 - save_labels: false + save_labels: true max_length: 8192 padding: true truncation: true # Metadata key customization embedding_key: embeddings - label_key: labels + label_key: Llama-3.3-70B-Instruct_aggregated_score document_id_key: document_ids running_on_slurm: false diff --git a/src/ml_filter/annotation/datatrove_jql_annotator.py b/src/ml_filter/annotation/datatrove_jql_annotator.py index a3b39c8c..0e6028da 100644 --- a/src/ml_filter/annotation/datatrove_jql_annotator.py +++ b/src/ml_filter/annotation/datatrove_jql_annotator.py @@ -41,7 +41,7 @@ def _get_file_path(doc: Document) -> str: Returns: str: The base filename without extension. Defaults to 'default' if no file_path is present. """ - return Path(doc.metadata.get("file_path", "default.jsonl")).stem + return Path(doc.metadata.get("source_filename", "default.jsonl")).stem class JQLJsonlReader(BaseDiskReader): @@ -151,7 +151,7 @@ def read_file(self, filepath: str): if not document: continue document.metadata[self.document_id_key] = self._combine_metadata_keys(document.metadata) - document.metadata["source_filename"] = _get_file_path(document) + document.metadata["source_filename"] = filepath if self.save_labels: # copy score into label for downstream consumers if enabled if "score" in document.metadata: @@ -239,7 +239,7 @@ def run(self, doc_pipeline: DocumentsPipeline, rank: int = 0, world_size: int = truncation=self.truncation, ) for idx, (doc, embedding) in enumerate(zip(doc_batch, embeddings)): - # doc.metadata["source_filename"] = _get_file_path(doc) + doc.metadata["source_filename"] = _get_file_path(doc) doc.metadata[self.embedding_key] = embedding if writer: writer.write(doc, rank) From e496ef872805c339d8c1471fabeea2279498c1c3 Mon Sep 17 00:00:00 2001 From: alex-jude Date: Tue, 2 Dec 2025 20:29:12 +0100 Subject: [PATCH 08/11] feat: update metadata key customization in lorem_ipsum_embedding.yaml --- configs/annotation/lorem_ipsum_embedding.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/configs/annotation/lorem_ipsum_embedding.yaml b/configs/annotation/lorem_ipsum_embedding.yaml index d0c6af1a..ab4a9801 100644 --- a/configs/annotation/lorem_ipsum_embedding.yaml +++ b/configs/annotation/lorem_ipsum_embedding.yaml @@ -28,9 +28,9 @@ params: truncation: true # Metadata key customization - embedding_key: embeddings - label_key: Llama-3.3-70B-Instruct_aggregated_score - document_id_key: document_ids + embedding_key: embedding + label_key: score + document_id_key: document_id running_on_slurm: false From 6b902010e692f2750352dd32e4f9453d1a5d197e Mon Sep 17 00:00:00 2001 From: alex-jude Date: Mon, 8 Dec 2025 11:58:42 +0100 Subject: [PATCH 09/11] feat: add label_field parameter for score handling in embedding pipeline and JQL reader --- configs/annotation/lorem_ipsum_embedding.yaml | 3 ++- src/ml_filter/annotation/datatrove_jql_annotator.py | 11 +++++++---- src/ml_filter/annotation/embedding_pipeline.py | 3 +++ 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/configs/annotation/lorem_ipsum_embedding.yaml b/configs/annotation/lorem_ipsum_embedding.yaml index ab4a9801..59ffb052 100644 --- a/configs/annotation/lorem_ipsum_embedding.yaml +++ b/configs/annotation/lorem_ipsum_embedding.yaml @@ -6,8 +6,9 @@ params: input_dir: /raid/s3/opengptx/jude/repos/ml_filter/soofi_classifier/sampled_output/Llama-3.3-70B-Instruct_aggregated_score/training_set #/raid/s3/opengptx/abbas/processed_data_natural/${dataset_name}_set # Output - output_dir: /raid/s3/opengptx/jude/repos/ml_filter/soofi_classifier/embeddings/llama + output_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/output_testing text_field: text + label_field: score keys_to_index: ["id"] embedding_dir: ${dataset_name}_embeddings compression: null diff --git a/src/ml_filter/annotation/datatrove_jql_annotator.py b/src/ml_filter/annotation/datatrove_jql_annotator.py index 0e6028da..dfe351dd 100644 --- a/src/ml_filter/annotation/datatrove_jql_annotator.py +++ b/src/ml_filter/annotation/datatrove_jql_annotator.py @@ -93,6 +93,7 @@ def __init__( save_labels: bool = True, document_id_key: str = "document_id", label_key: str = "label", + label_field: str = "score", ): super().__init__( data_folder, @@ -113,6 +114,7 @@ def __init__( self.save_labels = save_labels self.document_id_key = document_id_key self.label_key = label_key + self.label_field = label_field self.keys_to_index = keys_to_index self.n_keys = len(keys_to_index) @@ -147,17 +149,18 @@ def read_file(self, filepath: str): for li, line in enumerate(f): with self.track_time(): try: - document = self.get_document_from_dict(orjson.loads(line), filepath, li) + raw_record = orjson.loads(line) + document = self.get_document_from_dict(raw_record, filepath, li) if not document: continue document.metadata[self.document_id_key] = self._combine_metadata_keys(document.metadata) document.metadata["source_filename"] = filepath if self.save_labels: # copy score into label for downstream consumers if enabled - if "score" in document.metadata: - document.metadata[self.label_key] = document.metadata["score"] + if self.label_field in document.metadata: + document.metadata[self.label_key] = document.metadata[self.label_field] else: - raise ValueError("No 'score' field found in document metadata to copy to 'label'.") + raise ValueError(f"No '{self.label_field}' field found in document metadata to copy to '{self.label_key}'.") except (EOFError, JSONDecodeError) as e: logger.warning(f"Error when reading `{filepath}`: {e}") continue diff --git a/src/ml_filter/annotation/embedding_pipeline.py b/src/ml_filter/annotation/embedding_pipeline.py index 747ee478..4ba7c452 100644 --- a/src/ml_filter/annotation/embedding_pipeline.py +++ b/src/ml_filter/annotation/embedding_pipeline.py @@ -38,6 +38,7 @@ class EmbeddingPipelineParameters(BaseModel): padding: bool | str = Field(..., description="Padding strategy.") truncation: bool | str = Field(..., description="Truncation strategy.") save_labels: bool = Field(..., description="Copy score->label if present when writing.") + label_field: str = Field("score", description="Field name in the JSONL input that stores the label value.") embedding_key: str = Field("embedding", description="Metadata key name for embedding vector.") label_key: str = Field("label", description="Metadata key name for label value when saved.") document_id_key: str = Field("document_id", description="Metadata key used for combined/document id.") @@ -206,6 +207,7 @@ def _p(name: str, default=None): padding=_p("padding"), truncation=_p("truncation"), save_labels=_p("save_labels"), + label_field=_p("label_field", "score"), embedding_key=_p("embedding_key"), label_key=_p("label_key"), document_id_key=_p("document_id_key"), @@ -242,6 +244,7 @@ def build_pipeline(self) -> list[PipelineStep]: glob_pattern=p.glob_pattern, text_key=p.text_field, save_labels=p.save_labels, + label_field=p.label_field, document_id_key=p.document_id_key, label_key=p.label_key, ), From 295725fb3e78be4f2e4db8df873871cf7e4cd990 Mon Sep 17 00:00:00 2001 From: alex-jude Date: Tue, 9 Dec 2025 10:28:20 +0100 Subject: [PATCH 10/11] feat: remove compression parameter and adjust local settings in lorem_ipsum_embedding.yaml --- configs/annotation/lorem_ipsum_embedding.yaml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/configs/annotation/lorem_ipsum_embedding.yaml b/configs/annotation/lorem_ipsum_embedding.yaml index 59ffb052..3ba97049 100644 --- a/configs/annotation/lorem_ipsum_embedding.yaml +++ b/configs/annotation/lorem_ipsum_embedding.yaml @@ -11,7 +11,6 @@ params: label_field: score keys_to_index: ["id"] embedding_dir: ${dataset_name}_embeddings - compression: null # Precision embedding_dtype: float32 @@ -36,9 +35,9 @@ params: running_on_slurm: false local_settings: - tasks: 2 - workers: 2 - local_tasks: 2 + tasks: 1 + workers: 1 + local_tasks: 1 local_rank_offset: 0 slurm_settings: null From 490bd11c266c81c4621b95115bec47bc7088e02b Mon Sep 17 00:00:00 2001 From: alex-jude Date: Wed, 10 Dec 2025 11:24:51 +0100 Subject: [PATCH 11/11] feat: add string dtype mapping to LABEL_NUMPY_DTYPE_MAPPING for enhanced flexibility --- src/ml_filter/annotation/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ml_filter/annotation/utils.py b/src/ml_filter/annotation/utils.py index b48460f0..cc7b7efe 100644 --- a/src/ml_filter/annotation/utils.py +++ b/src/ml_filter/annotation/utils.py @@ -33,6 +33,7 @@ "float32": np.float32, "float16": np.float16, "bfloat16": np.float32, # degrade + "string": np.str_, # or np.object_ if you want fully generic Python objects } def resolve_output_dtype(schema: Any, pipeline: str) -> dict[str, Any]: