diff --git a/runtime/ops/mapper/__init__.py b/runtime/ops/mapper/__init__.py index 4b970199..67069447 100644 --- a/runtime/ops/mapper/__init__.py +++ b/runtime/ops/mapper/__init__.py @@ -47,6 +47,21 @@ def _import_operators(): from . import remove_duplicate_sentences from . import knowledge_relation_slice from . import pii_ner_detection + from . import audio_format_convert + from . import audio_anomaly_filter + from . import audio_dc_offset_removal + from . import audio_pre_emphasis + from . import audio_simple_agc + from . import audio_noise_gate + from . import audio_soft_peak_limiter + from . import audio_trim_silence_edges + from . import audio_rms_loudness_normalize + from . import audio_hum_notch + from . import audio_telephony_bandpass + from . import audio_gtcrn_denoise + from . import audio_quantize_encode + from . import audio_fast_lang_id + from . import audio_asr_pipeline _import_operators() diff --git a/runtime/ops/mapper/audio_anomaly_filter/README.md b/runtime/ops/mapper/audio_anomaly_filter/README.md new file mode 100644 index 00000000..87242f7c --- /dev/null +++ b/runtime/ops/mapper/audio_anomaly_filter/README.md @@ -0,0 +1,40 @@ +# AudioAnomalyFilter 异常语音检测与过滤算子 + +## 概述 + +AudioAnomalyFilter 用于对音频做快速质量检测,计算时长与静音帧比例,并给出 `quality_flag`。当判定为异常时,可选择直接“过滤”(清空 `text/data`)或“保留但打标”(仅写入报告)。 + +## 功能特性 + +- **时长检测**:支持最小时长/最大时长阈值 +- **静音比例检测**:基于短时 RMS 统计静音帧占比 +- **过滤策略可控**:支持保留异常文件(仅打标)或直接过滤 +- **结果结构化输出**:报告写入 `ext_params.audio_quality` + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| minDur | inputNumber | 1.0 | 最小时长(秒),小于该值视为异常 | +| maxDur | inputNumber | 20000.0 | 最大时长(秒),大于该值视为异常 | +| silenceRatioTh | slider | 0.8 | 静音帧比例阈值(0~1),>= 阈值视为异常 | +| silenceRmsRatioTh | slider | 0.05 | 静音判定阈值 = global_rms * 该比例 | +| keepInvalid | switch | false | true=保留异常文件仅打标;false=异常则清空 text/data 便于过滤 | + +## 输入输出 + +- **输入**:`sample["filePath"]`(音频文件路径) +- **输出**: + - `sample["ext_params"]["audio_quality"]`: + - `quality_flag`: `ok/invalid` + - `duration/silence_ratio/global_rms/reason` + - 若 `keepInvalid=false` 且 `quality_flag=invalid`:清空 `sample["text"]` 与 `sample["data"]` + +## 依赖说明 + +- **Python 依赖**:优先 `torchaudio`,兜底 `soundfile` + +## 版本历史 + +- **v1.0.0**:首次发布,支持时长/静音比例检测与过滤策略配置 + diff --git a/runtime/ops/mapper/audio_anomaly_filter/__init__.py b/runtime/ops/mapper/audio_anomaly_filter/__init__.py new file mode 100644 index 00000000..76476ccf --- /dev/null +++ b/runtime/ops/mapper/audio_anomaly_filter/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioAnomalyFilter", + module_path="ops.mapper.audio_anomaly_filter.process", +) + diff --git a/runtime/ops/mapper/audio_anomaly_filter/metadata.yml b/runtime/ops/mapper/audio_anomaly_filter/metadata.yml new file mode 100644 index 00000000..a6c6cb42 --- /dev/null +++ b/runtime/ops/mapper/audio_anomaly_filter/metadata.yml @@ -0,0 +1,60 @@ +name: '异常语音检测与过滤' +name_en: 'Audio Anomaly Detect & Filter' +description: '对音频做快速异常检测:时长范围与静音帧比例。若判定为异常,将清空 sample 的 text/data 以便后续过滤,并在 ext_params 中写入报告字段。' +description_en: 'Fast audio anomaly detection (duration and silence ratio). If invalid, clears text/data so downstream can filter, and writes report to ext_params.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioAnomalyFilter' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + minDur: + name: '最小时长(秒)' + type: 'inputNumber' + description: '小于该值视为异常。' + defaultVal: 1.0 + min: 0 + max: 36000 + step: 0.1 + maxDur: + name: '最大时长(秒)' + type: 'inputNumber' + description: '大于该值视为异常。' + defaultVal: 20000.0 + min: 0 + max: 360000 + step: 1 + silenceRatioTh: + name: '静音帧比例阈值' + type: 'slider' + description: '静音帧比例 >= 阈值 时视为异常。' + defaultVal: 0.8 + min: 0 + max: 1 + step: 0.01 + silenceRmsRatioTh: + name: '静音判定比例' + type: 'slider' + description: '静音判定阈值 = global_rms * 该比例。' + defaultVal: 0.05 + min: 0 + max: 1 + step: 0.01 + keepInvalid: + name: '保留异常文件' + description: '开启后不清空 text/data,仅打标 quality_flag=invalid。' + type: 'switch' + defaultVal: 'false' + required: false + checkedLabel: '保留' + unCheckedLabel: '过滤' +runtime: + memory: 104857600 + cpu: 0.2 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_anomaly_filter/process.py b/runtime/ops/mapper/audio_anomaly_filter/process.py new file mode 100644 index 00000000..f6042694 --- /dev/null +++ b/runtime/ops/mapper/audio_anomaly_filter/process.py @@ -0,0 +1,136 @@ +# -- encoding: utf-8 -- + +import math +import time +from pathlib import Path +from typing import Dict, Any, List, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _load_wave_mono(path: Path) -> Tuple[List[float], int]: + """ + 尽量少依赖:优先 torchaudio,其次 soundfile。 + 返回 mono waveform(list[float]) 与采样率。 + """ + try: + import torchaudio # type: ignore + + wav, sr = torchaudio.load(str(path)) + if wav.ndim > 1: + wav = wav.mean(dim=0, keepdim=True) + mono = wav.squeeze(0).float().tolist() + return mono, int(sr) + except Exception: + try: + import soundfile as sf # type: ignore + + data, sr = sf.read(str(path), always_2d=False) + if getattr(data, "ndim", 1) > 1: + data = data.mean(axis=1) + return data.tolist(), int(sr) + except Exception as e: + raise RuntimeError(f"读取音频失败: {path}, error={e}") from e + + +def _frame_rms(x: List[float], sr: int, frame_ms: float, hop_ms: float) -> Tuple[List[float], float]: + if not x or sr <= 0: + return [], 0.0 + frame_len = max(1, int(sr * frame_ms / 1000.0)) + hop = max(1, int(sr * hop_ms / 1000.0)) + n = len(x) + total_sq = 0.0 + for v in x: + total_sq += float(v) * float(v) + global_rms = math.sqrt(total_sq / max(1, n)) + rms_list: List[float] = [] + for start in range(0, n, hop): + end = min(start + frame_len, n) + if end <= start: + continue + s = 0.0 + cnt = 0 + for v in x[start:end]: + s += float(v) * float(v) + cnt += 1 + rms_list.append(math.sqrt(s / cnt) if cnt else 0.0) + return rms_list, global_rms + + +class AudioAnomalyFilter(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.min_dur = float(kwargs.get("minDur", 1.0)) + self.max_dur = float(kwargs.get("maxDur", 20000.0)) + self.silence_ratio_th = float(kwargs.get("silenceRatioTh", 0.8)) + self.silence_rms_ratio_th = float(kwargs.get("silenceRmsRatioTh", 0.05)) + self.keep_invalid = _as_bool(kwargs.get("keepInvalid", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + wav_path = Path(sample.get(self.filepath_key, "")).resolve() + if not wav_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {wav_path}") + + wav, sr = _load_wave_mono(wav_path) + n = len(wav) + duration = float(n) / float(sr) if sr > 0 else 0.0 + rms_frames, global_rms = _frame_rms(wav, sr, frame_ms=25.0, hop_ms=10.0) + if not rms_frames or global_rms <= 0.0: + silence_ratio = 1.0 + else: + th = max(1e-8, global_rms * float(self.silence_rms_ratio_th)) + silent = sum(1 for r in rms_frames if r < th) + silence_ratio = float(silent) / float(len(rms_frames)) + + reasons: List[str] = [] + quality_flag = "ok" + if duration <= 0.0: + quality_flag = "invalid" + reasons.append("duration_le_zero") + elif duration < self.min_dur: + quality_flag = "invalid" + reasons.append("too_short") + elif duration > self.max_dur: + quality_flag = "invalid" + reasons.append("too_long") + if silence_ratio >= self.silence_ratio_th: + quality_flag = "invalid" + reasons.append("too_much_silence") + + report = { + "quality_flag": quality_flag, + "duration": round(duration, 3), + "silence_ratio": round(silence_ratio, 4), + "global_rms": round(global_rms, 6), + "reason": ",".join(reasons) if reasons else "", + } + ext = sample.get(self.ext_params_key, {}) + if not isinstance(ext, dict): + ext = {"_raw": ext} + ext["audio_quality"] = report + sample[self.ext_params_key] = ext + + if quality_flag == "invalid" and not self.keep_invalid: + # 清空内容以便后续被框架过滤(Mapper 的“空内容过滤”逻辑) + sample[self.text_key] = "" + sample[self.data_key] = b"" + else: + if not sample.get(self.text_key): + sample[self.text_key] = "ok" + sample[self.data_key] = b"" + + logger.info( + f"fileName: {sample.get(self.filename_key)}, method: AudioAnomalyFilter costs {time.time() - start:6f} s" + ) + return sample + diff --git a/runtime/ops/mapper/audio_anomaly_filter/requirements.txt b/runtime/ops/mapper/audio_anomaly_filter/requirements.txt new file mode 100644 index 00000000..fd0cf60b --- /dev/null +++ b/runtime/ops/mapper/audio_anomaly_filter/requirements.txt @@ -0,0 +1,2 @@ +torchaudio +soundfile diff --git a/runtime/ops/mapper/audio_asr_pipeline/README.md b/runtime/ops/mapper/audio_asr_pipeline/README.md new file mode 100644 index 00000000..bdd1b5f7 --- /dev/null +++ b/runtime/ops/mapper/audio_asr_pipeline/README.md @@ -0,0 +1,52 @@ +# AudioAsrPipeline 音频预处理与中英ASR流水线算子 + +## 概述 + +AudioAsrPipeline 将 `audio_preprocessor` 的推荐流水线封装为一个 DataMate Mapper 算子:标准化、(可选)降噪、(可选)异常过滤、语言识别、切分、ASR 识别与合并。最终合并文本写入 `sample["text"]`,并在 `ext_params` 中记录中间产物路径,便于排查与验收。 + +## 功能特性 + +- **端到端流水线**:normalization →(可选)GTCRN →(可选)异常过滤 → LID → split → ASR → merge +- **可配置**:每个关键步骤参数化(降噪开关、过滤阈值、LID 截断秒数、切分长度、ASR 设备等) +- **结果可追溯**:中间产物路径记录在 `ext_params.audio_asr.artifacts` +- **面向验收**:输出合并转写文本到 `sample["text"]` + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| doDenoise | switch | false | 是否启用 GTCRN 降噪 | +| denoiseModelPath | input | (空) | GTCRN ONNX 模型绝对路径(启用降噪时必填) | +| doAnomalyFilter | switch | true | 是否启用异常语音检测与过滤 | +| minDur | inputNumber | 1.0 | 最小时长(秒) | +| maxDur | inputNumber | 20000.0 | 最大时长(秒) | +| silenceRatioTh | slider | 0.8 | 静音帧比例阈值(0~1) | +| silenceRmsRatioTh | slider | 0.05 | 静音判定阈值比例 | +| lidModelSource | input | (空) | SpeechBrain LID 模型 source(本地目录或 HF repo) | +| lidDevice | select | cpu | LID 推理设备(cpu/cuda/npu) | +| lidMaxSeconds | inputNumber | 3.0 | LID 只取前 N 秒,0=全长 | +| maxSegmentSeconds | inputNumber | 120 | 切分最大秒数 | +| asrDevice | select | auto | ASR 设备参数(auto/cpu/npu) | + +## 输入输出 + +- **输入**:`sample["filePath"]`(音频文件路径) +- **输出**: + - `sample["text"]`:合并后的转写文本(来自 `merged_text.txt`) + - `sample["ext_params"]["audio_asr"]`: + - `lang`:LID 结果(zh/en) + - `artifacts`:中间产物路径(normalized/denoise/lid/split/asr/merged_text) + +## 依赖说明 + +- **Python 依赖**(按启用功能而定): + - normalization/切分:`pydub`、`soundfile`、`numpy` + - LID:`torch`、`torchaudio`、`speechbrain` + - 降噪:`onnxruntime`(以及 GTCRN 模型文件) +- **系统依赖**: + - `pydub` 通常需要 `ffmpeg` + +## 版本历史 + +- **v1.0.0**:首次发布,支持音频标准化/(可选)降噪/过滤/LID/切分/ASR/合并 + diff --git a/runtime/ops/mapper/audio_asr_pipeline/__init__.py b/runtime/ops/mapper/audio_asr_pipeline/__init__.py new file mode 100644 index 00000000..f5f89074 --- /dev/null +++ b/runtime/ops/mapper/audio_asr_pipeline/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioAsrPipeline", + module_path="ops.mapper.audio_asr_pipeline.process", +) + diff --git a/runtime/ops/mapper/audio_asr_pipeline/metadata.yml b/runtime/ops/mapper/audio_asr_pipeline/metadata.yml new file mode 100644 index 00000000..cc3d3d9a --- /dev/null +++ b/runtime/ops/mapper/audio_asr_pipeline/metadata.yml @@ -0,0 +1,115 @@ +name: '音频预处理与中英ASR(流水线)' +name_en: 'Audio Preprocess & ASR Pipeline' +description: '调用 audio_preprocessor 的 normalization→(可选)GTCRN→(可选)异常过滤→LID→切分→ASR→合并,输出 merged_text 写入 sample.text,并在 ext_params 中记录中间产物路径。' +description_en: 'Run audio_preprocessor pipeline: normalization→(optional)GTCRN→(optional)anomaly filter→LID→split→ASR→merge. Writes merged_text into sample.text and records artifacts in ext_params.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioAsrPipeline' +version: '1.0.0' +types: + - 'annotation' +modal: 'audio' +inputs: 'audio' +outputs: 'text' +settings: + doDenoise: + name: '启用降噪' + type: 'switch' + description: '是否启用 GTCRN 降噪。' + defaultVal: 'false' + required: false + checkedLabel: '开启' + unCheckedLabel: '关闭' + denoiseModelPath: + name: '降噪模型路径' + type: 'input' + description: 'GTCRN ONNX 模型绝对路径(启用降噪时必填)。' + defaultVal: '' + required: false + doAnomalyFilter: + name: '启用异常过滤' + type: 'switch' + description: '是否启用异常语音检测与过滤(时长/静音比例)。' + defaultVal: 'true' + required: false + checkedLabel: '开启' + unCheckedLabel: '关闭' + minDur: + name: '最小时长(秒)' + type: 'inputNumber' + defaultVal: 1.0 + min: 0 + max: 36000 + step: 0.1 + maxDur: + name: '最大时长(秒)' + type: 'inputNumber' + defaultVal: 20000.0 + min: 0 + max: 360000 + step: 1 + silenceRatioTh: + name: '静音帧比例阈值' + type: 'slider' + defaultVal: 0.8 + min: 0 + max: 1 + step: 0.01 + silenceRmsRatioTh: + name: '静音判定比例' + type: 'slider' + defaultVal: 0.05 + min: 0 + max: 1 + step: 0.01 + lidModelSource: + name: 'LID 模型源' + type: 'input' + description: 'SpeechBrain LID 模型 source(本地目录或 HF repo)。留空则用 audio_preprocessor 默认值。' + defaultVal: '' + required: false + lidDevice: + name: 'LID 设备' + type: 'select' + defaultVal: 'cpu' + required: true + options: + - label: 'cpu' + value: 'cpu' + - label: 'cuda' + value: 'cuda' + - label: 'npu' + value: 'npu' + lidMaxSeconds: + name: 'LID 截断秒数' + type: 'inputNumber' + defaultVal: 3.0 + min: 0 + max: 60 + step: 0.5 + maxSegmentSeconds: + name: '切分最大秒数' + type: 'inputNumber' + defaultVal: 120 + min: 5 + max: 3600 + step: 1 + asrDevice: + name: 'ASR 设备' + type: 'select' + description: '传给 recognize_monitor 的 device 参数(auto/npu/cpu)。' + defaultVal: 'auto' + required: true + options: + - label: 'auto' + value: 'auto' + - label: 'cpu' + value: 'cpu' + - label: 'npu' + value: 'npu' +runtime: + memory: 4294967296 + cpu: 1.0 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_asr_pipeline/process.py b/runtime/ops/mapper/audio_asr_pipeline/process.py new file mode 100644 index 00000000..48c8ebe9 --- /dev/null +++ b/runtime/ops/mapper/audio_asr_pipeline/process.py @@ -0,0 +1,289 @@ +# -- encoding: utf-8 -- + +import json +import os +import shutil +import tempfile +import time +from pathlib import Path +from typing import Dict, Any + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _repo_root() -> Path: + return Path(__file__).resolve().parents[6] + + +def _audio_preprocessor_root() -> Path: + return _repo_root() / "audio_preprocessor" + + +def _ensure_sys_path(p: Path) -> None: + import sys + + sp = str(p) + if sp not in sys.path: + sys.path.insert(0, sp) + + +class AudioAsrPipeline(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.do_denoise = _as_bool(kwargs.get("doDenoise", False)) + self.denoise_model_path = str(kwargs.get("denoiseModelPath", "")).strip() + + self.do_anomaly_filter = _as_bool(kwargs.get("doAnomalyFilter", True)) + self.min_dur = float(kwargs.get("minDur", 1.0)) + self.max_dur = float(kwargs.get("maxDur", 20000.0)) + self.silence_ratio_th = float(kwargs.get("silenceRatioTh", 0.8)) + self.silence_rms_ratio_th = float(kwargs.get("silenceRmsRatioTh", 0.05)) + + self.lid_model_source = str(kwargs.get("lidModelSource", "")).strip() + self.lid_device = str(kwargs.get("lidDevice", "cpu")).strip() + self.lid_max_seconds = float(kwargs.get("lidMaxSeconds", 3.0)) + + self.max_segment_seconds = int(float(kwargs.get("maxSegmentSeconds", 120))) + self.asr_device = str(kwargs.get("asrDevice", "auto")).strip() + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + + ap_root = _audio_preprocessor_root() + if not ap_root.exists(): + raise FileNotFoundError(f"audio_preprocessor 不存在: {ap_root}") + + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + # 用临时工作区隔离每个 sample,避免污染 audio_preprocessor 自身的 output_data + with tempfile.TemporaryDirectory(prefix="dm_audio_asr_") as td: + work = Path(td) + input_dir = work / "input_data" / "audio_raw" + out_norm = work / "output_data" / "normalization" + out_denoise = work / "output_data" / "denoise" + out_lid = work / "output_data" / "lid" + out_split = work / "output_data" / "split" + out_asr = work / "output_data" / "asr" + + input_dir.mkdir(parents=True, exist_ok=True) + out_norm.mkdir(parents=True, exist_ok=True) + out_denoise.mkdir(parents=True, exist_ok=True) + out_lid.mkdir(parents=True, exist_ok=True) + out_split.mkdir(parents=True, exist_ok=True) + out_asr.mkdir(parents=True, exist_ok=True) + + # 复制输入音频到 pipeline 输入目录 + src_name = in_path.name + local_in = input_dir / src_name + shutil.copy2(str(in_path), str(local_in)) + + # 1) normalization(调用 audio_preprocessor 的 normalization.main,但用我们自己的 input/output_dir) + _ensure_sys_path(ap_root / "scripts" / "audio_convert") + _ensure_sys_path(ap_root / "src" / "utils") + _ensure_sys_path(ap_root / "src" / "pipeline") + + import sys + + from audio_preprocessor.src.pipeline import normalization as _norm # type: ignore + + argv_backup = sys.argv[:] + try: + sys.argv = [ + sys.argv[0], + "--input_dir", + str(input_dir), + "--output_dir", + str(out_norm), + "--overwrite", + ] + rc = _norm.main() + if rc != 0: + raise RuntimeError(f"normalization 失败,返回码: {rc}") + finally: + sys.argv = argv_backup + + # 归一化输出文件(按 stem) + norm_candidates = sorted(out_norm.glob(f"{Path(src_name).stem}.*")) + if not norm_candidates: + # 兜底:取目录内第一个文件 + norm_candidates = sorted([p for p in out_norm.iterdir() if p.is_file()]) + if not norm_candidates: + raise RuntimeError(f"normalization 未生成输出: {out_norm}") + norm_file = norm_candidates[0] + + current_audio_dir = out_norm + + # 2) (可选) GTCRN denoise(直接复用工具类) + if self.do_denoise: + if not self.denoise_model_path: + raise ValueError("启用降噪时必须提供 denoiseModelPath(GTCRN onnx 绝对路径)") + model = Path(self.denoise_model_path).expanduser().resolve() + if not model.exists(): + raise FileNotFoundError(f"GTCRN 模型不存在: {model}") + + _ensure_sys_path(ap_root / "src" / "utils") + from audio_preprocessor.src.utils.gtcrn_denoise import OnnxGtcrnDenoiser, process_one # type: ignore + + denoiser = OnnxGtcrnDenoiser(model) + den_out = out_denoise / f"{norm_file.stem}.wav" + process_one(norm_file, den_out, denoiser) + current_audio_dir = out_denoise + + # 3) (可选) anomaly_filter(复用其模块 main,通过 argv 注入参数) + quality_list = out_denoise / "item_with_quality.list" + if self.do_anomaly_filter: + from audio_preprocessor.src.pipeline import anomaly_filter as _af # type: ignore + + argv_backup = sys.argv[:] + try: + sys.argv = [ + sys.argv[0], + "--audio_dir", + str(current_audio_dir), + "--output", + str(quality_list), + "--min_dur", + str(self.min_dur), + "--max_dur", + str(self.max_dur), + "--silence_ratio_th", + str(self.silence_ratio_th), + "--silence_rms_ratio_th", + str(self.silence_rms_ratio_th), + ] + rc = _af.main() + if rc != 0: + raise RuntimeError(f"anomaly_filter 失败,返回码: {rc}") + finally: + sys.argv = argv_backup + + # 4) LID:fast_lang_id(用 input_list,保证只处理本文件) + from audio_preprocessor.src.utils import fast_lang_id as _lid # type: ignore + + lid_in_list = out_lid / "_single_item.list" + lid_in_list.write_text( + json.dumps({"key": norm_file.stem, "wav": str((current_audio_dir / norm_file.name).resolve()), "txt": ""}, ensure_ascii=False) + + "\n", + encoding="utf-8", + ) + lid_out_list = out_lid / "item_with_lang.list" + argv_backup = sys.argv[:] + try: + sys.argv = [ + sys.argv[0], + "--input_list", + str(lid_in_list), + "--output", + str(lid_out_list), + "--device", + self.lid_device, + "--batch_size", + "1", + "--max_seconds", + str(self.lid_max_seconds), + ] + if self.lid_model_source: + sys.argv += ["--model_source", self.lid_model_source] + rc = _lid.main() + if rc != 0: + raise RuntimeError(f"fast_lang_id 失败,返回码: {rc}") + finally: + sys.argv = argv_backup + + lid_line = lid_out_list.read_text(encoding="utf-8").splitlines()[0].strip() + lid_row = json.loads(lid_line) + lang = str(lid_row.get("lang", "en")) + + # 5) split_and_tag + from audio_preprocessor.src.pipeline import split_and_tag as _split # type: ignore + + argv_backup = sys.argv[:] + try: + sys.argv = [ + sys.argv[0], + "--input_dir", + str(current_audio_dir), + "--output_dir", + str(out_split), + "--list_file", + str(lid_out_list), + "--from_list", + "--max_seconds", + str(max(1, self.max_segment_seconds)), + ] + rc = _split.main() + if rc != 0: + raise RuntimeError(f"split_and_tag 失败,返回码: {rc}") + finally: + sys.argv = argv_backup + + split_list = out_split / "item_with_lang.list" + if not split_list.exists(): + raise RuntimeError(f"split 输出清单不存在: {split_list}") + + # 6) recognize_monitor + from audio_preprocessor.src.pipeline import recognize_monitor as _rm # type: ignore + + argv_backup = sys.argv[:] + try: + sys.argv = [ + sys.argv[0], + "--split_dir", + str(out_split), + "--asr_root", + str(out_asr), + "--device", + self.asr_device, + ] + rc = _rm.main() + if rc != 0: + raise RuntimeError(f"recognize_monitor 失败,返回码: {rc}") + finally: + sys.argv = argv_backup + + merged = out_asr / "merged_text.txt" + if not merged.exists(): + raise RuntimeError(f"ASR 合并结果不存在: {merged}") + + merged_text = merged.read_text(encoding="utf-8", errors="ignore").strip() + if not merged_text: + merged_text = "" + + # 写回 sample + sample[self.text_key] = merged_text + sample[self.data_key] = b"" + + ext = sample.get(self.ext_params_key, {}) + if not isinstance(ext, dict): + ext = {"_raw": ext} + ext["audio_asr"] = { + "lang": lang, + "artifacts": { + "work_dir": str(work), + "normalized_dir": str(out_norm), + "denoise_dir": str(out_denoise) if self.do_denoise else "", + "lid_list": str(lid_out_list), + "split_dir": str(out_split), + "asr_dir": str(out_asr), + "merged_text": str(merged), + }, + } + sample[self.ext_params_key] = ext + + logger.info( + f"fileName: {sample.get(self.filename_key)}, method: AudioAsrPipeline costs {time.time() - start:6f} s" + ) + return sample + diff --git a/runtime/ops/mapper/audio_asr_pipeline/requirements.txt b/runtime/ops/mapper/audio_asr_pipeline/requirements.txt new file mode 100644 index 00000000..b0f833bc --- /dev/null +++ b/runtime/ops/mapper/audio_asr_pipeline/requirements.txt @@ -0,0 +1,7 @@ +torch +torchaudio +speechbrain +pydub +soundfile +onnxruntime +numpy diff --git a/runtime/ops/mapper/audio_dc_offset_removal/README.md b/runtime/ops/mapper/audio_dc_offset_removal/README.md new file mode 100644 index 00000000..b23cd4dd --- /dev/null +++ b/runtime/ops/mapper/audio_dc_offset_removal/README.md @@ -0,0 +1,34 @@ +# AudioDcOffsetRemoval 去直流分量算子 + +## 概述 + +AudioDcOffsetRemoval 对音频波形做直流分量去除(减去全段均值),常用于采集链路的偏置修正。算子会把处理后的音频写入 `export_path` 并更新 `sample` 文件字段。 + +## 功能特性 + +- **直流偏置消除**:对全段做减均值处理 +- **文件输出更新**:输出写入 `export_path`,并更新 `filePath/fileType/fileName/fileSize` +- **覆盖策略**:可选择是否覆盖同名输出 + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| outFormat | select | wav | 输出格式(扩展名) | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - 输出文件:写入 `sample["export_path"]` + - 更新字段:`sample["filePath"]` / `sample["fileType"]` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`soundfile`、`numpy` + +## 版本历史 + +- **v1.0.0**:首次发布,支持去直流分量 + diff --git a/runtime/ops/mapper/audio_dc_offset_removal/__init__.py b/runtime/ops/mapper/audio_dc_offset_removal/__init__.py new file mode 100644 index 00000000..69bbad9a --- /dev/null +++ b/runtime/ops/mapper/audio_dc_offset_removal/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioDcOffsetRemoval", + module_path="ops.mapper.audio_dc_offset_removal.process", +) + diff --git a/runtime/ops/mapper/audio_dc_offset_removal/metadata.yml b/runtime/ops/mapper/audio_dc_offset_removal/metadata.yml new file mode 100644 index 00000000..a6d3213b --- /dev/null +++ b/runtime/ops/mapper/audio_dc_offset_removal/metadata.yml @@ -0,0 +1,39 @@ +name: '去直流分量' +name_en: 'DC Offset Removal' +description: '去除音频直流分量(减均值),写出处理后的 wav 到 export_path 并更新 filePath。' +description_en: 'Remove DC offset (subtract mean). Writes processed wav to export_path and updates filePath.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioDcOffsetRemoval' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + outFormat: + name: '输出格式' + description: '输出扩展名(建议 wav)。' + type: 'select' + defaultVal: 'wav' + required: true + options: + - label: 'wav' + value: 'wav' + - label: 'flac' + value: 'flac' + overwrite: + name: '覆盖输出' + type: 'switch' + description: '若 export_path 下已存在同名文件,是否覆盖。' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 104857600 + cpu: 0.1 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_dc_offset_removal/process.py b/runtime/ops/mapper/audio_dc_offset_removal/process.py new file mode 100644 index 00000000..cadea5f1 --- /dev/null +++ b/runtime/ops/mapper/audio_dc_offset_removal/process.py @@ -0,0 +1,88 @@ +# -- encoding: utf-8 -- + +import os +import time +from pathlib import Path +from typing import Dict, Any, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _load_audio(path: Path) -> Tuple["object", int]: + try: + import soundfile as sf # type: ignore + + data, sr = sf.read(str(path), always_2d=False) + return data, int(sr) + except Exception as e: + raise RuntimeError(f"读取音频失败(需要 soundfile): {path}, error={e}") from e + + +def _save_audio(path: Path, data: "object", sr: int, fmt: str) -> None: + try: + import soundfile as sf # type: ignore + + path.parent.mkdir(parents=True, exist_ok=True) + sf.write(str(path), data, int(sr), format=fmt.upper() if fmt else None) + except Exception as e: + raise RuntimeError(f"写入音频失败(需要 soundfile): {path}, error={e}") from e + + +class AudioDcOffsetRemoval(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.out_format = str(kwargs.get("outFormat", "wav")).strip().lower().lstrip(".") + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + data, sr = _load_audio(in_path) + try: + import numpy as np + + x = np.asarray(data, dtype=np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + y = x - float(np.mean(x)) if x.size else x + except Exception as e: + raise RuntimeError(f"处理失败(需要 numpy): {e}") from e + + export_dir = Path(os.path.abspath(str(sample.get(self.export_path_key, ".")))) + export_dir.mkdir(parents=True, exist_ok=True) + stem = Path(str(sample.get(self.filename_key, in_path.name))).stem + out_path = export_dir / f"{stem}.{self.out_format}" + if out_path.exists() and not self.overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {out_path}") + + _save_audio(out_path, y, sr, self.out_format) + + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = self.out_format + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "processed" + + logger.info( + f"fileName: {sample.get(self.filename_key)}, method: AudioDcOffsetRemoval costs {time.time() - start:6f} s" + ) + return sample + diff --git a/runtime/ops/mapper/audio_dc_offset_removal/requirements.txt b/runtime/ops/mapper/audio_dc_offset_removal/requirements.txt new file mode 100644 index 00000000..17e9d57d --- /dev/null +++ b/runtime/ops/mapper/audio_dc_offset_removal/requirements.txt @@ -0,0 +1,2 @@ +soundfile +numpy diff --git a/runtime/ops/mapper/audio_fast_lang_id/README.md b/runtime/ops/mapper/audio_fast_lang_id/README.md new file mode 100644 index 00000000..1cf69978 --- /dev/null +++ b/runtime/ops/mapper/audio_fast_lang_id/README.md @@ -0,0 +1,38 @@ +# AudioFastLangId 快速语言识别(中英)算子 + +## 概述 + +AudioFastLangId 用于对音频做快速语言识别(仅输出 `zh/en`),复用 `audio_preprocessor/src/utils/fast_lang_id.py` 的 SpeechBrain 推理逻辑。算子不会改写音频文件,仅将识别结果写入 `ext_params`,方便后续分流到不同 ASR 模型或处理链路。 + +## 功能特性 + +- **快速推理**:支持只截取前 N 秒进行判断 +- **仅输出 zh/en**:中文相关语言码统一映射为 `zh`,其他映射为 `en` +- **结构化输出**:结果写入 `ext_params.audio_lid.lang` + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| modelSource | input | (空) | SpeechBrain 模型 source(本地目录或 HuggingFace repo);为空使用默认 | +| modelSavedir | input | (空) | 模型缓存目录;为空使用默认 | +| device | select | cpu | 推理设备(cpu/cuda/npu) | +| batchSize | inputNumber | 1 | 批大小(单文件时通常为 1) | +| maxSeconds | inputNumber | 3.0 | 只取前 N 秒做判断,0=全长 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - `sample["ext_params"]["audio_lid"]["lang"] = "zh" | "en"` + - 不修改 `filePath`(不重写音频) + +## 依赖说明 + +- **Python 依赖**:`torch`、`torchaudio`、`speechbrain` +- **模型依赖**:SpeechBrain LID 权重需在环境中可访问(本地目录或可联网拉取) + +## 版本历史 + +- **v1.0.0**:首次发布,支持中英二分类 LID 输出 + diff --git a/runtime/ops/mapper/audio_fast_lang_id/__init__.py b/runtime/ops/mapper/audio_fast_lang_id/__init__.py new file mode 100644 index 00000000..70855035 --- /dev/null +++ b/runtime/ops/mapper/audio_fast_lang_id/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioFastLangId", + module_path="ops.mapper.audio_fast_lang_id.process", +) + diff --git a/runtime/ops/mapper/audio_fast_lang_id/metadata.yml b/runtime/ops/mapper/audio_fast_lang_id/metadata.yml new file mode 100644 index 00000000..41183267 --- /dev/null +++ b/runtime/ops/mapper/audio_fast_lang_id/metadata.yml @@ -0,0 +1,61 @@ +name: '快速语言识别(中英)' +name_en: 'Fast Language ID (zh/en)' +description: '调用 audio_preprocessor 的 SpeechBrain LID,输出 zh/en,并写入 ext_params.audio_lid.lang。' +description_en: 'Run SpeechBrain LID from audio_preprocessor and output zh/en; writes result to ext_params.audio_lid.lang.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioFastLangId' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + modelSource: + name: '模型源' + description: 'SpeechBrain LID 模型 source:可为本地目录或 HuggingFace repo。建议用本地目录(离线)。' + type: 'input' + defaultVal: '' + required: false + modelSavedir: + name: '模型缓存目录' + description: 'SpeechBrain 模型缓存目录(可选)。留空则使用 audio_preprocessor 默认缓存目录。' + type: 'input' + defaultVal: '' + required: false + device: + name: '设备' + description: 'cpu/cuda/npu 等(取决于 torch 环境)。' + type: 'select' + defaultVal: 'cpu' + required: true + options: + - label: 'cpu' + value: 'cpu' + - label: 'cuda' + value: 'cuda' + - label: 'npu' + value: 'npu' + batchSize: + name: '批大小' + type: 'inputNumber' + description: '批大小(单文件时意义不大)。' + defaultVal: 1 + min: 1 + max: 64 + step: 1 + maxSeconds: + name: '截断秒数' + type: 'inputNumber' + description: '只取前 N 秒做判断,0=全长。' + defaultVal: 3.0 + min: 0 + max: 60 + step: 0.5 +runtime: + memory: 2147483648 + cpu: 0.5 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_fast_lang_id/process.py b/runtime/ops/mapper/audio_fast_lang_id/process.py new file mode 100644 index 00000000..ef5f1ffc --- /dev/null +++ b/runtime/ops/mapper/audio_fast_lang_id/process.py @@ -0,0 +1,106 @@ +# -- encoding: utf-8 -- + +import time +from pathlib import Path +from typing import Dict, Any + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _repo_root() -> Path: + return Path(__file__).resolve().parents[6] + + +def _audio_preprocessor_root() -> Path: + return _repo_root() / "audio_preprocessor" + + +class AudioFastLangId(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.model_source = str(kwargs.get("modelSource", "")).strip() + self.model_savedir = str(kwargs.get("modelSavedir", "")).strip() + self.device = str(kwargs.get("device", "cpu")).strip() + self.batch_size = int(float(kwargs.get("batchSize", 1))) + self.max_seconds = float(kwargs.get("maxSeconds", 3.0)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + + ap_root = _audio_preprocessor_root() + if not ap_root.exists(): + raise FileNotFoundError(f"audio_preprocessor 不存在: {ap_root}") + + wav_path = Path(sample.get(self.filepath_key, "")).resolve() + if not wav_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {wav_path}") + + import sys + + utils_dir = ap_root / "src" / "utils" + if str(utils_dir) not in sys.path: + sys.path.insert(0, str(utils_dir)) + + import fast_lang_id # type: ignore + + # 组装 args,直接复用其 main() 的 CLI 解析逻辑 + argv_backup = sys.argv[:] + try: + out_path = (ap_root / "output_data" / "lid" / "item_with_lang.list").resolve() + in_list = (ap_root / "output_data" / "lid" / "_single_item.list").resolve() + in_list.parent.mkdir(parents=True, exist_ok=True) + in_list.write_text( + f'{{"key":"{wav_path.stem}","wav":"{str(wav_path)}","txt":""}}\\n', + encoding="utf-8", + ) + + sys.argv = [ + sys.argv[0], + "--input_list", + str(in_list), + "--output", + str(out_path), + "--device", + self.device, + "--batch_size", + str(max(1, self.batch_size)), + "--max_seconds", + str(self.max_seconds), + ] + if self.model_source: + sys.argv += ["--model_source", self.model_source] + if self.model_savedir: + sys.argv += ["--model_savedir", self.model_savedir] + + rc = fast_lang_id.main() + if rc != 0: + raise RuntimeError(f"fast_lang_id 失败,返回码: {rc}") + + # 读取输出第一行 + if not out_path.exists(): + raise RuntimeError(f"LID 输出不存在: {out_path}") + line = out_path.read_text(encoding="utf-8").splitlines()[0].strip() + import json + + d = json.loads(line) + lang = str(d.get("lang", "en")) + finally: + sys.argv = argv_backup + + ext = sample.get(self.ext_params_key, {}) + if not isinstance(ext, dict): + ext = {"_raw": ext} + ext["audio_lid"] = {"lang": lang} + sample[self.ext_params_key] = ext + + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "lid" + + logger.info( + f"fileName: {sample.get(self.filename_key)}, method: AudioFastLangId costs {time.time() - start:6f} s" + ) + return sample + diff --git a/runtime/ops/mapper/audio_fast_lang_id/requirements.txt b/runtime/ops/mapper/audio_fast_lang_id/requirements.txt new file mode 100644 index 00000000..cd76c81c --- /dev/null +++ b/runtime/ops/mapper/audio_fast_lang_id/requirements.txt @@ -0,0 +1,3 @@ +torch +torchaudio +speechbrain diff --git a/runtime/ops/mapper/audio_format_convert/README.md b/runtime/ops/mapper/audio_format_convert/README.md new file mode 100644 index 00000000..9bbb05a9 --- /dev/null +++ b/runtime/ops/mapper/audio_format_convert/README.md @@ -0,0 +1,38 @@ +# AudioFormatConvert 音频格式转换与重采样算子 + +## 概述 + +AudioFormatConvert 用于将常见音频格式互相转换,并支持可选的重采样与声道数转换。算子会把输出文件写入 `export_path`,并更新 `sample` 中的文件路径与类型字段,便于后续算子继续处理。 + +## 功能特性 + +- **格式互转**:支持 `wav/flac/mp3/aac/m4a/ogg` 等常见格式互转 +- **重采样**:可将采样率转换为指定值(Hz) +- **声道转换**:可转换为单声道/双声道 +- **覆盖策略**:可选择是否覆盖 `export_path` 下的同名输出 + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| targetFormat | select | wav | 目标输出格式(扩展名) | +| sampleRate | inputNumber | 16000 | 目标采样率(Hz),0 表示保持原采样率 | +| channels | inputNumber | 1 | 目标声道数:1=单声道,2=双声道,0=保持原声道 | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]`(音频文件路径) +- **输出**: + - 输出文件:写入 `sample["export_path"]` + - 更新字段:`sample["filePath"]` / `sample["fileType"]` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`pydub`(优先)、`soundfile`、`numpy`、`torch`(用于重采样兜底实现) +- **系统依赖**:`pydub` 通常需要 `ffmpeg` 才能处理 mp3/aac/m4a 等格式 + +## 版本历史 + +- **v1.0.0**:首次发布,支持格式互转/重采样/声道转换 + diff --git a/runtime/ops/mapper/audio_format_convert/__init__.py b/runtime/ops/mapper/audio_format_convert/__init__.py new file mode 100644 index 00000000..92479007 --- /dev/null +++ b/runtime/ops/mapper/audio_format_convert/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioFormatConvert", + module_path="ops.mapper.audio_format_convert.process", +) + diff --git a/runtime/ops/mapper/audio_format_convert/metadata.yml b/runtime/ops/mapper/audio_format_convert/metadata.yml new file mode 100644 index 00000000..9173e99e --- /dev/null +++ b/runtime/ops/mapper/audio_format_convert/metadata.yml @@ -0,0 +1,63 @@ +name: '音频格式转换与重采样' +name_en: 'Audio Format Convert & Resample' +description: '将常见音频格式互相转换,并可选重采样、声道转换。输出文件写入 export_path,并更新 sample 的 filePath/fileType/fileName。' +description_en: 'Convert between common audio formats with optional resampling and channel conversion. Writes output to export_path and updates filePath/fileType/fileName.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioFormatConvert' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + targetFormat: + name: '目标格式' + description: '输出音频格式(扩展名),如 wav/flac/mp3/aac/m4a/ogg。' + type: 'select' + defaultVal: 'wav' + required: true + options: + - label: 'wav' + value: 'wav' + - label: 'flac' + value: 'flac' + - label: 'mp3' + value: 'mp3' + - label: 'aac' + value: 'aac' + - label: 'm4a' + value: 'm4a' + - label: 'ogg' + value: 'ogg' + sampleRate: + name: '采样率' + description: '目标采样率(Hz)。0 表示保持原采样率。' + type: 'inputNumber' + defaultVal: 16000 + min: 0 + max: 192000 + step: 1 + channels: + name: '声道数' + description: '目标声道数:1=单声道,2=双声道,0=保持原声道。' + type: 'inputNumber' + defaultVal: 1 + min: 0 + max: 2 + step: 1 + overwrite: + name: '覆盖输出' + description: '若 export_path 下已存在同名文件,是否覆盖。' + type: 'switch' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 104857600 + cpu: 0.2 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_format_convert/process.py b/runtime/ops/mapper/audio_format_convert/process.py new file mode 100644 index 00000000..6bcf88a5 --- /dev/null +++ b/runtime/ops/mapper/audio_format_convert/process.py @@ -0,0 +1,184 @@ +# -- encoding: utf-8 -- + +import os +import shutil +import time +from pathlib import Path +from typing import Dict, Any, Optional, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _safe_export_path(sample: Dict[str, Any], key: str, default: str = "") -> str: + v = sample.get(key, default) + if v is None: + return default + return str(v) + + +def _load_audio_backend() -> Tuple[Optional[object], Optional[object]]: + """ + Returns: + (AudioSegment, sf) + - AudioSegment: pydub.AudioSegment if available + - sf: soundfile module if available + """ + audiosegment = None + sf = None + try: + from pydub import AudioSegment # type: ignore + + audiosegment = AudioSegment + except Exception: + audiosegment = None + + try: + import soundfile as _sf # type: ignore + + sf = _sf + except Exception: + sf = None + + return audiosegment, sf + + +def _convert_with_soundfile( + src: Path, dst: Path, target_sr: int, channels: int, fmt: str, overwrite: bool +) -> None: + audiosegment, sf = _load_audio_backend() + _ = audiosegment + if sf is None: + raise RuntimeError("soundfile 不可用,无法使用 soundfile 转换") + + if dst.exists() and not overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {dst}") + + data, sr = sf.read(str(src), always_2d=True) # (T, C) + if channels == 1 and data.shape[1] > 1: + data = data.mean(axis=1, keepdims=True) + elif channels == 2 and data.shape[1] == 1: + data = data.repeat(2, axis=1) + + if target_sr and target_sr > 0 and int(sr) != int(target_sr): + try: + import numpy as np + import torch # type: ignore + + x = torch.from_numpy(data.astype("float32")).transpose(0, 1).unsqueeze(0) # (1,C,T) + new_len = int(round(x.shape[-1] * float(target_sr) / float(sr))) + y = torch.nn.functional.interpolate(x, size=new_len, mode="linear", align_corners=False) + data = y.squeeze(0).transpose(0, 1).cpu().numpy().astype(np.float32) + sr = int(target_sr) + except Exception as e: + raise RuntimeError(f"重采样失败(需要 torch/numpy 支持),src_sr={sr}, target_sr={target_sr}: {e}") from e + + dst.parent.mkdir(parents=True, exist_ok=True) + sf.write(str(dst), data, int(sr), format=fmt.upper() if fmt else None) + + +def _convert_with_pydub( + src: Path, dst: Path, target_sr: int, channels: int, fmt: str, overwrite: bool +) -> None: + audiosegment, sf = _load_audio_backend() + _ = sf + if audiosegment is None: + raise RuntimeError("pydub 不可用,无法使用 pydub 转换") + + if dst.exists() and not overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {dst}") + + audio = audiosegment.from_file(str(src)) + if target_sr and target_sr > 0: + audio = audio.set_frame_rate(int(target_sr)) + if channels == 1: + audio = audio.set_channels(1) + elif channels == 2: + audio = audio.set_channels(2) + dst.parent.mkdir(parents=True, exist_ok=True) + audio.export(str(dst), format=fmt) + + +class AudioFormatConvert(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.target_format = str(kwargs.get("targetFormat", "wav")).strip().lower().lstrip(".") + self.sample_rate = int(float(kwargs.get("sampleRate", 16000))) + self.channels = int(float(kwargs.get("channels", 1))) + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + + src_path = Path(_safe_export_path(sample, self.filepath_key, "")).resolve() + if not src_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {src_path}") + + export_dir = Path(os.path.abspath(_safe_export_path(sample, self.export_path_key, "."))) + export_dir.mkdir(parents=True, exist_ok=True) + + in_stem = Path(_safe_export_path(sample, self.filename_key, src_path.name)).stem + out_name = f"{in_stem}.{self.target_format}" + out_path = export_dir / out_name + + audiosegment, sf = _load_audio_backend() + try: + if audiosegment is not None: + _convert_with_pydub( + src=src_path, + dst=out_path, + target_sr=self.sample_rate, + channels=self.channels, + fmt=self.target_format, + overwrite=self.overwrite, + ) + else: + # soundfile 支持的格式较少,优先兜底 wav/flac/ogg 等 + if sf is None: + raise RuntimeError("pydub/soundfile 均不可用,无法转换") + if self.target_format not in {"wav", "flac", "ogg"}: + raise RuntimeError(f"当前环境无 pydub 时不支持转换到: {self.target_format}") + _convert_with_soundfile( + src=src_path, + dst=out_path, + target_sr=self.sample_rate, + channels=self.channels, + fmt=self.target_format, + overwrite=self.overwrite, + ) + except Exception as e: + # 尝试“同格式复制”作为保底(仅当 target_format 与输入一致) + if src_path.suffix.lower().lstrip(".") == self.target_format: + if out_path.exists() and not self.overwrite: + raise + shutil.copy2(str(src_path), str(out_path)) + else: + raise e + + # 更新 sample 指向新文件 + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = self.target_format + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + + # 避免 BaseOp 误判为空内容 + if not sample.get(self.text_key): + sample[self.text_key] = "converted" + sample[self.data_key] = b"" + + logger.info( + f"fileName: {sample.get(self.filename_key)}, method: AudioFormatConvert costs {time.time() - start:6f} s" + ) + return sample + diff --git a/runtime/ops/mapper/audio_format_convert/requirements.txt b/runtime/ops/mapper/audio_format_convert/requirements.txt new file mode 100644 index 00000000..2734f2ab --- /dev/null +++ b/runtime/ops/mapper/audio_format_convert/requirements.txt @@ -0,0 +1,4 @@ +pydub +soundfile +numpy +torch diff --git a/runtime/ops/mapper/audio_gtcrn_denoise/README.md b/runtime/ops/mapper/audio_gtcrn_denoise/README.md new file mode 100644 index 00000000..b0450efb --- /dev/null +++ b/runtime/ops/mapper/audio_gtcrn_denoise/README.md @@ -0,0 +1,36 @@ +# AudioGtcrnDenoise GTCRN 智能降噪算子 + +## 概述 + +AudioGtcrnDenoise 封装 `audio_preprocessor` 的 GTCRN ONNX 降噪逻辑,对输入音频进行智能降噪处理。算子会将降噪后的音频写入 `export_path`,并更新 `sample` 文件字段,便于后续链路继续处理。 + +## 功能特性 + +- **GTCRN ONNX 推理**:复用 `audio_preprocessor/src/utils/gtcrn_denoise.py` +- **输出文件更新**:写入 `export_path`,更新 `filePath/fileType/fileName/fileSize` +- **覆盖策略**:可选覆盖同名输出 + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| modelPath | input | (空) | GTCRN `.onnx` 模型绝对路径;为空则使用默认路径(若存在) | +| outFormat | select | wav | 输出格式(建议 wav) | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - 输出文件:写入 `sample["export_path"]` + - 更新字段:`sample["filePath"]` / `sample["fileType"]` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`onnxruntime`、`soundfile`、`numpy`、`torch` +- **模型依赖**:需提供 GTCRN ONNX 模型文件(通常放在模型仓并在运行环境挂载) + +## 版本历史 + +- **v1.0.0**:首次发布,支持 GTCRN ONNX 降噪 + diff --git a/runtime/ops/mapper/audio_gtcrn_denoise/__init__.py b/runtime/ops/mapper/audio_gtcrn_denoise/__init__.py new file mode 100644 index 00000000..0aaa897b --- /dev/null +++ b/runtime/ops/mapper/audio_gtcrn_denoise/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioGtcrnDenoise", + module_path="ops.mapper.audio_gtcrn_denoise.process", +) + diff --git a/runtime/ops/mapper/audio_gtcrn_denoise/metadata.yml b/runtime/ops/mapper/audio_gtcrn_denoise/metadata.yml new file mode 100644 index 00000000..02a23638 --- /dev/null +++ b/runtime/ops/mapper/audio_gtcrn_denoise/metadata.yml @@ -0,0 +1,43 @@ +name: 'GTCRN 智能降噪' +name_en: 'GTCRN Denoise' +description: '调用 audio_preprocessor 的 GTCRN ONNX 降噪工具对音频降噪,输出写入 export_path 并更新 filePath。' +description_en: 'Run GTCRN ONNX denoiser from audio_preprocessor, write output to export_path and update filePath.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioGtcrnDenoise' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + modelPath: + name: 'GTCRN 模型路径' + description: 'GTCRN ONNX 模型绝对路径(.onnx)。若留空则使用 audio_preprocessor 默认路径(若存在)。' + type: 'input' + defaultVal: '' + required: false + outFormat: + name: '输出格式' + description: '输出扩展名(建议 wav;GTCRN 默认输出 wav)。' + type: 'select' + defaultVal: 'wav' + required: true + options: + - label: 'wav' + value: 'wav' + overwrite: + name: '覆盖输出' + description: '若 export_path 下已存在同名文件,是否覆盖。' + type: 'switch' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 2147483648 + cpu: 0.5 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_gtcrn_denoise/process.py b/runtime/ops/mapper/audio_gtcrn_denoise/process.py new file mode 100644 index 00000000..58694e75 --- /dev/null +++ b/runtime/ops/mapper/audio_gtcrn_denoise/process.py @@ -0,0 +1,87 @@ +# -- encoding: utf-8 -- + +import os +import time +from pathlib import Path +from typing import Dict, Any + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _repo_root() -> Path: + # .../DataMate/runtime/ops/mapper//process.py -> repo root + return Path(__file__).resolve().parents[6] + + +def _audio_preprocessor_root() -> Path: + return _repo_root() / "audio_preprocessor" + + +class AudioGtcrnDenoise(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.model_path = str(kwargs.get("modelPath", "")).strip() + self.out_format = str(kwargs.get("outFormat", "wav")).strip().lower().lstrip(".") + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + + ap_root = _audio_preprocessor_root() + if not ap_root.exists(): + raise FileNotFoundError(f"audio_preprocessor 不存在: {ap_root}") + + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + export_dir = Path(os.path.abspath(str(sample.get(self.export_path_key, ".")))) + export_dir.mkdir(parents=True, exist_ok=True) + + stem = Path(str(sample.get(self.filename_key, in_path.name))).stem + out_path = export_dir / f"{stem}.{self.out_format}" + if out_path.exists() and not self.overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {out_path}") + + model = Path(self.model_path).expanduser() if self.model_path else (ap_root / "models" / "gtcrn" / "gtcrn.onnx") + model = model.resolve() + if not model.exists(): + raise FileNotFoundError(f"GTCRN ONNX 模型不存在: {model}") + + # 直接调用 audio_preprocessor 的工具函数,避免 subprocess 路径/环境差异 + import sys + + utils_dir = ap_root / "src" / "utils" + if str(utils_dir) not in sys.path: + sys.path.insert(0, str(utils_dir)) + + from gtcrn_denoise import OnnxGtcrnDenoiser, process_one # type: ignore + + denoiser = OnnxGtcrnDenoiser(model) + process_one(in_path, out_path, denoiser) + + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = self.out_format + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "denoised" + + logger.info( + f"fileName: {sample.get(self.filename_key)}, method: AudioGtcrnDenoise costs {time.time() - start:6f} s" + ) + return sample + diff --git a/runtime/ops/mapper/audio_gtcrn_denoise/requirements.txt b/runtime/ops/mapper/audio_gtcrn_denoise/requirements.txt new file mode 100644 index 00000000..d8fcae1a --- /dev/null +++ b/runtime/ops/mapper/audio_gtcrn_denoise/requirements.txt @@ -0,0 +1,4 @@ +onnxruntime +soundfile +numpy +torch diff --git a/runtime/ops/mapper/audio_hum_notch/README.md b/runtime/ops/mapper/audio_hum_notch/README.md new file mode 100644 index 00000000..91f180f4 --- /dev/null +++ b/runtime/ops/mapper/audio_hum_notch/README.md @@ -0,0 +1,36 @@ +# AudioHumNotch 工频陷波算子 + +## 概述 + +AudioHumNotch 用于抑制 50/60Hz 工频哼声,通过陷波滤波器降低电源噪声对语音可懂度的影响。算子会将处理后的音频写入 `export_path` 并更新 `sample` 文件字段。 + +## 功能特性 + +- **工频陷波**:支持 50/60Hz 选择 +- **可调 Q 值**:控制陷波带宽 +- **输出文件更新**:写入 `export_path`,更新 `filePath/fileType/fileName/fileSize` + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| freqHz | select | 50 | 中心频率(Hz):50/60 | +| q | slider | 30 | 品质因数,越大陷波越窄 | +| outFormat | select | wav | 输出格式(扩展名) | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - 输出文件:写入 `sample["export_path"]` + - 更新字段:`sample["filePath"]` / `sample["fileType"]` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`soundfile`、`numpy`、`scipy`(`scipy.signal`) + +## 版本历史 + +- **v1.0.0**:首次发布,支持 50/60Hz 工频陷波 + diff --git a/runtime/ops/mapper/audio_hum_notch/__init__.py b/runtime/ops/mapper/audio_hum_notch/__init__.py new file mode 100644 index 00000000..222716c0 --- /dev/null +++ b/runtime/ops/mapper/audio_hum_notch/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioHumNotch", + module_path="ops.mapper.audio_hum_notch.process", +) + diff --git a/runtime/ops/mapper/audio_hum_notch/metadata.yml b/runtime/ops/mapper/audio_hum_notch/metadata.yml new file mode 100644 index 00000000..cbeda801 --- /dev/null +++ b/runtime/ops/mapper/audio_hum_notch/metadata.yml @@ -0,0 +1,58 @@ +name: '工频陷波' +name_en: 'Hum Notch Filter' +description: '50/60Hz 工频陷波抑制。需要 scipy.signal;写出处理后的 wav 到 export_path 并更新 filePath。' +description_en: 'Notch filter for 50/60Hz hum suppression. Requires scipy.signal; writes processed wav to export_path and updates filePath.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioHumNotch' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + freqHz: + name: '中心频率(Hz)' + type: 'select' + description: '工频中心频率。' + defaultVal: '50' + required: true + options: + - label: '50Hz' + value: '50' + - label: '60Hz' + value: '60' + q: + name: 'Q' + type: 'slider' + description: '陷波品质因数,越大越窄。' + defaultVal: 30 + min: 1 + max: 200 + step: 1 + outFormat: + name: '输出格式' + type: 'select' + description: '输出扩展名(建议 wav)。' + defaultVal: 'wav' + required: true + options: + - label: 'wav' + value: 'wav' + - label: 'flac' + value: 'flac' + overwrite: + name: '覆盖输出' + type: 'switch' + description: '若 export_path 下已存在同名文件,是否覆盖。' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 104857600 + cpu: 0.2 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_hum_notch/process.py b/runtime/ops/mapper/audio_hum_notch/process.py new file mode 100644 index 00000000..4f45eac6 --- /dev/null +++ b/runtime/ops/mapper/audio_hum_notch/process.py @@ -0,0 +1,96 @@ +# -- encoding: utf-8 -- + +import os +import time +from pathlib import Path +from typing import Dict, Any, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _load_audio(path: Path) -> Tuple["object", int]: + try: + import soundfile as sf # type: ignore + + data, sr = sf.read(str(path), always_2d=False) + return data, int(sr) + except Exception as e: + raise RuntimeError(f"读取音频失败(需要 soundfile): {path}, error={e}") from e + + +def _save_audio(path: Path, data: "object", sr: int, fmt: str) -> None: + try: + import soundfile as sf # type: ignore + + path.parent.mkdir(parents=True, exist_ok=True) + sf.write(str(path), data, int(sr), format=fmt.upper() if fmt else None) + except Exception as e: + raise RuntimeError(f"写入音频失败(需要 soundfile): {path}, error={e}") from e + + +class AudioHumNotch(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.freq_hz = float(kwargs.get("freqHz", 50)) + self.q = float(kwargs.get("q", 30)) + self.out_format = str(kwargs.get("outFormat", "wav")).strip().lower().lstrip(".") + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + data, sr = _load_audio(in_path) + try: + import numpy as np + from scipy.signal import iirnotch, lfilter # type: ignore + + x = np.asarray(data, dtype=np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + if x.size == 0: + y = x + else: + w0 = float(self.freq_hz) / (float(sr) / 2.0) + b, a = iirnotch(w0, float(self.q)) + y = lfilter(b, a, x).astype(np.float32) + y = np.clip(y, -1.0, 1.0) + except ImportError as e: + raise RuntimeError("AudioHumNotch 需要 scipy.signal(iirnotch/lfilter)") from e + except Exception as e: + raise RuntimeError(f"处理失败: {e}") from e + + export_dir = Path(os.path.abspath(str(sample.get(self.export_path_key, ".")))) + export_dir.mkdir(parents=True, exist_ok=True) + stem = Path(str(sample.get(self.filename_key, in_path.name))).stem + out_path = export_dir / f"{stem}.{self.out_format}" + if out_path.exists() and not self.overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {out_path}") + + _save_audio(out_path, y, sr, self.out_format) + + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = self.out_format + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "processed" + + logger.info(f"fileName: {sample.get(self.filename_key)}, method: AudioHumNotch costs {time.time() - start:6f} s") + return sample + diff --git a/runtime/ops/mapper/audio_hum_notch/requirements.txt b/runtime/ops/mapper/audio_hum_notch/requirements.txt new file mode 100644 index 00000000..843a926a --- /dev/null +++ b/runtime/ops/mapper/audio_hum_notch/requirements.txt @@ -0,0 +1,3 @@ +soundfile +numpy +scipy diff --git a/runtime/ops/mapper/audio_noise_gate/README.md b/runtime/ops/mapper/audio_noise_gate/README.md new file mode 100644 index 00000000..14b6abab --- /dev/null +++ b/runtime/ops/mapper/audio_noise_gate/README.md @@ -0,0 +1,38 @@ +# AudioNoiseGate 噪声门算子 + +## 概述 + +AudioNoiseGate 基于短时 RMS 做噪声门控:当帧能量低于阈值时,按 `floorRatio` 衰减该帧信号。阈值使用“相对全段峰值”的 dB 表达,适合远场/底噪场景的轻量抑噪。 + +## 功能特性 + +- **门控抑噪**:低能量帧按比例衰减 +- **阈值表达直观**:阈值为相对峰值的 dB +- **输出文件更新**:写入 `export_path`,更新 `filePath/fileType/fileName/fileSize` + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| thresholdDb | slider | -45 | 门限(dB,相对全段峰值) | +| frameMs | inputNumber | 20 | 帧长(ms) | +| hopMs | inputNumber | 10 | 帧移(ms) | +| floorRatio | slider | 0.05 | 门控时保留能量比例(0~1) | +| outFormat | select | wav | 输出格式(扩展名) | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - 输出文件:写入 `sample["export_path"]` + - 更新字段:`sample["filePath"]` / `sample["fileType"]` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`soundfile`、`numpy` + +## 版本历史 + +- **v1.0.0**:首次发布,支持噪声门控 + diff --git a/runtime/ops/mapper/audio_noise_gate/__init__.py b/runtime/ops/mapper/audio_noise_gate/__init__.py new file mode 100644 index 00000000..c14588df --- /dev/null +++ b/runtime/ops/mapper/audio_noise_gate/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioNoiseGate", + module_path="ops.mapper.audio_noise_gate.process", +) + diff --git a/runtime/ops/mapper/audio_noise_gate/metadata.yml b/runtime/ops/mapper/audio_noise_gate/metadata.yml new file mode 100644 index 00000000..9ce448e0 --- /dev/null +++ b/runtime/ops/mapper/audio_noise_gate/metadata.yml @@ -0,0 +1,71 @@ +name: '噪声门' +name_en: 'Noise Gate' +description: '短时 RMS 低于阈值时按 floor_ratio 衰减(相对全段峰值 dB)。写出处理后的 wav 到 export_path 并更新 filePath。' +description_en: 'Attenuate frames whose RMS below threshold (dB relative to peak) by floor_ratio. Writes processed wav to export_path and updates filePath.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioNoiseGate' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + thresholdDb: + name: '门限(dB)' + type: 'slider' + description: '相对全段峰值的门限(dB),越小越“宽松”。' + defaultVal: -45 + min: -80 + max: 0 + step: 1 + frameMs: + name: '帧长(ms)' + type: 'inputNumber' + description: '分析帧长。' + defaultVal: 20 + min: 5 + max: 200 + step: 1 + hopMs: + name: '帧移(ms)' + type: 'inputNumber' + description: '帧移。' + defaultVal: 10 + min: 1 + max: 200 + step: 1 + floorRatio: + name: '衰减比例' + type: 'slider' + description: '门控时保留能量比例(0~1)。' + defaultVal: 0.05 + min: 0 + max: 1 + step: 0.01 + outFormat: + name: '输出格式' + type: 'select' + description: '输出扩展名(建议 wav)。' + defaultVal: 'wav' + required: true + options: + - label: 'wav' + value: 'wav' + - label: 'flac' + value: 'flac' + overwrite: + name: '覆盖输出' + type: 'switch' + description: '若 export_path 下已存在同名文件,是否覆盖。' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 104857600 + cpu: 0.15 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_noise_gate/process.py b/runtime/ops/mapper/audio_noise_gate/process.py new file mode 100644 index 00000000..ee1454f5 --- /dev/null +++ b/runtime/ops/mapper/audio_noise_gate/process.py @@ -0,0 +1,103 @@ +# -- encoding: utf-8 -- + +import os +import time +from pathlib import Path +from typing import Dict, Any, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _load_audio(path: Path) -> Tuple["object", int]: + try: + import soundfile as sf # type: ignore + + data, sr = sf.read(str(path), always_2d=False) + return data, int(sr) + except Exception as e: + raise RuntimeError(f"读取音频失败(需要 soundfile): {path}, error={e}") from e + + +def _save_audio(path: Path, data: "object", sr: int, fmt: str) -> None: + try: + import soundfile as sf # type: ignore + + path.parent.mkdir(parents=True, exist_ok=True) + sf.write(str(path), data, int(sr), format=fmt.upper() if fmt else None) + except Exception as e: + raise RuntimeError(f"写入音频失败(需要 soundfile): {path}, error={e}") from e + + +class AudioNoiseGate(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.threshold_db = float(kwargs.get("thresholdDb", -45)) + self.frame_ms = float(kwargs.get("frameMs", 20)) + self.hop_ms = float(kwargs.get("hopMs", 10)) + self.floor_ratio = float(kwargs.get("floorRatio", 0.05)) + self.out_format = str(kwargs.get("outFormat", "wav")).strip().lower().lstrip(".") + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + data, sr = _load_audio(in_path) + try: + import numpy as np + + x = np.asarray(data, dtype=np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + if x.size == 0: + y = x + else: + peak = float(np.max(np.abs(x))) + 1e-12 + th = peak * (10.0 ** (float(self.threshold_db) / 20.0)) + frame_len = max(1, int(sr * self.frame_ms / 1000.0)) + hop = max(1, int(sr * self.hop_ms / 1000.0)) + y = x.copy() + for st in range(0, len(x), hop): + ed = min(st + frame_len, len(x)) + frame = x[st:ed] + rms = float(np.sqrt(np.mean(frame * frame) + 1e-12)) + if rms < th: + y[st:ed] = y[st:ed] * float(self.floor_ratio) + y = np.clip(y, -1.0, 1.0) + except Exception as e: + raise RuntimeError(f"处理失败(需要 numpy): {e}") from e + + export_dir = Path(os.path.abspath(str(sample.get(self.export_path_key, ".")))) + export_dir.mkdir(parents=True, exist_ok=True) + stem = Path(str(sample.get(self.filename_key, in_path.name))).stem + out_path = export_dir / f"{stem}.{self.out_format}" + if out_path.exists() and not self.overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {out_path}") + + _save_audio(out_path, y, sr, self.out_format) + + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = self.out_format + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "processed" + + logger.info(f"fileName: {sample.get(self.filename_key)}, method: AudioNoiseGate costs {time.time() - start:6f} s") + return sample + diff --git a/runtime/ops/mapper/audio_noise_gate/requirements.txt b/runtime/ops/mapper/audio_noise_gate/requirements.txt new file mode 100644 index 00000000..17e9d57d --- /dev/null +++ b/runtime/ops/mapper/audio_noise_gate/requirements.txt @@ -0,0 +1,2 @@ +soundfile +numpy diff --git a/runtime/ops/mapper/audio_pre_emphasis/README.md b/runtime/ops/mapper/audio_pre_emphasis/README.md new file mode 100644 index 00000000..a36271ef --- /dev/null +++ b/runtime/ops/mapper/audio_pre_emphasis/README.md @@ -0,0 +1,35 @@ +# AudioPreEmphasis 预加重算子 + +## 概述 + +AudioPreEmphasis 对音频做一阶预加重滤波(\(y[n]=x[n]-coef \cdot x[n-1]\)),常用于 ASR 前端增强高频信息。算子会将处理后的音频写入 `export_path` 并更新 `sample` 文件字段。 + +## 功能特性 + +- **一阶预加重**:系数可配置 +- **输出文件更新**:写入 `export_path`,更新 `filePath/fileType/fileName/fileSize` +- **覆盖策略**:可选覆盖同名输出 + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| coef | slider | 0.97 | 预加重系数(常用 0.9~0.99) | +| outFormat | select | wav | 输出格式(扩展名) | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - 输出文件:写入 `sample["export_path"]` + - 更新字段:`sample["filePath"]` / `sample["fileType"]` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`soundfile`、`numpy` + +## 版本历史 + +- **v1.0.0**:首次发布,支持一阶预加重 + diff --git a/runtime/ops/mapper/audio_pre_emphasis/__init__.py b/runtime/ops/mapper/audio_pre_emphasis/__init__.py new file mode 100644 index 00000000..fdf03b14 --- /dev/null +++ b/runtime/ops/mapper/audio_pre_emphasis/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioPreEmphasis", + module_path="ops.mapper.audio_pre_emphasis.process", +) + diff --git a/runtime/ops/mapper/audio_pre_emphasis/metadata.yml b/runtime/ops/mapper/audio_pre_emphasis/metadata.yml new file mode 100644 index 00000000..a9cb641a --- /dev/null +++ b/runtime/ops/mapper/audio_pre_emphasis/metadata.yml @@ -0,0 +1,47 @@ +name: '预加重' +name_en: 'Pre-Emphasis' +description: '一阶预加重滤波 \(y[n]=x[n]-coef*x[n-1]\)。写出处理后的 wav 到 export_path 并更新 filePath。' +description_en: 'First-order pre-emphasis \(y[n]=x[n]-coef*x[n-1]\). Writes processed wav to export_path and updates filePath.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioPreEmphasis' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + coef: + name: '预加重系数' + type: 'slider' + description: '常用范围 0.9~0.99。' + defaultVal: 0.97 + min: 0 + max: 0.999 + step: 0.001 + outFormat: + name: '输出格式' + type: 'select' + description: '输出扩展名(建议 wav)。' + defaultVal: 'wav' + required: true + options: + - label: 'wav' + value: 'wav' + - label: 'flac' + value: 'flac' + overwrite: + name: '覆盖输出' + type: 'switch' + description: '若 export_path 下已存在同名文件,是否覆盖。' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 104857600 + cpu: 0.1 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_pre_emphasis/process.py b/runtime/ops/mapper/audio_pre_emphasis/process.py new file mode 100644 index 00000000..d61310bc --- /dev/null +++ b/runtime/ops/mapper/audio_pre_emphasis/process.py @@ -0,0 +1,91 @@ +# -- encoding: utf-8 -- + +import os +import time +from pathlib import Path +from typing import Dict, Any, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _load_audio(path: Path) -> Tuple["object", int]: + try: + import soundfile as sf # type: ignore + + data, sr = sf.read(str(path), always_2d=False) + return data, int(sr) + except Exception as e: + raise RuntimeError(f"读取音频失败(需要 soundfile): {path}, error={e}") from e + + +def _save_audio(path: Path, data: "object", sr: int, fmt: str) -> None: + try: + import soundfile as sf # type: ignore + + path.parent.mkdir(parents=True, exist_ok=True) + sf.write(str(path), data, int(sr), format=fmt.upper() if fmt else None) + except Exception as e: + raise RuntimeError(f"写入音频失败(需要 soundfile): {path}, error={e}") from e + + +class AudioPreEmphasis(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.coef = float(kwargs.get("coef", 0.97)) + self.out_format = str(kwargs.get("outFormat", "wav")).strip().lower().lstrip(".") + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + data, sr = _load_audio(in_path) + try: + import numpy as np + + x = np.asarray(data, dtype=np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + if x.size == 0: + y = x + else: + y = np.empty_like(x) + y[0] = x[0] + y[1:] = x[1:] - float(self.coef) * x[:-1] + except Exception as e: + raise RuntimeError(f"处理失败(需要 numpy): {e}") from e + + export_dir = Path(os.path.abspath(str(sample.get(self.export_path_key, ".")))) + export_dir.mkdir(parents=True, exist_ok=True) + stem = Path(str(sample.get(self.filename_key, in_path.name))).stem + out_path = export_dir / f"{stem}.{self.out_format}" + if out_path.exists() and not self.overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {out_path}") + + _save_audio(out_path, y, sr, self.out_format) + + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = self.out_format + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "processed" + + logger.info(f"fileName: {sample.get(self.filename_key)}, method: AudioPreEmphasis costs {time.time() - start:6f} s") + return sample + diff --git a/runtime/ops/mapper/audio_pre_emphasis/requirements.txt b/runtime/ops/mapper/audio_pre_emphasis/requirements.txt new file mode 100644 index 00000000..17e9d57d --- /dev/null +++ b/runtime/ops/mapper/audio_pre_emphasis/requirements.txt @@ -0,0 +1,2 @@ +soundfile +numpy diff --git a/runtime/ops/mapper/audio_quantize_encode/README.md b/runtime/ops/mapper/audio_quantize_encode/README.md new file mode 100644 index 00000000..695679a6 --- /dev/null +++ b/runtime/ops/mapper/audio_quantize_encode/README.md @@ -0,0 +1,37 @@ +# AudioQuantizeEncode 量化编码与重采样算子 + +## 概述 + +AudioQuantizeEncode 用于将音频重采样到指定采样率,并编码为 WAV PCM(8/16/24/32 bit)。输出文件写入 `export_path`,并更新 `sample` 中的文件路径与类型字段,便于后续算子继续处理或直接导出。 + +## 功能特性 + +- **重采样**:支持转换到指定采样率(Hz) +- **量化编码**:支持 WAV PCM 8/16/24/32 bit +- **声道转换**:支持 mono/stereo(或保持原声道) +- **覆盖策略**:可选覆盖同名输出 + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| sampleRate | inputNumber | 16000 | 目标采样率(Hz),0=保持原采样率 | +| bitDepth | select | 16 | WAV PCM 位深:8/16/24/32 | +| channels | inputNumber | 1 | 目标声道数:1/2,0=保持 | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - 输出文件:写入 `sample["export_path"]`(输出扩展名固定为 `.wav`) + - 更新字段:`sample["filePath"]` / `sample["fileType"]="wav"` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`soundfile`、`numpy`、`torch`(用于重采样实现) + +## 版本历史 + +- **v1.0.0**:首次发布,支持重采样与 WAV PCM 量化编码 + diff --git a/runtime/ops/mapper/audio_quantize_encode/__init__.py b/runtime/ops/mapper/audio_quantize_encode/__init__.py new file mode 100644 index 00000000..104c3a40 --- /dev/null +++ b/runtime/ops/mapper/audio_quantize_encode/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioQuantizeEncode", + module_path="ops.mapper.audio_quantize_encode.process", +) + diff --git a/runtime/ops/mapper/audio_quantize_encode/metadata.yml b/runtime/ops/mapper/audio_quantize_encode/metadata.yml new file mode 100644 index 00000000..b974f91f --- /dev/null +++ b/runtime/ops/mapper/audio_quantize_encode/metadata.yml @@ -0,0 +1,59 @@ +name: '量化编码与重采样' +name_en: 'Quantize Encode & Resample' +description: '将音频重采样到指定采样率,并量化编码为 8/16/24/32-bit PCM(WAV)。输出写入 export_path 并更新 filePath。' +description_en: 'Resample audio to target sample rate and encode as 8/16/24/32-bit PCM WAV. Writes output to export_path and updates filePath.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioQuantizeEncode' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + sampleRate: + name: '采样率(Hz)' + description: '目标采样率(Hz)。0 表示保持原采样率。' + type: 'inputNumber' + defaultVal: 16000 + min: 0 + max: 192000 + step: 1 + bitDepth: + name: '位深(bit)' + description: 'WAV PCM 位深:8/16/24/32。' + type: 'select' + defaultVal: '16' + required: true + options: + - label: '8-bit PCM' + value: '8' + - label: '16-bit PCM' + value: '16' + - label: '24-bit PCM' + value: '24' + - label: '32-bit PCM' + value: '32' + channels: + name: '声道数' + description: '目标声道数:1=单声道,2=双声道,0=保持原声道。' + type: 'inputNumber' + defaultVal: 1 + min: 0 + max: 2 + step: 1 + overwrite: + name: '覆盖输出' + description: '若 export_path 下已存在同名文件,是否覆盖。' + type: 'switch' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 268435456 + cpu: 0.3 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_quantize_encode/process.py b/runtime/ops/mapper/audio_quantize_encode/process.py new file mode 100644 index 00000000..3961c219 --- /dev/null +++ b/runtime/ops/mapper/audio_quantize_encode/process.py @@ -0,0 +1,121 @@ +# -- encoding: utf-8 -- + +import os +import time +from pathlib import Path +from typing import Dict, Any, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _load_audio(path: Path) -> Tuple["object", int]: + try: + import soundfile as sf # type: ignore + + data, sr = sf.read(str(path), always_2d=True) # (T, C) + return data, int(sr) + except Exception as e: + raise RuntimeError(f"读取音频失败(需要 soundfile): {path}, error={e}") from e + + +def _save_wav_pcm(path: Path, data: "object", sr: int, subtype: str) -> None: + try: + import soundfile as sf # type: ignore + + path.parent.mkdir(parents=True, exist_ok=True) + sf.write(str(path), data, int(sr), format="WAV", subtype=subtype) + except Exception as e: + raise RuntimeError(f"写入 WAV 失败(需要 soundfile,subtype={subtype}): {path}, error={e}") from e + + +def _resample_linear(data: "object", src_sr: int, tgt_sr: int) -> "object": + if src_sr <= 0 or tgt_sr <= 0 or int(src_sr) == int(tgt_sr): + return data + try: + import numpy as np + import torch # type: ignore + + x = np.asarray(data, dtype=np.float32) # (T, C) + if x.ndim != 2: + x = x.reshape((-1, 1)) + # (1, C, T) + xt = torch.from_numpy(x).transpose(0, 1).unsqueeze(0) + new_len = int(round(xt.shape[-1] * float(tgt_sr) / float(src_sr))) + yt = torch.nn.functional.interpolate(xt, size=new_len, mode="linear", align_corners=False) + y = yt.squeeze(0).transpose(0, 1).cpu().numpy().astype(np.float32) + return y + except Exception as e: + raise RuntimeError(f"重采样失败(需要 torch/numpy): {e}") from e + + +class AudioQuantizeEncode(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sample_rate = int(float(kwargs.get("sampleRate", 16000))) + self.bit_depth = int(float(kwargs.get("bitDepth", 16))) + self.channels = int(float(kwargs.get("channels", 1))) + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + data, sr = _load_audio(in_path) # (T, C) + try: + import numpy as np + + x = np.asarray(data, dtype=np.float32) + if self.channels == 1 and x.shape[1] > 1: + x = x.mean(axis=1, keepdims=True) + elif self.channels == 2 and x.shape[1] == 1: + x = x.repeat(2, axis=1) + x = _resample_linear(x, sr, self.sample_rate) if self.sample_rate > 0 else x + out_sr = int(self.sample_rate) if self.sample_rate > 0 else int(sr) + except Exception as e: + raise RuntimeError(f"预处理失败: {e}") from e + + subtype_map = { + 8: "PCM_U8", + 16: "PCM_16", + 24: "PCM_24", + 32: "PCM_32", + } + if self.bit_depth not in subtype_map: + raise ValueError(f"不支持的 bitDepth: {self.bit_depth},仅支持 8/16/24/32") + + export_dir = Path(os.path.abspath(str(sample.get(self.export_path_key, ".")))) + export_dir.mkdir(parents=True, exist_ok=True) + stem = Path(str(sample.get(self.filename_key, in_path.name))).stem + out_path = export_dir / f"{stem}.wav" + if out_path.exists() and not self.overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {out_path}") + + _save_wav_pcm(out_path, x, out_sr, subtype=subtype_map[self.bit_depth]) + + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = "wav" + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "encoded" + + logger.info( + f"fileName: {sample.get(self.filename_key)}, method: AudioQuantizeEncode costs {time.time() - start:6f} s" + ) + return sample + diff --git a/runtime/ops/mapper/audio_quantize_encode/requirements.txt b/runtime/ops/mapper/audio_quantize_encode/requirements.txt new file mode 100644 index 00000000..7f5bc1c2 --- /dev/null +++ b/runtime/ops/mapper/audio_quantize_encode/requirements.txt @@ -0,0 +1,3 @@ +soundfile +numpy +torch diff --git a/runtime/ops/mapper/audio_rms_loudness_normalize/README.md b/runtime/ops/mapper/audio_rms_loudness_normalize/README.md new file mode 100644 index 00000000..d11c1718 --- /dev/null +++ b/runtime/ops/mapper/audio_rms_loudness_normalize/README.md @@ -0,0 +1,36 @@ +# AudioRmsLoudnessNormalize 整段 RMS 归一与峰值顶限算子 + +## 概述 + +AudioRmsLoudnessNormalize 先将整段音频 RMS 归一到目标值,再按峰值顶限缩放,避免归一化后出现过大峰值导致削波。适合播客/内容生产等需要统一响度的场景。 + +## 功能特性 + +- **整段 RMS 归一**:对齐到 `targetRms` +- **峰值顶限**:按 `peakCeiling` 限制峰值 +- **输出文件更新**:写入 `export_path`,更新 `filePath/fileType/fileName/fileSize` + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| targetRms | slider | 0.08 | 目标 RMS(线性) | +| peakCeiling | slider | 0.99 | 峰值顶限(0~1) | +| outFormat | select | wav | 输出格式(扩展名) | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - 输出文件:写入 `sample["export_path"]` + - 更新字段:`sample["filePath"]` / `sample["fileType"]` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`soundfile`、`numpy` + +## 版本历史 + +- **v1.0.0**:首次发布,支持 RMS 归一与峰值顶限 + diff --git a/runtime/ops/mapper/audio_rms_loudness_normalize/__init__.py b/runtime/ops/mapper/audio_rms_loudness_normalize/__init__.py new file mode 100644 index 00000000..c3adf284 --- /dev/null +++ b/runtime/ops/mapper/audio_rms_loudness_normalize/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioRmsLoudnessNormalize", + module_path="ops.mapper.audio_rms_loudness_normalize.process", +) + diff --git a/runtime/ops/mapper/audio_rms_loudness_normalize/metadata.yml b/runtime/ops/mapper/audio_rms_loudness_normalize/metadata.yml new file mode 100644 index 00000000..1bfe6d3f --- /dev/null +++ b/runtime/ops/mapper/audio_rms_loudness_normalize/metadata.yml @@ -0,0 +1,55 @@ +name: '整段RMS归一 + 峰值顶限' +name_en: 'RMS Loudness Normalize' +description: '将整段 RMS 对齐到目标,再按峰值顶限缩放。写出处理后的 wav 到 export_path 并更新 filePath。' +description_en: 'Normalize full-utterance RMS to target and apply peak ceiling. Writes processed wav to export_path and updates filePath.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioRmsLoudnessNormalize' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + targetRms: + name: '目标RMS' + type: 'slider' + description: '线性 RMS(0~1),越大越响。' + defaultVal: 0.08 + min: 0.001 + max: 0.5 + step: 0.001 + peakCeiling: + name: '峰值顶限' + type: 'slider' + description: '峰值限制(0~1)。' + defaultVal: 0.99 + min: 0.1 + max: 1 + step: 0.01 + outFormat: + name: '输出格式' + type: 'select' + description: '输出扩展名(建议 wav)。' + defaultVal: 'wav' + required: true + options: + - label: 'wav' + value: 'wav' + - label: 'flac' + value: 'flac' + overwrite: + name: '覆盖输出' + type: 'switch' + description: '若 export_path 下已存在同名文件,是否覆盖。' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 104857600 + cpu: 0.12 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_rms_loudness_normalize/process.py b/runtime/ops/mapper/audio_rms_loudness_normalize/process.py new file mode 100644 index 00000000..f1774aec --- /dev/null +++ b/runtime/ops/mapper/audio_rms_loudness_normalize/process.py @@ -0,0 +1,100 @@ +# -- encoding: utf-8 -- + +import os +import time +from pathlib import Path +from typing import Dict, Any, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _load_audio(path: Path) -> Tuple["object", int]: + try: + import soundfile as sf # type: ignore + + data, sr = sf.read(str(path), always_2d=False) + return data, int(sr) + except Exception as e: + raise RuntimeError(f"读取音频失败(需要 soundfile): {path}, error={e}") from e + + +def _save_audio(path: Path, data: "object", sr: int, fmt: str) -> None: + try: + import soundfile as sf # type: ignore + + path.parent.mkdir(parents=True, exist_ok=True) + sf.write(str(path), data, int(sr), format=fmt.upper() if fmt else None) + except Exception as e: + raise RuntimeError(f"写入音频失败(需要 soundfile): {path}, error={e}") from e + + +class AudioRmsLoudnessNormalize(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.target_rms = float(kwargs.get("targetRms", 0.08)) + self.peak_ceiling = float(kwargs.get("peakCeiling", 0.99)) + self.out_format = str(kwargs.get("outFormat", "wav")).strip().lower().lstrip(".") + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + data, sr = _load_audio(in_path) + try: + import numpy as np + + x = np.asarray(data, dtype=np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + if x.size == 0: + y = x + else: + eps = 1e-8 + rms = float(np.sqrt(np.mean(x * x) + eps)) + g = float(self.target_rms) / max(eps, rms) + y = x * g + peak = float(np.max(np.abs(y)) + eps) + ceiling = max(1e-6, min(1.0, float(self.peak_ceiling))) + if peak > ceiling: + y = y * (ceiling / peak) + y = np.clip(y, -1.0, 1.0) + except Exception as e: + raise RuntimeError(f"处理失败(需要 numpy): {e}") from e + + export_dir = Path(os.path.abspath(str(sample.get(self.export_path_key, ".")))) + export_dir.mkdir(parents=True, exist_ok=True) + stem = Path(str(sample.get(self.filename_key, in_path.name))).stem + out_path = export_dir / f"{stem}.{self.out_format}" + if out_path.exists() and not self.overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {out_path}") + + _save_audio(out_path, y, sr, self.out_format) + + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = self.out_format + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "processed" + + logger.info( + f"fileName: {sample.get(self.filename_key)}, method: AudioRmsLoudnessNormalize costs {time.time() - start:6f} s" + ) + return sample + diff --git a/runtime/ops/mapper/audio_rms_loudness_normalize/requirements.txt b/runtime/ops/mapper/audio_rms_loudness_normalize/requirements.txt new file mode 100644 index 00000000..17e9d57d --- /dev/null +++ b/runtime/ops/mapper/audio_rms_loudness_normalize/requirements.txt @@ -0,0 +1,2 @@ +soundfile +numpy diff --git a/runtime/ops/mapper/audio_simple_agc/README.md b/runtime/ops/mapper/audio_simple_agc/README.md new file mode 100644 index 00000000..b6d4b4ee --- /dev/null +++ b/runtime/ops/mapper/audio_simple_agc/README.md @@ -0,0 +1,38 @@ +# AudioSimpleAgc 分段 RMS 自动增益算子 + +## 概述 + +AudioSimpleAgc 按帧估计 RMS,并将电平拉向目标 RMS,同时限制最大增益,避免过度放大噪声。算子输出写入 `export_path` 并更新 `sample` 文件字段。 + +## 功能特性 + +- **帧级 AGC**:按 `frameMs/hopMs` 分析并调节增益 +- **增益限制**:通过 `maxGain` 限制放大倍数 +- **输出文件更新**:写入 `export_path`,更新 `filePath/fileType/fileName/fileSize` + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| targetRms | slider | 0.05 | 目标 RMS(线性) | +| frameMs | inputNumber | 50 | 帧长(ms) | +| hopMs | inputNumber | 25 | 帧移(ms) | +| maxGain | slider | 10 | 最大线性增益 | +| outFormat | select | wav | 输出格式(扩展名) | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - 输出文件:写入 `sample["export_path"]` + - 更新字段:`sample["filePath"]` / `sample["fileType"]` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`soundfile`、`numpy` + +## 版本历史 + +- **v1.0.0**:首次发布,支持分段 RMS AGC + diff --git a/runtime/ops/mapper/audio_simple_agc/__init__.py b/runtime/ops/mapper/audio_simple_agc/__init__.py new file mode 100644 index 00000000..5bbd25ac --- /dev/null +++ b/runtime/ops/mapper/audio_simple_agc/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioSimpleAgc", + module_path="ops.mapper.audio_simple_agc.process", +) + diff --git a/runtime/ops/mapper/audio_simple_agc/metadata.yml b/runtime/ops/mapper/audio_simple_agc/metadata.yml new file mode 100644 index 00000000..95cc0f68 --- /dev/null +++ b/runtime/ops/mapper/audio_simple_agc/metadata.yml @@ -0,0 +1,71 @@ +name: '分段RMS自动增益' +name_en: 'Simple AGC (RMS)' +description: '按帧估计 RMS,将电平拉向目标并限制最大增益。写出处理后的 wav 到 export_path 并更新 filePath。' +description_en: 'Frame-wise RMS AGC towards target RMS with max gain limit. Writes processed wav to export_path and updates filePath.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioSimpleAgc' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + targetRms: + name: '目标RMS' + type: 'slider' + description: '线性 RMS,越大越响。' + defaultVal: 0.05 + min: 0.001 + max: 0.5 + step: 0.001 + frameMs: + name: '帧长(ms)' + type: 'inputNumber' + description: '分析帧长。' + defaultVal: 50 + min: 5 + max: 500 + step: 1 + hopMs: + name: '帧移(ms)' + type: 'inputNumber' + description: '帧移。' + defaultVal: 25 + min: 1 + max: 500 + step: 1 + maxGain: + name: '最大增益(线性)' + type: 'slider' + description: '限制增益,避免过度放大噪声。' + defaultVal: 10 + min: 1 + max: 50 + step: 0.5 + outFormat: + name: '输出格式' + type: 'select' + description: '输出扩展名(建议 wav)。' + defaultVal: 'wav' + required: true + options: + - label: 'wav' + value: 'wav' + - label: 'flac' + value: 'flac' + overwrite: + name: '覆盖输出' + type: 'switch' + description: '若 export_path 下已存在同名文件,是否覆盖。' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 104857600 + cpu: 0.15 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_simple_agc/process.py b/runtime/ops/mapper/audio_simple_agc/process.py new file mode 100644 index 00000000..891c5297 --- /dev/null +++ b/runtime/ops/mapper/audio_simple_agc/process.py @@ -0,0 +1,104 @@ +# -- encoding: utf-8 -- + +import os +import time +from pathlib import Path +from typing import Dict, Any, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _load_audio(path: Path) -> Tuple["object", int]: + try: + import soundfile as sf # type: ignore + + data, sr = sf.read(str(path), always_2d=False) + return data, int(sr) + except Exception as e: + raise RuntimeError(f"读取音频失败(需要 soundfile): {path}, error={e}") from e + + +def _save_audio(path: Path, data: "object", sr: int, fmt: str) -> None: + try: + import soundfile as sf # type: ignore + + path.parent.mkdir(parents=True, exist_ok=True) + sf.write(str(path), data, int(sr), format=fmt.upper() if fmt else None) + except Exception as e: + raise RuntimeError(f"写入音频失败(需要 soundfile): {path}, error={e}") from e + + +class AudioSimpleAgc(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.target_rms = float(kwargs.get("targetRms", 0.05)) + self.frame_ms = float(kwargs.get("frameMs", 50)) + self.hop_ms = float(kwargs.get("hopMs", 25)) + self.max_gain = float(kwargs.get("maxGain", 10)) + self.out_format = str(kwargs.get("outFormat", "wav")).strip().lower().lstrip(".") + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + data, sr = _load_audio(in_path) + try: + import numpy as np + + x = np.asarray(data, dtype=np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + if x.size == 0: + y = x + else: + frame_len = max(1, int(sr * self.frame_ms / 1000.0)) + hop = max(1, int(sr * self.hop_ms / 1000.0)) + y = x.copy() + eps = 1e-8 + for st in range(0, len(x), hop): + ed = min(st + frame_len, len(x)) + frame = x[st:ed] + rms = float(np.sqrt(np.mean(frame * frame) + eps)) + g = float(self.target_rms) / max(eps, rms) + g = max(1.0 / max(1.0, self.max_gain), min(float(self.max_gain), g)) + y[st:ed] = y[st:ed] * g + # 简单防爆:限制到 [-1,1] + y = np.clip(y, -1.0, 1.0) + except Exception as e: + raise RuntimeError(f"处理失败(需要 numpy): {e}") from e + + export_dir = Path(os.path.abspath(str(sample.get(self.export_path_key, ".")))) + export_dir.mkdir(parents=True, exist_ok=True) + stem = Path(str(sample.get(self.filename_key, in_path.name))).stem + out_path = export_dir / f"{stem}.{self.out_format}" + if out_path.exists() and not self.overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {out_path}") + + _save_audio(out_path, y, sr, self.out_format) + + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = self.out_format + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "processed" + + logger.info(f"fileName: {sample.get(self.filename_key)}, method: AudioSimpleAgc costs {time.time() - start:6f} s") + return sample + diff --git a/runtime/ops/mapper/audio_simple_agc/requirements.txt b/runtime/ops/mapper/audio_simple_agc/requirements.txt new file mode 100644 index 00000000..17e9d57d --- /dev/null +++ b/runtime/ops/mapper/audio_simple_agc/requirements.txt @@ -0,0 +1,2 @@ +soundfile +numpy diff --git a/runtime/ops/mapper/audio_soft_peak_limiter/README.md b/runtime/ops/mapper/audio_soft_peak_limiter/README.md new file mode 100644 index 00000000..743a38be --- /dev/null +++ b/runtime/ops/mapper/audio_soft_peak_limiter/README.md @@ -0,0 +1,36 @@ +# AudioSoftPeakLimiter 软限幅算子 + +## 概述 + +AudioSoftPeakLimiter 对峰值做软限幅(tanh 近似压缩),用于减轻硬削波与爆音风险,常用于播客/通话等内容生产链路的轻量动态控制。 + +## 功能特性 + +- **软饱和压缩**:对超过阈值的部分平滑压缩 +- **参数可调**:`threshold` 控制线性区阈值,`knee` 控制过渡宽度 +- **输出文件更新**:写入 `export_path`,更新 `filePath/fileType/fileName/fileSize` + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| threshold | slider | 0.92 | 线性区阈值(0~1) | +| knee | slider | 0.08 | 过渡宽度(0~1),越大越柔和 | +| outFormat | select | wav | 输出格式(扩展名) | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - 输出文件:写入 `sample["export_path"]` + - 更新字段:`sample["filePath"]` / `sample["fileType"]` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`soundfile`、`numpy` + +## 版本历史 + +- **v1.0.0**:首次发布,支持软限幅处理 + diff --git a/runtime/ops/mapper/audio_soft_peak_limiter/__init__.py b/runtime/ops/mapper/audio_soft_peak_limiter/__init__.py new file mode 100644 index 00000000..358e4dd1 --- /dev/null +++ b/runtime/ops/mapper/audio_soft_peak_limiter/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioSoftPeakLimiter", + module_path="ops.mapper.audio_soft_peak_limiter.process", +) + diff --git a/runtime/ops/mapper/audio_soft_peak_limiter/metadata.yml b/runtime/ops/mapper/audio_soft_peak_limiter/metadata.yml new file mode 100644 index 00000000..a3248082 --- /dev/null +++ b/runtime/ops/mapper/audio_soft_peak_limiter/metadata.yml @@ -0,0 +1,55 @@ +name: '软限幅' +name_en: 'Soft Peak Limiter' +description: '软饱和限制峰值(tanh 近似),减轻硬削波。写出处理后的 wav 到 export_path 并更新 filePath。' +description_en: 'Soft limiting using tanh-like saturation to reduce clipping. Writes processed wav to export_path and updates filePath.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioSoftPeakLimiter' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + threshold: + name: '阈值' + type: 'slider' + description: '线性区阈值(0~1)。' + defaultVal: 0.92 + min: 0.1 + max: 1 + step: 0.01 + knee: + name: 'knee' + type: 'slider' + description: '过渡宽度(0~1),越大越柔和。' + defaultVal: 0.08 + min: 0 + max: 1 + step: 0.01 + outFormat: + name: '输出格式' + type: 'select' + description: '输出扩展名(建议 wav)。' + defaultVal: 'wav' + required: true + options: + - label: 'wav' + value: 'wav' + - label: 'flac' + value: 'flac' + overwrite: + name: '覆盖输出' + type: 'switch' + description: '若 export_path 下已存在同名文件,是否覆盖。' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 104857600 + cpu: 0.12 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_soft_peak_limiter/process.py b/runtime/ops/mapper/audio_soft_peak_limiter/process.py new file mode 100644 index 00000000..cb34cc45 --- /dev/null +++ b/runtime/ops/mapper/audio_soft_peak_limiter/process.py @@ -0,0 +1,102 @@ +# -- encoding: utf-8 -- + +import os +import time +from pathlib import Path +from typing import Dict, Any, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _load_audio(path: Path) -> Tuple["object", int]: + try: + import soundfile as sf # type: ignore + + data, sr = sf.read(str(path), always_2d=False) + return data, int(sr) + except Exception as e: + raise RuntimeError(f"读取音频失败(需要 soundfile): {path}, error={e}") from e + + +def _save_audio(path: Path, data: "object", sr: int, fmt: str) -> None: + try: + import soundfile as sf # type: ignore + + path.parent.mkdir(parents=True, exist_ok=True) + sf.write(str(path), data, int(sr), format=fmt.upper() if fmt else None) + except Exception as e: + raise RuntimeError(f"写入音频失败(需要 soundfile): {path}, error={e}") from e + + +class AudioSoftPeakLimiter(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.threshold = float(kwargs.get("threshold", 0.92)) + self.knee = float(kwargs.get("knee", 0.08)) + self.out_format = str(kwargs.get("outFormat", "wav")).strip().lower().lstrip(".") + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + data, sr = _load_audio(in_path) + try: + import numpy as np + + x = np.asarray(data, dtype=np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + if x.size == 0: + y = x + else: + th = max(1e-6, min(1.0, float(self.threshold))) + knee = max(0.0, min(1.0, float(self.knee))) + # 简单软限幅:对超出阈值的部分做 tanh 压缩;knee 控制压缩强度 + a = 1.0 / max(1e-6, (1.0 - th + knee)) + y = x.copy() + absx = np.abs(x) + mask = absx > th + sign = np.sign(x[mask]) + z = (absx[mask] - th) * a + y[mask] = sign * (th + (1.0 - th) * np.tanh(z)) + y = np.clip(y, -1.0, 1.0) + except Exception as e: + raise RuntimeError(f"处理失败(需要 numpy): {e}") from e + + export_dir = Path(os.path.abspath(str(sample.get(self.export_path_key, ".")))) + export_dir.mkdir(parents=True, exist_ok=True) + stem = Path(str(sample.get(self.filename_key, in_path.name))).stem + out_path = export_dir / f"{stem}.{self.out_format}" + if out_path.exists() and not self.overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {out_path}") + + _save_audio(out_path, y, sr, self.out_format) + + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = self.out_format + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "processed" + + logger.info( + f"fileName: {sample.get(self.filename_key)}, method: AudioSoftPeakLimiter costs {time.time() - start:6f} s" + ) + return sample + diff --git a/runtime/ops/mapper/audio_soft_peak_limiter/requirements.txt b/runtime/ops/mapper/audio_soft_peak_limiter/requirements.txt new file mode 100644 index 00000000..17e9d57d --- /dev/null +++ b/runtime/ops/mapper/audio_soft_peak_limiter/requirements.txt @@ -0,0 +1,2 @@ +soundfile +numpy diff --git a/runtime/ops/mapper/audio_telephony_bandpass/README.md b/runtime/ops/mapper/audio_telephony_bandpass/README.md new file mode 100644 index 00000000..8d8b0111 --- /dev/null +++ b/runtime/ops/mapper/audio_telephony_bandpass/README.md @@ -0,0 +1,37 @@ +# AudioTelephonyBandpass 电话带通算子 + +## 概述 + +AudioTelephonyBandpass 用于模拟窄带话机频带(默认 300–3400Hz),突出语音可懂度并抑制低频/高频无关成分。算子依赖 `scipy.signal`,输出写入 `export_path` 并更新 `sample` 文件字段。 + +## 功能特性 + +- **带通滤波**:默认 300–3400Hz,可配置上下截止 +- **阶数可调**:Butterworth `order` 可配置 +- **输出文件更新**:写入 `export_path`,更新 `filePath/fileType/fileName/fileSize` + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| lowHz | inputNumber | 300 | 下截止频率(Hz) | +| highHz | inputNumber | 3400 | 上截止频率(Hz) | +| order | inputNumber | 4 | Butterworth 阶数 | +| outFormat | select | wav | 输出格式(扩展名) | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - 输出文件:写入 `sample["export_path"]` + - 更新字段:`sample["filePath"]` / `sample["fileType"]` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`soundfile`、`numpy`、`scipy`(`scipy.signal`) + +## 版本历史 + +- **v1.0.0**:首次发布,支持电话带通滤波 + diff --git a/runtime/ops/mapper/audio_telephony_bandpass/__init__.py b/runtime/ops/mapper/audio_telephony_bandpass/__init__.py new file mode 100644 index 00000000..4d9ee27d --- /dev/null +++ b/runtime/ops/mapper/audio_telephony_bandpass/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioTelephonyBandpass", + module_path="ops.mapper.audio_telephony_bandpass.process", +) + diff --git a/runtime/ops/mapper/audio_telephony_bandpass/metadata.yml b/runtime/ops/mapper/audio_telephony_bandpass/metadata.yml new file mode 100644 index 00000000..edb1c314 --- /dev/null +++ b/runtime/ops/mapper/audio_telephony_bandpass/metadata.yml @@ -0,0 +1,63 @@ +name: '电话带通' +name_en: 'Telephony Bandpass' +description: '模拟窄带话机频带(默认 300–3400Hz)。需要 scipy.signal;写出处理后的 wav 到 export_path 并更新 filePath。' +description_en: 'Simulate telephony bandpass (default 300–3400Hz). Requires scipy.signal; writes processed wav to export_path and updates filePath.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioTelephonyBandpass' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + lowHz: + name: '下截止(Hz)' + type: 'inputNumber' + description: '带通下截止频率。' + defaultVal: 300 + min: 1 + max: 20000 + step: 1 + highHz: + name: '上截止(Hz)' + type: 'inputNumber' + description: '带通上截止频率。' + defaultVal: 3400 + min: 1 + max: 20000 + step: 1 + order: + name: '阶数' + type: 'inputNumber' + description: 'Butterworth 阶数(建议 2~6)。' + defaultVal: 4 + min: 1 + max: 12 + step: 1 + outFormat: + name: '输出格式' + type: 'select' + description: '输出扩展名(建议 wav)。' + defaultVal: 'wav' + required: true + options: + - label: 'wav' + value: 'wav' + - label: 'flac' + value: 'flac' + overwrite: + name: '覆盖输出' + type: 'switch' + description: '若 export_path 下已存在同名文件,是否覆盖。' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 104857600 + cpu: 0.2 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_telephony_bandpass/process.py b/runtime/ops/mapper/audio_telephony_bandpass/process.py new file mode 100644 index 00000000..141b9088 --- /dev/null +++ b/runtime/ops/mapper/audio_telephony_bandpass/process.py @@ -0,0 +1,103 @@ +# -- encoding: utf-8 -- + +import os +import time +from pathlib import Path +from typing import Dict, Any, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _load_audio(path: Path) -> Tuple["object", int]: + try: + import soundfile as sf # type: ignore + + data, sr = sf.read(str(path), always_2d=False) + return data, int(sr) + except Exception as e: + raise RuntimeError(f"读取音频失败(需要 soundfile): {path}, error={e}") from e + + +def _save_audio(path: Path, data: "object", sr: int, fmt: str) -> None: + try: + import soundfile as sf # type: ignore + + path.parent.mkdir(parents=True, exist_ok=True) + sf.write(str(path), data, int(sr), format=fmt.upper() if fmt else None) + except Exception as e: + raise RuntimeError(f"写入音频失败(需要 soundfile): {path}, error={e}") from e + + +class AudioTelephonyBandpass(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.low_hz = float(kwargs.get("lowHz", 300)) + self.high_hz = float(kwargs.get("highHz", 3400)) + self.order = int(float(kwargs.get("order", 4))) + self.out_format = str(kwargs.get("outFormat", "wav")).strip().lower().lstrip(".") + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + data, sr = _load_audio(in_path) + try: + import numpy as np + from scipy.signal import butter, lfilter # type: ignore + + x = np.asarray(data, dtype=np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + if x.size == 0: + y = x + else: + nyq = float(sr) / 2.0 + low = max(1.0, float(self.low_hz)) / nyq + high = min(nyq - 1.0, float(self.high_hz)) / nyq + if not (0.0 < low < high < 1.0): + raise ValueError(f"非法带通范围: low={self.low_hz}, high={self.high_hz}, sr={sr}") + b, a = butter(max(1, int(self.order)), [low, high], btype="bandpass") + y = lfilter(b, a, x).astype(np.float32) + y = np.clip(y, -1.0, 1.0) + except ImportError as e: + raise RuntimeError("AudioTelephonyBandpass 需要 scipy.signal(butter/lfilter)") from e + except Exception as e: + raise RuntimeError(f"处理失败: {e}") from e + + export_dir = Path(os.path.abspath(str(sample.get(self.export_path_key, ".")))) + export_dir.mkdir(parents=True, exist_ok=True) + stem = Path(str(sample.get(self.filename_key, in_path.name))).stem + out_path = export_dir / f"{stem}.{self.out_format}" + if out_path.exists() and not self.overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {out_path}") + + _save_audio(out_path, y, sr, self.out_format) + + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = self.out_format + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "processed" + + logger.info( + f"fileName: {sample.get(self.filename_key)}, method: AudioTelephonyBandpass costs {time.time() - start:6f} s" + ) + return sample + diff --git a/runtime/ops/mapper/audio_telephony_bandpass/requirements.txt b/runtime/ops/mapper/audio_telephony_bandpass/requirements.txt new file mode 100644 index 00000000..843a926a --- /dev/null +++ b/runtime/ops/mapper/audio_telephony_bandpass/requirements.txt @@ -0,0 +1,3 @@ +soundfile +numpy +scipy diff --git a/runtime/ops/mapper/audio_trim_silence_edges/README.md b/runtime/ops/mapper/audio_trim_silence_edges/README.md new file mode 100644 index 00000000..8b8a72b5 --- /dev/null +++ b/runtime/ops/mapper/audio_trim_silence_edges/README.md @@ -0,0 +1,38 @@ +# AudioTrimSilenceEdges 首尾静音裁剪算子 + +## 概述 + +AudioTrimSilenceEdges 用于裁剪音频首尾静音区域:通过短时能量阈值判定静音帧,找到首个与末个“有效语音”区间后进行裁剪,并可保留两端 `padMs` 的 padding,避免过度切断。 + +## 功能特性 + +- **首尾静音裁剪**:按帧能量阈值从两端向内收缩 +- **可保留 padding**:裁剪后两端可保留固定毫秒数 +- **输出文件更新**:写入 `export_path`,更新 `filePath/fileType/fileName/fileSize` + +## 参数说明 + +| 参数 | 类型 | 默认值 | 说明 | +|---|---|---:|---| +| frameMs | inputNumber | 30 | 帧长(ms) | +| hopMs | inputNumber | 10 | 帧移(ms) | +| threshDb | slider | -50 | 能量阈值(dB,相对全段峰值) | +| padMs | inputNumber | 50 | 裁剪后两端各保留的静音(ms) | +| outFormat | select | wav | 输出格式(扩展名) | +| overwrite | switch | false | 输出同名文件是否覆盖 | + +## 输入输出 + +- **输入**:`sample["filePath"]` +- **输出**: + - 输出文件:写入 `sample["export_path"]` + - 更新字段:`sample["filePath"]` / `sample["fileType"]` / `sample["fileName"]` / `sample["fileSize"]` + +## 依赖说明 + +- **Python 依赖**:`soundfile`、`numpy` + +## 版本历史 + +- **v1.0.0**:首次发布,支持首尾静音裁剪 + diff --git a/runtime/ops/mapper/audio_trim_silence_edges/__init__.py b/runtime/ops/mapper/audio_trim_silence_edges/__init__.py new file mode 100644 index 00000000..0fa95cee --- /dev/null +++ b/runtime/ops/mapper/audio_trim_silence_edges/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- + +from datamate.core.base_op import OPERATORS + +OPERATORS.register_module( + module_name="AudioTrimSilenceEdges", + module_path="ops.mapper.audio_trim_silence_edges.process", +) + diff --git a/runtime/ops/mapper/audio_trim_silence_edges/metadata.yml b/runtime/ops/mapper/audio_trim_silence_edges/metadata.yml new file mode 100644 index 00000000..57589f84 --- /dev/null +++ b/runtime/ops/mapper/audio_trim_silence_edges/metadata.yml @@ -0,0 +1,71 @@ +name: '首尾静音裁剪' +name_en: 'Trim Silence Edges' +description: '从首尾向内裁剪静音,保留可选 padding。写出处理后的 wav 到 export_path 并更新 filePath。' +description_en: 'Trim leading/trailing silence with optional padding. Writes processed wav to export_path and updates filePath.' +language: 'python' +vendor: 'huawei' +raw_id: 'AudioTrimSilenceEdges' +version: '1.0.0' +types: + - 'cleanse' +modal: 'audio' +inputs: 'audio' +outputs: 'audio' +settings: + frameMs: + name: '帧长(ms)' + type: 'inputNumber' + description: '分析帧长。' + defaultVal: 30 + min: 5 + max: 500 + step: 1 + hopMs: + name: '帧移(ms)' + type: 'inputNumber' + description: '帧移。' + defaultVal: 10 + min: 1 + max: 500 + step: 1 + threshDb: + name: '能量阈值(dB)' + type: 'slider' + description: '相对全段峰值的帧能量阈值(dB)。' + defaultVal: -50 + min: -80 + max: 0 + step: 1 + padMs: + name: '保留静音(ms)' + type: 'inputNumber' + description: '裁剪后两端各保留的 padding(毫秒)。' + defaultVal: 50 + min: 0 + max: 5000 + step: 1 + outFormat: + name: '输出格式' + type: 'select' + description: '输出扩展名(建议 wav)。' + defaultVal: 'wav' + required: true + options: + - label: 'wav' + value: 'wav' + - label: 'flac' + value: 'flac' + overwrite: + name: '覆盖输出' + type: 'switch' + description: '若 export_path 下已存在同名文件,是否覆盖。' + defaultVal: 'false' + required: false + checkedLabel: '覆盖' + unCheckedLabel: '不覆盖' +runtime: + memory: 104857600 + cpu: 0.15 + gpu: 0 + npu: 0 + diff --git a/runtime/ops/mapper/audio_trim_silence_edges/process.py b/runtime/ops/mapper/audio_trim_silence_edges/process.py new file mode 100644 index 00000000..b1acfc33 --- /dev/null +++ b/runtime/ops/mapper/audio_trim_silence_edges/process.py @@ -0,0 +1,116 @@ +# -- encoding: utf-8 -- + +import os +import time +from pathlib import Path +from typing import Dict, Any, Tuple + +from loguru import logger + +from datamate.core.base_op import Mapper + + +def _as_bool(v: object) -> bool: + if isinstance(v, bool): + return v + s = str(v).strip().lower() + return s in {"1", "true", "yes", "y", "on"} + + +def _load_audio(path: Path) -> Tuple["object", int]: + try: + import soundfile as sf # type: ignore + + data, sr = sf.read(str(path), always_2d=False) + return data, int(sr) + except Exception as e: + raise RuntimeError(f"读取音频失败(需要 soundfile): {path}, error={e}") from e + + +def _save_audio(path: Path, data: "object", sr: int, fmt: str) -> None: + try: + import soundfile as sf # type: ignore + + path.parent.mkdir(parents=True, exist_ok=True) + sf.write(str(path), data, int(sr), format=fmt.upper() if fmt else None) + except Exception as e: + raise RuntimeError(f"写入音频失败(需要 soundfile): {path}, error={e}") from e + + +class AudioTrimSilenceEdges(Mapper): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.frame_ms = float(kwargs.get("frameMs", 30)) + self.hop_ms = float(kwargs.get("hopMs", 10)) + self.thresh_db = float(kwargs.get("threshDb", -50)) + self.pad_ms = float(kwargs.get("padMs", 50)) + self.out_format = str(kwargs.get("outFormat", "wav")).strip().lower().lstrip(".") + self.overwrite = _as_bool(kwargs.get("overwrite", False)) + + def execute(self, sample: Dict[str, Any]) -> Dict[str, Any]: + start = time.time() + in_path = Path(sample.get(self.filepath_key, "")).resolve() + if not in_path.exists(): + raise FileNotFoundError(f"输入音频不存在: {in_path}") + + data, sr = _load_audio(in_path) + try: + import numpy as np + + x = np.asarray(data, dtype=np.float32) + if x.ndim > 1: + x = x.mean(axis=1) + if x.size == 0: + y = x + else: + peak = float(np.max(np.abs(x))) + 1e-12 + th = peak * (10.0 ** (float(self.thresh_db) / 20.0)) + frame_len = max(1, int(sr * self.frame_ms / 1000.0)) + hop = max(1, int(sr * self.hop_ms / 1000.0)) + + # 找到首个/末个“非静音”帧 + rms = [] + for st in range(0, len(x), hop): + ed = min(st + frame_len, len(x)) + f = x[st:ed] + rms.append(float(np.sqrt(np.mean(f * f) + 1e-12))) + keep = [i for i, r in enumerate(rms) if r >= th] + if not keep: + y = x[:0] + else: + first = keep[0] + last = keep[-1] + start_samp = first * hop + end_samp = min(len(x), last * hop + frame_len) + pad = int(sr * self.pad_ms / 1000.0) + start_samp = max(0, start_samp - pad) + end_samp = min(len(x), end_samp + pad) + y = x[start_samp:end_samp] + except Exception as e: + raise RuntimeError(f"处理失败(需要 numpy): {e}") from e + + export_dir = Path(os.path.abspath(str(sample.get(self.export_path_key, ".")))) + export_dir.mkdir(parents=True, exist_ok=True) + stem = Path(str(sample.get(self.filename_key, in_path.name))).stem + out_path = export_dir / f"{stem}.{self.out_format}" + if out_path.exists() and not self.overwrite: + raise FileExistsError(f"输出文件已存在且未启用覆盖: {out_path}") + + _save_audio(out_path, y, sr, self.out_format) + + sample[self.filepath_key] = str(out_path.resolve()) + sample[self.filetype_key] = self.out_format + sample[self.filename_key] = out_path.name + try: + sample[self.filesize_key] = str(out_path.stat().st_size) + except Exception: + pass + sample[self.data_key] = b"" + if not sample.get(self.text_key): + sample[self.text_key] = "processed" + + logger.info( + f"fileName: {sample.get(self.filename_key)}, method: AudioTrimSilenceEdges costs {time.time() - start:6f} s" + ) + return sample + diff --git a/runtime/ops/mapper/audio_trim_silence_edges/requirements.txt b/runtime/ops/mapper/audio_trim_silence_edges/requirements.txt new file mode 100644 index 00000000..17e9d57d --- /dev/null +++ b/runtime/ops/mapper/audio_trim_silence_edges/requirements.txt @@ -0,0 +1,2 @@ +soundfile +numpy