From b07c6ed626d3ebe554c856ce73296c229d99e831 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Wed, 4 Mar 2026 20:54:21 +0530 Subject: [PATCH 01/32] feat: add admissibility checking for permissionless TX serving --- src/dev_arweave.erl | 70 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 377b5f2a1..9fa76dd1c 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -130,6 +130,14 @@ get_tx(Base, Request, Opts) -> request( <<"GET">>, <<"/tx/", TXID/binary>>, + #{ + <<"multirequest-admissible">> => + #{ + <<"device">> => <<"arweave@2.9">>, + <<"path">> => <<"is-tx-admissible">>, + <<"tx">> => TXID + } + }, Opts#{ exclude_data => hb_util:bool( @@ -144,6 +152,19 @@ get_tx(Base, Request, Opts) -> ) end. +%% @doc Check whether a response to a `GET /tx/ID' request is valid. +%% The TXID is passed through the admissible message (Base), and the response +%% (Request) is verified to have a commitment matching that TXID. +is_tx_admissible(Base, Request, Opts) -> + maybe + {ok, TXID} ?= hb_maps:find(<<"tx">>, Base, Opts), + {ok, CommittedMsg} ?= hb_message:with_only_committed(Request, Opts), + hb_message:verify(CommittedMsg, #{ <<"commitment-ids">> => [TXID] }, Opts) + else + _ -> false + end. + + %% @doc A router for range requests by method. Both `HEAD` and `GET` requests %% are supported. raw(Base, Request, Opts) -> @@ -2156,3 +2177,52 @@ assert_chunk_range(Type, ID, StartOffset, ExpectedLength, ExpectedHash, Opts) -> ?event(debug_test, {data, {explicit, hb_util:encode(crypto:hash(sha256, Data))}}), ?assertEqual(ExpectedHash, hb_util:encode(crypto:hash(sha256, Data))), ok. + +is_admissible_routed_test() -> + hb_http_client:init_prometheus(), + %% Start Node1 which serves cached messages, and Node2 which has nothing. + W1 = ar_wallet:new(), + Node1 = hb_http_server:start_node(#{ priv_wallet => W1 }), + Node2 = hb_http_server:start_node(#{ priv_wallet => ar_wallet:new() }), + %% Get Node1's opts (including its store) from cowboy env and cache messages + Node1ServerID = hb_util:human_id(ar_wallet:to_address(W1)), + Node1Opts = cowboy:get_env(Node1ServerID, node_msg, #{}), + Msg1 = hb_message:commit(#{ <<"a">> => 1 }, Node1Opts, <<"ans104@1.0">>), + {ok, Msg1RawID} = hb_cache:write(Msg1, Node1Opts), + Msg1ID = hb_util:human_id(Msg1RawID), + Msg2 = hb_message:commit(#{ <<"b">> => 1 }, Node1Opts, <<"ans104@1.0">>), + {ok, Msg2RawID} = hb_cache:write(Msg2, Node1Opts), + Msg2ID = hb_util:human_id(Msg2RawID), + %% Start RoutingNode with routes to both Node1 and Node2. + %% Node2 has no data, so admissibility checks will reject its responses. + %% The router will find admissible responses from Node1. + RoutingNode = hb_http_server:start_node(#{ + priv_wallet => ar_wallet:new(), + routes => [ + #{ + <<"template">> => <<"^/arweave/tx">>, + <<"strategy">> => <<"All">>, + <<"nodes">> => + [ + #{ + <<"match">> => <<"/arweave/tx/">>, + <<"with">> => Node1 + }, + #{ + <<"match">> => <<"/arweave/tx/">>, + <<"with">> => Node2 + } + ] + } + ] + }), + %% Fetch Msg1 and Msg2 via RoutingNode with admissibility verification. + ?assertMatch( + {ok, _}, + hb_http:get(RoutingNode, <<"~arweave@2.9/tx=", Msg1ID/binary>>, #{}) + ), + ?assertMatch( + {ok, _}, + hb_http:get(RoutingNode, <<"~arweave@2.9/tx=", Msg2ID/binary>>, #{}) + ), + ok. From 164f6fb969f855cad41804869b38668f16d8f894 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Thu, 5 Mar 2026 19:02:59 +0530 Subject: [PATCH 02/32] fix: use content hash for TX admissibility instead of commitment IDs --- src/dev_arweave.erl | 90 ++++++++++++++++++++++++++++----------------- 1 file changed, 57 insertions(+), 33 deletions(-) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 9fa76dd1c..5e422432c 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -154,17 +154,18 @@ get_tx(Base, Request, Opts) -> %% @doc Check whether a response to a `GET /tx/ID' request is valid. %% The TXID is passed through the admissible message (Base), and the response -%% (Request) is verified to have a commitment matching that TXID. +%% (Request) is verified to have a content hash matching that TXID. is_tx_admissible(Base, Request, Opts) -> maybe {ok, TXID} ?= hb_maps:find(<<"tx">>, Base, Opts), {ok, CommittedMsg} ?= hb_message:with_only_committed(Request, Opts), - hb_message:verify(CommittedMsg, #{ <<"commitment-ids">> => [TXID] }, Opts) + BareMsg = hb_maps:without([<<"commitments">>], CommittedMsg, Opts), + ContentID = hb_message:id(BareMsg, unsigned, Opts), + true ?= (ContentID == TXID) else _ -> false end. - %% @doc A router for range requests by method. Both `HEAD` and `GET` requests %% are supported. raw(Base, Request, Opts) -> @@ -906,7 +907,20 @@ request(Method, Path, Extra, LogExtra, Opts) -> cache_control => [<<"no-cache">>, <<"no-store">>] } ), - to_message(Path, Method, best_response(Res), LogExtra, Opts). + case to_message(Path, Method, best_response(Res), LogExtra, Opts) of + {ok, Msg} -> + case hb_maps:get( + <<"multirequest-admissible">>, Extra, undefined, Opts + ) of + undefined -> {ok, Msg}; + Admissible -> + case is_tx_admissible(Admissible, Msg, Opts) of + true -> {ok, Msg}; + false -> {error, not_admissible} + end + end; + Error -> Error + end. %% @doc Select the best response from a list of responses by sorting them %% ascending by HTTP status code. Returns the first (best) response tuple. @@ -2179,50 +2193,60 @@ assert_chunk_range(Type, ID, StartOffset, ExpectedLength, ExpectedHash, Opts) -> ok. is_admissible_routed_test() -> - hb_http_client:init_prometheus(), - %% Start Node1 which serves cached messages, and Node2 which has nothing. - W1 = ar_wallet:new(), - Node1 = hb_http_server:start_node(#{ priv_wallet => W1 }), - Node2 = hb_http_server:start_node(#{ priv_wallet => ar_wallet:new() }), - %% Get Node1's opts (including its store) from cowboy env and cache messages - Node1ServerID = hb_util:human_id(ar_wallet:to_address(W1)), - Node1Opts = cowboy:get_env(Node1ServerID, node_msg, #{}), - Msg1 = hb_message:commit(#{ <<"a">> => 1 }, Node1Opts, <<"ans104@1.0">>), - {ok, Msg1RawID} = hb_cache:write(Msg1, Node1Opts), - Msg1ID = hb_util:human_id(Msg1RawID), - Msg2 = hb_message:commit(#{ <<"b">> => 1 }, Node1Opts, <<"ans104@1.0">>), - {ok, Msg2RawID} = hb_cache:write(Msg2, Node1Opts), - Msg2ID = hb_util:human_id(Msg2RawID), - %% Start RoutingNode with routes to both Node1 and Node2. - %% Node2 has no data, so admissibility checks will reject its responses. - %% The router will find admissible responses from Node1. + AliceWallet = ar_wallet:new(), + BobWallet = ar_wallet:new(), + AliceNode = hb_http_server:start_node(#{ priv_wallet => AliceWallet }), + BobNode = hb_http_server:start_node(#{ priv_wallet => BobWallet }), + AliceNodeOpts = hb_http_server:get_opts(#{ + http_server => hb_util:human_id(ar_wallet:to_address(AliceWallet)) + }), + BobNodeOpts = hb_http_server:get_opts(#{ + http_server => hb_util:human_id(ar_wallet:to_address(BobWallet)) + }), + AliceMsg = + hb_message:commit(#{ + <<"a">> => 1 }, + AliceNodeOpts, + <<"ans104@1.0">> + ), + {ok, AliceMsgRawID} = hb_cache:write(AliceMsg, AliceNodeOpts), + AliceMsgID = hb_util:human_id(AliceMsgRawID), + BobMsg = + hb_message:commit(#{ + <<"b">> => 1 }, + BobNodeOpts, + <<"ans104@1.0">> + ), + {ok, BobMsgRawID} = hb_cache:write(BobMsg, BobNodeOpts), + BobMsgID = hb_util:human_id(BobMsgRawID), + %% Start RoutingNode with routes to both AliceNode and BobNode. RoutingNode = hb_http_server:start_node(#{ priv_wallet => ar_wallet:new(), routes => [ #{ <<"template">> => <<"^/arweave/tx">>, - <<"strategy">> => <<"All">>, + <<"strategy">> => <<"Random">>, + <<"choose">> => 2, + <<"parallel">> => true, <<"nodes">> => [ #{ <<"match">> => <<"/arweave/tx/">>, - <<"with">> => Node1 + <<"with">> => AliceNode }, #{ <<"match">> => <<"/arweave/tx/">>, - <<"with">> => Node2 + <<"with">> => BobNode } ] } ] }), - %% Fetch Msg1 and Msg2 via RoutingNode with admissibility verification. - ?assertMatch( - {ok, _}, - hb_http:get(RoutingNode, <<"~arweave@2.9/tx=", Msg1ID/binary>>, #{}) - ), - ?assertMatch( - {ok, _}, - hb_http:get(RoutingNode, <<"~arweave@2.9/tx=", Msg2ID/binary>>, #{}) - ), + %% Fetch Alice's and Bob's messages via RoutingNode with admissibility check. + {ok, AliceRes} = + hb_http:get(RoutingNode, <<"~arweave@2.9/tx=", AliceMsgID/binary>>, #{}), + ?assertMatch(#{ <<"a">> := 1 }, AliceRes), + {ok, BobRes} = + hb_http:get(RoutingNode, <<"~arweave@2.9/tx=", BobMsgID/binary>>, #{}), + ?assertMatch(#{ <<"b">> := 1 }, BobRes), ok. From 7ce2a4f1ac31f3b5938d61dc453b6e97d7553929 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 6 Mar 2026 00:18:34 +0530 Subject: [PATCH 03/32] feat: support commitment-based ID check in is_tx_admissible Arweave v2 TXIDs are SHA-256(signature), which differs from the unsigned content hash used by HyperBEAM messages. Add a fallback in is_tx_admissible that checks the commitment-based ID via hb_message:id(CommittedMsg, all, Opts) when the unsigned content hash doesn't match. --- src/dev_arweave.erl | 44 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 5e422432c..3958bf585 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -11,6 +11,7 @@ %%% Helper functions -export([get_chunk/2, bundle_header/2, bundle_header/3]). -include("include/hb.hrl"). +-include("include/hb_arweave_nodes.hrl"). -include_lib("eunit/include/eunit.hrl"). -define(IS_BLOCK_ID(X), (is_binary(X) andalso byte_size(X) == 64)). @@ -161,7 +162,10 @@ is_tx_admissible(Base, Request, Opts) -> {ok, CommittedMsg} ?= hb_message:with_only_committed(Request, Opts), BareMsg = hb_maps:without([<<"commitments">>], CommittedMsg, Opts), ContentID = hb_message:id(BareMsg, unsigned, Opts), - true ?= (ContentID == TXID) + true ?= + ((ContentID == TXID) orelse + (hb_message:id(CommittedMsg, all, Opts) == TXID)) + % and (hb_message:verify(CommittedMsg, all, Opts)) else _ -> false end. @@ -2250,3 +2254,41 @@ is_admissible_routed_test() -> hb_http:get(RoutingNode, <<"~arweave@2.9/tx=", BobMsgID/binary>>, #{}), ?assertMatch(#{ <<"b">> := 1 }, BobRes), ok. + +is_admissible_real_gateway_test_() -> + {timeout, 30, fun() -> + application:ensure_all_started(hb), + TXID = <<"ptBC0UwDmrUTBQX3MqZ1lB57ex20ygwzkjjCrQjIx3o">>, + RouteOpts = #{ + priv_wallet => ar_wallet:new(), + routes => [ + #{ + <<"template">> => <<"^/arweave/tx">>, + <<"strategy">> => <<"Random">>, + <<"choose">> => 10, + <<"parallel">> => true, + <<"nodes">> => ?ARWEAVE_BOOTSTRAP_CHAIN_NODES + }, + #{ + <<"template">> => <<"^/arweave">>, + <<"nodes">> => ?ARWEAVE_BOOTSTRAP_CHAIN_NODES, + <<"parallel">> => true, + <<"admissible-status">> => 200 + } + ] + }, + {ok, Res} = get_tx( + #{ <<"tx">> => TXID, <<"exclude-data">> => true }, + #{}, + RouteOpts + ), + ?assertMatch(#{ <<"reward">> := <<"482143296">> }, Res), + ?assertMatch( + #{ <<"anchor">> := + <<"XTzaU2_m_hRYDLiXkcleOC4zf5MVTXIeFWBOsJSRrtEZ8kM6Oz7EKLhZY7fTAvKq">> + }, + Res + ), + ?assertMatch(#{ <<"content-type">> := <<"application/json">> }, Res), + ok + end}. From 881e3e2b58dfd201592f72d6991b08773e644f79 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 6 Mar 2026 13:26:51 +0530 Subject: [PATCH 04/32] fix: correct way to verify while checking is_admissible --- src/dev_arweave.erl | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 3958bf585..4bd2efb1d 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -163,9 +163,15 @@ is_tx_admissible(Base, Request, Opts) -> BareMsg = hb_maps:without([<<"commitments">>], CommittedMsg, Opts), ContentID = hb_message:id(BareMsg, unsigned, Opts), true ?= - ((ContentID == TXID) orelse - (hb_message:id(CommittedMsg, all, Opts) == TXID)) - % and (hb_message:verify(CommittedMsg, all, Opts)) + (ContentID == TXID) orelse + ( + (hb_message:id(CommittedMsg, all, Opts) == TXID) + andalso hb_message:verify( + CommittedMsg, + #{ <<"commitment-ids">> => [TXID] }, + Opts + ) + ) else _ -> false end. From 547acfd7a655fbcbfb5e1efa23953c20c483d430 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 6 Mar 2026 14:36:13 +0530 Subject: [PATCH 05/32] feat: enable hooks so that node can sign when vouching for a tx --- src/dev_arweave.erl | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 4bd2efb1d..1fb40ed7f 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -154,8 +154,10 @@ get_tx(Base, Request, Opts) -> end. %% @doc Check whether a response to a `GET /tx/ID' request is valid. -%% The TXID is passed through the admissible message (Base), and the response -%% (Request) is verified to have a content hash matching that TXID. +%% 1. Unsigned content hash == TXID: for HyperBEAM-served messages. +%% The hash itself proves content integrity +%% 2. For signed Arweave TXs where TXID = SHA-256(signature). +%% Cryptographically verifies the matching commitment is_tx_admissible(Base, Request, Opts) -> maybe {ok, TXID} ?= hb_maps:find(<<"tx">>, Base, Opts), @@ -925,7 +927,16 @@ request(Method, Path, Extra, LogExtra, Opts) -> undefined -> {ok, Msg}; Admissible -> case is_tx_admissible(Admissible, Msg, Opts) of - true -> {ok, Msg}; + true -> + case dev_hook:on( + <<"tx-admissible">>, + #{ <<"body">> => Msg }, + Opts + ) of + {ok, #{ <<"body">> := ResultMsg }} -> + {ok, ResultMsg}; + {error, Reason} -> {error, Reason} + end; false -> {error, not_admissible} end end; From 5acfd280d044931e82f7bf7ff31a575136876759 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Sat, 7 Mar 2026 01:00:58 +0530 Subject: [PATCH 06/32] fix: test to use http rather than get_tx --- src/dev_arweave.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 1fb40ed7f..3c6a291db 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -2276,7 +2276,7 @@ is_admissible_real_gateway_test_() -> {timeout, 30, fun() -> application:ensure_all_started(hb), TXID = <<"ptBC0UwDmrUTBQX3MqZ1lB57ex20ygwzkjjCrQjIx3o">>, - RouteOpts = #{ + Node = hb_http_server:start_node(#{ priv_wallet => ar_wallet:new(), routes => [ #{ @@ -2293,11 +2293,11 @@ is_admissible_real_gateway_test_() -> <<"admissible-status">> => 200 } ] - }, - {ok, Res} = get_tx( - #{ <<"tx">> => TXID, <<"exclude-data">> => true }, - #{}, - RouteOpts + }), + {ok, Res} = hb_http:get( + Node, + <<"~arweave@2.9/tx=", TXID/binary, "&exclude-data=true">>, + #{} ), ?assertMatch(#{ <<"reward">> := <<"482143296">> }, Res), ?assertMatch( From 9226f1cca8c196b0ad2b435f0a088d1a8278bd32 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Sat, 7 Mar 2026 01:17:52 +0530 Subject: [PATCH 07/32] fix: simplify is_tx_admissible to check TXID membership in commitment IDs --- src/dev_arweave.erl | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 3c6a291db..e5f2ea207 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -154,26 +154,15 @@ get_tx(Base, Request, Opts) -> end. %% @doc Check whether a response to a `GET /tx/ID' request is valid. -%% 1. Unsigned content hash == TXID: for HyperBEAM-served messages. -%% The hash itself proves content integrity -%% 2. For signed Arweave TXs where TXID = SHA-256(signature). -%% Cryptographically verifies the matching commitment +%% Verifies that the requested TXID exists as a commitment ID in the +%% response message, and that all commitments are cryptographically valid. is_tx_admissible(Base, Request, Opts) -> maybe {ok, TXID} ?= hb_maps:find(<<"tx">>, Base, Opts), - {ok, CommittedMsg} ?= hb_message:with_only_committed(Request, Opts), - BareMsg = hb_maps:without([<<"commitments">>], CommittedMsg, Opts), - ContentID = hb_message:id(BareMsg, unsigned, Opts), + CommIDs = maps:keys(maps:get(<<"commitments">>, Request, #{})), true ?= - (ContentID == TXID) orelse - ( - (hb_message:id(CommittedMsg, all, Opts) == TXID) - andalso hb_message:verify( - CommittedMsg, - #{ <<"commitment-ids">> => [TXID] }, - Opts - ) - ) + lists:member(TXID, CommIDs) andalso + hb_message:verify(Request, all, Opts) else _ -> false end. @@ -2308,4 +2297,4 @@ is_admissible_real_gateway_test_() -> ), ?assertMatch(#{ <<"content-type">> := <<"application/json">> }, Res), ok - end}. + end}. \ No newline at end of file From bd2df460ea028d64f5b1a0355f95bef2b389505b Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Mon, 9 Mar 2026 05:04:17 +0530 Subject: [PATCH 08/32] feat: add router-perf@1.0 Erlang execution device for performance-based routing Introduces dev_router_perf.erl, an Erlang execution device for process@1.0 that replaces dynamic-router.lua for Arweave gateway routing. Fixes gaps in the Lua version: supports match/with route format. Key functionality: - register: adds nodes to routes with http_reference for the feedback loop - duration: updates node performance via exponential weighted average (EMA) - recalculate: recomputes weights using decay-based percentile scoring - Supports both match/with and prefix Registers router-perf@1.0 in hb_opts preloaded_devices. Comments added in the file by: Claude Opus 4.6 --- src/dev_router_perf.erl | 490 ++++++++++++++++++++++++++++++++++++++++ src/hb_opts.erl | 1 + 2 files changed, 491 insertions(+) create mode 100644 src/dev_router_perf.erl diff --git a/src/dev_router_perf.erl b/src/dev_router_perf.erl new file mode 100644 index 000000000..7b6b0e516 --- /dev/null +++ b/src/dev_router_perf.erl @@ -0,0 +1,490 @@ +%%% @doc An Erlang execution device for `process@1.0' that provides +%%% performance-based routing. Replaces the Lua `dynamic-router.lua' for +%%% Arweave gateway routing, fixing two gaps: +%%% 1. Supports `match'/`with' route format (not just `prefix'/`price'/`topup'). +%%% 2. Handles monitor duration POSTs that lack an `action' field. +%%% +%%% Device state is stored in the Base message (standard HyperBEAM architecture). +%%% Configuration keys read from Base: +%%% - `performance-period': EMA smoothing period (default 1000). +%%% - `initial-performance': Starting perf score for new nodes (default 30000). +%%% - `sampling-rate': Fraction of random sampling (default 0.1). +%%% - `performance-weight': Weight factor for perf scoring (default 1). +%%% - `pricing-weight': Weight factor for price scoring (default 1). +%%% - `score-preference': Decay exponent for scoring (default 1). +-module(dev_router_perf). +-export([init/3, compute/3, snapshot/3, normalize/3]). +-export([duration/3, register/3, recalculate/3]). +-export([to_float/1]). +-include("include/hb.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%% @doc Compute dispatcher for process@1.0 compatibility. +compute(Base, Req, Opts) -> + Body = hb_ao:get(<<"body">>, Req, Req, Opts), + %% TODO: THIS IS WIERD IS THERE A BETTER WAY??? + %% Falls back to `path' because hb_http_client:maybe_invoke_monitor + %% sends duration data with `path => <<"duration">>' (not `action'). + Action = + case hb_ao:get(<<"action">>, Body, not_found, Opts) of + not_found -> hb_ao:get(<<"path">>, Body, not_found, Opts); + A -> A + end, + case Action of + <<"register">> -> register(Base, Body, Opts); + <<"recalculate">> -> recalculate(Base, Body, Opts); + <<"duration">> -> duration(Base, Body, Opts); + _ -> + ?event({erroe_report, <<"Action not supported.">>}), + {ok, Base} + end. + +%% @doc Initialize the device state with defaults if not already set. +init(Base, _Req, Opts) -> + {ok, ensure_defaults(Base, Opts)}. + +%% @doc Return a snapshot of the execution device state for caching. +snapshot(Base, _Req, _Opts) -> + {ok, Base}. + +%% @doc Restore execution device state from a snapshot. +normalize(Base, _Req, _Opts) -> + {ok, Base}. + +%% @doc Update the performance score for a node identified by `reference'. +%% Uses exponential weighted average: +%% new_perf = current * (1 - 1/period) + duration * (1/period) +duration(RawBase, Req, Opts) -> + Base = ensure_defaults(RawBase, Opts), + Body = hb_ao:get(<<"body">>, Req, Req, Opts), + case hb_ao:get(<<"reference">>, Body, not_found, Opts) of + not_found -> {ok, Base}; + Reference -> + Duration = to_float(hb_ao:get(<<"duration">>, Body, Opts)), + Period = to_float( + hb_ao:get(<<"performance-period">>, Base, 1000, Opts) + ), + ChangeFactor = 1.0 / Period, + Routes = hb_ao:get(<<"routes">>, Base, [], Opts), + UpdatedRoutes = update_node_performance( + Routes, + Reference, + Duration, + ChangeFactor, + Opts + ), + {ok, hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts)} + end. + +%% @doc Register a new node on a route. Supports both: +%% - `match'/`with' format +%% - `prefix'/`price'/`topup' format +%% Generates `http_reference' from the node's URL (with or prefix). +register(RawBase, Req, Opts) -> + Base = ensure_defaults(RawBase, Opts), + Body = hb_ao:get(<<"body">>, Req, Req, Opts), + Route = hb_ao:get(<<"route">>, Body, Opts), + Template = hb_ao:get(<<"template">>, Route, Opts), + Routes = hb_ao:get(<<"routes">>, Base, [], Opts), + InitialPerf = + to_float(hb_ao:get(<<"initial-performance">>, Base, 30000, Opts)), + HttpRef = node_url(Route, Opts), + Node = build_node(Route, HttpRef, InitialPerf, Opts), + RouteConfig = extract_route_config(Route, Opts), + {UpdatedRoutes, _} = + add_node_to_route(Routes, Template, Node, RouteConfig, Opts), + NewBase = hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts), + recalculate(NewBase, Req, Opts). + +%% @doc Recompute weights for all nodes in all routes. +%% Weight = inverse performance -- lower ms = higher weight, normalized. +recalculate(RawBase, _Req, Opts) -> + Base = ensure_defaults(RawBase, Opts), + Routes = hb_ao:get(<<"routes">>, Base, [], Opts), + SamplingRate = to_float(hb_ao:get(<<"sampling-rate">>, Base, 0.1, Opts)), + PerfWeight = to_float(hb_ao:get(<<"performance-weight">>, Base, 1, Opts)), + PricingWeight = to_float(hb_ao:get(<<"pricing-weight">>, Base, 1, Opts)), + ScorePref = to_float(hb_ao:get(<<"score-preference">>, Base, 1, Opts)), + ScoringParams = #{ + sampling_rate => SamplingRate, + perf_weight => PerfWeight, + pricing_weight => PricingWeight, + score_pref => ScorePref + }, + UpdatedRoutes = lists:map( + fun(R) -> + Nodes = hb_ao:get(<<"nodes">>, R, [], Opts), + recalculate_route(R, ScoringParams, Opts, Nodes) + end, + Routes + ), + {ok, hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts)}. + +%%% Internal functions + +-define( + ROUTE_CONFIG_KEYS, + [ + <<"strategy">>, + <<"parallel">>, + <<"choose">>, + <<"admissible-status">> + ] +). + +%% @doc Extract route-level configuration keys from a registration message. +extract_route_config(Route, Opts) -> + lists:foldl( + fun(Key, Acc) -> + case hb_ao:get(Key, Route, not_found, Opts) of + not_found -> Acc; + Value -> Acc#{ Key => Value } + end + end, + #{}, + ?ROUTE_CONFIG_KEYS + ). + +ensure_defaults(Base, Opts) -> + Defaults = [ + {<<"routes">>, []}, + {<<"sampling-rate">>, 0.1}, + {<<"pricing-weight">>, 1}, + {<<"performance-weight">>, 1}, + {<<"score-preference">>, 1}, + {<<"performance-period">>, 1000}, + {<<"initial-performance">>, 30000} + ], + lists:foldl( + fun({Key, Default}, Acc) -> + case hb_ao:get(Key, Acc, not_found, Opts) of + not_found -> hb_ao:set(Acc, #{Key => Default}, Opts); + _ -> Acc + end + end, + Base, + Defaults + ). + + +node_url(Node, Opts) -> + case hb_ao:get(<<"with">>, Node, not_found, Opts) of + not_found -> hb_ao:get(<<"prefix">>, Node, <<"unknown">>, Opts); + With -> With + end. + +%% @doc Build a node map from a registration route. +%% `http_reference' is stored at the top level AND in opts. +build_node(Route, HttpRef, InitialPerf, Opts) -> + ExistingNodeOpts = + case hb_ao:get(<<"opts">>, Route, not_found, Opts) of + NodeOpts when is_map(NodeOpts) -> + %% TODO: PLEASE CROSS CHECK ONCE I THINK IT's OKAY BUT VERIFY + %% Strip old commitments so http_reference gets included + %% when the process pipeline commits the full state. + hb_maps:without([<<"commitments">>], NodeOpts, Opts); + _ -> #{} + end, + Node = #{ + <<"performance">> => InitialPerf, + <<"price">> => to_float(hb_ao:get(<<"price">>, Route, 0, Opts)), + <<"weight">> => 1.0, + <<"http_reference">> => HttpRef, + <<"opts">> => ExistingNodeOpts#{ <<"http_reference">> => HttpRef } + }, + WithMatch = + case hb_ao:get(<<"with">>, Route, not_found, Opts) of + not_found -> #{}; + With -> + #{ + <<"with">> => With, + <<"match">> => hb_ao:get(<<"match">>, Route, <<"">>, Opts) + } + end, + WithPrefix = + case hb_ao:get(<<"prefix">>, Route, not_found, Opts) of + not_found -> #{}; + Prefix -> #{ <<"prefix">> => Prefix } + end, + hb_maps:merge(hb_maps:merge(Node, WithMatch, Opts), WithPrefix, Opts). + +%% @doc Append a node to an existing route or create a new one. +add_node_to_route(Routes, Template, Node, RouteConfig, Opts) -> + case find_route_index(Routes, Template, Opts) of + not_found -> + BaseRoute = #{ + <<"template">> => Template, + <<"strategy">> => <<"By-Weight">>, + <<"nodes">> => [Node] + }, + NewRoute = hb_maps:merge(BaseRoute, RouteConfig, Opts), + {Routes ++ [NewRoute], length(Routes) + 1}; + Idx -> + Route = lists:nth(Idx, Routes), + ExistingNodes = hb_ao:get(<<"nodes">>, Route, [], Opts), + UpdatedRoute = + hb_ao:set( + Route, + #{ <<"nodes">> => ExistingNodes ++ [Node] }, + Opts + ), + MergedRoute = hb_maps:merge(UpdatedRoute, RouteConfig, Opts), + {list_replace(Routes, Idx, MergedRoute), Idx} + end. + +%% @doc Find the 1-based index of the route matching Template, or not_found. +find_route_index(Routes, Template, Opts) -> + find_route_index(Routes, Template, 1, Opts). +find_route_index([], _Template, _Idx, _Opts) -> not_found; +find_route_index([Route | Rest], Template, Idx, Opts) -> + case hb_ao:get(<<"template">>, Route, undefined, Opts) of + Template -> Idx; + _ -> find_route_index(Rest, Template, Idx + 1, Opts) + end. + +%% @doc Replace the element at 1-based Idx in List with Value. +list_replace(List, Idx, Value) -> + {Before, [_ | After]} = lists:split(Idx - 1, List), + Before ++ [Value | After]. + +%% @doc Apply a duration update to the node matching Reference across all routes. +update_node_performance(Routes, Reference, Duration, ChangeFactor, Opts) -> + lists:map( + fun(Route) -> + Nodes = hb_ao:get(<<"nodes">>, Route, [], Opts), + UpdatedNodes = + [ + apply_node_perf(Node, Reference, Duration, ChangeFactor, Opts) + || + Node <- Nodes + ], + hb_ao:set(Route, #{ <<"nodes">> => UpdatedNodes }, Opts) + end, + Routes + ). + +%% @doc Update a single node's performance if its http_reference matches. +apply_node_perf(Node, Reference, Duration, ChangeFactor, Opts) -> + case hb_ao:get(<<"http_reference">>, Node, not_found, Opts) of + Reference -> + OldPerf = to_float(hb_ao:get(<<"performance">>, Node, 30000, Opts)), + NewPerf = (OldPerf * (1.0 - ChangeFactor)) + (Duration * ChangeFactor), + hb_ao:set(Node, #{ <<"performance">> => NewPerf }, Opts); + _ -> + Node + end. + +%% @doc Recompute weights for all nodes in a single route. +recalculate_route(Route, ScoringParams, Opts, Nodes) when is_list(Nodes) -> + #{ + perf_weight := PerfW, + pricing_weight := PriceW, + sampling_rate := SamplingRate, + score_pref := ScorePref + } = ScoringParams, + TotalWeight = PerfW + PriceW, + PerfFactor = PerfW / TotalWeight, + PriceFactor = PriceW / TotalWeight, + PerfValues = + [ + to_float(hb_ao:get(<<"performance">>, N, 30000, Opts)) + || + N <- Nodes + ], + PriceValues = + [ + to_float(hb_ao:get(<<"price">>, N, 0, Opts)) + || + N <- Nodes + ], + SortedPerf = lists:sort(PerfValues), + SortedPrice = lists:sort(PriceValues), + ScoredNodes = + lists:map( + fun(Node) -> + Perf = + to_float(hb_ao:get(<<"performance">>, Node, 30000, Opts)), + Price = + to_float(hb_ao:get(<<"price">>, Node, 0, Opts)), + PerfPercentile = percentile(Perf, SortedPerf), + PricePercentile = percentile(Price, SortedPrice), + PerfScore = + (decay(ScorePref, PerfPercentile) * (1.0 - SamplingRate)) + + SamplingRate, + PriceScore = decay(ScorePref, PricePercentile), + Weight = (PerfScore * PerfFactor) + (PriceScore * PriceFactor), + hb_ao:set(Node, #{ <<"weight">> => Weight }, Opts) + end, + Nodes + ), + hb_ao:set(Route, #{ <<"nodes">> => ScoredNodes }, Opts); +recalculate_route(Route, _ScoringParams, _Opts, _Nodes) -> + Route. + +%% @doc Exponential decay function for score weighting. +decay(Preference, Score) -> + math:exp(-Preference * Score). + +%% @doc Compute the percentile rank of Value in a sorted list. +percentile(_Value, []) -> 0.0; +percentile(_Value, [_]) -> 0.0; +percentile(Value, Sorted) -> + Pos = count_less_or_equal(Value, Sorted, 0), + (Pos - 1) / length(Sorted). + +count_less_or_equal(_Value, [], Count) -> Count; +count_less_or_equal(Value, [H | T], Count) when H =< Value -> + count_less_or_equal(Value, T, Count + 1); +count_less_or_equal(_Value, _Rest, Count) -> Count. + +to_float(V) when is_float(V) -> V; +to_float(V) when is_integer(V) -> float(V); +to_float(V) when is_binary(V) -> + try binary_to_float(V) + catch _:_ -> + try float(binary_to_integer(V)) + catch _:_ -> 0.0 + end + end; +to_float(V) when is_list(V) -> + try list_to_float(V) + catch _:_ -> + try float(list_to_integer(V)) + catch _:_ -> 0.0 + end + end; +to_float(_) -> 0.0. + +%%% ============================================================ +%%% Tests +%%% ============================================================ + +register_test() -> + Base = #{}, + Req = #{ + <<"route">> => #{ + <<"template">> => <<"/test-key">>, + <<"prefix">> => <<"host1">>, + <<"price">> => 5 + } + }, + {ok, Base2} = register(Base, Req, #{}), + Routes = hb_maps:get(<<"routes">>, Base2, [], #{}), + ?assertEqual(1, length(Routes)), + [Route] = Routes, + Nodes = hb_maps:get(<<"nodes">>, Route, [], #{}), + ?assertEqual(1, length(Nodes)), + [Node] = Nodes, + ?assertEqual(<<"host1">>, hb_maps:get(<<"prefix">>, Node, undefined, #{})), + ?assertEqual(5.0, hb_maps:get(<<"price">>, Node, undefined, #{})). + +register_match_with_test() -> + Base = #{}, + Req = #{ + <<"route">> => #{ + <<"template">> => <<"^/arweave">>, + <<"match">> => <<"^/arweave">>, + <<"with">> => <<"http://chain-1.arweave.xyz:1984">> + } + }, + {ok, Base2} = register(Base, Req, #{}), + Routes = hb_maps:get(<<"routes">>, Base2, [], #{}), + ?assertEqual(1, length(Routes)), + [Route] = Routes, + Nodes = hb_maps:get(<<"nodes">>, Route, [], #{}), + ?assertEqual(1, length(Nodes)), + [Node] = Nodes, + ?assertEqual(<<"http://chain-1.arweave.xyz:1984">>, + hb_maps:get(<<"with">>, Node, undefined, #{})), + ?assertEqual(<<"^/arweave">>, + hb_maps:get(<<"match">>, Node, undefined, #{})), + ?assertEqual(<<"http://chain-1.arweave.xyz:1984">>, + hb_maps:get(<<"http_reference">>, Node, undefined, #{})), + NodeOpts = hb_maps:get(<<"opts">>, Node, #{}, #{}), + ?assertEqual(<<"http://chain-1.arweave.xyz:1984">>, + hb_maps:get(<<"http_reference">>, NodeOpts, undefined, #{})). + +duration_test() -> + Base = #{ + <<"performance-period">> => 2, + <<"initial-performance">> => 1000, + <<"routes">> => [ + #{ + <<"template">> => <<"^/arweave">>, + <<"strategy">> => <<"By-Weight">>, + <<"nodes">> => [ + #{ + <<"with">> => <<"http://chain-1.arweave.xyz:1984">>, + <<"match">> => <<"^/arweave">>, + <<"performance">> => 1000.0, + <<"price">> => 0, + <<"weight">> => 1.0, + <<"http_reference">> => + <<"http://chain-1.arweave.xyz:1984">>, + <<"opts">> => #{ + <<"http_reference">> => + <<"http://chain-1.arweave.xyz:1984">> + } + } + ] + } + ] + }, + Req = #{ + <<"duration">> => 200, + <<"reference">> => <<"http://chain-1.arweave.xyz:1984">> + }, + {ok, Base2} = duration(Base, Req, #{}), + Routes = hb_maps:get(<<"routes">>, Base2, [], #{}), + [Route] = Routes, + [Node] = hb_maps:get(<<"nodes">>, Route, [], #{}), + %% With period=2, change_factor=0.5. + %% new_perf = 1000 * 0.5 + 200 * 0.5 = 600. + ?assertEqual(600.0, hb_maps:get(<<"performance">>, Node, undefined, #{})). + +duration_no_reference_test() -> + Base = #{ <<"routes">> => [] }, + Req = #{ + <<"duration">> => 200 + }, + {ok, Base2} = duration(Base, Req, #{}), + ?assertEqual(Base2, ensure_defaults(Base, #{})). + +recalculate_test() -> + Base = #{ + <<"performance-period">> => 6, + <<"initial-performance">> => 30000, + <<"performance-weight">> => 1, + <<"pricing-weight">> => 1, + <<"sampling-rate">> => 0.1, + <<"score-preference">> => 1, + <<"routes">> => [ + #{ + <<"template">> => <<"/test">>, + <<"strategy">> => <<"By-Weight">>, + <<"nodes">> => [ + #{ + <<"prefix">> => <<"host1">>, + <<"performance">> => 200.0, + <<"price">> => 5, + <<"weight">> => 1.0, + <<"http_reference">> => <<"host1">> + }, + #{ + <<"prefix">> => <<"host2">>, + <<"performance">> => 55500.0, + <<"price">> => 5, + <<"weight">> => 1.0, + <<"http_reference">> => <<"host2">> + } + ] + } + ] + }, + {ok, Base2} = recalculate(Base, #{}, #{}), + Routes = hb_maps:get(<<"routes">>, Base2, [], #{}), + [Route] = Routes, + [N1, N2] = hb_maps:get(<<"nodes">>, Route, [], #{}), + W1 = hb_maps:get(<<"weight">>, N1, undefined, #{}), + W2 = hb_maps:get(<<"weight">>, N2, undefined, #{}), + ?assert(W1 > W2). diff --git a/src/hb_opts.erl b/src/hb_opts.erl index 4d169cf88..88a94e21c 100644 --- a/src/hb_opts.erl +++ b/src/hb_opts.erl @@ -218,6 +218,7 @@ default_message() -> #{<<"name">> => <<"rate-limit@1.0">>, <<"module">> => dev_rate_limit}, #{<<"name">> => <<"relay@1.0">>, <<"module">> => dev_relay}, #{<<"name">> => <<"router@1.0">>, <<"module">> => dev_router}, + #{<<"name">> => <<"router-perf@1.0">>, <<"module">> => dev_router_perf}, #{<<"name">> => <<"scheduler@1.0">>, <<"module">> => dev_scheduler}, #{<<"name">> => <<"simple-pay@1.0">>, <<"module">> => dev_simple_pay}, #{<<"name">> => <<"snp@1.0">>, <<"module">> => dev_snp}, From d8992575a78df8f7c840e66a9316a99310599fda Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Mon, 9 Mar 2026 05:14:22 +0530 Subject: [PATCH 09/32] feat: add integration test for router-perf@1.0 and clean up device code Add `is_admissible_hook_routed_test_` that validates the full perf-router feedback loop: gateway registration, TX fetch through routed stack, async monitor duration posts, and performance score updates via EMA. --- src/dev_arweave.erl | 92 +++++++++++++++++++++++++++++++++++++++++ src/dev_router_perf.erl | 1 + 2 files changed, 93 insertions(+) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index e5f2ea207..426954027 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -2261,6 +2261,98 @@ is_admissible_routed_test() -> ?assertMatch(#{ <<"b">> := 1 }, BobRes), ok. +is_admissible_hook_routed_test_() -> + {timeout, 60, fun() -> + application:ensure_all_started(hb), + TXID = <<"ptBC0UwDmrUTBQX3MqZ1lB57ex20ygwzkjjCrQjIx3o">>, + PerfProcess = <<"/perf-router~node-process@1.0">>, + SchedulePath = <>, + RoutesPath = <>, + NodeWallet = ar_wallet:new(), + NodeAddr = hb_util:human_id(NodeWallet), + Opts = #{ + store => hb_test_utils:test_store(), + priv_wallet => NodeWallet, + http_monitor => #{ + <<"method">> => <<"POST">>, + <<"path">> => SchedulePath + }, + router_opts => #{ <<"provider">> => #{ <<"path">> => RoutesPath } }, + node_processes => #{ + <<"perf-router">> => #{ + <<"device">> => <<"process@1.0">>, + <<"execution-device">> => <<"router-perf@1.0">>, + <<"scheduler-device">> => <<"scheduler@1.0">>, + <<"performance-period">> => 2, + <<"initial-performance">> => 1000 + } + }, + routes => [ + #{ + <<"template">> => <<"^/arweave">>, + <<"nodes">> => ?ARWEAVE_BOOTSTRAP_CHAIN_NODES, + <<"parallel">> => true, + <<"admissible-status">> => 200 + } + ] + }, + Node = hb_http_server:start_node(Opts), + %% Register gateways with the perf-router process. + RouteConfig = #{ + <<"template">> => <<"^/arweave">>, + <<"parallel">> => true, + <<"strategy">> => <<"Random">>, + <<"choose">> => 10, + <<"admissible-status">> => 200 + }, + lists:foreach( + fun(GatewayNode) -> + Body = + hb_message:commit( + #{ + <<"action">> => <<"register">>, + <<"route">> => maps:merge(GatewayNode, RouteConfig) + }, + Opts + ), + {ok, _} = + hb_http:post( + Node, + #{ + <<"path">> => SchedulePath, + <<"method">> => <<"POST">>, + <<"body">> => Body + }, + Opts + ) + end, + ?ARWEAVE_BOOTSTRAP_DATA_NODES + ), + %% Trigger compute to process register messages. + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + %% Verify initial performance. + PerfPath = <>, + {ok, InitPerf} = hb_http:get(Node, PerfPath, Opts), + ?assertEqual(1000.0, dev_router_perf:to_float(InitPerf)), + %% Fetch TX through the full stack. + {ok, Res} = + hb_http:get( + Node, + <<"~arweave@2.9/tx=", TXID/binary, "&exclude-data=true">>, + Opts + ), + ?assertMatch(#{ <<"reward">> := <<"482143296">> }, Res), + ?assert(hb_message:verify(Res, all, #{})), + ?assertNot(lists:member(NodeAddr, hb_message:signers(Res, #{}))), + %% Wait for async monitor duration posts, then recompute. + timer:sleep(1000), + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + %% Verify performance score changed from initial value. + {ok, UpdatedPerf} = hb_http:get(Node, PerfPath, Opts), + ?assertNotEqual(1000.0, dev_router_perf:to_float(UpdatedPerf)), + ok + end}. + is_admissible_real_gateway_test_() -> {timeout, 30, fun() -> application:ensure_all_started(hb), diff --git a/src/dev_router_perf.erl b/src/dev_router_perf.erl index 7b6b0e516..ab1d680ab 100644 --- a/src/dev_router_perf.erl +++ b/src/dev_router_perf.erl @@ -15,6 +15,7 @@ -module(dev_router_perf). -export([init/3, compute/3, snapshot/3, normalize/3]). -export([duration/3, register/3, recalculate/3]). +%%% Helper API -export([to_float/1]). -include("include/hb.hrl"). -include_lib("eunit/include/eunit.hrl"). From 3d6921c44011953cca3439b676035999ebb10ba0 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Tue, 10 Mar 2026 04:36:26 +0530 Subject: [PATCH 10/32] fix: calculating the weights post req --- src/dev_router_perf.erl | 115 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 114 insertions(+), 1 deletion(-) diff --git a/src/dev_router_perf.erl b/src/dev_router_perf.erl index ab1d680ab..880d2aeaa 100644 --- a/src/dev_router_perf.erl +++ b/src/dev_router_perf.erl @@ -74,7 +74,8 @@ duration(RawBase, Req, Opts) -> ChangeFactor, Opts ), - {ok, hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts)} + NewBase = hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts), + recalculate(NewBase, Req, Opts) end. %% @doc Register a new node on a route. Supports both: @@ -489,3 +490,115 @@ recalculate_test() -> W1 = hb_maps:get(<<"weight">>, N1, undefined, #{}), W2 = hb_maps:get(<<"weight">>, N2, undefined, #{}), ?assert(W1 > W2). + +hb_gateway_load_balancer_test_() -> + {timeout, 60, fun() -> + application:ensure_all_started(hb), + %% A known ID that exists on HB gateways. + ID = <<"ptBC0UwDmrUTBQX3MqZ1lB57ex20ygwzkjjCrQjIx3o">>, + PerfProcess = <<"/perf-router~node-process@1.0">>, + SchedulePath = <>, + RoutesPath = <>, + NodeWallet = ar_wallet:new(), + HBGateways = [ + #{ + <<"match">> => <<"^/">>, + <<"with">> => <<"https://blue.hyperbeam.zephyrdev.xyz/">> + }, + #{ + <<"match">> => <<"^/">>, + <<"with">> => <<"https://neo.hyperbeam.zephyrdev.xyz/">> + }, + #{ + <<"match">> => <<"^/">>, + <<"with">> => <<"https://neo2.hyperbeam.zephyrdev.xyz/">> + } + ], + Opts = #{ + store => hb_test_utils:test_store(), + priv_wallet => NodeWallet, + http_monitor => #{ + <<"method">> => <<"POST">>, + <<"path">> => SchedulePath + }, + router_opts => #{ + <<"provider">> => #{ <<"path">> => RoutesPath } + }, + on => #{ + <<"request">> => #{ + <<"device">> => <<"router@1.0">>, + <<"path">> => <<"preprocess">> + } + }, + node_processes => #{ + <<"perf-router">> => #{ + <<"device">> => <<"process@1.0">>, + <<"execution-device">> => <<"router-perf@1.0">>, + <<"scheduler-device">> => <<"scheduler@1.0">>, + <<"performance-period">> => 2, + <<"initial-performance">> => 1000 + } + } + }, + Node = hb_http_server:start_node(Opts), + %% Register HB gateways with perf-router. + %% Use ^/(?!.*~) to exclude internal device paths from routing. + RouteConfig = #{ + <<"template">> => <<"^/(?!.*~)">>, + <<"strategy">> => <<"By-Weight">>, + <<"choose">> => 2 + }, + lists:foreach( + fun(GatewayNode) -> + Body = + hb_message:commit( + #{ + <<"action">> => <<"register">>, + <<"route">> => maps:merge(GatewayNode, RouteConfig) + }, + Opts + ), + {ok, _} = + hb_http:post( + Node, + #{ + <<"path">> => SchedulePath, + <<"method">> => <<"POST">>, + <<"body">> => Body + }, + Opts + ) + end, + HBGateways + ), + %% Trigger compute to process register messages. + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + %% Verify 3 nodes registered with initial performance. + PerfPath = + <>, + {ok, InitPerf} = hb_http:get(Node, PerfPath, Opts), + ?assertEqual(1000.0, dev_router_perf:to_float(InitPerf)), + %% Fetch ID through the load-balanced routing. + %% The request goes through: on.request -> preprocess -> relay + %% -> hb_http_multi -> HB gateway -> response. + {ok, Res} = hb_http:get(Node, <<"/", ID/binary>>, Opts), + ?assert(is_map(Res)), + %% Wait for async monitor duration posts, then recompute. + timer:sleep(2000), + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + %% Check that at least one node's performance changed. + %% Routing selects 2 of 3 nodes randomly, so we check all. + Perfs = lists:map( + fun(I) -> + IB = integer_to_binary(I), + P = <>, + {ok, V} = hb_http:get(Node, P, Opts), + dev_router_perf:to_float(V) + end, + [1, 2, 3] + ), + ?assert(lists:any(fun(P) -> P =/= 1000.0 end, Perfs)), + ok + end}. From 45bc93fbf6994ddc12afb70dd4330573e9084562 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Tue, 10 Mar 2026 12:45:10 +0530 Subject: [PATCH 11/32] Revert "fix: calculating the weights post req" This reverts commit dba1bd3be9cf318218068f465d61f5249699f9a8. --- src/dev_router_perf.erl | 115 +--------------------------------------- 1 file changed, 1 insertion(+), 114 deletions(-) diff --git a/src/dev_router_perf.erl b/src/dev_router_perf.erl index 880d2aeaa..ab1d680ab 100644 --- a/src/dev_router_perf.erl +++ b/src/dev_router_perf.erl @@ -74,8 +74,7 @@ duration(RawBase, Req, Opts) -> ChangeFactor, Opts ), - NewBase = hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts), - recalculate(NewBase, Req, Opts) + {ok, hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts)} end. %% @doc Register a new node on a route. Supports both: @@ -490,115 +489,3 @@ recalculate_test() -> W1 = hb_maps:get(<<"weight">>, N1, undefined, #{}), W2 = hb_maps:get(<<"weight">>, N2, undefined, #{}), ?assert(W1 > W2). - -hb_gateway_load_balancer_test_() -> - {timeout, 60, fun() -> - application:ensure_all_started(hb), - %% A known ID that exists on HB gateways. - ID = <<"ptBC0UwDmrUTBQX3MqZ1lB57ex20ygwzkjjCrQjIx3o">>, - PerfProcess = <<"/perf-router~node-process@1.0">>, - SchedulePath = <>, - RoutesPath = <>, - NodeWallet = ar_wallet:new(), - HBGateways = [ - #{ - <<"match">> => <<"^/">>, - <<"with">> => <<"https://blue.hyperbeam.zephyrdev.xyz/">> - }, - #{ - <<"match">> => <<"^/">>, - <<"with">> => <<"https://neo.hyperbeam.zephyrdev.xyz/">> - }, - #{ - <<"match">> => <<"^/">>, - <<"with">> => <<"https://neo2.hyperbeam.zephyrdev.xyz/">> - } - ], - Opts = #{ - store => hb_test_utils:test_store(), - priv_wallet => NodeWallet, - http_monitor => #{ - <<"method">> => <<"POST">>, - <<"path">> => SchedulePath - }, - router_opts => #{ - <<"provider">> => #{ <<"path">> => RoutesPath } - }, - on => #{ - <<"request">> => #{ - <<"device">> => <<"router@1.0">>, - <<"path">> => <<"preprocess">> - } - }, - node_processes => #{ - <<"perf-router">> => #{ - <<"device">> => <<"process@1.0">>, - <<"execution-device">> => <<"router-perf@1.0">>, - <<"scheduler-device">> => <<"scheduler@1.0">>, - <<"performance-period">> => 2, - <<"initial-performance">> => 1000 - } - } - }, - Node = hb_http_server:start_node(Opts), - %% Register HB gateways with perf-router. - %% Use ^/(?!.*~) to exclude internal device paths from routing. - RouteConfig = #{ - <<"template">> => <<"^/(?!.*~)">>, - <<"strategy">> => <<"By-Weight">>, - <<"choose">> => 2 - }, - lists:foreach( - fun(GatewayNode) -> - Body = - hb_message:commit( - #{ - <<"action">> => <<"register">>, - <<"route">> => maps:merge(GatewayNode, RouteConfig) - }, - Opts - ), - {ok, _} = - hb_http:post( - Node, - #{ - <<"path">> => SchedulePath, - <<"method">> => <<"POST">>, - <<"body">> => Body - }, - Opts - ) - end, - HBGateways - ), - %% Trigger compute to process register messages. - {ok, _} = hb_http:get(Node, RoutesPath, Opts), - %% Verify 3 nodes registered with initial performance. - PerfPath = - <>, - {ok, InitPerf} = hb_http:get(Node, PerfPath, Opts), - ?assertEqual(1000.0, dev_router_perf:to_float(InitPerf)), - %% Fetch ID through the load-balanced routing. - %% The request goes through: on.request -> preprocess -> relay - %% -> hb_http_multi -> HB gateway -> response. - {ok, Res} = hb_http:get(Node, <<"/", ID/binary>>, Opts), - ?assert(is_map(Res)), - %% Wait for async monitor duration posts, then recompute. - timer:sleep(2000), - {ok, _} = hb_http:get(Node, RoutesPath, Opts), - %% Check that at least one node's performance changed. - %% Routing selects 2 of 3 nodes randomly, so we check all. - Perfs = lists:map( - fun(I) -> - IB = integer_to_binary(I), - P = <>, - {ok, V} = hb_http:get(Node, P, Opts), - dev_router_perf:to_float(V) - end, - [1, 2, 3] - ), - ?assert(lists:any(fun(P) -> P =/= 1000.0 end, Perfs)), - ok - end}. From 544eebb51a031378e7301c4d88fde2e1c59bdb51 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Wed, 11 Mar 2026 16:32:09 +0530 Subject: [PATCH 12/32] test: added test for load balancing on gateways --- src/dev_router_perf.erl | 144 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/src/dev_router_perf.erl b/src/dev_router_perf.erl index ab1d680ab..c25ce3ceb 100644 --- a/src/dev_router_perf.erl +++ b/src/dev_router_perf.erl @@ -489,3 +489,147 @@ recalculate_test() -> W1 = hb_maps:get(<<"weight">>, N1, undefined, #{}), W2 = hb_maps:get(<<"weight">>, N2, undefined, #{}), ?assert(W1 > W2). + +hb_gateway_load_balancer_test_() -> + {timeout, 60, fun() -> + application:ensure_all_started(hb), + ID = <<"ptBC0UwDmrUTBQX3MqZ1lB57ex20ygwzkjjCrQjIx3o">>, + PerfProcess = <<"/perf-router~node-process@1.0">>, + SchedulePath = <>, + RoutesPath = <>, + HBGateways = [ + #{ + <<"match">> => <<"^/">>, + <<"with">> => <<"https://blue.hyperbeam.zephyrdev.xyz/">> + }, + #{ + <<"match">> => <<"^/">>, + <<"with">> => <<"https://neo.hyperbeam.zephyrdev.xyz/">> + }, + #{ <<"match">> => <<"^/">>, + <<"with">> => <<"https://neo2.hyperbeam.zephyrdev.xyz/">> } + ], + Opts = #{ + store => hb_test_utils:test_store(), + priv_wallet => ar_wallet:new(), + http_monitor => #{ + <<"method">> => <<"POST">>, + <<"path">> => SchedulePath + }, + router_opts => #{ + <<"provider">> => #{ <<"path">> => RoutesPath } + }, + on => #{ + <<"request">> => #{ + <<"device">> => <<"router@1.0">>, + <<"path">> => <<"preprocess">> + } + }, + node_processes => #{ + <<"perf-router">> => #{ + <<"device">> => <<"process@1.0">>, + <<"execution-device">> => <<"router-perf@1.0">>, + <<"scheduler-device">> => <<"scheduler@1.0">>, + <<"performance-period">> => 2, + <<"initial-performance">> => 1000 + } + } + }, + Node = hb_http_server:start_node(Opts), + %% Register HB gateways with the perf-router process. + RouteConfig = #{ + <<"template">> => <<"^/(?!.*~)">>, + <<"strategy">> => <<"By-Weight">>, + <<"choose">> => 2 + }, + lists:foreach( + fun(GW) -> + Body = + hb_message:commit( + #{ + <<"action">> => <<"register">>, + <<"route">> => maps:merge(GW, RouteConfig) + }, + Opts + ), + {ok, _} = + hb_http:post( + Node, + #{ + <<"path">> => SchedulePath, + <<"method">> => <<"POST">>, + <<"body">> => Body + }, + Opts + ) + end, + HBGateways + ), + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + {ok, InitPerf} = + hb_http:get( + Node, + <>, + Opts + ), + ?assertEqual(1000.0, to_float(InitPerf)), + {ok, Res} = hb_http:get(Node, <<"/", ID/binary>>, Opts), + ?assert(is_map(Res)), + timer:sleep(2000), + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + Perfs = + lists:map( + fun(I) -> + IB = integer_to_binary(I), + {ok, V} = + hb_http:get( + Node, + << + PerfProcess/binary, + "/now/routes/1/nodes/", + IB/binary, + "/performance" + >>, + Opts + ), + to_float(V) + end, + [1, 2, 3] + ), + ?assert(lists:any(fun(P) -> P =/= 1000.0 end, Perfs)), + RecalcBody = + hb_message:commit(#{ <<"action">> => <<"recalculate">> }, Opts), + {ok, _} = + hb_http:post( + Node, + #{ + <<"path">> => SchedulePath, + <<"method">> => <<"POST">>, + <<"body">> => RecalcBody + }, + Opts + ), + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + Weights = + lists:map( + fun(I) -> + IB = integer_to_binary(I), + {ok, W} = + hb_http:get( + Node, + << + PerfProcess/binary, + "/now/routes/1/nodes/", + IB/binary, + "/weight" + >>, + Opts + ), + to_float(W) + end, + [1, 2, 3] + ), + UniqueWeights = lists:usort(Weights), + ?assert(length(UniqueWeights) > 1), + ok + end}. \ No newline at end of file From a920927395b2516a198f1ab45ee6fe1f787090c7 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Sun, 15 Mar 2026 06:18:49 +0530 Subject: [PATCH 13/32] feat: add monitor-sampler@1.0 device for probabilistic sampling Lightweight device that gates HTTP monitor invocations via 1-in-N probabilistic sampling. Reads `sample-rate` from the request message and rolls `rand:uniform(Rate) =:= 1`. If absent, all requests pass. --- src/dev_monitor_sampler.erl | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 src/dev_monitor_sampler.erl diff --git a/src/dev_monitor_sampler.erl b/src/dev_monitor_sampler.erl new file mode 100644 index 000000000..078e0f195 --- /dev/null +++ b/src/dev_monitor_sampler.erl @@ -0,0 +1,22 @@ +-module(dev_monitor_sampler). +-export([info/1, should_sample/3]). +-include("include/hb.hrl"). + + +%%% A lightweight device that gates HTTP monitor invocations via probabilistic +%%% sampling. When `sample-rate` is set to N in the monitor config, only +%%% ~1-in-N requests will be forwarded. If absent, every request is forwarded. + +info(_Base) -> + #{default => fun should_sample/3}. + +should_sample(_Base, Req, Opts) -> + ?event({req, Req}), + case hb_maps:get(<<"sample-rate">>, Req, not_found, Opts) of + not_found -> + {ok, true}; + Rate when is_integer(Rate), Rate > 0 -> + {ok, rand:uniform(Rate) =:= 1}; + _Other -> + {ok, true} + end. From 37ff1998f3fdfce2795faf091ae8bcb2ea0a6cb5 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Sun, 15 Mar 2026 06:19:09 +0530 Subject: [PATCH 14/32] feat: register monitor-sampler@1.0 in preloaded devices --- src/hb_opts.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/hb_opts.erl b/src/hb_opts.erl index 88a94e21c..0a0111a0b 100644 --- a/src/hb_opts.erl +++ b/src/hb_opts.erl @@ -205,6 +205,7 @@ default_message() -> #{<<"name">> => <<"message@1.0">>, <<"module">> => dev_message}, #{<<"name">> => <<"meta@1.0">>, <<"module">> => dev_meta}, #{<<"name">> => <<"monitor@1.0">>, <<"module">> => dev_monitor}, + #{<<"name">> => <<"monitor-sampler@1.0">>, <<"module">> => dev_monitor_sampler}, #{<<"name">> => <<"multipass@1.0">>, <<"module">> => dev_multipass}, #{<<"name">> => <<"name@1.0">>, <<"module">> => dev_name}, #{<<"name">> => <<"node-process@1.0">>, <<"module">> => dev_node_process}, From 41c4c9cc6156fd33602bd5bd116e239004f34c77 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Sun, 15 Mar 2026 06:19:39 +0530 Subject: [PATCH 15/32] feat: integrate monitor sampling and path sanitization in http_client - Gate monitor invocations via monitor-sampler@1.0 when `sample-rate` is set in the http_monitor config - When `is-wasm-process` is set, strip the reserved `path` key from the monitor body and move it to `monitor-type`. opts example --- ``` json "http_monitor": { "device": "relay@1.0", "method": "POST", "peer": "http://localhost:9000", "path": "call", "relay-path": "//push", "commit-request": true, "sample-rate": 10, "is-wasm-process": true } ``` --- src/hb_http_client.erl | 89 ++++++++++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 33 deletions(-) diff --git a/src/hb_http_client.erl b/src/hb_http_client.erl index e02135d39..a925bf125 100644 --- a/src/hb_http_client.erl +++ b/src/hb_http_client.erl @@ -261,43 +261,66 @@ init_hackney_pool() -> {timeout, ?DEFAULT_KEEPALIVE_TIMEOUT} ]). -%% @doc Invoke the HTTP monitor message with AO-Core, if it is set in the -%% node message key. We invoke the given message with the `body' set to a signed -%% version of the details. This allows node operators to configure their machine -%% to record duration statistics into customized data stores, computations, or -%% processes etc. Additionally, we include the `http_reference' value, if set in -%% the given `opts'. -%% -%% We use `hb_ao:get' rather than `hb_opts:get', as settings configured -%% by the `~router@1.0' route `opts' key are unable to generate atoms. +%% @doc Invoke the HTTP monitor if configured. Optionally gates invocations +%% via `monitor-sampler@1.0' when `sample-rate' is set in the monitor config. +%% When the target is a WASM process (`is-wasm-process' in the monitor config), +%% the reserved `path' key is moved to `monitor-type' to avoid interfering +%% with AO-Core routing. maybe_invoke_monitor(Details, Opts) -> case hb_ao:get(<<"http_monitor">>, Opts, Opts) of not_found -> ok; Monitor -> - % We have a monitor message. Place the `details' into the body, set - % the `method' to "POST", add the `http_reference' (if applicable) - % and sign the request. We use the node message's wallet as the - % source of the key. - MaybeWithReference = - case hb_ao:get(<<"http_reference">>, Opts, Opts) of - not_found -> Details; - Ref -> Details#{ <<"reference">> => Ref } - end, - Req = - Monitor#{ - <<"body">> => - hb_message:commit( - MaybeWithReference#{ - <<"method">> => <<"POST">> - }, - Opts - ) - }, - % Use the singleton parse to generate the message sequence to - % execute. - ReqMsgs = hb_singleton:from(Req, Opts), - Res = hb_ao:resolve_many(ReqMsgs, Opts), - ?event(debug_http_monitor, {resolved_monitor, Res}) + case should_forward(Monitor, Opts) of + false -> ok; + true -> do_invoke_monitor(Monitor, Details, Opts) + end + end. + +%% @doc Check whether this request should be forwarded to the monitor. +%% If `sample-rate' is not set in the monitor config, all requests are forwarded. +should_forward(Monitor, Opts) -> + case hb_maps:get(<<"sample-rate">>, Monitor, not_found, Opts) of + not_found -> true; + _ -> + case hb_ao:resolve( + #{ <<"device">> => <<"monitor-sampler@1.0">> }, + Monitor#{ <<"path">> => <<"should-sample">> }, + Opts + ) of + {ok, true} -> true; + {ok, false} -> false; + Error -> Error + end + end. + +%% @doc Build and dispatch the monitor message. +do_invoke_monitor(Monitor, Details, Opts) -> + WithRef = + case hb_ao:get(<<"http_reference">>, Opts, Opts) of + not_found -> Details; + Ref -> Details#{ <<"reference">> => Ref } + end, + BodyData = sanitize_body(WithRef, Monitor, Opts), + Req = Monitor#{ + <<"body">> => + hb_message:commit( + BodyData#{ <<"method">> => <<"POST">> }, Opts + ) + }, + ReqMsgs = hb_singleton:from(Req, Opts), + Res = hb_ao:resolve_many(ReqMsgs, Opts), + ?event(http_monitor, {resolved_monitor, Res}). + +%% @doc When the monitor target is a WASM process, `path' is a reserved +%% AO-Core routing key that the scheduler copies into the assignment. +%% Move it to `monitor-type' to avoid breaking execution dispatch. +sanitize_body(Body, Monitor, Opts) -> + case hb_ao:get(<<"is-wasm-process">>, Monitor, Opts) of + true -> + {PathVal, Rest} = hb_maps:take(<<"path">>, Body, Opts), + Rest#{ <<"monitor-type">> => PathVal }; + _ -> + Body end. %%% ================================================================== From 07b72a4c6b0f863e29f289c79215eabe9e0aca6b Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Sun, 15 Mar 2026 06:34:06 +0530 Subject: [PATCH 16/32] impr: right key for acrion --- src/hb_http_client.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hb_http_client.erl b/src/hb_http_client.erl index a925bf125..be14247cd 100644 --- a/src/hb_http_client.erl +++ b/src/hb_http_client.erl @@ -313,12 +313,12 @@ do_invoke_monitor(Monitor, Details, Opts) -> %% @doc When the monitor target is a WASM process, `path' is a reserved %% AO-Core routing key that the scheduler copies into the assignment. -%% Move it to `monitor-type' to avoid breaking execution dispatch. +%% Move it to `Action' to avoid breaking execution dispatch. sanitize_body(Body, Monitor, Opts) -> case hb_ao:get(<<"is-wasm-process">>, Monitor, Opts) of true -> {PathVal, Rest} = hb_maps:take(<<"path">>, Body, Opts), - Rest#{ <<"monitor-type">> => PathVal }; + Rest#{ <<"Action">> => PathVal }; _ -> Body end. From 00dd9812cbb9e48bab601474915805380c767f1b Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Wed, 18 Mar 2026 19:32:05 +0530 Subject: [PATCH 17/32] feat: support nested hook names via deep_get in dev_hook Change find/3 to use hb_util:deep_get instead of maps:get so that slash-separated hook names like <<"http-client/response">> resolve to nested keys in the on config map --- src/dev_hook.erl | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/src/dev_hook.erl b/src/dev_hook.erl index 1696745ec..ddf6b1999 100644 --- a/src/dev_hook.erl +++ b/src/dev_hook.erl @@ -92,7 +92,7 @@ find(HookName, Opts) -> find(#{}, #{ <<"target">> => <<"body">>, <<"body">> => HookName }, Opts). find(_Base, Req, Opts) -> HookName = maps:get(maps:get(<<"target">>, Req, <<"body">>), Req), - case maps:get(HookName, hb_opts:get(on, #{}, Opts), []) of + case hb_util:deep_get(HookName, hb_opts:get(on, #{}, Opts), [], Opts) of Handler when is_map(Handler) -> case hb_util:is_ordered_list(Handler, Opts) of true -> @@ -169,7 +169,7 @@ execute_handler(HookName, Handler, Req, Opts) -> <<"method">> => hb_maps:get(<<"method">>, Handler, <<"GET">>, Opts) }, - CommitReqBin = + CommitReqBin = hb_util:bin( hb_util:deep_get( <<"hook/commit-request">>, @@ -316,4 +316,26 @@ halt_on_error_test() -> Req = #{ <<"test">> => <<"value">> }, Opts = #{ on => #{ <<"test-hook">> => [Handler1, Handler2, Handler3] }}, {error, Result} = on(<<"test-hook">>, Req, Opts), - ?assertEqual(<<"Error in handler2">>, Result). \ No newline at end of file + ?assertEqual(<<"Error in handler2">>, Result). + +%% @doc Test that nested hook names (slash-separated) resolve correctly +nested_hook_name_test() -> + Handler = #{ + <<"device">> => #{ + nested_key => + fun(_, Req, _) -> + {ok, Req#{ <<"nested_executed">> => true }} + end + }, + <<"path">> => <<"nested-key">> + }, + Req = #{ <<"test">> => <<"value">> }, + Opts = #{ + on => #{ + <<"http-client">> => #{ + <<"response">> => Handler + } + } + }, + {ok, Result} = on(<<"http-client/response">>, Req, Opts), + ?assertEqual(true, maps:get(<<"nested_executed">>, Result)). \ No newline at end of file From fa611fd24647ce04d09c94e4d7cd7c52e8827933 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Wed, 18 Mar 2026 19:32:46 +0530 Subject: [PATCH 18/32] feat: add generic chance@1.0 gate device, replace monitor-sampler Add dev_chance.erl as a composable 1-in-N probabilistic gate using the 4-arity default handler pattern. --- src/dev_chance.erl | 23 +++++++++++++++++++++++ src/dev_monitor_sampler.erl | 22 ---------------------- src/hb_opts.erl | 2 +- 3 files changed, 24 insertions(+), 23 deletions(-) create mode 100644 src/dev_chance.erl delete mode 100644 src/dev_monitor_sampler.erl diff --git a/src/dev_chance.erl b/src/dev_chance.erl new file mode 100644 index 000000000..87d7a0751 --- /dev/null +++ b/src/dev_chance.erl @@ -0,0 +1,23 @@ +%%% @doc A generic probabilistic gate device. When accessed as +%%% `~chance@1.0/N', the key `N' is parsed as an integer rate. +%%% A 1-in-N random check is performed: +%%% +%%% {ok, Req} -- check passed, hook chain continues +%%% {error, _} -- check failed, hook chain halts +%%% +%%% This device uses the 4-arity default handler pattern so that the +%%% rate is supplied via the URL path rather than a config key. +-module(dev_chance). +-export([info/1]). +-include("include/hb.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +info(_) -> #{ default => fun default/4 }. + +default(Key, _Base, Req, _Opts) -> + Rate = binary_to_integer(Key), + ?event({request, {rate, Rate}}), + case Rate > 0 andalso rand:uniform(Rate) =:= 1 of + true -> {ok, Req}; + false -> {error, <<"Filtered by chance gate.">>} + end. diff --git a/src/dev_monitor_sampler.erl b/src/dev_monitor_sampler.erl deleted file mode 100644 index 078e0f195..000000000 --- a/src/dev_monitor_sampler.erl +++ /dev/null @@ -1,22 +0,0 @@ --module(dev_monitor_sampler). --export([info/1, should_sample/3]). --include("include/hb.hrl"). - - -%%% A lightweight device that gates HTTP monitor invocations via probabilistic -%%% sampling. When `sample-rate` is set to N in the monitor config, only -%%% ~1-in-N requests will be forwarded. If absent, every request is forwarded. - -info(_Base) -> - #{default => fun should_sample/3}. - -should_sample(_Base, Req, Opts) -> - ?event({req, Req}), - case hb_maps:get(<<"sample-rate">>, Req, not_found, Opts) of - not_found -> - {ok, true}; - Rate when is_integer(Rate), Rate > 0 -> - {ok, rand:uniform(Rate) =:= 1}; - _Other -> - {ok, true} - end. diff --git a/src/hb_opts.erl b/src/hb_opts.erl index 0a0111a0b..7fc4b0b09 100644 --- a/src/hb_opts.erl +++ b/src/hb_opts.erl @@ -205,7 +205,7 @@ default_message() -> #{<<"name">> => <<"message@1.0">>, <<"module">> => dev_message}, #{<<"name">> => <<"meta@1.0">>, <<"module">> => dev_meta}, #{<<"name">> => <<"monitor@1.0">>, <<"module">> => dev_monitor}, - #{<<"name">> => <<"monitor-sampler@1.0">>, <<"module">> => dev_monitor_sampler}, + #{<<"name">> => <<"chance@1.0">>, <<"module">> => dev_chance}, #{<<"name">> => <<"multipass@1.0">>, <<"module">> => dev_multipass}, #{<<"name">> => <<"name@1.0">>, <<"module">> => dev_name}, #{<<"name">> => <<"node-process@1.0">>, <<"module">> => dev_node_process}, From 949f29afefc97c26aec92aa20fc12ac61d2c91a1 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Wed, 18 Mar 2026 19:33:20 +0530 Subject: [PATCH 19/32] feat: replace custom monitor code with hook-based architecture - Replace ~60 lines of custom monitor invocation (maybe_invoke_monitor, should_forward, do_invoke_monitor, sanitize_body) with a single `dev_hook:on(<<"http-client/response">>, ...)` call. - Reorder relay-path priority in dev_relay:call so that the explicit relay-path key is checked before the generic path key, fixing a conflict where the handler's dispatch path shadowed the relay URL. --- src/dev_relay.erl | 6 +-- src/hb_http_client.erl | 106 ++++++++++++++++++----------------------- 2 files changed, 50 insertions(+), 62 deletions(-) diff --git a/src/dev_relay.erl b/src/dev_relay.erl index c5782ff1f..9760fb63c 100644 --- a/src/dev_relay.erl +++ b/src/dev_relay.erl @@ -36,10 +36,10 @@ call(M1, RawM2, Opts) -> RelayPath = hb_ao:get_first( [ - {M1, <<"path">>}, - {{as, <<"message@1.0">>, BaseTarget}, <<"path">>}, {RawM2, <<"relay-path">>}, - {M1, <<"relay-path">>} + {M1, <<"relay-path">>}, + {M1, <<"path">>}, + {{as, <<"message@1.0">>, BaseTarget}, <<"path">>} ], Opts ), diff --git a/src/hb_http_client.erl b/src/hb_http_client.erl index be14247cd..3065e6210 100644 --- a/src/hb_http_client.erl +++ b/src/hb_http_client.erl @@ -261,66 +261,54 @@ init_hackney_pool() -> {timeout, ?DEFAULT_KEEPALIVE_TIMEOUT} ]). -%% @doc Invoke the HTTP monitor if configured. Optionally gates invocations -%% via `monitor-sampler@1.0' when `sample-rate' is set in the monitor config. -%% When the target is a WASM process (`is-wasm-process' in the monitor config), -%% the reserved `path' key is moved to `monitor-type' to avoid interfering -%% with AO-Core routing. -maybe_invoke_monitor(Details, Opts) -> - case hb_ao:get(<<"http_monitor">>, Opts, Opts) of - not_found -> ok; - Monitor -> - case should_forward(Monitor, Opts) of - false -> ok; - true -> do_invoke_monitor(Monitor, Details, Opts) - end - end. - -%% @doc Check whether this request should be forwarded to the monitor. -%% If `sample-rate' is not set in the monitor config, all requests are forwarded. -should_forward(Monitor, Opts) -> - case hb_maps:get(<<"sample-rate">>, Monitor, not_found, Opts) of - not_found -> true; - _ -> - case hb_ao:resolve( - #{ <<"device">> => <<"monitor-sampler@1.0">> }, - Monitor#{ <<"path">> => <<"should-sample">> }, - Opts - ) of - {ok, true} -> true; - {ok, false} -> false; - Error -> Error - end - end. +%% @doc Record the duration of the request in an async process. We write the +%% data to prometheus if the application is enabled, as well as firing the +%% `http-client/response' hook if configured. +record_duration(Details, Opts) -> + spawn( + fun() -> + % First, write to prometheus if it is enabled. Prometheus works + % only with strings as lists, so we encode the data before granting + % it. + GetFormat = + fun + (<<"request-category">>) -> + path_to_category(maps:get(<<"request-path">>, Details)); + (Key) -> + hb_util:list(maps:get(Key, Details)) + end, + case application:get_application(prometheus) of + undefined -> ok; + _ -> + prometheus_histogram:observe( + http_request_duration_seconds, + lists:map( + GetFormat, + [ + <<"request-method">>, + <<"status-class">>, + <<"request-category">> + ] + ), + maps:get(<<"duration">>, Details) + ) + end, + HookReq = + #{ <<"body">> => + hb_message:commit( + maybe_add_reference(Details, Opts), + Opts + ) + }, + dev_hook:on(<<"http-client/response">>, HookReq, Opts) + end + ). -%% @doc Build and dispatch the monitor message. -do_invoke_monitor(Monitor, Details, Opts) -> - WithRef = - case hb_ao:get(<<"http_reference">>, Opts, Opts) of - not_found -> Details; - Ref -> Details#{ <<"reference">> => Ref } - end, - BodyData = sanitize_body(WithRef, Monitor, Opts), - Req = Monitor#{ - <<"body">> => - hb_message:commit( - BodyData#{ <<"method">> => <<"POST">> }, Opts - ) - }, - ReqMsgs = hb_singleton:from(Req, Opts), - Res = hb_ao:resolve_many(ReqMsgs, Opts), - ?event(http_monitor, {resolved_monitor, Res}). - -%% @doc When the monitor target is a WASM process, `path' is a reserved -%% AO-Core routing key that the scheduler copies into the assignment. -%% Move it to `Action' to avoid breaking execution dispatch. -sanitize_body(Body, Monitor, Opts) -> - case hb_ao:get(<<"is-wasm-process">>, Monitor, Opts) of - true -> - {PathVal, Rest} = hb_maps:take(<<"path">>, Body, Opts), - Rest#{ <<"Action">> => PathVal }; - _ -> - Body +%% @doc Attach the `http_reference' from Opts to the details map, if present. +maybe_add_reference(Details, Opts) -> + case hb_opts:get(http_reference, not_found, Opts) of + not_found -> Details; + Ref -> Details#{ <<"reference">> => Ref } end. %%% ================================================================== From abbcc3dbe359e94b782d227d1dcec894895d7f7c Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Wed, 18 Mar 2026 19:34:22 +0530 Subject: [PATCH 20/32] refactor: migrate tests from http_monitor to hook-based config --- src/dev_arweave.erl | 15 ++++++++++++--- src/dev_router.erl | 21 ++++++++++++++------- src/dev_router_perf.erl | 15 +++++++++++---- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 426954027..5bb4ed2ca 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -2273,9 +2273,18 @@ is_admissible_hook_routed_test_() -> Opts = #{ store => hb_test_utils:test_store(), priv_wallet => NodeWallet, - http_monitor => #{ - <<"method">> => <<"POST">>, - <<"path">> => SchedulePath + on => #{ + <<"http-client">> => #{ + <<"response">> => [ + #{ + <<"device">> => <<"relay@1.0">>, + <<"path">> => <<"call">>, + <<"method">> => <<"POST">>, + <<"relay-path">> => SchedulePath, + <<"hook/result">> => <<"ignore">> + } + ] + } }, router_opts => #{ <<"provider">> => #{ <<"path">> => RoutesPath } }, node_processes => #{ diff --git a/src/dev_router.erl b/src/dev_router.erl index 1e87a65d5..89d1920f8 100644 --- a/src/dev_router.erl +++ b/src/dev_router.erl @@ -1328,7 +1328,7 @@ dynamic_routing_by_performance() -> BenchRoutes = 16, TestPath = <<"/worker">>, % Start the main node for the test, loading the `dynamic-router' script and - % the http_monitor to generate performance messages. + % the http-client/response hook to generate performance messages. {ok, Script} = file:read_file(<<"scripts/dynamic-router.lua">>), Node = hb_http_server:start_node(Opts = #{ relay_http_client => gun, @@ -1358,12 +1358,19 @@ dynamic_routing_by_performance() -> <<"initial-performance">> => 1000 } }, - % Define the request that should be called in order to record performance - % information into the process. The `body' of the `http_monitor' message - % is filled with the signed performance report. - http_monitor => #{ - <<"method">> => <<"POST">>, - <<"path">> => <<"/perf-router~node-process@1.0/schedule">> + on => #{ + <<"http-client">> => #{ + <<"response">> => [ + #{ + <<"device">> => <<"relay@1.0">>, + <<"path">> => <<"call">>, + <<"method">> => <<"POST">>, + <<"relay-path">> => + <<"/perf-router~node-process@1.0/schedule">>, + <<"hook/result">> => <<"ignore">> + } + ] + } } }), % Start and add a series of nodes with decreasing performance, via lag diff --git a/src/dev_router_perf.erl b/src/dev_router_perf.erl index c25ce3ceb..747ee5bbc 100644 --- a/src/dev_router_perf.erl +++ b/src/dev_router_perf.erl @@ -512,10 +512,6 @@ hb_gateway_load_balancer_test_() -> Opts = #{ store => hb_test_utils:test_store(), priv_wallet => ar_wallet:new(), - http_monitor => #{ - <<"method">> => <<"POST">>, - <<"path">> => SchedulePath - }, router_opts => #{ <<"provider">> => #{ <<"path">> => RoutesPath } }, @@ -523,6 +519,17 @@ hb_gateway_load_balancer_test_() -> <<"request">> => #{ <<"device">> => <<"router@1.0">>, <<"path">> => <<"preprocess">> + }, + <<"http-client">> => #{ + <<"response">> => [ + #{ + <<"device">> => <<"relay@1.0">>, + <<"path">> => <<"call">>, + <<"method">> => <<"POST">>, + <<"relay-path">> => SchedulePath, + <<"hook/result">> => <<"ignore">> + } + ] } }, node_processes => #{ From 9bc4e83f6902284940b9029ab7762c126a1b293f Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Wed, 18 Mar 2026 19:58:33 +0530 Subject: [PATCH 21/32] refactor: move tx admissibility logic from generic request/5 into get_tx The multirequest-admissible check was in the generic request/5 function which also handles /raw paths that don't emit the user's message as-is. Moved the admissibility + tx-admissible hook logic into get_tx where it belongs. --- src/dev_arweave.erl | 75 ++++++++++++++++++--------------------------- 1 file changed, 29 insertions(+), 46 deletions(-) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 5bb4ed2ca..791390a71 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -128,29 +128,34 @@ get_tx(Base, Request, Opts) -> case find_key(<<"tx">>, Base, Request, Opts) of not_found -> {error, not_found}; TXID -> - request( - <<"GET">>, - <<"/tx/", TXID/binary>>, - #{ - <<"multirequest-admissible">> => - #{ - <<"device">> => <<"arweave@2.9">>, - <<"path">> => <<"is-tx-admissible">>, - <<"tx">> => TXID - } - }, - Opts#{ - exclude_data => - hb_util:bool( - find_key( - <<"exclude-data">>, - Base, - Request, - Opts - ) - ) - } - ) + TxOpts = Opts#{ + exclude_data => + hb_util:bool( + find_key(<<"exclude-data">>, Base, Request, Opts) + ) + }, + case request(<<"GET">>, <<"/tx/", TXID/binary>>, TxOpts) of + {ok, Msg} -> + Admissible = #{ + <<"device">> => <<"arweave@2.9">>, + <<"path">> => <<"is-tx-admissible">>, + <<"tx">> => TXID + }, + case is_tx_admissible(Admissible, Msg, TxOpts) of + true -> + case dev_hook:on( + <<"tx-admissible">>, + #{ <<"body">> => Msg }, + TxOpts + ) of + {ok, #{ <<"body">> := ResultMsg }} -> + {ok, ResultMsg}; + {error, Reason} -> {error, Reason} + end; + false -> {error, not_admissible} + end; + Error -> Error + end end. %% @doc Check whether a response to a `GET /tx/ID' request is valid. @@ -908,29 +913,7 @@ request(Method, Path, Extra, LogExtra, Opts) -> cache_control => [<<"no-cache">>, <<"no-store">>] } ), - case to_message(Path, Method, best_response(Res), LogExtra, Opts) of - {ok, Msg} -> - case hb_maps:get( - <<"multirequest-admissible">>, Extra, undefined, Opts - ) of - undefined -> {ok, Msg}; - Admissible -> - case is_tx_admissible(Admissible, Msg, Opts) of - true -> - case dev_hook:on( - <<"tx-admissible">>, - #{ <<"body">> => Msg }, - Opts - ) of - {ok, #{ <<"body">> := ResultMsg }} -> - {ok, ResultMsg}; - {error, Reason} -> {error, Reason} - end; - false -> {error, not_admissible} - end - end; - Error -> Error - end. + to_message(Path, Method, best_response(Res), LogExtra, Opts). %% @doc Select the best response from a list of responses by sorting them %% ascending by HTTP status code. Returns the first (best) response tuple. From fc0725c66a5021386f38e0a858b680e03ff07099 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 10 Apr 2026 14:30:05 -0400 Subject: [PATCH 22/32] fix: clean up hb_http_client record_duration after rebase - Remove duplicate record_duration/2 and dead maybe_invoke_monitor/2 left by a botched merge during the rebase onto neo/edge. - Switch the surviving record_duration/2 to use hb_prometheus:observe/3 with the declared metric name http_client_duration_seconds (was calling prometheus_histogram:observe directly with an undeclared name). - Add <<"action">> => <<"duration">> to the hook body so dev_router_perf can dispatch performance updates from the http-client/response hook. --- src/hb_http_client.erl | 74 ++++++++++-------------------------------- 1 file changed, 18 insertions(+), 56 deletions(-) diff --git a/src/hb_http_client.erl b/src/hb_http_client.erl index 3065e6210..b0d8cc401 100644 --- a/src/hb_http_client.erl +++ b/src/hb_http_client.erl @@ -277,29 +277,25 @@ record_duration(Details, Opts) -> (Key) -> hb_util:list(maps:get(Key, Details)) end, - case application:get_application(prometheus) of - undefined -> ok; - _ -> - prometheus_histogram:observe( - http_request_duration_seconds, - lists:map( - GetFormat, - [ - <<"request-method">>, - <<"status-class">>, - <<"request-category">> - ] - ), - maps:get(<<"duration">>, Details) - ) - end, - HookReq = - #{ <<"body">> => - hb_message:commit( - maybe_add_reference(Details, Opts), - Opts - ) + Labels = lists:map( + GetFormat, + [ + <<"request-method">>, + <<"status-class">>, + <<"request-category">> + ] + ), + hb_prometheus:observe( + maps:get(<<"duration">>, Details), + http_client_duration_seconds, + Labels + ), + BodyDetails = + (maybe_add_reference(Details, Opts))#{ + <<"action">> => <<"duration">> }, + HookReq = + #{ <<"body">> => hb_message:commit(BodyDetails, Opts) }, dev_hook:on(<<"http-client/response">>, HookReq, Opts) end ). @@ -626,40 +622,6 @@ init_prometheus() -> ?event(started), ok. -%% @doc Record the duration of the request in an async process. We write the -%% data to prometheus if the application is enabled, as well as invoking the -%% `http_monitor' if appropriate. -record_duration(Details, Opts) -> - spawn( - fun() -> - % Prometheus works only with strings as lists, so we encode the - % data before granting it. - GetFormat = - fun - (<<"request-category">>) -> - path_to_category(maps:get(<<"request-path">>, Details)); - (Key) -> - hb_util:list(maps:get(Key, Details)) - end, - Labels = lists:map( - GetFormat, - [ - <<"request-method">>, - <<"status-class">>, - <<"request-category">> - ]), - hb_prometheus:observe( - maps:get(<<"duration">>, Details), - http_client_duration_seconds, - Labels - ), - maybe_invoke_monitor( - Details#{ <<"path">> => <<"duration">> }, - Opts - ) - end - ). - record_response_status(Method, Response) -> record_response_status(Method, Response, undefined). record_response_status(Method, Response, Path) -> From 3b11d4ebd2bcb346bf355dd20516e1e731c5211d Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 10 Apr 2026 14:44:45 -0400 Subject: [PATCH 23/32] feat: gate router-perf register with trusted-peer + is-admissible Restores the admission control gate that the Lua dynamic-router had but the Erlang port dropped. Without this anyone could POST a registration and inject themselves into a route. - Add trusted-peer check: if any signer of the request matches the configured trusted-peer wallet, registration is accepted unconditionally. - Otherwise resolve the configured is-admissible device against the body. If absent, registration is open (matches prior behaviour). - Document trusted-peer and is-admissible at the module level. - Drop the path-fallback dispatch and TODO from compute/3 now that hb_http_client always sets <<"action">> on the hook body. --- src/dev_router_perf.erl | 92 ++++++++++++++++++++++++++++++----------- 1 file changed, 68 insertions(+), 24 deletions(-) diff --git a/src/dev_router_perf.erl b/src/dev_router_perf.erl index 747ee5bbc..a832810dc 100644 --- a/src/dev_router_perf.erl +++ b/src/dev_router_perf.erl @@ -12,6 +12,11 @@ %%% - `performance-weight': Weight factor for perf scoring (default 1). %%% - `pricing-weight': Weight factor for price scoring (default 1). %%% - `score-preference': Decay exponent for scoring (default 1). +%%% - `trusted-peer': Wallet address whose signed registrations bypass the +%%% admissibility check. +%%% - `is-admissible': Device resolved against a registration body to decide +%%% whether the request may register a new node. Default device resolves +%%% to `<<"true">>' (open registration). -module(dev_router_perf). -export([init/3, compute/3, snapshot/3, normalize/3]). -export([duration/3, register/3, recalculate/3]). @@ -23,20 +28,12 @@ %% @doc Compute dispatcher for process@1.0 compatibility. compute(Base, Req, Opts) -> Body = hb_ao:get(<<"body">>, Req, Req, Opts), - %% TODO: THIS IS WIERD IS THERE A BETTER WAY??? - %% Falls back to `path' because hb_http_client:maybe_invoke_monitor - %% sends duration data with `path => <<"duration">>' (not `action'). - Action = - case hb_ao:get(<<"action">>, Body, not_found, Opts) of - not_found -> hb_ao:get(<<"path">>, Body, not_found, Opts); - A -> A - end, - case Action of + case hb_ao:get(<<"action">>, Body, not_found, Opts) of <<"register">> -> register(Base, Body, Opts); <<"recalculate">> -> recalculate(Base, Body, Opts); <<"duration">> -> duration(Base, Body, Opts); _ -> - ?event({erroe_report, <<"Action not supported.">>}), + ?event(router_perf, {action_not_supported, {body, Body}}), {ok, Base} end. @@ -78,24 +75,71 @@ duration(RawBase, Req, Opts) -> end. %% @doc Register a new node on a route. Supports both: -%% - `match'/`with' format -%% - `prefix'/`price'/`topup' format +%% - `match'/`with' format +%% - `prefix'/`price'/`topup' format %% Generates `http_reference' from the node's URL (with or prefix). +%% +%% Registration is gated by either of: +%% - `trusted-peer': if any committer of the request matches the configured +%% trusted peer wallet address, the registration is accepted unconditionally. +%% - `is-admissible': otherwise the configured admissibility device is +%% resolved against the request body. The default device returns `true' for +%% backward compatibility, so deployments must opt in to a stricter check. register(RawBase, Req, Opts) -> Base = ensure_defaults(RawBase, Opts), Body = hb_ao:get(<<"body">>, Req, Req, Opts), - Route = hb_ao:get(<<"route">>, Body, Opts), - Template = hb_ao:get(<<"template">>, Route, Opts), - Routes = hb_ao:get(<<"routes">>, Base, [], Opts), - InitialPerf = - to_float(hb_ao:get(<<"initial-performance">>, Base, 30000, Opts)), - HttpRef = node_url(Route, Opts), - Node = build_node(Route, HttpRef, InitialPerf, Opts), - RouteConfig = extract_route_config(Route, Opts), - {UpdatedRoutes, _} = - add_node_to_route(Routes, Template, Node, RouteConfig, Opts), - NewBase = hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts), - recalculate(NewBase, Req, Opts). + case is_admissible(Base, Body, Opts) of + true -> + Route = hb_ao:get(<<"route">>, Body, Opts), + Template = hb_ao:get(<<"template">>, Route, Opts), + Routes = hb_ao:get(<<"routes">>, Base, [], Opts), + InitialPerf = + to_float( + hb_ao:get(<<"initial-performance">>, Base, 30000, Opts) + ), + HttpRef = node_url(Route, Opts), + Node = build_node(Route, HttpRef, InitialPerf, Opts), + RouteConfig = extract_route_config(Route, Opts), + {UpdatedRoutes, _} = + add_node_to_route(Routes, Template, Node, RouteConfig, Opts), + NewBase = + hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts), + recalculate(NewBase, Req, Opts); + false -> + ?event(router_perf, + {register_rejected, {body, Body}}), + {ok, Base} + end. + +%% @doc Check whether a registration request is admissible. +%% Returns `true' if either the request is signed by the configured +%% `trusted-peer' or the configured `is-admissible' device resolves to +%% `<<"true">>'. +is_admissible(Base, Body, Opts) -> + case is_trusted_peer(Base, Body, Opts) of + true -> true; + false -> resolve_is_admissible(Base, Body, Opts) + end. + +is_trusted_peer(Base, Body, Opts) -> + case hb_ao:get(<<"trusted-peer">>, Base, not_found, Opts) of + not_found -> false; + TrustedPeer -> + Signers = hb_message:signers(Body, Opts), + lists:member(TrustedPeer, Signers) + end. + +resolve_is_admissible(Base, Body, Opts) -> + Device = hb_ao:get(<<"is-admissible">>, Base, not_found, Opts), + case Device of + not_found -> true; + _ -> + case hb_ao:resolve(Device, Body, Opts) of + {ok, <<"true">>} -> true; + {ok, true} -> true; + _ -> false + end + end. %% @doc Recompute weights for all nodes in all routes. %% Weight = inverse performance -- lower ms = higher weight, normalized. From 69839dd99af035bf30a13973178580f7ed7a2461 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 10 Apr 2026 14:46:10 -0400 Subject: [PATCH 24/32] fix: handle hook failure return in dev_arweave get_tx dev_hook:on/3 can return {failure, Reason} when a handler returns an unexpected result. Without this clause get_tx crashes with case_clause on hook misconfiguration. Treat it the same as {error, Reason}. --- src/dev_arweave.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 791390a71..b90e10096 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -150,7 +150,8 @@ get_tx(Base, Request, Opts) -> ) of {ok, #{ <<"body">> := ResultMsg }} -> {ok, ResultMsg}; - {error, Reason} -> {error, Reason} + {error, Reason} -> {error, Reason}; + {failure, Reason} -> {error, Reason} end; false -> {error, not_admissible} end; From c187ce9708c96c03fc6f3f663e14d7441b41baa7 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 10 Apr 2026 14:48:37 -0400 Subject: [PATCH 25/32] docs: flag cross-route http_reference collision in update_node_performance --- src/dev_router_perf.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/dev_router_perf.erl b/src/dev_router_perf.erl index a832810dc..e4a4fc754 100644 --- a/src/dev_router_perf.erl +++ b/src/dev_router_perf.erl @@ -292,7 +292,10 @@ list_replace(List, Idx, Value) -> {Before, [_ | After]} = lists:split(Idx - 1, List), Before ++ [Value | After]. -%% @doc Apply a duration update to the node matching Reference across all routes. +%% @doc Apply a duration update to every node whose `http_reference' matches +%% Reference, across all routes. +%% TODO: scope by (route, reference) so the same URL in multiple routes does +%% not cross-contaminate weights. update_node_performance(Routes, Reference, Duration, ChangeFactor, Opts) -> lists:map( fun(Route) -> From ed13824202925265a743d62d3f74c8fe282f4b39 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 10 Apr 2026 14:49:35 -0400 Subject: [PATCH 26/32] fix: validate chance@1.0 rate before parsing binary_to_integer/1 throws badarg on a non-integer path. Wrap in try/catch and reject non-positive rates explicitly so callers get a proper error tuple instead of a crash. --- src/dev_chance.erl | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/dev_chance.erl b/src/dev_chance.erl index 87d7a0751..6d99ad969 100644 --- a/src/dev_chance.erl +++ b/src/dev_chance.erl @@ -15,9 +15,16 @@ info(_) -> #{ default => fun default/4 }. default(Key, _Base, Req, _Opts) -> - Rate = binary_to_integer(Key), - ?event({request, {rate, Rate}}), - case Rate > 0 andalso rand:uniform(Rate) =:= 1 of - true -> {ok, Req}; - false -> {error, <<"Filtered by chance gate.">>} + try binary_to_integer(Key) of + Rate when Rate > 0 -> + ?event({request, {rate, Rate}}), + case rand:uniform(Rate) =:= 1 of + true -> {ok, Req}; + false -> {error, <<"Filtered by chance gate.">>} + end; + _ -> + {error, <<"chance@1.0 rate must be a positive integer.">>} + catch + error:badarg -> + {error, <<"chance@1.0 rate must be a valid integer.">>} end. From 9a29e1eabaf418ea750ba6dd24faf41f7f523757 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 10 Apr 2026 21:05:59 -0400 Subject: [PATCH 27/32] feat: pass raw response to http-client/response hook via priv --- src/hb_http_client.erl | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/src/hb_http_client.erl b/src/hb_http_client.erl index b0d8cc401..abd394798 100644 --- a/src/hb_http_client.erl +++ b/src/hb_http_client.erl @@ -155,6 +155,7 @@ httpc_req(Args, Opts) -> <<"status-class">> => get_status_class(Status), <<"duration">> => EndTime - StartTime }, + response_to_map({ok, Status, RespHeaders, RespBody}), Opts ), {ok, Status, RespHeaders, RespBody}; @@ -217,6 +218,7 @@ hackney_req(Args, Opts) -> <<"status-class">> => get_status_class(Response), <<"duration">> => EndTime - StartTime }, + response_to_map(Response), Opts ), record_response_status(Method, Response, Path), @@ -249,6 +251,7 @@ gun_req(Args, Opts) -> <<"status-class">> => get_status_class(Response), <<"duration">> => EndTime - StartTime }, + response_to_map(Response), Opts ), Response. @@ -264,7 +267,7 @@ init_hackney_pool() -> %% @doc Record the duration of the request in an async process. We write the %% data to prometheus if the application is enabled, as well as firing the %% `http-client/response' hook if configured. -record_duration(Details, Opts) -> +record_duration(Details, Response, Opts) -> spawn( fun() -> % First, write to prometheus if it is enabled. Prometheus works @@ -290,12 +293,15 @@ record_duration(Details, Opts) -> http_client_duration_seconds, Labels ), - BodyDetails = - (maybe_add_reference(Details, Opts))#{ - <<"action">> => <<"duration">> - }, HookReq = - #{ <<"body">> => hb_message:commit(BodyDetails, Opts) }, + #{ + <<"body">> => + hb_message:commit( + maybe_add_reference(Details, Opts), + Opts + ), + <<"priv">> => #{<<"response">> => Response} + }, dev_hook:on(<<"http-client/response">>, HookReq, Opts) end ). @@ -307,6 +313,20 @@ maybe_add_reference(Details, Opts) -> Ref -> Details#{ <<"reference">> => Ref } end. +%% @doc Convert a raw HTTP response tuple into a HB map of lowercased +%% header keys and a `<<"body">>' field, suitable for passing into +%% `dev_codec_httpsig:from/3'. +response_to_map({ok, _Status, Headers, Body}) -> + HeaderMap = + maps:from_list( + [ + {string:lowercase(hb_util:bin(K)), hb_util:bin(V)} + || + {K, V} <- Headers + ] + ), + HeaderMap#{<<"body">> => Body}. + %%% ================================================================== %%% gen_server callbacks. %%% ================================================================== From e5b81baa5aa1f2e7b4fc15460ac8a5d7870ddbb2 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 10 Apr 2026 21:06:35 -0400 Subject: [PATCH 28/32] feat: add is_tx_admissible_hook adapter to dev_arweave --- src/dev_arweave.erl | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index b90e10096..4fa39acc8 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -8,6 +8,7 @@ -export([tx/3, raw/3, chunk/3, block/3, current/3, status/3, price/3, tx_anchor/3]). -export([pending/3]). -export([post_tx_header/2, post_tx/3, post_tx/4, post_chunk/2]). +-export([is_tx_admissible_hook/3]). %%% Helper functions -export([get_chunk/2, bundle_header/2, bundle_header/3]). -include("include/hb.hrl"). @@ -167,12 +168,42 @@ is_tx_admissible(Base, Request, Opts) -> {ok, TXID} ?= hb_maps:find(<<"tx">>, Base, Opts), CommIDs = maps:keys(maps:get(<<"commitments">>, Request, #{})), true ?= - lists:member(TXID, CommIDs) andalso + lists:member(TXID, CommIDs) andalso hb_message:verify(Request, all, Opts) else _ -> false end. +%% @doc Hook adapter for the `http-client/response' chain. Decodes the raw +%% upstream response via `dev_codec_httpsig:from/3' and runs +%% `is_tx_admissible/3' against the result. +is_tx_admissible_hook(_Base, HookReq, Opts) -> + Body = hb_maps:get(<<"body">>, HookReq, #{}, Opts), + Path = hb_maps:get(<<"request-path">>, Body, <<>>, Opts), + Priv = hb_maps:get(<<"priv">>, HookReq, #{}, Opts), + Response = hb_maps:get(<<"response">>, Priv, #{}, Opts), + case extract_txid_from_path(Path) of + {ok, TXID} -> + case dev_codec_httpsig:from(Response, #{}, Opts) of + {ok, Decoded} -> + case is_tx_admissible(#{<<"tx">> => TXID}, Decoded, Opts) of + true -> {ok, HookReq}; + false -> {error, not_admissible} + end; + _ -> + {error, decode_failed} + end; + error -> + {ok, HookReq} + end. + +extract_txid_from_path(<<"/", TXID:43/binary>>) -> + {ok, TXID}; +extract_txid_from_path(<<"/", TXID:43/binary, "/", _/binary>>) -> + {ok, TXID}; +extract_txid_from_path(_) -> + error. + %% @doc A router for range requests by method. Both `HEAD` and `GET` requests %% are supported. raw(Base, Request, Opts) -> From 1809db8e4dd1a52ad2591a0ff4ce0a24d95cd4e4 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 10 Apr 2026 21:06:51 -0400 Subject: [PATCH 29/32] fix: restore path fallback in router-perf compute dispatcher Falls back to <<"path">> when <<"action">> is not present on the hook payload --- src/dev_router_perf.erl | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/dev_router_perf.erl b/src/dev_router_perf.erl index e4a4fc754..c4c6186bc 100644 --- a/src/dev_router_perf.erl +++ b/src/dev_router_perf.erl @@ -28,7 +28,15 @@ %% @doc Compute dispatcher for process@1.0 compatibility. compute(Base, Req, Opts) -> Body = hb_ao:get(<<"body">>, Req, Req, Opts), - case hb_ao:get(<<"action">>, Body, not_found, Opts) of + %% TODO: THIS IS WIERD IS THERE A BETTER WAY??? + %% Falls back to `path' because hb_http_client:maybe_invoke_monitor + %% sends duration data with `path => <<"duration">>' (not `action'). + Action = + case hb_ao:get(<<"action">>, Body, not_found, Opts) of + not_found -> hb_ao:get(<<"path">>, Body, not_found, Opts); + A -> A + end, + case Action of <<"register">> -> register(Base, Body, Opts); <<"recalculate">> -> recalculate(Base, Body, Opts); <<"duration">> -> duration(Base, Body, Opts); From 8b5c56de273d9da360f6579f8af23f14fa7f549b Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Fri, 10 Apr 2026 21:13:25 -0400 Subject: [PATCH 30/32] refactor: flatten is_tx_admissible_hook using maybe --- src/dev_arweave.erl | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 4fa39acc8..ad6f9256a 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -178,23 +178,19 @@ is_tx_admissible(Base, Request, Opts) -> %% upstream response via `dev_codec_httpsig:from/3' and runs %% `is_tx_admissible/3' against the result. is_tx_admissible_hook(_Base, HookReq, Opts) -> - Body = hb_maps:get(<<"body">>, HookReq, #{}, Opts), - Path = hb_maps:get(<<"request-path">>, Body, <<>>, Opts), - Priv = hb_maps:get(<<"priv">>, HookReq, #{}, Opts), + Body = hb_maps:get(<<"body">>, HookReq, #{}, Opts), + Path = hb_maps:get(<<"request-path">>, Body, <<>>, Opts), + Priv = hb_maps:get(<<"priv">>, HookReq, #{}, Opts), Response = hb_maps:get(<<"response">>, Priv, #{}, Opts), - case extract_txid_from_path(Path) of - {ok, TXID} -> - case dev_codec_httpsig:from(Response, #{}, Opts) of - {ok, Decoded} -> - case is_tx_admissible(#{<<"tx">> => TXID}, Decoded, Opts) of - true -> {ok, HookReq}; - false -> {error, not_admissible} - end; - _ -> - {error, decode_failed} - end; - error -> - {ok, HookReq} + maybe + {ok, TXID} ?= extract_txid_from_path(Path), + {ok, Decoded} ?= dev_codec_httpsig:from(Response, #{}, Opts), + true ?= is_tx_admissible(#{<<"tx">> => TXID}, Decoded, Opts), + {ok, HookReq} + else + error -> {ok, HookReq}; + false -> {error, not_admissible}; + _ -> {error, decode_failed} end. extract_txid_from_path(<<"/", TXID:43/binary>>) -> From d0ebf8b5dc43bf27f0963a637ee65376fa4a6716 Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Mon, 13 Apr 2026 13:40:50 -0400 Subject: [PATCH 31/32] fix: handle error tuples in response_to_map response_to_map/1 only matched {ok, _, _, _}. Transport errors from hackney/gun crashed with function_clause before record_duration could fire the hook. Add a catch-all that returns an empty map so the hook chain still runs (and the adapter rejects the empty response as not admissible). --- src/hb_http_client.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/hb_http_client.erl b/src/hb_http_client.erl index abd394798..b00f757fc 100644 --- a/src/hb_http_client.erl +++ b/src/hb_http_client.erl @@ -325,7 +325,9 @@ response_to_map({ok, _Status, Headers, Body}) -> {K, V} <- Headers ] ), - HeaderMap#{<<"body">> => Body}. + HeaderMap#{<<"body">> => Body}; +response_to_map(_) -> + #{}. %%% ================================================================== %%% gen_server callbacks. From 145afb96f456aaa5d9a0a3a25f4fc69d2cd162ae Mon Sep 17 00:00:00 2001 From: Ayush Agrawal Date: Mon, 13 Apr 2026 13:40:50 -0400 Subject: [PATCH 32/32] fix: handle error tuples in response_to_map --- src/hb_http_client.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/hb_http_client.erl b/src/hb_http_client.erl index abd394798..b00f757fc 100644 --- a/src/hb_http_client.erl +++ b/src/hb_http_client.erl @@ -325,7 +325,9 @@ response_to_map({ok, _Status, Headers, Body}) -> {K, V} <- Headers ] ), - HeaderMap#{<<"body">> => Body}. + HeaderMap#{<<"body">> => Body}; +response_to_map(_) -> + #{}. %%% ================================================================== %%% gen_server callbacks.