Skip to content

Commit 1af9aa9

Browse files
committed
Unify managed rust corpus orchestration
1 parent ecb4f49 commit 1af9aa9

6 files changed

Lines changed: 110 additions & 39 deletions

File tree

docs/modernization-handoff.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,9 @@ Delivered:
9595
- Rust-managed session settings lifecycle (`configure_corpus_session`) with settings-aware orchestration (`run_em_on_session_start`) wired in `Lda::Backends::Rust`
9696
- Rust session execution refactor to shared session corpus storage + borrowed orchestration helpers, eliminating deep corpus clone overhead on each session EM run
9797
- unified Rust session orchestration API (`run_em_on_session`) that applies settings + runs EM in one call inside Rust session orchestration
98-
- `Lda::Backends::Rust` now routes session-path EM through managed Rust session orchestration (`run_em_on_session_with_corpus`), leaving session reuse/recovery decisions in Rust; when session orchestration is unavailable it still prefers direct Rust non-session orchestration (`run_em_with_start_seed`) before legacy Ruby-side beta-input fallback (`run_em`)
98+
- `Lda::Backends::Rust` now routes cached-corpus EM through managed Rust orchestration (`run_em_on_session_with_corpus`), leaving session reuse/recovery decisions in Rust and preferring that managed path even when no active session id is cached locally; when the managed API is unavailable it still prefers direct Rust non-session orchestration (`run_em_with_start_seed`) before legacy Ruby-side beta-input fallback (`run_em`)
9999
- direct non-session Rust orchestration now reuses the backend's cached Rust corpus snapshot instead of rebuilding corpus arrays from `@corpus` on each fallback invocation
100-
- Rust managed-session orchestration API (`run_em_on_session_with_corpus`) added to recreate missing sessions and run EM in one Rust call
100+
- Rust managed-session orchestration API (`run_em_on_session_with_corpus`) added to recreate missing sessions and run EM in one Rust call, and now directly falls back to start-aware array execution inside Rust if session-backed execution cannot be used
101101
- Rust session lifecycle replacement API (`replace_corpus_session`) added so corpus reassignment can update existing Rust sessions in place (config reset + corpus swap) instead of Ruby-side drop/recreate
102102
- `Lda::Backends::Rust` now keeps session-based orchestration on the managed Rust path (`run_em_on_session_with_corpus`) even when sessions are dropped externally
103103
- parity/compatibility test coverage and rust runtime CI

docs/porting-strategy.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ Completed in `codex/experiment-ruby3-modernization`:
6666
- Direct non-session Rust orchestration now reuses the backend's cached Rust corpus snapshot instead of rebuilding corpus arrays from `@corpus` on each fallback invocation.
6767
- Rust managed-session orchestration API added (`run_em_on_session_with_corpus`) to recreate missing sessions and execute EM in one Rust call.
6868
- Rust session lifecycle replacement API added (`replace_corpus_session`) so corpus reassignment can update existing Rust sessions in place (config reset + corpus swap) instead of Ruby-side drop/recreate.
69-
- `Lda::Backends::Rust` now routes session-path EM through `run_em_on_session_with_corpus`, leaving session reuse/recovery decisions in Rust and reducing fallback to non-session orchestration when sessions are externally dropped.
69+
- `Lda::Backends::Rust` now routes cached-corpus EM through `run_em_on_session_with_corpus`, leaving session reuse/recovery decisions in Rust, preferring that managed path even when no active session id is cached locally, and reducing Ruby-side fallback branching when sessions are externally dropped.
70+
- `run_em_on_session_with_corpus` now acts as a unified Rust managed-corpus entrypoint: it attempts session-backed execution first, then falls back to direct start-aware array execution inside Rust when a managed session cannot be used.
7071
- Dockerized rust runtime workflow added for local parity with CI (`Dockerfile.rust`, `bin/docker-test-rust`).
7172
- Gem packaging now excludes local Rust cargo build artifacts (`target/**`) for clean release builds.
7273
- Backend benchmark driver added (`bin/benchmark-backends`) to track pure/native/rust runtime deltas.

docs/rust-orchestration-guardrails.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ Current parity expectations:
1313

1414
- Rust vs pure backend fixture parity remains exact within existing tolerances used by tests.
1515
- Session-based orchestration paths (`run_em_on_session`, `run_em_on_session_with_start_seed`, `run_em_on_session_start`, `run_em_on_session_with_corpus`) must match direct non-session orchestration for equivalent settings/seeds.
16-
- `Lda::Backends::Rust` session-path EM should prefer the managed Rust session entrypoint (`run_em_on_session_with_corpus`) rather than branching in Ruby between session-only and recovery paths.
16+
- `Lda::Backends::Rust` cached-corpus EM should prefer the managed Rust session entrypoint (`run_em_on_session_with_corpus`) even when no active session id is cached locally, rather than branching in Ruby between session-only, recovery, and direct paths.
1717
- `Lda::Backends::Rust` non-session fallback should prefer Rust start-aware orchestration (`run_em_with_start_seed`) before legacy beta-input orchestration (`run_em`).
1818
- Direct non-session fallback should reuse the backend's cached Rust corpus snapshot rather than rebuilding corpus arrays from `@corpus` for each invocation.
1919
- Rust backend corpus/session lifecycle must not leak session count across corpus replacement.
2020
- Missing-session recovery in managed session orchestration (`run_em_on_session_with_corpus`) must recreate a usable session and keep parity with direct orchestration.
21+
- Managed Rust corpus orchestration (`run_em_on_session_with_corpus`) must keep parity with direct orchestration even when it falls back internally from session-backed execution to start-seeded array execution.
2122
- Corpus reassignment through Rust session replacement lifecycle (`replace_corpus_session`) must preserve stable session count and route subsequent EM runs over updated corpus data.
2223
- Unknown start-mode handling in seed-aware Rust orchestration must match Ruby's non-seeded fallback behavior when given the same explicit seed.
2324

ext/lda-ruby-rust/src/lib.rs

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -407,20 +407,20 @@ fn random_topic_term_probabilities(
407407
}
408408

409409
fn corpus_session_data(
410-
document_words: Vec<Vec<usize>>,
411-
document_counts: Vec<Vec<f64>>,
410+
document_words: &[Vec<usize>],
411+
document_counts: &[Vec<f64>],
412412
terms: usize,
413413
) -> Arc<CorpusSessionData> {
414414
Arc::new(CorpusSessionData {
415-
document_words,
416-
document_counts,
415+
document_words: document_words.to_vec(),
416+
document_counts: document_counts.to_vec(),
417417
terms,
418418
})
419419
}
420420

421-
fn create_corpus_session(
422-
document_words: Vec<Vec<usize>>,
423-
document_counts: Vec<Vec<f64>>,
421+
fn create_corpus_session_internal(
422+
document_words: &[Vec<usize>],
423+
document_counts: &[Vec<f64>],
424424
terms: usize,
425425
) -> i64 {
426426
let session_id = NEXT_CORPUS_SESSION_ID.fetch_add(1, Ordering::Relaxed);
@@ -438,11 +438,19 @@ fn create_corpus_session(
438438
}
439439
}
440440

441-
fn replace_corpus_session(
442-
session_id: i64,
441+
fn create_corpus_session(
443442
document_words: Vec<Vec<usize>>,
444443
document_counts: Vec<Vec<f64>>,
445444
terms: usize,
445+
) -> i64 {
446+
create_corpus_session_internal(document_words.as_slice(), document_counts.as_slice(), terms)
447+
}
448+
449+
fn replace_corpus_session_internal(
450+
session_id: i64,
451+
document_words: &[Vec<usize>],
452+
document_counts: &[Vec<f64>],
453+
terms: usize,
446454
) -> i64 {
447455
if terms == 0 {
448456
return 0;
@@ -474,11 +482,25 @@ fn replace_corpus_session(
474482
}
475483
}
476484

477-
fn ensure_corpus_session(
485+
fn replace_corpus_session(
478486
session_id: i64,
479487
document_words: Vec<Vec<usize>>,
480488
document_counts: Vec<Vec<f64>>,
481489
terms: usize,
490+
) -> i64 {
491+
replace_corpus_session_internal(
492+
session_id,
493+
document_words.as_slice(),
494+
document_counts.as_slice(),
495+
terms,
496+
)
497+
}
498+
499+
fn ensure_corpus_session(
500+
session_id: i64,
501+
document_words: &[Vec<usize>],
502+
document_counts: &[Vec<f64>],
503+
terms: usize,
482504
) -> i64 {
483505
if terms == 0 {
484506
return 0;
@@ -488,7 +510,7 @@ fn ensure_corpus_session(
488510
return session_id;
489511
}
490512

491-
create_corpus_session(document_words, document_counts, terms)
513+
create_corpus_session_internal(document_words, document_counts, terms)
492514
}
493515

494516
fn drop_corpus_session(session_id: i64) -> bool {
@@ -667,16 +689,42 @@ fn run_em_on_session_with_corpus(
667689
return empty_managed_session_em_output();
668690
}
669691

670-
let active_session_id =
671-
ensure_corpus_session(session_id, document_words, document_counts, terms);
672-
if active_session_id <= 0 {
673-
return empty_managed_session_em_output();
692+
let active_session_id = ensure_corpus_session(
693+
session_id,
694+
document_words.as_slice(),
695+
document_counts.as_slice(),
696+
terms,
697+
);
698+
699+
if active_session_id > 0 {
700+
let (beta_probabilities, beta_log, gamma, phi) = run_em_on_session(
701+
active_session_id,
702+
start.clone(),
703+
topics,
704+
max_iter,
705+
convergence,
706+
em_max_iter,
707+
em_convergence,
708+
init_alpha,
709+
min_probability,
710+
random_seed,
711+
);
712+
713+
if !(beta_probabilities.is_empty()
714+
&& beta_log.is_empty()
715+
&& gamma.is_empty()
716+
&& phi.is_empty())
717+
{
718+
return (active_session_id, beta_probabilities, beta_log, gamma, phi);
719+
}
674720
}
675721

676-
let (beta_probabilities, beta_log, gamma, phi) = run_em_on_session(
677-
active_session_id,
678-
start,
722+
let (beta_probabilities, beta_log, gamma, phi) = run_em_with_start_seed_internal(
723+
start.as_str(),
724+
document_words.as_slice(),
725+
document_counts.as_slice(),
679726
topics,
727+
terms,
680728
max_iter,
681729
convergence,
682730
em_max_iter,

lib/lda-ruby/backends/rust.rb

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -106,18 +106,18 @@ def model
106106
private
107107

108108
def rust_orchestrated_em(start)
109-
session_orchestrated = rust_orchestrated_em_with_session(start)
110-
return true if session_orchestrated
109+
managed_orchestrated = rust_orchestrated_em_with_managed_corpus(start)
110+
return true if managed_orchestrated
111111

112112
direct_orchestrated = rust_orchestrated_em_with_start_seed(start)
113113
return true if direct_orchestrated
114114

115115
rust_orchestrated_em_with_beta(start)
116116
end
117117

118-
def rust_orchestrated_em_with_session(start)
118+
def rust_orchestrated_em_with_managed_corpus(start)
119119
return false unless defined?(::Lda::RustBackend)
120-
return false unless ensure_rust_corpus_session
120+
return false unless ensure_rust_corpus_snapshot
121121

122122
random_seed = Integer(next_random_seed)
123123
if ::Lda::RustBackend.respond_to?(:run_em_on_session_with_corpus)
@@ -134,12 +134,13 @@ def rust_orchestrated_em_with_session(start)
134134
return false unless managed_output.is_a?(Array) && managed_output.size == 5
135135

136136
session_id, beta_probabilities, beta_log, gamma, phi = managed_output
137-
return false unless session_id.is_a?(Numeric) && session_id.positive?
138-
139137
output = [beta_probabilities, beta_log, gamma, phi]
140138
return false unless valid_rust_em_output?(output, @rust_document_lengths, Integer(num_topics), Integer(@rust_corpus_terms))
141139

142-
@rust_corpus_session_id = Integer(session_id)
140+
@rust_corpus_session_id =
141+
if session_id.is_a?(Numeric) && session_id.positive?
142+
Integer(session_id)
143+
end
143144
@fallback.apply_em_state(
144145
beta_probabilities: beta_probabilities,
145146
beta_log: beta_log,
@@ -349,12 +350,6 @@ def register_rust_corpus_session(previous_session_id = nil)
349350
drop_rust_corpus_session_by_id(previous_session_id)
350351
end
351352

352-
def ensure_rust_corpus_session
353-
ensure_rust_corpus_snapshot
354-
rescue StandardError
355-
false
356-
end
357-
358353
def ensure_rust_corpus_snapshot
359354
has_session_data = @rust_corpus_terms && @rust_document_lengths && @rust_document_words && @rust_document_counts
360355
return true if has_session_data

test/rust_orchestration_test.rb

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -623,8 +623,8 @@ def test_rust_backend_non_session_fallback_prefers_run_em_with_start_seed
623623
backend.em_convergence = @em_convergence
624624
backend.init_alpha = @init_alpha
625625

626-
# Force the direct non-session orchestration path.
627-
backend.define_singleton_method(:ensure_rust_corpus_session) { false }
626+
# Force the direct non-managed orchestration path.
627+
backend.define_singleton_method(:rust_orchestrated_em_with_managed_corpus) { |_start| false }
628628

629629
rust_singleton = Lda::RustBackend.singleton_class
630630
run_em_calls = 0
@@ -678,7 +678,7 @@ def test_rust_backend_direct_non_session_path_reuses_cached_corpus_snapshot
678678
backend.em_convergence = @em_convergence
679679
backend.init_alpha = @init_alpha
680680

681-
backend.define_singleton_method(:rust_orchestrated_em_with_session) { |_start| false }
681+
backend.define_singleton_method(:rust_orchestrated_em_with_managed_corpus) { |_start| false }
682682
backend.define_singleton_method(:rust_em_corpus_input) do
683683
raise "direct non-session path should reuse cached corpus snapshot"
684684
end
@@ -689,14 +689,17 @@ def test_rust_backend_direct_non_session_path_reuses_cached_corpus_snapshot
689689
backend&.corpus = nil
690690
end
691691

692-
def test_rust_backend_session_path_prefers_managed_session_entrypoint
692+
def test_rust_backend_prefers_managed_corpus_entrypoint_without_active_session
693693
backend = nil
694694
rust_singleton = nil
695695
run_em_on_session_alias = :__test_original_run_em_on_session_for_managed_preference__
696696
run_em_on_session_with_corpus_alias = :__test_original_run_em_on_session_with_corpus_for_managed_preference__
697+
run_em_with_start_seed_alias = :__test_original_run_em_with_start_seed_for_managed_preference__
697698

698699
omit("run_em_on_session unavailable") unless Lda::RustBackend.respond_to?(:run_em_on_session)
699700
omit("run_em_on_session_with_corpus unavailable") unless Lda::RustBackend.respond_to?(:run_em_on_session_with_corpus)
701+
omit("run_em_with_start_seed unavailable") unless Lda::RustBackend.respond_to?(:run_em_with_start_seed)
702+
omit("drop_corpus_session unavailable") unless Lda::RustBackend.respond_to?(:drop_corpus_session)
700703

701704
backend = Lda::Backends::Rust.new(random_seed: 1234)
702705
backend.corpus = Lda::TextCorpus.new(FIXTURE_DOCUMENTS)
@@ -708,13 +711,20 @@ def test_rust_backend_session_path_prefers_managed_session_entrypoint
708711
backend.em_convergence = @em_convergence
709712
backend.init_alpha = @init_alpha
710713

714+
dropped_session_id = backend.instance_variable_get(:@rust_corpus_session_id)
715+
assert_operator dropped_session_id, :>, 0
716+
assert_equal true, Lda::RustBackend.drop_corpus_session(dropped_session_id)
717+
backend.instance_variable_set(:@rust_corpus_session_id, nil)
718+
711719
rust_singleton = Lda::RustBackend.singleton_class
712720
run_em_on_session_calls = 0
713721
run_em_on_session_with_corpus_calls = 0
722+
run_em_with_start_seed_calls = 0
714723

715724
silence_redefinition_warnings do
716725
rust_singleton.send(:alias_method, run_em_on_session_alias, :run_em_on_session)
717726
rust_singleton.send(:alias_method, run_em_on_session_with_corpus_alias, :run_em_on_session_with_corpus)
727+
rust_singleton.send(:alias_method, run_em_with_start_seed_alias, :run_em_with_start_seed)
718728

719729
rust_singleton.send(:define_method, :run_em_on_session) do |*args|
720730
run_em_on_session_calls += 1
@@ -725,14 +735,30 @@ def test_rust_backend_session_path_prefers_managed_session_entrypoint
725735
run_em_on_session_with_corpus_calls += 1
726736
public_send(run_em_on_session_with_corpus_alias, *args)
727737
end
738+
739+
rust_singleton.send(:define_method, :run_em_with_start_seed) do |*args|
740+
run_em_with_start_seed_calls += 1
741+
public_send(run_em_with_start_seed_alias, *args)
742+
end
728743
end
729744

730745
backend.em("seeded")
731746
assert_equal 0, run_em_on_session_calls
732747
assert_equal 1, run_em_on_session_with_corpus_calls
748+
assert_equal 0, run_em_with_start_seed_calls
733749
assert_equal @topics, backend.gamma.first.size
750+
751+
recreated_session_id = backend.instance_variable_get(:@rust_corpus_session_id)
752+
assert_operator recreated_session_id, :>, 0
753+
assert_not_equal dropped_session_id, recreated_session_id
734754
ensure
735755
silence_redefinition_warnings do
756+
if defined?(rust_singleton) && rust_singleton.method_defined?(run_em_with_start_seed_alias)
757+
rust_singleton.send(:remove_method, :run_em_with_start_seed)
758+
rust_singleton.send(:alias_method, :run_em_with_start_seed, run_em_with_start_seed_alias)
759+
rust_singleton.send(:remove_method, run_em_with_start_seed_alias)
760+
end
761+
736762
if defined?(rust_singleton) && rust_singleton.method_defined?(run_em_on_session_with_corpus_alias)
737763
rust_singleton.send(:remove_method, :run_em_on_session_with_corpus)
738764
rust_singleton.send(:alias_method, :run_em_on_session_with_corpus, run_em_on_session_with_corpus_alias)

0 commit comments

Comments
 (0)