Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 37 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
resolver = "2"
members = ["crates/stem"]
members = ["crates/atom"]
32 changes: 16 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Stem

Off-chain runtime for the [Stem smart contract](src/Stem.sol).
Off-chain runtime for the [Atom smart contract](src/Atom.sol).

Stem indexes `HeadUpdated` events emitted by an on-chain anchor contract,
finalizes them with reorg safety (configurable confirmation depth), and exposes
Expand All @@ -24,7 +24,7 @@ HeadUpdated → Indexer → Finalizer → Epoch → Membrane

The runtime is a four-stage pipeline:

### 1. Stem contract (`src/Stem.sol`)
### 1. Atom contract (`src/Atom.sol`)

On-chain anchor. The owner calls `setHead(newCid)` to advance a monotonic
`seq` and emit a `HeadUpdated` event. The `head()` view returns the canonical
Expand All @@ -39,7 +39,7 @@ event HeadUpdated(
);
```

### 2. Indexer (`StemIndexer`)
### 2. Indexer (`AtomIndexer`)

Subscribes to `HeadUpdated` via WebSocket for live events and backfills
missed blocks via HTTP `eth_getLogs` on startup and reconnect. Broadcasts
Expand All @@ -57,7 +57,7 @@ Consumes observed events from the indexer and outputs only those that are
- **Eligibility** is decided by a pluggable `Strategy` trait. The built-in
`ConfirmationDepth(K)` strategy requires `tip >= event.block_number + K`.
- **Canonical cross-check**: after eligibility, the finalizer calls
`Stem.head()` and only emits if the on-chain `(seq, cid)` matches the
`Atom.head()` and only emits if the on-chain `(seq, cid)` matches the
candidate event.
- **Deduplication** by `(tx_hash, log_index)` ensures exactly-once delivery
across reconnects and backfills.
Expand Down Expand Up @@ -91,14 +91,14 @@ call `graft()` again to obtain a fresh session under the new epoch.

```bash
forge build
cargo build -p stem
cargo build -p atom
```

### Test

```bash
forge test
cargo test -p stem
cargo test -p atom
```

## Deploy (local)
Expand All @@ -119,24 +119,24 @@ forge script script/Deploy.s.sol \
--private-key 0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80

# Note the deployed address from the output, then set the first head:
cast send <STEM_ADDRESS> "setHead(bytes)" "0x$(echo -n 'ipfs://first' | xxd -p)" \
cast send <ATOM_ADDRESS> "setHead(bytes)" "0x$(echo -n 'ipfs://first' | xxd -p)" \
--rpc-url http://127.0.0.1:8545 \
--private-key 0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80
```

## Examples

All examples live in `crates/stem/examples/` and connect to a running node
with a deployed Stem contract.
All examples live in `crates/atom/examples/` and connect to a running node
with a deployed Atom contract.

### `stem_indexer` — raw observed events
### `atom_indexer` — raw observed events

Prints every `HeadUpdated` event as it arrives (no finalization).

```bash
cargo run -p stem --example stem_indexer -- \
cargo run -p atom --example atom_indexer -- \
--rpc-url http://127.0.0.1:8545 \
--contract <STEM_ADDRESS>
--contract <ATOM_ADDRESS>
```

### `finalizer` — finalized events as JSON
Expand All @@ -145,10 +145,10 @@ Runs the full indexer + finalizer pipeline and prints one JSON object per
finalized event.

```bash
cargo run -p stem --example finalizer -- \
cargo run -p atom --example finalizer -- \
--ws-url ws://127.0.0.1:8545 \
--http-url http://127.0.0.1:8545 \
--contract <STEM_ADDRESS> \
--contract <ATOM_ADDRESS> \
--depth 2
```

Expand All @@ -160,10 +160,10 @@ fails with a `staleEpoch` error; the example re-grafts and polls successfully
under the new epoch.

```bash
cargo run -p stem --example membrane_poll -- \
cargo run -p atom --example membrane_poll -- \
--ws-url ws://127.0.0.1:8545 \
--http-url http://127.0.0.1:8545 \
--contract <STEM_ADDRESS> \
--contract <ATOM_ADDRESS> \
--depth 2
```

Expand Down
4 changes: 2 additions & 2 deletions capnp/stem.capnp
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
@0x9bce094a026970c4;

struct Epoch {
seq @0 :UInt64; # Monotonic epoch sequence number (from Stem.seq).
head @1 :Data; # Opaque head bytes from the Stem contract.
seq @0 :UInt64; # Monotonic epoch sequence number (from Atom.seq).
head @1 :Data; # Opaque head bytes from the Atom contract.
adoptedBlock @2 :UInt64;# Block number at which this epoch was adopted.
}

Expand Down
12 changes: 5 additions & 7 deletions crates/stem/Cargo.toml → crates/atom/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
[package]
name = "stem"
name = "atom"
version = "0.1.0"
edition = "2021"
description = "Off-chain Stem runtime: head-following, finalization, and caching for the Stem contract"
description = "Off-chain Atom runtime: head-following, finalization, and caching for the Atom contract"


[build-dependencies]
capnpc = "0.23.3"

[dependencies]
membrane = { git = "https://github.com/wetware/rs.git", branch = "feat/membrane-crate" }
alloy = { version = "0.4", features = ["rpc-types", "sol-types"] }
capnp = "0.23.2"
anyhow = "1"
Expand All @@ -32,8 +30,8 @@ rlp = "0.5"
tokio-test = "0.4"

[[example]]
name = "stem_indexer"
path = "examples/stem_indexer.rs"
name = "atom_indexer"
path = "examples/atom_indexer.rs"

[[example]]
name = "finalizer"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
//! Example: connect to an RPC endpoint and log Stem head updates.
//! Example: connect to an RPC endpoint and log Atom head updates.
//!
//! Imports the stem lib, runs StemIndexer against a Stem contract, and prints each
//! Imports the atom lib, runs AtomIndexer against an Atom contract, and prints each
//! HeadUpdated event (seq, block, writer, cid length). WebSocket URL is derived from
//! the HTTP RPC URL (http -> ws, https -> wss).
//!
//! Usage:
//!
//! cargo run -p stem --example stem_indexer -- --rpc-url <HTTP_URL> --contract <STEM_ADDRESS>
//! cargo run -p atom --example atom_indexer -- --rpc-url <HTTP_URL> --contract <ATOM_ADDRESS>
//!
//! Getting the contract address: deploy Stem with Foundry, then use the printed address:
//! Getting the contract address: deploy Atom with Foundry, then use the printed address:
//!
//! anvil
//! forge script script/Deploy.s.sol --rpc-url http://127.0.0.1:8545 --broadcast --private-key 0xac0974...
//! # "Stem deployed at: 0x..." is the address to pass as --contract
//! # "Atom deployed at: 0x..." is the address to pass as --contract
//!
//! cargo run -p stem --example stem_indexer -- --rpc-url http://127.0.0.1:8545 --contract 0x...
//! cargo run -p atom --example atom_indexer -- --rpc-url http://127.0.0.1:8545 --contract 0x...

use stem::{IndexerConfig, StemIndexer};
use atom::{IndexerConfig, AtomIndexer};
use std::sync::Arc;

fn main() -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -37,8 +37,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
"--help" | "-h" => {
eprintln!(
"Usage: stem_indexer --rpc-url <HTTP_URL> --contract <STEM_ADDRESS>\n\
Logs HeadUpdated events from the Stem contract. WS URL is derived from RPC URL."
"Usage: atom_indexer --rpc-url <HTTP_URL> --contract <ATOM_ADDRESS>\n\
Logs HeadUpdated events from the Atom contract. WS URL is derived from RPC URL."
);
std::process::exit(0);
}
Expand All @@ -47,7 +47,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
i += 1;
}
if rpc_url.is_empty() || contract.is_empty() {
eprintln!("Usage: stem_indexer --rpc-url <HTTP_URL> --contract <STEM_ADDRESS>");
eprintln!("Usage: atom_indexer --rpc-url <HTTP_URL> --contract <ATOM_ADDRESS>");
eprintln!(" (WebSocket URL is derived from the RPC URL)");
std::process::exit(1);
}
Expand All @@ -73,7 +73,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
getlogs_max_range: 1000,
reconnection: Default::default(),
};
let indexer = Arc::new(StemIndexer::new(config));
let indexer = Arc::new(AtomIndexer::new(config));
let mut recv = indexer.subscribe();
let indexer_clone = Arc::clone(&indexer);
std::thread::spawn(move || {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
//!
//! Usage:
//!
//! cargo run -p stem --example finalizer -- --ws-url <WS_URL> --http-url <HTTP_URL> --contract <STEM_ADDRESS>
//! cargo run -p atom --example finalizer -- --ws-url <WS_URL> --http-url <HTTP_URL> --contract <ATOM_ADDRESS>
//!
//! Options:
//! --depth <K> Confirmation depth (number of blocks after event before considering finalized). Default: 6.
//! --cursor <path> Path to file containing start block (one line, decimal). If missing or invalid, start from 0.

use stem::{FinalizerBuilder, IndexerConfig, StemIndexer};
use atom::{FinalizerBuilder, IndexerConfig, AtomIndexer};
use std::io::BufRead;
use std::sync::Arc;

Expand Down Expand Up @@ -74,7 +74,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
"--help" | "-h" => {
eprintln!(
"Usage: finalizer --ws-url <WS_URL> --http-url <HTTP_URL> --contract <STEM_ADDRESS> [--depth K] [--cursor <path>]\n\
"Usage: finalizer --ws-url <WS_URL> --http-url <HTTP_URL> --contract <ATOM_ADDRESS> [--depth K] [--cursor <path>]\n\
Prints one-line JSON per finalized HeadUpdated event (confirmation-depth strategy).\n\
--depth K Confirmation depth (blocks after event before finalized). Default: 6.\n\
--cursor Path to file with start block (one line, decimal). Optional.\n\
Expand All @@ -87,7 +87,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
i += 1;
}
if ws_url.is_empty() || http_url.is_empty() || contract.is_empty() {
eprintln!("Usage: finalizer --ws-url <WS_URL> --http-url <HTTP_URL> --contract <STEM_ADDRESS> [--depth K] [--cursor <path>]");
eprintln!("Usage: finalizer --ws-url <WS_URL> --http-url <HTTP_URL> --contract <ATOM_ADDRESS> [--depth K] [--cursor <path>]");
std::process::exit(1);
}
let contract_address = match parse_contract_address(&contract) {
Expand All @@ -111,7 +111,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
getlogs_max_range: 1000,
reconnection: Default::default(),
};
let indexer = Arc::new(StemIndexer::new(config));
let indexer = Arc::new(AtomIndexer::new(config));
let mut recv = indexer.subscribe();
let indexer_clone = Arc::clone(&indexer);
std::thread::spawn(move || {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
//!
//! Usage:
//!
//! cargo run -p stem --example membrane_poll -- --ws-url <WS_URL> --http-url <HTTP_URL> --contract <STEM_ADDRESS>
//! cargo run -p atom --example membrane_poll -- --ws-url <WS_URL> --http-url <HTTP_URL> --contract <ATOM_ADDRESS>
//!
//! Options:
//! --depth <K> Confirmation depth (blocks after event before finalized). Default: 2.

use capnp_rpc::new_client;
use stem::stem_capnp;
use stem::{current_block_number, FinalizerBuilder, IndexerConfig, StemIndexer, Epoch};
use stem::{FinalizedEvent, membrane_client};
use atom::stem_capnp;
use atom::{current_block_number, FinalizerBuilder, IndexerConfig, AtomIndexer, Epoch};
use atom::{FinalizedEvent, membrane_client};
use std::sync::Arc;
use tokio::sync::watch;

Expand Down Expand Up @@ -83,7 +83,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
"--help" | "-h" => {
eprintln!(
"Usage: membrane_poll --ws-url <WS_URL> --http-url <HTTP_URL> --contract <STEM_ADDRESS> [--depth K]\n\
"Usage: membrane_poll --ws-url <WS_URL> --http-url <HTTP_URL> --contract <ATOM_ADDRESS> [--depth K]\n\
Runs indexer → finalizer → membrane → graft → pollStatus (live-only from current block). Use small --depth (1–2) on a dev chain.\n\
After first epoch, run the printed cast send in another terminal to trigger staleness + re-graft."
);
Expand All @@ -94,7 +94,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
i += 1;
}
if ws_url.is_empty() || http_url.is_empty() || contract.is_empty() {
eprintln!("Usage: membrane_poll --ws-url <WS_URL> --http-url <HTTP_URL> --contract <STEM_ADDRESS> [--depth K]");
eprintln!("Usage: membrane_poll --ws-url <WS_URL> --http-url <HTTP_URL> --contract <ATOM_ADDRESS> [--depth K]");
std::process::exit(1);
}
let contract_address = match parse_contract_address(&contract) {
Expand Down Expand Up @@ -127,7 +127,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
getlogs_max_range: 1000,
reconnection: Default::default(),
};
let indexer = Arc::new(StemIndexer::new(config));
let indexer = Arc::new(AtomIndexer::new(config));
let mut recv = indexer.subscribe();
let indexer_clone = Arc::clone(&indexer);
std::thread::spawn(move || {
Expand Down
2 changes: 1 addition & 1 deletion crates/stem/src/abi.rs → crates/atom/src/abi.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! ABI types and decoding for the Stem contract.
//! ABI types and decoding for the Atom contract.
//!
//! HeadUpdated event and head() view. Decode from JSON-RPC log shape and eth_call return.
//! Uses alloy sol-types for ABI decoding; head() returns (uint64, bytes), event HeadUpdated(seq, writer, cid, cidHash).
Expand Down
Loading