diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 0e74681..a10c8db 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -2,6 +2,7 @@ name: Build And Publish on: workflow_dispatch: + pull_request: push: branches: - "**" @@ -41,23 +42,52 @@ jobs: uv run --reinstall-package macloop pytest -q tests - name: Install Rust coverage tooling - if: ${{ github.ref == 'refs/heads/main' }} + if: ${{ github.event_name == 'pull_request' || startsWith(github.ref, 'refs/heads/') }} run: | rustup component add llvm-tools-preview cargo install cargo-llvm-cov --locked + - name: Generate Python coverage report + if: ${{ github.event_name == 'pull_request' || startsWith(github.ref, 'refs/heads/') }} + run: | + uv run --reinstall-package macloop pytest \ + --cov=macloop \ + --cov-report=xml:python-coverage.xml \ + --cov-report=term-missing \ + -q tests/test_public_api.py tests/test_runtime_helpers.py tests/test_e2e_synthetic.py + - name: Generate Rust coverage report (lcov) - if: ${{ github.ref == 'refs/heads/main' }} + if: ${{ github.event_name == 'pull_request' || startsWith(github.ref, 'refs/heads/') }} + shell: bash run: | - cargo llvm-cov --workspace --lcov --output-path lcov.info + SWIFT_RUSTFLAGS="$RUSTFLAGS" + cargo llvm-cov clean --workspace + source <(cargo llvm-cov show-env --export-prefix) + export CARGO_TARGET_DIR="${CARGO_LLVM_COV_TARGET_DIR:-target/llvm-cov-target}" + export RUSTFLAGS="$RUSTFLAGS $SWIFT_RUSTFLAGS" + cargo llvm-cov --workspace --no-report + uv run maturin develop --manifest-path python_ffi/Cargo.toml + uv run pytest -q tests/test_e2e_synthetic.py tests/test_ffi_backend.py + cargo llvm-cov report --lcov --output-path rust-lcov.info - name: Upload coverage to Codecov - if: ${{ github.ref == 'refs/heads/main' }} + if: ${{ github.event_name == 'pull_request' || startsWith(github.ref, 'refs/heads/') }} uses: codecov/codecov-action@v4 with: token: ${{ secrets.CODECOV_TOKEN }} - files: lcov.info + files: rust-lcov.info flags: rust + disable_search: true + fail_ci_if_error: false + + - name: Upload Python coverage to Codecov + if: ${{ github.event_name == 'pull_request' || startsWith(github.ref, 'refs/heads/') }} + uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} + files: python-coverage.xml + flags: python + disable_search: true fail_ci_if_error: false build: diff --git a/pyproject.toml b/pyproject.toml index c8ec309..71e94dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,6 +36,8 @@ manifest-path = "python_ffi/Cargo.toml" [dependency-groups] dev = [ + "maturin>=1.0,<2.0", "pytest>=7.4.4", "pytest-asyncio>=0.21.2", + "pytest-cov>=5.0.0", ] diff --git a/python_ffi/Cargo.toml b/python_ffi/Cargo.toml index 4c3969c..40171fd 100644 --- a/python_ffi/Cargo.toml +++ b/python_ffi/Cargo.toml @@ -15,4 +15,4 @@ crate-type = ["cdylib"] [dependencies] core_engine = { path = "../core_engine" } numpy = "0.28.0" -pyo3 = { version = "0.28.2", features = ["extension-module", "abi3-py39"] } +pyo3 = { version = "0.28.2", features = ["abi3-py39"] } diff --git a/python_ffi/src/capture.rs b/python_ffi/src/capture.rs deleted file mode 100644 index c318f12..0000000 --- a/python_ffi/src/capture.rs +++ /dev/null @@ -1,182 +0,0 @@ -use crossbeam_channel::Sender; -use core_engine::messages::{AudioFrame, AudioSourceType}; -use core_engine::config::AudioProcessingConfig; -use screencapturekit::prelude::*; -use screencapturekit::stream::output_trait::SCStreamOutputTrait; -use anyhow::{Result, anyhow}; - -#[derive(Clone, Copy)] -pub enum CaptureTarget { - Display(u32), - Process(i32), -} - -struct AudioOutputHandler { - tx: Sender, - source_type: AudioSourceType, -} - -fn cm_time_to_ns(value: i64, timescale: i32) -> u64 { - if timescale <= 0 || value < 0 { - return 0; - } - - let value = value as u64; - let timescale = timescale as u64; - - if value <= u64::MAX / 1_000_000_000 { - (value * 1_000_000_000) / timescale - } else { - // For very large values, divide first to avoid multiplication overflow. - value / timescale * 1_000_000_000 + (value % timescale * 1_000_000_000) / timescale - } -} - -impl SCStreamOutputTrait for AudioOutputHandler { - fn did_output_sample_buffer(&self, sample_buffer: CMSampleBuffer, of_type: SCStreamOutputType) { - let is_target = match (self.source_type, of_type) { - (AudioSourceType::System, SCStreamOutputType::Audio) => true, - (AudioSourceType::Microphone, SCStreamOutputType::Microphone) => true, - _ => false, - }; - - if is_target { - if let Some(audio_data) = sample_buffer.audio_buffer_list() { - let mut interleaved_samples = Vec::new(); - let num_buffers = audio_data.num_buffers(); - - if num_buffers == 0 { return; } - - // Get first buffer to check channels; skip malformed empty payloads. - let Some(first_buffer) = audio_data.get(0) else { return; }; - let channels_per_buffer = first_buffer.number_channels as usize; - - if num_buffers == 1 { - // Already interleaved or mono - let samples: &[f32] = bytemuck::cast_slice(first_buffer.data()); - interleaved_samples.extend_from_slice(samples); - } else { - // Planar data: Buffers [L, R]. We need to interleave them. - let bufs: Vec<&[f32]> = audio_data.iter() - .map(|b| bytemuck::cast_slice::(b.data())) - .collect(); - - let frames = bufs[0].len(); - interleaved_samples.reserve(frames * num_buffers); - - for i in 0..frames { - for b in &bufs { - if i < b.len() { - interleaved_samples.push(b[i]); - } - } - } - } - - if !interleaved_samples.is_empty() { - let pts = sample_buffer.presentation_timestamp(); - let timestamp = cm_time_to_ns(pts.value, pts.timescale); - - let packet = AudioFrame { - source: self.source_type, - samples: interleaved_samples, - sample_rate: 48000, - channels: (num_buffers * channels_per_buffer) as u16, - timestamp, - }; - // Use send() to ensure delivery. Unbounded channel prevents blocking. - let _ = self.tx.send(packet); - } - } - } - } -} - -pub fn spawn_capture_engine( - tx: Sender, - target: Option, - _config: AudioProcessingConfig, - capture_system: bool, - capture_mic: bool -) -> Result { - let content = SCShareableContent::get().map_err(|e| anyhow!("Failed to get shareable content: {}", e))?; - // Even if we only capture mic, SCK needs a filter. We'll use the first display as a base. - let filter = match target { - Some(CaptureTarget::Display(display_id)) => { - let display = content.displays().into_iter() - .find(|d| d.display_id() == display_id) - .ok_or_else(|| anyhow!("Display with ID {} not found", display_id))?; - SCContentFilter::create().with_display(&display).build() - } - - Some(CaptureTarget::Process(pid)) => { - let app = content.applications().into_iter() - .find(|a| a.process_id() == pid) - .ok_or_else(|| anyhow!("Application with PID {} not found", pid))?; - let display = content.displays().into_iter().next().ok_or_else(|| anyhow!("No display found for app capture base"))?; - SCContentFilter::create() - .with_display(&display) - .with_including_applications(&[&app], &[]) - .build() - } - - None => { - let display = content.displays().into_iter().next().ok_or_else(|| anyhow!("No display found"))?; - SCContentFilter::create().with_display(&display).build() - } - }; - - let mut sc_config = SCStreamConfiguration::new(); - sc_config.set_captures_audio(capture_system); - sc_config.set_excludes_current_process_audio(true); - sc_config.set_sample_rate(48000); - sc_config.set_channel_count(2); - - if capture_mic { - sc_config.set_captures_microphone(true); - } - - let mut stream = SCStream::new(&filter, &sc_config); - - if capture_system { - stream.add_output_handler(AudioOutputHandler { - tx: tx.clone(), - source_type: AudioSourceType::System - }, SCStreamOutputType::Audio); - } - - if capture_mic { - stream.add_output_handler(AudioOutputHandler { - tx, - source_type: AudioSourceType::Microphone - }, SCStreamOutputType::Microphone); - } - - stream.start_capture().map_err(|e| anyhow!("Failed to start capture: {}", e))?; - Ok(stream) -} - -#[cfg(test)] -mod tests { - use super::cm_time_to_ns; - - #[test] - fn cm_time_to_ns_converts_basic_values() { - assert_eq!(cm_time_to_ns(1, 1), 1_000_000_000); - assert_eq!(cm_time_to_ns(480, 48_000), 10_000_000); - } - - #[test] - fn cm_time_to_ns_rejects_invalid_input() { - assert_eq!(cm_time_to_ns(-1, 1), 0); - assert_eq!(cm_time_to_ns(1, 0), 0); - assert_eq!(cm_time_to_ns(1, -1), 0); - } - - #[test] - fn cm_time_to_ns_handles_large_values_without_overflow() { - let large = i64::MAX; - let out = cm_time_to_ns(large, 48_000); - assert!(out > 0); - } -} diff --git a/python_ffi/src/config.rs b/python_ffi/src/config.rs deleted file mode 100644 index 643fff0..0000000 --- a/python_ffi/src/config.rs +++ /dev/null @@ -1,112 +0,0 @@ -use core_engine::config::AudioProcessingConfig as CoreConfig; -use pyo3::prelude::*; - -#[pyclass(name = "AudioProcessingConfig", module = "macloop._macloop", from_py_object)] -#[derive(Clone, Debug)] -pub struct PyAudioProcessingConfig { - #[pyo3(get, set)] - pub sample_rate: u32, - #[pyo3(get, set)] - pub channels: u16, - #[pyo3(get, set)] - pub enable_aec: bool, - #[pyo3(get, set)] - pub enable_ns: bool, - #[pyo3(get, set)] - pub sample_format: String, - #[pyo3(get, set)] - pub aec_stream_delay_ms: i32, - #[pyo3(get, set)] - pub aec_auto_delay_tuning: bool, - #[pyo3(get, set)] - pub aec_max_delay_ms: i32, -} - -impl Default for PyAudioProcessingConfig { - fn default() -> Self { - let cfg = CoreConfig::default(); - Self { - sample_rate: cfg.sample_rate, - channels: cfg.channels, - enable_aec: cfg.enable_aec, - enable_ns: cfg.enable_ns, - sample_format: cfg.sample_format, - aec_stream_delay_ms: cfg.aec_stream_delay_ms, - aec_auto_delay_tuning: cfg.aec_auto_delay_tuning, - aec_max_delay_ms: cfg.aec_max_delay_ms, - } - } -} - -impl From for CoreConfig { - fn from(value: PyAudioProcessingConfig) -> Self { - Self { - sample_rate: value.sample_rate, - channels: value.channels, - enable_aec: value.enable_aec, - enable_ns: value.enable_ns, - sample_format: value.sample_format, - aec_stream_delay_ms: value.aec_stream_delay_ms, - aec_auto_delay_tuning: value.aec_auto_delay_tuning, - aec_max_delay_ms: value.aec_max_delay_ms, - } - } -} - -impl From<&PyAudioProcessingConfig> for CoreConfig { - fn from(value: &PyAudioProcessingConfig) -> Self { - Self { - sample_rate: value.sample_rate, - channels: value.channels, - enable_aec: value.enable_aec, - enable_ns: value.enable_ns, - sample_format: value.sample_format.clone(), - aec_stream_delay_ms: value.aec_stream_delay_ms, - aec_auto_delay_tuning: value.aec_auto_delay_tuning, - aec_max_delay_ms: value.aec_max_delay_ms, - } - } -} - -#[pymethods] -impl PyAudioProcessingConfig { - #[new] - #[pyo3(signature = ( - sample_rate=48000, - channels=2, - enable_aec=false, - enable_ns=false, - sample_format="f32".to_string(), - aec_stream_delay_ms=0, - aec_auto_delay_tuning=false, - aec_max_delay_ms=140 - ))] - fn new( - sample_rate: u32, - channels: u16, - enable_aec: bool, - enable_ns: bool, - sample_format: String, - aec_stream_delay_ms: i32, - aec_auto_delay_tuning: bool, - aec_max_delay_ms: i32, - ) -> Self { - Self { - sample_rate, - channels, - enable_aec, - enable_ns, - sample_format, - aec_stream_delay_ms, - aec_auto_delay_tuning, - aec_max_delay_ms, - } - } - - #[pyo3(name = "calibrate_delay")] - fn calibrate_delay(&mut self, measured_system_latency_ms: f32, measured_mic_latency_ms: f32) { - let mut core_cfg = CoreConfig::from(self.clone()); - core_cfg.calibrate_delay(measured_system_latency_ms, measured_mic_latency_ms); - self.aec_stream_delay_ms = core_cfg.aec_stream_delay_ms; - } -} diff --git a/python_ffi/src/lib.rs b/python_ffi/src/lib.rs index 8b748d1..9552c8a 100644 --- a/python_ffi/src/lib.rs +++ b/python_ffi/src/lib.rs @@ -271,12 +271,13 @@ impl PyAudioEngineBackend { ) .map_err(engine_error)?; - let source = AppAudioSource::new(pipeline, AppAudioSourceConfig { pids, display_id }) - .map_err(|e| { - PyRuntimeError::new_err(format!( - "failed to create application audio source: {e}" - )) - })?; + let source = + AppAudioSource::new(pipeline, AppAudioSourceConfig { pids, display_id }) + .map_err(|e| { + PyRuntimeError::new_err(format!( + "failed to create application audio source: {e}" + )) + })?; self.sources.insert( stream_id, @@ -816,3 +817,119 @@ fn _macloop(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(create_wav_sink, m)?)?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use pyo3::types::PyDict; + + fn with_python(f: impl FnOnce(Python<'_>) -> T) -> T { + Python::initialize(); + Python::attach(f) + } + + #[test] + fn parse_sample_format_accepts_supported_values() { + assert!(matches!(parse_sample_format("f32"), Ok(SampleFormat::F32))); + assert!(matches!(parse_sample_format("F32"), Ok(SampleFormat::F32))); + assert!(matches!(parse_sample_format("i16"), Ok(SampleFormat::I16))); + assert!(matches!(parse_sample_format("I16"), Ok(SampleFormat::I16))); + } + + #[test] + fn parse_sample_format_rejects_unsupported_values() { + Python::initialize(); + let err = parse_sample_format("u8").unwrap_err(); + assert!(err.to_string().contains("unsupported sample_format 'u8'")); + } + + #[test] + fn duplicate_file_descriptor_rejects_negative_fd() { + let err = duplicate_file_descriptor(-1).unwrap_err(); + assert_eq!(err.kind(), std::io::ErrorKind::InvalidInput); + } + + #[test] + fn dict_helpers_extract_values_and_defaults() { + with_python(|py| { + let dict = PyDict::new(py); + dict.set_item("device_id", 7).unwrap(); + dict.set_item("pids", vec![11_u32, 22_u32]).unwrap(); + dict.set_item("enabled", true).unwrap(); + dict.set_item("size", 123_usize).unwrap(); + dict.set_item("delay_ms", 45_u64).unwrap(); + dict.set_item("gain", 1.5_f32).unwrap(); + + assert_eq!(dict_optional_u32(&dict, "device_id").unwrap(), Some(7)); + assert_eq!(dict_optional_u32(&dict, "missing").unwrap(), None); + assert_eq!(dict_u32_list(&dict, "pids").unwrap(), vec![11, 22]); + assert_eq!(dict_u32_list(&dict, "missing").unwrap(), Vec::::new()); + assert!(dict_bool_with_default(&dict, "enabled", false).unwrap()); + assert!(dict_bool_with_default(&dict, "missing", true).unwrap()); + assert_eq!(dict_usize_with_default(&dict, "size", 9).unwrap(), 123); + assert_eq!(dict_usize_with_default(&dict, "missing", 9).unwrap(), 9); + assert_eq!(dict_u64_with_default(&dict, "delay_ms", 1).unwrap(), 45); + assert_eq!(dict_u64_with_default(&dict, "missing", 1).unwrap(), 1); + assert_eq!(dict_f32_with_default(&dict, "gain", 0.5).unwrap(), 1.5); + assert_eq!(dict_f32_with_default(&dict, "missing", 0.5).unwrap(), 0.5); + }); + } + + #[test] + fn backend_rejects_unsupported_source_kind() { + with_python(|py| { + let mut backend = PyAudioEngineBackend::new(); + let config = PyDict::new(py); + let err = backend + .create_stream("stream".to_string(), "nope".to_string(), config) + .unwrap_err(); + assert!(err.to_string().contains("unsupported source_kind 'nope'")); + }); + } + + #[test] + fn backend_requires_application_audio_pids() { + with_python(|py| { + let mut backend = PyAudioEngineBackend::new(); + let config = PyDict::new(py); + let err = backend + .create_stream( + "app_stream".to_string(), + "application_audio".to_string(), + config, + ) + .unwrap_err(); + assert!(err + .to_string() + .contains("requires at least one pid in 'pids'")); + }); + } + + #[test] + fn backend_rejects_unsupported_processor_kind() { + with_python(|py| { + let mut backend = PyAudioEngineBackend::new(); + let config = PyDict::new(py); + let err = backend + .add_processor( + "stream".to_string(), + "processor".to_string(), + "nope".to_string(), + config, + ) + .unwrap_err(); + assert!(err + .to_string() + .contains("unsupported processor_kind 'nope'")); + }); + } + + #[test] + fn backend_close_marks_engine_closed() { + Python::initialize(); + let mut backend = PyAudioEngineBackend::new(); + backend.close().unwrap(); + let err = backend.ensure_open().unwrap_err(); + assert!(err.to_string().contains("audio engine backend is closed")); + } +} diff --git a/python_ffi/src/stats.rs b/python_ffi/src/stats.rs index b209874..d75c4c9 100644 --- a/python_ffi/src/stats.rs +++ b/python_ffi/src/stats.rs @@ -162,3 +162,127 @@ impl PyWavSinkStats { }) } } + +#[cfg(test)] +mod tests { + use super::*; + use core_engine::{ + AsrInputMetricsSnapshot, LatencyHistogramSnapshot, NodeMetricsSnapshot, + PipelineMetricsSnapshot, StreamMetricsSnapshot, WavSinkMetricsSnapshot, + }; + + fn with_python(f: impl FnOnce(Python<'_>) -> T) -> T { + Python::initialize(); + Python::attach(f) + } + + fn sample_latency() -> LatencyHistogramSnapshot { + LatencyHistogramSnapshot { + last_us: 7, + max_us: 13, + count: 5, + bucket_bounds_us: vec![1, 2, 4, 8, 16], + buckets: vec![0, 1, 1, 2, 1], + p50_us: 8, + p90_us: 16, + p95_us: 16, + p99_us: 16, + } + } + + #[test] + fn latency_stats_conversion_preserves_values() { + let stats = PyLatencyStats::from(sample_latency()); + assert_eq!(stats.last_us, 7); + assert_eq!(stats.max_us, 13); + assert_eq!(stats.count, 5); + assert_eq!(stats.bucket_bounds_us, vec![1, 2, 4, 8, 16]); + assert_eq!(stats.buckets, vec![0, 1, 1, 2, 1]); + assert_eq!(stats.p95_us, 16); + } + + #[test] + fn stream_stats_conversion_preserves_pipeline_and_processors() { + with_python(|py| { + let stream = PyStreamStats::from_snapshot( + py, + StreamMetricsSnapshot { + pipeline: PipelineMetricsSnapshot { + total_callback_time_us: 11, + dropped_frames: 2, + buffer_size: 256, + latency: sample_latency(), + }, + processors: std::iter::once(( + "gain".to_string(), + NodeMetricsSnapshot { + processing_time_us: 3, + max_processing_time_us: 9, + latency: sample_latency(), + }, + )) + .collect(), + }, + ) + .unwrap(); + + let pipeline = stream.pipeline.bind(py).borrow(); + assert_eq!(pipeline.total_callback_time_us, 11); + assert_eq!(pipeline.dropped_frames, 2); + assert_eq!(pipeline.buffer_size, 256); + let pipeline_latency = pipeline.latency.bind(py).borrow(); + assert_eq!(pipeline_latency.p90_us, 16); + + let processors = stream.processors.bind(py); + assert_eq!(processors.len(), 1); + let gain = processors + .get_item("gain") + .unwrap() + .unwrap() + .extract::>() + .unwrap(); + let gain = gain.bind(py).borrow(); + assert_eq!(gain.processing_time_us, 3); + assert_eq!(gain.max_processing_time_us, 9); + }); + } + + #[test] + fn sink_stats_conversion_preserves_values() { + with_python(|py| { + let asr = PyAsrInputStats::from_snapshot( + py, + AsrInputMetricsSnapshot { + chunks_emitted: 4, + frames_emitted: 640, + pending_frames: 32, + poll: sample_latency(), + callback: sample_latency(), + }, + ) + .unwrap(); + assert_eq!(asr.chunks_emitted, 4); + assert_eq!(asr.frames_emitted, 640); + assert_eq!(asr.pending_frames, 32); + assert_eq!(asr.poll.bind(py).borrow().p50_us, 8); + assert_eq!(asr.callback.bind(py).borrow().p99_us, 16); + + let wav = PyWavSinkStats::from_snapshot( + py, + WavSinkMetricsSnapshot { + write_calls: 6, + samples_written: 1200, + frames_written: 600, + write: sample_latency(), + finalize: sample_latency(), + }, + ) + .unwrap(); + assert_eq!(wav.write_calls, 6); + assert_eq!(wav.samples_written, 1200); + assert_eq!(wav.frames_written, 600); + assert_eq!(wav.write.bind(py).borrow().last_us, 7); + assert_eq!(wav.finalize.bind(py).borrow().max_us, 13); + }); + } +} diff --git a/tests/test_ffi_backend.py b/tests/test_ffi_backend.py new file mode 100644 index 0000000..13ceda3 --- /dev/null +++ b/tests/test_ffi_backend.py @@ -0,0 +1,170 @@ +from __future__ import annotations + +import queue +import struct +import threading +from pathlib import Path + +import pytest + +pytest.importorskip("numpy") +ffi = pytest.importorskip("macloop._macloop") + +import numpy as np + + +def _read_float_wav(path: Path) -> tuple[tuple[int, int, int], list[float]]: + data = path.read_bytes() + assert data[:4] == b"RIFF" + assert data[8:12] == b"WAVE" + + fmt_info = None + samples = None + offset = 12 + while offset + 8 <= len(data): + chunk_id = data[offset : offset + 4] + size = struct.unpack_from(" list[tuple[str, int, np.ndarray]]: + return [collected.get(timeout=2.0) for _ in range(count)] + + +def test_low_level_ffi_asr_sink_round_trip() -> None: + engine = ffi._AudioEngineBackend() + engine.create_stream( + "synthetic_stream", + "synthetic", + { + "frames_per_callback": 4, + "callback_count": 2, + "start_value": 1.0, + "step_value": 1.0, + "start_delay_ms": 100, + }, + ) + engine.add_processor("synthetic_stream", "gain_processor", "gain", {"gain": 2.0}) + engine.route("synthetic_route", "synthetic_stream") + + collected: "queue.Queue[tuple[str, int, np.ndarray]]" = queue.Queue() + + def callback(route_id: str, frames: int, samples) -> None: + collected.put((route_id, frames, np.asarray(samples).copy())) + + sink = ffi._create_asr_sink( + engine, + "ffi_asr_sink", + ["synthetic_route"], + 48_000, + 1, + "f32", + 4, + callback, + ) + try: + chunks = _wait_for_chunks(collected, 2) + stats = sink.stats() + finally: + sink.close() + engine.close() + + assert chunks[0][0] == "synthetic_route" + assert chunks[0][1] == 4 + assert np.array_equal(chunks[0][2], np.array([2.0, 4.0, 6.0, 8.0], dtype=np.float32)) + assert np.array_equal(chunks[1][2], np.array([10.0, 12.0, 14.0, 16.0], dtype=np.float32)) + + stream_stats = engine.get_stats()["synthetic_stream"] + assert stream_stats.pipeline.buffer_size == 8 + assert "gain_processor" in stream_stats.processors + assert stream_stats.processors["gain_processor"].processing_time_us >= 0 + + sink_stats = stats["synthetic_route"] + assert sink_stats.chunks_emitted == 2 + assert sink_stats.frames_emitted == 8 + assert sink_stats.callback.count == 2 + + +def test_low_level_ffi_wav_sink_writes_output(tmp_path: Path) -> None: + output_path = tmp_path / "ffi_synthetic.wav" + + engine = ffi._AudioEngineBackend() + engine.create_stream( + "synthetic_stream", + "synthetic", + { + "frames_per_callback": 4, + "callback_count": 3, + "start_value": 1.0, + "step_value": 0.0, + "start_delay_ms": 100, + }, + ) + engine.route("wav_route", "synthetic_stream") + + with output_path.open("w+b") as fileobj: + sink = ffi._create_wav_sink(engine, "ffi_wav_sink", ["wav_route"], fileobj.fileno(), 1.0) + try: + threading.Event().wait(0.5) + stats = sink.stats() + finally: + sink.close() + engine.close() + + fmt_info, samples = _read_float_wav(output_path) + assert fmt_info[0] in {3, 65534} + assert fmt_info[1:] == (2, 48_000) + assert samples == [1.0] * 24 + assert stats.write_calls >= 1 + assert stats.samples_written == 24 + assert stats.frames_written == 12 + + +def test_low_level_ffi_validates_invalid_configs() -> None: + engine = ffi._AudioEngineBackend() + + with pytest.raises(ValueError, match="unsupported source_kind"): + engine.create_stream("bad_stream", "nope", {}) + + with pytest.raises(ValueError, match="requires at least one pid"): + engine.create_stream("bad_app", "application_audio", {}) + + engine.create_stream("synthetic_stream", "synthetic", {}) + + with pytest.raises(ValueError, match="unsupported processor_kind"): + engine.add_processor("synthetic_stream", "bad_processor", "nope", {}) + + engine.route("synthetic_route", "synthetic_stream") + + with pytest.raises(ValueError, match="unsupported sample_format"): + ffi._create_asr_sink( + engine, + "bad_sink", + ["synthetic_route"], + 16_000, + 1, + "u8", + 320, + lambda *_args: None, + ) + + engine.close()