Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions configs/annotation/lorem_ipsum_annotation_pipeline.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
params:
glob_pattern: "**/*.h5"
embeddings_directory: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output/validation_embeddings
output_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output/annotations

Expand All @@ -7,12 +8,20 @@ params:
Llama_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/embedding_ablations/training/final #/raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-llama-snowflake-balanced.ckpt
Mistral_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/embedding_ablations/training/final #/raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-mistral-snowflake-balanced.ckpt
batch_size: 1000
hdf5_dataset_name: train
output_keys: ["document_id", "score_Gemma_Snowflake", "score_Llama_Snowflake", "score_Mistral_Snowflake"]
hdf5_dataset_name: fineweb2

# Metadata key customization
embedding_key: embeddings
label_key: labels
document_id_key: document_ids
score_prefix: "score_"

output_keys: [document_ids]
model_dtype: bfloat16
embedding_dtype: bfloat16
label_dtype: bfloat16
compression: gzip

running_on_slurm: false

local_settings:
Expand Down
28 changes: 21 additions & 7 deletions configs/annotation/lorem_ipsum_annotation_pipeline_slurm.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
params:
embeddings_directory: /raid/s3/opengptx/jude/repos/ml_filter/data/input_dir_hierarchy
output_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/embedding_output_dir/annotations_new/
glob_pattern: "**/*.h5"
embeddings_directory: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/embedding/training_embeddings
output_dir: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/annotations

regression_head_checkpoints:
Gemma_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-gemma-snowflake-balanced.ckpt
Llama_Snowflake: /raid/s3/opengptx/jude/repos/ml_filter/hessanAI/checkpoints/checkpoints/edu-llama-snowflake-balanced.ckpt
Gemma_Snowflake: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/final_regression_head
Llama_Snowflake: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/final_regression_head
Mistral_Snowflake: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/final_regression_head
batch_size: 1000
hdf5_dataset_name: fineweb2

# Metadata key customization
embedding_key: embeddings
label_key: labels
document_id_key: document_ids
score_prefix: "score_"

output_keys: [document_ids]
model_dtype: bfloat16
embedding_dtype: bfloat16
label_dtype: bfloat16
compression: gzip

running_on_slurm: true

Expand All @@ -21,11 +36,10 @@ slurm_settings:
time: "04:00:00"
cpus_per_task: 8
mem_per_cpu_gb: 2
gpus_per_task: 1
job_name: "MMbert_embedder"
output: /data/cat/ws/alju972f-regression_heads/dataset/mmbet_embeddings/mmber_logs/%j.out
error: /data/cat/ws/alju972f-regression_heads/dataset/mmbet_embeddings/mmber_logs/%j.err
qos: "normal"
venv_path: /data/cat/ws/alju972f-regression_heads/envs/env_regression_heads/bin/activate
tasks: 10
workers: 1001
tasks: 1
workers: 1
29 changes: 17 additions & 12 deletions configs/annotation/lorem_ipsum_embedding.yaml
Original file line number Diff line number Diff line change
@@ -1,38 +1,43 @@
dataset_name: validation
dataset_name: train

params:
# File selection
glob_pattern: "**/*.jsonl"
input_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/input #/raid/s3/opengptx/abbas/processed_data_natural/${dataset_name}_set
input_dir: /raid/s3/opengptx/jude/repos/ml_filter/soofi_classifier/sampled_output/Llama-3.3-70B-Instruct_aggregated_score/training_set #/raid/s3/opengptx/abbas/processed_data_natural/${dataset_name}_set

# Output
output_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/throughput_analysis/output
output_dir: /raid/s3/opengptx/jude/repos/ml_filter/data/output_testing
text_field: text
keys_to_index: ["id", "aggregation_type"]
label_field: score
keys_to_index: ["id"]
embedding_dir: ${dataset_name}_embeddings
compression: gzip

# Precision
embedding_dtype: float32
label_dtype: int8
model_dtype: bfloat16

# Model and embedding parameters
embedding_model: jhu-clsp/mmBERT-base
batch_size: 128
embedding_model: Snowflake/snowflake-arctic-embed-m-v2.0 #jhu-clsp/mmBERT-base
batch_size: 256
writer_batch_size: 1000
hdf5_dataset_name: train
save_labels: false
hdf5_dataset_name: fineweb2
save_labels: true
max_length: 8192
padding: true
truncation: true

# Metadata key customization
embedding_key: embedding
label_key: score
document_id_key: document_id

running_on_slurm: false

local_settings:
tasks: 2
workers: 2
local_tasks: 2
tasks: 1
workers: 1
local_tasks: 1
local_rank_offset: 0

slurm_settings: null
21 changes: 12 additions & 9 deletions configs/annotation/lorem_ipsum_embedding_pipeline_slurm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,27 @@ dataset_name: training
params:
# File selection
glob_pattern: "**/*.jsonl"
input_dir: /data/cat/ws/alju972f-regression_heads/repos/data/embedding_creation/input #/data/cat/ws/alju972f-regression_heads/dataset/Regression_head/abbas/processed_data_natural/${dataset_name}_set
input_dir: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/input

# Output
output_dir: /data/cat/ws/alju972f-regression_heads/repos/data/embedding_creation/output
output_dir: /data/horse/ws/alju972f-tokenization_at_scale/data_ml_filter/embedding
keys_to_index: ["id", "aggregation_type"]
text_field: text
embedding_dir: ${dataset_name}_embeddings
compression: gzip
compression: null

# Precision
embedding_dtype: float32
label_dtype: int8
model_dtype: bfloat16

# Metadata key customization
embedding_key: embeddings
label_key: labels
document_id_key: document_ids

# Model and embedding parameters
embedding_model: jhu-clsp/mmBERT-base
embedding_model: Snowflake/snowflake-arctic-embed-m-v2.0
batch_size: 512
writer_batch_size: 1000
hdf5_dataset_name: train
Expand All @@ -36,13 +41,11 @@ slurm_settings:
account: "p_gptx"
nodes: 1
ntasks: 1
gres: gpu:4
exclusive: user
gres: gpu:1
partition: "capella"
time: "04:00:00"
cpus_per_task: 32
mem_per_cpu_gb: 8
gpus_per_task: 4
cpus_per_task: 8
mem_per_cpu_gb: 2
job_name: "MMbert_embedder"
output: ${params.output_dir}/logs/%j.out
error: ${params.output_dir}/logs/%j.err
Expand Down
62 changes: 45 additions & 17 deletions src/ml_filter/annotation/annotation_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,28 @@ class SlurmExecutionSettings(BaseModel):
workers: int
job_name: str
qos: str
env_command: Optional[str]
condaenv: Optional[str]
venv_path: Optional[str]
sbatch_args: Optional[dict[str, str | int | float | bool]]
max_array_size: int
depends_job_id: Optional[str]
job_id_position: int
logging_dir: Optional[str]
skip_completed: bool
slurm_logs_folder: Optional[str]
mail_type: str
mail_user: Optional[str]
requeue: bool
srun_args: Optional[dict[str, str | int | float | bool]]
tasks_per_job: int
env_command: str | None = None
condaenv: str | None = None
venv_path: str | None = None
# Allow users to supply any sbatch arg (e.g. nodes, ntasks, gres, account, output, error, gpus-per-task, etc.)
# using either snake_case or dash-case. Primitive values get coerced to strings.
sbatch_args: dict[str, str | int | float | bool] | None = None
max_array_size: int = 1001
depends_job_id: str | None = None
job_id_position: int = -1
logging_dir: str | None = None
skip_completed: bool = True
slurm_logs_folder: str | None = None
max_array_launch_parallel: bool = False
stagger_max_array_jobs: int = 0
run_on_dependency_fail: bool = False
randomize_start_duration: int = 0
requeue_signals: tuple[str] | None = ("SIGUSR1",)
mail_type: str = "ALL"
mail_user: str | None = None
requeue: bool = True
srun_args: dict[str, str | int | float | bool] | None = None
tasks_per_job: int = 1

@model_validator(mode="before")
def _normalize_sbatch(cls, values):
Expand All @@ -78,6 +85,7 @@ def _normalize_sbatch(cls, values):
# ---------------------------------------------------------------------------

class AnnotationPipelineParameters(BaseModel):
glob_pattern: str = Field(..., description="Glob pattern to match embedding files in the embeddings directory.")
embeddings_directory: str = Field(..., description="Path to directory containing HDF5 embedding files.")
output_keys: list[str] = Field(..., description="List of metadata keys to include in the annotated output files.")
output_dir: Path = Field(..., description="Output directory for annotated JSONL files.")
Expand All @@ -88,6 +96,10 @@ class AnnotationPipelineParameters(BaseModel):
embedding_dtype: str = Field(..., description="Storage dtype for embeddings (float32, float16, bfloat16->float32).")
label_dtype: str | None = Field(..., description="Storage dtype for labels (e.g., int8, float32). Optional.")
model_dtype: str = Field(..., description="Model compute dtype (float32, float16, bfloat16).")
embedding_key: str = Field("embedding", description="Metadata key for embedding vectors in HDF5 reader.")
document_id_key: str = Field("document_id", description="Metadata key for document id in reader output.")
label_key: str | None = Field("label", description="Metadata key for labels if present.")
score_prefix: str = Field("score_", description="Prefix applied to regression head score keys.")

@property
def annotated_output_dir(self) -> Path:
Expand Down Expand Up @@ -151,6 +163,7 @@ def _p(name: str, default=None):
return params_cfg.get(name, default)

params = AnnotationPipelineParameters(
glob_pattern=_p("glob_pattern"),
embeddings_directory=_p("embeddings_directory"),
output_dir=_p("output_dir"),
output_keys=_p("output_keys"),
Expand All @@ -161,6 +174,10 @@ def _p(name: str, default=None):
embedding_dtype=_p("embedding_dtype"),
label_dtype=_p("label_dtype"),
model_dtype=_p("model_dtype"),
embedding_key=_p("embedding_key"),
document_id_key=_p("document_id_key"),
label_key=_p("label_key"),
score_prefix=_p("score_prefix"),
)

local_settings_obj = None
Expand Down Expand Up @@ -189,8 +206,18 @@ def build_pipeline(self) -> list[PipelineStep]:
'embedding_dtype': p.embedding_dtype,
'label_dtype': p.label_dtype,
}, pipeline="annotation_pipeline")
base_output_keys = p.output_keys
dynamic_score_keys = [f"{p.score_prefix}{name}" for name in p.regression_head_checkpoints.keys()]
output_keys = list(dict.fromkeys(base_output_keys + dynamic_score_keys))

pipeline = [
JQLEmbeddingReader(data_folder=p.embeddings_directory, dataset_name=p.dataset_name),
JQLEmbeddingReader(
data_folder=p.embeddings_directory,
dataset_name=p.dataset_name,
embedding_key=p.embedding_key,
document_id_key=p.document_id_key,
glob_pattern=p.glob_pattern,
),
JQLHead(
regression_head_checkpoints=p.regression_head_checkpoints,
batch_size=p.batch_size,
Expand All @@ -199,10 +226,11 @@ def build_pipeline(self) -> list[PipelineStep]:
"embedding_dtype": _resolved["embedding_dtype"],
"label_dtype": _resolved["label_dtype"],
},
score_prefix=p.score_prefix,
stats_writer=JsonlWriter(
output_folder=str(p.annotated_output_dir),
output_filename="${source_filename}.jsonl",
adapter=partial(stats_adapter, output_keys=p.output_keys),
adapter=partial(stats_adapter, output_keys=output_keys),
expand_metadata=True,
),
),
Expand Down
Loading