Skip to content

Commit a163904

Browse files
committed
feat: add cron runner worker
1 parent 2fdf44a commit a163904

20 files changed

Lines changed: 944 additions & 39 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/loopforge-cli/src/cli.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ mod commands;
44
mod tests;
55

66
pub(crate) use commands::{
7-
AcpCommand, AgentCommand, AgentKind, ChannelCommand, Cli, Command, ConfigCommand,
7+
AcpCommand, AgentCommand, AgentKind, ChannelCommand, Cli, Command, ConfigCommand, CronCommand,
88
DaemonCommand, HarnessCommand, ReleaseCommand, SkillsCommand,
99
};

crates/loopforge-cli/src/cli/commands.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod acp;
22
mod agent;
33
mod channel;
44
mod config;
5+
mod cron;
56
mod daemon;
67
mod harness;
78
mod release;
@@ -16,6 +17,7 @@ pub(crate) use acp::AcpCommand;
1617
pub(crate) use agent::{AgentCommand, AgentKind};
1718
pub(crate) use channel::ChannelCommand;
1819
pub(crate) use config::ConfigCommand;
20+
pub(crate) use cron::CronCommand;
1921
pub(crate) use daemon::DaemonCommand;
2022
pub(crate) use harness::HarnessCommand;
2123
pub(crate) use release::ReleaseCommand;
@@ -76,6 +78,11 @@ pub(crate) enum Command {
7678
#[command(subcommand)]
7779
command: ChannelCommand,
7880
},
81+
/// Cron scheduler helpers (stored jobs + optional runner)
82+
Cron {
83+
#[command(subcommand)]
84+
command: CronCommand,
85+
},
7986
/// ACP event/checkpoint inspection helpers
8087
Acp {
8188
#[command(subcommand)]
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#[derive(Debug, clap::Subcommand)]
2+
pub(crate) enum CronCommand {
3+
/// Run one cron scheduler tick (process due jobs once)
4+
Tick {
5+
/// Max due jobs to execute in this tick
6+
#[arg(long, default_value_t = 20)]
7+
max_due_per_tick: usize,
8+
/// Max catch-up slots per job when recovering after downtime
9+
#[arg(long, default_value_t = 25)]
10+
max_catchup_slots_per_job: u32,
11+
/// Auto-disable a job after this many consecutive failures
12+
#[arg(long, default_value_t = 5)]
13+
max_consecutive_errors: u32,
14+
/// Minimum delay (seconds) before retrying a failing job
15+
#[arg(long, default_value_t = 30)]
16+
min_retry_delay_secs: u64,
17+
/// Per-job execution timeout (milliseconds)
18+
#[arg(long, default_value_t = 10_000)]
19+
job_timeout_ms: u64,
20+
},
21+
/// Run a long-lived cron worker loop
22+
Worker {
23+
/// Seconds between scheduler ticks
24+
#[arg(long, default_value_t = 2)]
25+
interval_secs: u64,
26+
/// Max due jobs to execute per tick
27+
#[arg(long, default_value_t = 20)]
28+
max_due_per_tick: usize,
29+
/// Max catch-up slots per job when recovering after downtime
30+
#[arg(long, default_value_t = 25)]
31+
max_catchup_slots_per_job: u32,
32+
/// Auto-disable a job after this many consecutive failures
33+
#[arg(long, default_value_t = 5)]
34+
max_consecutive_errors: u32,
35+
/// Minimum delay (seconds) before retrying a failing job
36+
#[arg(long, default_value_t = 30)]
37+
min_retry_delay_secs: u64,
38+
/// Per-job execution timeout (milliseconds)
39+
#[arg(long, default_value_t = 10_000)]
40+
job_timeout_ms: u64,
41+
},
42+
}

crates/loopforge-cli/src/cli/tests.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,21 @@ fn cli_parses_onboard_starter_profile() {
144144
"expected `loopforge onboard --starter workspace-brief` to parse, got: {parsed:?}"
145145
);
146146
}
147+
148+
#[test]
149+
fn cli_parses_cron_tick_subcommand() {
150+
let parsed = Cli::try_parse_from(["loopforge", "cron", "tick"]);
151+
assert!(
152+
parsed.is_ok(),
153+
"expected `loopforge cron tick` to parse, got: {parsed:?}"
154+
);
155+
}
156+
157+
#[test]
158+
fn cli_parses_cron_worker_subcommand() {
159+
let parsed = Cli::try_parse_from(["loopforge", "cron", "worker"]);
160+
assert!(
161+
parsed.is_ok(),
162+
"expected `loopforge cron worker` to parse, got: {parsed:?}"
163+
);
164+
}

crates/loopforge-cli/src/dispatch.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mod acp;
22
mod agent;
33
mod channel;
44
mod config;
5+
mod cron;
56
mod daemon;
67
mod doctor;
78
mod harness;
@@ -31,6 +32,7 @@ pub(crate) async fn run(cli: Cli) -> anyhow::Result<()> {
3132
} => doctor::run(json, strict, timeout_ms).await,
3233
Command::Agent { command } => agent::run(command).await,
3334
Command::Channel { command } => channel::run(command).await,
35+
Command::Cron { command } => cron::run(command).await,
3436
Command::Acp { command } => acp::run(command),
3537
Command::Config { command } => config::run(command),
3638
Command::Skills { command } => skills::run(command).await,
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use crate::{cli::CronCommand, runtime_env};
2+
3+
pub(super) async fn run(command: CronCommand) -> anyhow::Result<()> {
4+
let (_paths, agent) = runtime_env::load_agent_runtime()?;
5+
6+
match command {
7+
CronCommand::Tick {
8+
max_due_per_tick,
9+
max_catchup_slots_per_job,
10+
max_consecutive_errors,
11+
min_retry_delay_secs,
12+
job_timeout_ms,
13+
} => {
14+
let config = rexos::agent::CronRunnerConfig {
15+
tick_interval_secs: 1,
16+
max_due_per_tick,
17+
max_catchup_slots_per_job,
18+
max_consecutive_errors,
19+
min_retry_delay_secs,
20+
job_timeout_ms,
21+
};
22+
let summary = agent.cron_runner_tick(&config).await?;
23+
println!(
24+
"cron: tick_at={} due={} ran={} ok={} failed={} skipped={}",
25+
summary.tick_at,
26+
summary.due,
27+
summary.ran,
28+
summary.ok,
29+
summary.failed,
30+
summary.skipped
31+
);
32+
Ok(())
33+
}
34+
CronCommand::Worker {
35+
interval_secs,
36+
max_due_per_tick,
37+
max_catchup_slots_per_job,
38+
max_consecutive_errors,
39+
min_retry_delay_secs,
40+
job_timeout_ms,
41+
} => {
42+
let config = rexos::agent::CronRunnerConfig {
43+
tick_interval_secs: interval_secs,
44+
max_due_per_tick,
45+
max_catchup_slots_per_job,
46+
max_consecutive_errors,
47+
min_retry_delay_secs,
48+
job_timeout_ms,
49+
};
50+
agent.cron_runner_loop(config).await?;
51+
Ok(())
52+
}
53+
}
54+
}

crates/rexos-memory/src/kv.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use rusqlite::OptionalExtension;
1+
use rusqlite::{OptionalExtension, Transaction, TransactionBehavior};
22

33
use crate::MemoryStore;
44

@@ -20,4 +20,32 @@ impl MemoryStore {
2020
.optional()?;
2121
Ok(value)
2222
}
23+
24+
pub fn kv_update<F>(&self, key: &str, f: F) -> anyhow::Result<Option<String>>
25+
where
26+
F: FnOnce(Option<String>) -> anyhow::Result<Option<String>>,
27+
{
28+
let tx = Transaction::new_unchecked(&self.conn, TransactionBehavior::Immediate)?;
29+
let current = tx
30+
.query_row("SELECT value FROM kv WHERE key=?1", (key,), |row| {
31+
row.get(0)
32+
})
33+
.optional()?;
34+
35+
let next = f(current)?;
36+
match &next {
37+
Some(value) => {
38+
tx.execute(
39+
"INSERT INTO kv (key, value) VALUES (?1, ?2)\n ON CONFLICT(key) DO UPDATE SET value=excluded.value",
40+
(key, value),
41+
)?;
42+
}
43+
None => {
44+
tx.execute("DELETE FROM kv WHERE key=?1", (key,))?;
45+
}
46+
}
47+
48+
tx.commit()?;
49+
Ok(next)
50+
}
2351
}

crates/rexos-runtime/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,6 @@ rexos-kernel = { path = "../rexos-kernel" }
1717
rexos-llm = { path = "../rexos-llm" }
1818
rexos-memory = { path = "../rexos-memory" }
1919
rexos-tools = { path = "../rexos-tools" }
20+
21+
[dev-dependencies]
22+
tempfile.workspace = true

crates/rexos-runtime/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub struct AgentRuntime {
5353
}
5454

5555
pub use outbox::{OutboxDispatcher, OutboxDrainSummary};
56+
pub use scheduling::runner::{CronRunnerConfig, CronRunnerTickSummary};
5657

5758
impl AgentRuntime {
5859
pub fn new(memory: MemoryStore, llms: LlmRegistry, router: ModelRouter) -> Self {

0 commit comments

Comments
 (0)