Skip to content

Commit 1eaa536

Browse files
committed
feat: реализовать TTS pipeline и интеграцию runtime
- Добавить TtsPipeline с полным циклом: нормализация → токенизация → акустика → декодирование - Реализовать StreamingSession для потокового синтеза с crossfade - Расширить TtsRuntime: async submit, queue stats, синхронный синтез - Добавить BatchScheduler для группировки запросов - Добавить TtsError::QueueFull для обработки переполнения очереди - Добавить интеграционные тесты для pipeline и runtime
1 parent a8279f2 commit 1eaa536

4 files changed

Lines changed: 988 additions & 15 deletions

File tree

crates/runtime/src/lib.rs

Lines changed: 306 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,72 +3,363 @@
33
//! Runtime orchestration for the Qwen3-TTS Rust Engine.
44
//!
55
//! This crate provides:
6+
//! - TTS pipeline integration (text → audio)
7+
//! - Streaming synthesis sessions
68
//! - Request queue management
79
//! - Batching for efficient GPU utilization
810
//! - QoS policies (priority, deadlines, cancellation)
911
//! - Structured logging and metrics
10-
//! - Device management (CPU/GPU selection, warmup)
1112
1213
pub mod logging;
1314
pub mod metrics;
15+
pub mod pipeline;
1416
pub mod queue;
1517

16-
use tracing::info;
17-
use tts_core::{RuntimeConfig, SynthesisRequest, TtsResult};
18+
use std::sync::Arc;
19+
use std::time::Instant;
20+
21+
use tokio::sync::mpsc;
22+
use tracing::{debug, info, instrument, warn};
23+
24+
use tts_core::{AudioChunk, RuntimeConfig, SynthesisRequest, TtsError, TtsResult};
25+
26+
pub use pipeline::{PipelineBackend, PipelineConfig, StreamingSession, TtsPipeline};
27+
pub use queue::{QueuedRequest, RequestQueue};
1828

1929
/// TTS runtime orchestrator.
30+
///
31+
/// Manages the TTS pipeline, request queue, and batch scheduling.
2032
#[derive(Debug)]
2133
pub struct TtsRuntime {
2234
config: RuntimeConfig,
35+
pipeline: Arc<TtsPipeline>,
36+
queue: RequestQueue,
2337
}
2438

2539
impl TtsRuntime {
26-
/// Create a new TTS runtime with the given configuration.
27-
pub fn new(config: RuntimeConfig) -> TtsResult<Self> {
28-
info!("Initializing TTS runtime");
29-
Ok(Self { config })
40+
/// Create a new TTS runtime with mock backend.
41+
pub fn new_mock(config: RuntimeConfig) -> TtsResult<Self> {
42+
info!("Initializing TTS runtime with mock backend");
43+
44+
let pipeline = Arc::new(TtsPipeline::new_mock()?);
45+
let queue = RequestQueue::new(config.queue.max_queue_size);
46+
47+
Ok(Self {
48+
config,
49+
pipeline,
50+
queue,
51+
})
52+
}
53+
54+
/// Create a new TTS runtime with neural backend.
55+
///
56+
/// Note: Currently uses mock backend. Full neural implementation
57+
/// will be added when model weights are available.
58+
#[instrument(skip(config, acoustic_path, tokenizer_path, codec_path))]
59+
pub fn new_neural(
60+
config: RuntimeConfig,
61+
acoustic_path: impl AsRef<std::path::Path>,
62+
tokenizer_path: impl AsRef<std::path::Path>,
63+
codec_path: impl AsRef<std::path::Path>,
64+
) -> TtsResult<Self> {
65+
info!("Initializing TTS runtime with neural backend");
66+
67+
let pipeline = Arc::new(TtsPipeline::new_neural(
68+
acoustic_path,
69+
tokenizer_path,
70+
codec_path,
71+
)?);
72+
let queue = RequestQueue::new(config.queue.max_queue_size);
73+
74+
Ok(Self {
75+
config,
76+
pipeline,
77+
queue,
78+
})
3079
}
3180

3281
/// Get the runtime configuration.
3382
pub fn config(&self) -> &RuntimeConfig {
3483
&self.config
3584
}
3685

37-
/// Submit a synthesis request.
86+
/// Get a reference to the pipeline.
87+
pub fn pipeline(&self) -> &TtsPipeline {
88+
&self.pipeline
89+
}
90+
91+
/// Get an Arc to the pipeline for sharing.
92+
pub fn pipeline_arc(&self) -> Arc<TtsPipeline> {
93+
Arc::clone(&self.pipeline)
94+
}
95+
96+
/// Get queue statistics.
97+
pub fn queue_stats(&self) -> QueueStats {
98+
QueueStats {
99+
size: self.queue.len(),
100+
max_size: self.config.queue.max_queue_size,
101+
is_full: self.queue.is_full(),
102+
}
103+
}
104+
105+
/// Submit a synthesis request for processing.
38106
///
39-
/// Note: Placeholder for Phase 4 implementation.
40-
pub async fn submit(&self, _request: SynthesisRequest) -> TtsResult<()> {
41-
// TODO: Implement in Phase 4
42-
Ok(())
107+
/// Returns a channel receiver for streaming audio chunks.
108+
#[instrument(skip(self, request), fields(session_id = %request.session_id))]
109+
pub async fn submit(
110+
&self,
111+
request: SynthesisRequest,
112+
) -> TtsResult<mpsc::Receiver<TtsResult<AudioChunk>>> {
113+
let start = Instant::now();
114+
115+
// Check queue capacity
116+
if self.queue.is_full() {
117+
warn!("Request queue is full");
118+
return Err(TtsError::queue_full());
119+
}
120+
121+
// Create response channel
122+
let (tx, rx) = mpsc::channel(32);
123+
124+
// Clone pipeline for async processing
125+
let pipeline = Arc::clone(&self.pipeline);
126+
let session_id = request.session_id;
127+
128+
// Spawn synthesis task
129+
tokio::spawn(async move {
130+
debug!(session_id = %session_id, "Starting synthesis");
131+
132+
match pipeline.synthesize(&request.text, Some(request.lang)) {
133+
Ok(audio) => {
134+
let _ = tx.send(Ok(audio)).await;
135+
debug!(
136+
session_id = %session_id,
137+
elapsed_ms = start.elapsed().as_millis(),
138+
"Synthesis completed"
139+
);
140+
}
141+
Err(e) => {
142+
let _ = tx.send(Err(e)).await;
143+
warn!(session_id = %session_id, "Synthesis failed");
144+
}
145+
}
146+
});
147+
148+
Ok(rx)
149+
}
150+
151+
/// Synthesize text synchronously (blocking).
152+
pub fn synthesize_sync(
153+
&self,
154+
text: &str,
155+
lang: Option<tts_core::Lang>,
156+
) -> TtsResult<AudioChunk> {
157+
self.pipeline.synthesize(text, lang)
158+
}
159+
160+
/// Create a streaming synthesis session.
161+
pub fn streaming_session(&self) -> TtsResult<StreamingSession<'_>> {
162+
self.pipeline.streaming_session()
163+
}
164+
165+
/// Cancel a pending request.
166+
pub fn cancel(&self, session_id: uuid::Uuid) -> bool {
167+
self.queue.cancel(session_id)
43168
}
44169

45170
/// Shutdown the runtime gracefully.
46171
pub async fn shutdown(&self) -> TtsResult<()> {
47172
info!("Shutting down TTS runtime");
173+
self.queue.clear();
48174
Ok(())
49175
}
50176
}
51177

52178
impl Default for TtsRuntime {
53179
fn default() -> Self {
54-
Self::new(RuntimeConfig::default()).expect("default config should be valid")
180+
Self::new_mock(RuntimeConfig::default()).expect("default config should be valid")
181+
}
182+
}
183+
184+
/// Queue statistics.
185+
#[derive(Debug, Clone)]
186+
pub struct QueueStats {
187+
/// Current queue size.
188+
pub size: usize,
189+
/// Maximum queue capacity.
190+
pub max_size: usize,
191+
/// Whether the queue is full.
192+
pub is_full: bool,
193+
}
194+
195+
/// Batch scheduler for efficient processing of multiple requests.
196+
#[derive(Debug)]
197+
pub struct BatchScheduler {
198+
max_batch_size: usize,
199+
max_batch_tokens: usize,
200+
batch_window_ms: u64,
201+
}
202+
203+
impl BatchScheduler {
204+
/// Create a new batch scheduler.
205+
pub fn new(max_batch_size: usize, max_batch_tokens: usize, batch_window_ms: u64) -> Self {
206+
Self {
207+
max_batch_size,
208+
max_batch_tokens,
209+
batch_window_ms,
210+
}
211+
}
212+
213+
/// Create from runtime config.
214+
pub fn from_config(config: &RuntimeConfig) -> Self {
215+
Self::new(
216+
config.batching.max_batch_size,
217+
config.batching.max_batch_tokens,
218+
config.batching.batch_window_ms,
219+
)
220+
}
221+
222+
/// Get maximum batch size.
223+
pub fn max_batch_size(&self) -> usize {
224+
self.max_batch_size
225+
}
226+
227+
/// Get maximum tokens per batch.
228+
pub fn max_batch_tokens(&self) -> usize {
229+
self.max_batch_tokens
230+
}
231+
232+
/// Get batch window duration in milliseconds.
233+
pub fn batch_window_ms(&self) -> u64 {
234+
self.batch_window_ms
235+
}
236+
237+
/// Collect requests into a batch.
238+
///
239+
/// Returns requests that fit within batch constraints.
240+
pub fn collect_batch(&self, requests: &mut Vec<QueuedRequest>) -> Vec<QueuedRequest> {
241+
let mut batch = Vec::with_capacity(self.max_batch_size);
242+
let mut total_tokens = 0;
243+
244+
while !requests.is_empty() && batch.len() < self.max_batch_size {
245+
let req = &requests[0];
246+
247+
// Estimate token count (rough: 1 token per 4 chars)
248+
let estimated_tokens = req.request.text.len() / 4 + 1;
249+
250+
if total_tokens + estimated_tokens > self.max_batch_tokens && !batch.is_empty() {
251+
break;
252+
}
253+
254+
batch.push(requests.remove(0));
255+
total_tokens += estimated_tokens;
256+
}
257+
258+
batch
259+
}
260+
}
261+
262+
impl Default for BatchScheduler {
263+
fn default() -> Self {
264+
Self::new(8, 4096, 10)
55265
}
56266
}
57267

58268
#[cfg(test)]
59269
mod tests {
60270
use super::*;
271+
use tts_core::Lang;
61272

62-
#[tokio::test]
63-
async fn test_runtime_creation() {
273+
#[test]
274+
fn test_runtime_creation_mock() {
275+
let runtime = TtsRuntime::new_mock(RuntimeConfig::default()).unwrap();
276+
assert_eq!(runtime.config().batching.max_batch_size, 8);
277+
}
278+
279+
#[test]
280+
fn test_runtime_default() {
64281
let runtime = TtsRuntime::default();
65282
assert_eq!(runtime.config().batching.max_batch_size, 8);
66283
}
67284

285+
#[test]
286+
fn test_runtime_queue_stats() {
287+
let runtime = TtsRuntime::default();
288+
let stats = runtime.queue_stats();
289+
290+
assert_eq!(stats.size, 0);
291+
assert_eq!(stats.max_size, 1000);
292+
assert!(!stats.is_full);
293+
}
294+
295+
#[test]
296+
fn test_runtime_synthesize_sync() {
297+
let runtime = TtsRuntime::default();
298+
let audio = runtime.synthesize_sync("Тест", Some(Lang::Ru)).unwrap();
299+
300+
assert!(audio.num_samples() > 0);
301+
assert_eq!(audio.sample_rate, 24000);
302+
}
303+
304+
#[tokio::test]
305+
async fn test_runtime_submit() {
306+
let runtime = TtsRuntime::default();
307+
let request = SynthesisRequest::new("Test synthesis").with_lang(Lang::En);
308+
309+
let mut rx = runtime.submit(request).await.unwrap();
310+
311+
// Should receive audio chunk
312+
let result = rx.recv().await;
313+
assert!(result.is_some());
314+
315+
let audio = result.unwrap().unwrap();
316+
assert!(audio.num_samples() > 0);
317+
}
318+
68319
#[tokio::test]
69320
async fn test_runtime_shutdown() {
70321
let runtime = TtsRuntime::default();
71322
let result = runtime.shutdown().await;
72323
assert!(result.is_ok());
73324
}
325+
326+
#[test]
327+
fn test_batch_scheduler_creation() {
328+
let scheduler = BatchScheduler::new(16, 8192, 20);
329+
330+
assert_eq!(scheduler.max_batch_size(), 16);
331+
assert_eq!(scheduler.max_batch_tokens(), 8192);
332+
assert_eq!(scheduler.batch_window_ms(), 20);
333+
}
334+
335+
#[test]
336+
fn test_batch_scheduler_from_config() {
337+
let config = RuntimeConfig::default();
338+
let scheduler = BatchScheduler::from_config(&config);
339+
340+
assert_eq!(scheduler.max_batch_size(), config.batching.max_batch_size);
341+
}
342+
343+
#[test]
344+
fn test_batch_scheduler_collect_empty() {
345+
let scheduler = BatchScheduler::default();
346+
let mut requests = Vec::new();
347+
348+
let batch = scheduler.collect_batch(&mut requests);
349+
assert!(batch.is_empty());
350+
}
351+
352+
#[test]
353+
fn test_batch_scheduler_collect_batch() {
354+
let scheduler = BatchScheduler::new(2, 1000, 10);
355+
356+
let mut requests: Vec<QueuedRequest> = (0..5)
357+
.map(|i| QueuedRequest::new(SynthesisRequest::new(format!("Request {i}"))))
358+
.collect();
359+
360+
let batch = scheduler.collect_batch(&mut requests);
361+
362+
assert_eq!(batch.len(), 2); // max_batch_size
363+
assert_eq!(requests.len(), 3); // remaining
364+
}
74365
}

0 commit comments

Comments
 (0)