diff --git a/src/dev_arweave.erl b/src/dev_arweave.erl index 377b5f2a1..ad6f9256a 100644 --- a/src/dev_arweave.erl +++ b/src/dev_arweave.erl @@ -8,9 +8,11 @@ -export([tx/3, raw/3, chunk/3, block/3, current/3, status/3, price/3, tx_anchor/3]). -export([pending/3]). -export([post_tx_header/2, post_tx/3, post_tx/4, post_chunk/2]). +-export([is_tx_admissible_hook/3]). %%% Helper functions -export([get_chunk/2, bundle_header/2, bundle_header/3]). -include("include/hb.hrl"). +-include("include/hb_arweave_nodes.hrl"). -include_lib("eunit/include/eunit.hrl"). -define(IS_BLOCK_ID(X), (is_binary(X) andalso byte_size(X) == 64)). @@ -127,23 +129,77 @@ get_tx(Base, Request, Opts) -> case find_key(<<"tx">>, Base, Request, Opts) of not_found -> {error, not_found}; TXID -> - request( - <<"GET">>, - <<"/tx/", TXID/binary>>, - Opts#{ - exclude_data => - hb_util:bool( - find_key( - <<"exclude-data">>, - Base, - Request, - Opts - ) - ) - } - ) + TxOpts = Opts#{ + exclude_data => + hb_util:bool( + find_key(<<"exclude-data">>, Base, Request, Opts) + ) + }, + case request(<<"GET">>, <<"/tx/", TXID/binary>>, TxOpts) of + {ok, Msg} -> + Admissible = #{ + <<"device">> => <<"arweave@2.9">>, + <<"path">> => <<"is-tx-admissible">>, + <<"tx">> => TXID + }, + case is_tx_admissible(Admissible, Msg, TxOpts) of + true -> + case dev_hook:on( + <<"tx-admissible">>, + #{ <<"body">> => Msg }, + TxOpts + ) of + {ok, #{ <<"body">> := ResultMsg }} -> + {ok, ResultMsg}; + {error, Reason} -> {error, Reason}; + {failure, Reason} -> {error, Reason} + end; + false -> {error, not_admissible} + end; + Error -> Error + end end. +%% @doc Check whether a response to a `GET /tx/ID' request is valid. +%% Verifies that the requested TXID exists as a commitment ID in the +%% response message, and that all commitments are cryptographically valid. +is_tx_admissible(Base, Request, Opts) -> + maybe + {ok, TXID} ?= hb_maps:find(<<"tx">>, Base, Opts), + CommIDs = maps:keys(maps:get(<<"commitments">>, Request, #{})), + true ?= + lists:member(TXID, CommIDs) andalso + hb_message:verify(Request, all, Opts) + else + _ -> false + end. + +%% @doc Hook adapter for the `http-client/response' chain. Decodes the raw +%% upstream response via `dev_codec_httpsig:from/3' and runs +%% `is_tx_admissible/3' against the result. +is_tx_admissible_hook(_Base, HookReq, Opts) -> + Body = hb_maps:get(<<"body">>, HookReq, #{}, Opts), + Path = hb_maps:get(<<"request-path">>, Body, <<>>, Opts), + Priv = hb_maps:get(<<"priv">>, HookReq, #{}, Opts), + Response = hb_maps:get(<<"response">>, Priv, #{}, Opts), + maybe + {ok, TXID} ?= extract_txid_from_path(Path), + {ok, Decoded} ?= dev_codec_httpsig:from(Response, #{}, Opts), + true ?= is_tx_admissible(#{<<"tx">> => TXID}, Decoded, Opts), + {ok, HookReq} + else + error -> {ok, HookReq}; + false -> {error, not_admissible}; + _ -> {error, decode_failed} + end. + +extract_txid_from_path(<<"/", TXID:43/binary>>) -> + {ok, TXID}; +extract_txid_from_path(<<"/", TXID:43/binary, "/", _/binary>>) -> + {ok, TXID}; +extract_txid_from_path(_) -> + error. + %% @doc A router for range requests by method. Both `HEAD` and `GET` requests %% are supported. raw(Base, Request, Opts) -> @@ -2156,3 +2212,201 @@ assert_chunk_range(Type, ID, StartOffset, ExpectedLength, ExpectedHash, Opts) -> ?event(debug_test, {data, {explicit, hb_util:encode(crypto:hash(sha256, Data))}}), ?assertEqual(ExpectedHash, hb_util:encode(crypto:hash(sha256, Data))), ok. + +is_admissible_routed_test() -> + AliceWallet = ar_wallet:new(), + BobWallet = ar_wallet:new(), + AliceNode = hb_http_server:start_node(#{ priv_wallet => AliceWallet }), + BobNode = hb_http_server:start_node(#{ priv_wallet => BobWallet }), + AliceNodeOpts = hb_http_server:get_opts(#{ + http_server => hb_util:human_id(ar_wallet:to_address(AliceWallet)) + }), + BobNodeOpts = hb_http_server:get_opts(#{ + http_server => hb_util:human_id(ar_wallet:to_address(BobWallet)) + }), + AliceMsg = + hb_message:commit(#{ + <<"a">> => 1 }, + AliceNodeOpts, + <<"ans104@1.0">> + ), + {ok, AliceMsgRawID} = hb_cache:write(AliceMsg, AliceNodeOpts), + AliceMsgID = hb_util:human_id(AliceMsgRawID), + BobMsg = + hb_message:commit(#{ + <<"b">> => 1 }, + BobNodeOpts, + <<"ans104@1.0">> + ), + {ok, BobMsgRawID} = hb_cache:write(BobMsg, BobNodeOpts), + BobMsgID = hb_util:human_id(BobMsgRawID), + %% Start RoutingNode with routes to both AliceNode and BobNode. + RoutingNode = hb_http_server:start_node(#{ + priv_wallet => ar_wallet:new(), + routes => [ + #{ + <<"template">> => <<"^/arweave/tx">>, + <<"strategy">> => <<"Random">>, + <<"choose">> => 2, + <<"parallel">> => true, + <<"nodes">> => + [ + #{ + <<"match">> => <<"/arweave/tx/">>, + <<"with">> => AliceNode + }, + #{ + <<"match">> => <<"/arweave/tx/">>, + <<"with">> => BobNode + } + ] + } + ] + }), + %% Fetch Alice's and Bob's messages via RoutingNode with admissibility check. + {ok, AliceRes} = + hb_http:get(RoutingNode, <<"~arweave@2.9/tx=", AliceMsgID/binary>>, #{}), + ?assertMatch(#{ <<"a">> := 1 }, AliceRes), + {ok, BobRes} = + hb_http:get(RoutingNode, <<"~arweave@2.9/tx=", BobMsgID/binary>>, #{}), + ?assertMatch(#{ <<"b">> := 1 }, BobRes), + ok. + +is_admissible_hook_routed_test_() -> + {timeout, 60, fun() -> + application:ensure_all_started(hb), + TXID = <<"ptBC0UwDmrUTBQX3MqZ1lB57ex20ygwzkjjCrQjIx3o">>, + PerfProcess = <<"/perf-router~node-process@1.0">>, + SchedulePath = <>, + RoutesPath = <>, + NodeWallet = ar_wallet:new(), + NodeAddr = hb_util:human_id(NodeWallet), + Opts = #{ + store => hb_test_utils:test_store(), + priv_wallet => NodeWallet, + on => #{ + <<"http-client">> => #{ + <<"response">> => [ + #{ + <<"device">> => <<"relay@1.0">>, + <<"path">> => <<"call">>, + <<"method">> => <<"POST">>, + <<"relay-path">> => SchedulePath, + <<"hook/result">> => <<"ignore">> + } + ] + } + }, + router_opts => #{ <<"provider">> => #{ <<"path">> => RoutesPath } }, + node_processes => #{ + <<"perf-router">> => #{ + <<"device">> => <<"process@1.0">>, + <<"execution-device">> => <<"router-perf@1.0">>, + <<"scheduler-device">> => <<"scheduler@1.0">>, + <<"performance-period">> => 2, + <<"initial-performance">> => 1000 + } + }, + routes => [ + #{ + <<"template">> => <<"^/arweave">>, + <<"nodes">> => ?ARWEAVE_BOOTSTRAP_CHAIN_NODES, + <<"parallel">> => true, + <<"admissible-status">> => 200 + } + ] + }, + Node = hb_http_server:start_node(Opts), + %% Register gateways with the perf-router process. + RouteConfig = #{ + <<"template">> => <<"^/arweave">>, + <<"parallel">> => true, + <<"strategy">> => <<"Random">>, + <<"choose">> => 10, + <<"admissible-status">> => 200 + }, + lists:foreach( + fun(GatewayNode) -> + Body = + hb_message:commit( + #{ + <<"action">> => <<"register">>, + <<"route">> => maps:merge(GatewayNode, RouteConfig) + }, + Opts + ), + {ok, _} = + hb_http:post( + Node, + #{ + <<"path">> => SchedulePath, + <<"method">> => <<"POST">>, + <<"body">> => Body + }, + Opts + ) + end, + ?ARWEAVE_BOOTSTRAP_DATA_NODES + ), + %% Trigger compute to process register messages. + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + %% Verify initial performance. + PerfPath = <>, + {ok, InitPerf} = hb_http:get(Node, PerfPath, Opts), + ?assertEqual(1000.0, dev_router_perf:to_float(InitPerf)), + %% Fetch TX through the full stack. + {ok, Res} = + hb_http:get( + Node, + <<"~arweave@2.9/tx=", TXID/binary, "&exclude-data=true">>, + Opts + ), + ?assertMatch(#{ <<"reward">> := <<"482143296">> }, Res), + ?assert(hb_message:verify(Res, all, #{})), + ?assertNot(lists:member(NodeAddr, hb_message:signers(Res, #{}))), + %% Wait for async monitor duration posts, then recompute. + timer:sleep(1000), + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + %% Verify performance score changed from initial value. + {ok, UpdatedPerf} = hb_http:get(Node, PerfPath, Opts), + ?assertNotEqual(1000.0, dev_router_perf:to_float(UpdatedPerf)), + ok + end}. + +is_admissible_real_gateway_test_() -> + {timeout, 30, fun() -> + application:ensure_all_started(hb), + TXID = <<"ptBC0UwDmrUTBQX3MqZ1lB57ex20ygwzkjjCrQjIx3o">>, + Node = hb_http_server:start_node(#{ + priv_wallet => ar_wallet:new(), + routes => [ + #{ + <<"template">> => <<"^/arweave/tx">>, + <<"strategy">> => <<"Random">>, + <<"choose">> => 10, + <<"parallel">> => true, + <<"nodes">> => ?ARWEAVE_BOOTSTRAP_CHAIN_NODES + }, + #{ + <<"template">> => <<"^/arweave">>, + <<"nodes">> => ?ARWEAVE_BOOTSTRAP_CHAIN_NODES, + <<"parallel">> => true, + <<"admissible-status">> => 200 + } + ] + }), + {ok, Res} = hb_http:get( + Node, + <<"~arweave@2.9/tx=", TXID/binary, "&exclude-data=true">>, + #{} + ), + ?assertMatch(#{ <<"reward">> := <<"482143296">> }, Res), + ?assertMatch( + #{ <<"anchor">> := + <<"XTzaU2_m_hRYDLiXkcleOC4zf5MVTXIeFWBOsJSRrtEZ8kM6Oz7EKLhZY7fTAvKq">> + }, + Res + ), + ?assertMatch(#{ <<"content-type">> := <<"application/json">> }, Res), + ok + end}. \ No newline at end of file diff --git a/src/dev_chance.erl b/src/dev_chance.erl new file mode 100644 index 000000000..6d99ad969 --- /dev/null +++ b/src/dev_chance.erl @@ -0,0 +1,30 @@ +%%% @doc A generic probabilistic gate device. When accessed as +%%% `~chance@1.0/N', the key `N' is parsed as an integer rate. +%%% A 1-in-N random check is performed: +%%% +%%% {ok, Req} -- check passed, hook chain continues +%%% {error, _} -- check failed, hook chain halts +%%% +%%% This device uses the 4-arity default handler pattern so that the +%%% rate is supplied via the URL path rather than a config key. +-module(dev_chance). +-export([info/1]). +-include("include/hb.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +info(_) -> #{ default => fun default/4 }. + +default(Key, _Base, Req, _Opts) -> + try binary_to_integer(Key) of + Rate when Rate > 0 -> + ?event({request, {rate, Rate}}), + case rand:uniform(Rate) =:= 1 of + true -> {ok, Req}; + false -> {error, <<"Filtered by chance gate.">>} + end; + _ -> + {error, <<"chance@1.0 rate must be a positive integer.">>} + catch + error:badarg -> + {error, <<"chance@1.0 rate must be a valid integer.">>} + end. diff --git a/src/dev_hook.erl b/src/dev_hook.erl index 1696745ec..ddf6b1999 100644 --- a/src/dev_hook.erl +++ b/src/dev_hook.erl @@ -92,7 +92,7 @@ find(HookName, Opts) -> find(#{}, #{ <<"target">> => <<"body">>, <<"body">> => HookName }, Opts). find(_Base, Req, Opts) -> HookName = maps:get(maps:get(<<"target">>, Req, <<"body">>), Req), - case maps:get(HookName, hb_opts:get(on, #{}, Opts), []) of + case hb_util:deep_get(HookName, hb_opts:get(on, #{}, Opts), [], Opts) of Handler when is_map(Handler) -> case hb_util:is_ordered_list(Handler, Opts) of true -> @@ -169,7 +169,7 @@ execute_handler(HookName, Handler, Req, Opts) -> <<"method">> => hb_maps:get(<<"method">>, Handler, <<"GET">>, Opts) }, - CommitReqBin = + CommitReqBin = hb_util:bin( hb_util:deep_get( <<"hook/commit-request">>, @@ -316,4 +316,26 @@ halt_on_error_test() -> Req = #{ <<"test">> => <<"value">> }, Opts = #{ on => #{ <<"test-hook">> => [Handler1, Handler2, Handler3] }}, {error, Result} = on(<<"test-hook">>, Req, Opts), - ?assertEqual(<<"Error in handler2">>, Result). \ No newline at end of file + ?assertEqual(<<"Error in handler2">>, Result). + +%% @doc Test that nested hook names (slash-separated) resolve correctly +nested_hook_name_test() -> + Handler = #{ + <<"device">> => #{ + nested_key => + fun(_, Req, _) -> + {ok, Req#{ <<"nested_executed">> => true }} + end + }, + <<"path">> => <<"nested-key">> + }, + Req = #{ <<"test">> => <<"value">> }, + Opts = #{ + on => #{ + <<"http-client">> => #{ + <<"response">> => Handler + } + } + }, + {ok, Result} = on(<<"http-client/response">>, Req, Opts), + ?assertEqual(true, maps:get(<<"nested_executed">>, Result)). \ No newline at end of file diff --git a/src/dev_relay.erl b/src/dev_relay.erl index c5782ff1f..9760fb63c 100644 --- a/src/dev_relay.erl +++ b/src/dev_relay.erl @@ -36,10 +36,10 @@ call(M1, RawM2, Opts) -> RelayPath = hb_ao:get_first( [ - {M1, <<"path">>}, - {{as, <<"message@1.0">>, BaseTarget}, <<"path">>}, {RawM2, <<"relay-path">>}, - {M1, <<"relay-path">>} + {M1, <<"relay-path">>}, + {M1, <<"path">>}, + {{as, <<"message@1.0">>, BaseTarget}, <<"path">>} ], Opts ), diff --git a/src/dev_router.erl b/src/dev_router.erl index 1e87a65d5..89d1920f8 100644 --- a/src/dev_router.erl +++ b/src/dev_router.erl @@ -1328,7 +1328,7 @@ dynamic_routing_by_performance() -> BenchRoutes = 16, TestPath = <<"/worker">>, % Start the main node for the test, loading the `dynamic-router' script and - % the http_monitor to generate performance messages. + % the http-client/response hook to generate performance messages. {ok, Script} = file:read_file(<<"scripts/dynamic-router.lua">>), Node = hb_http_server:start_node(Opts = #{ relay_http_client => gun, @@ -1358,12 +1358,19 @@ dynamic_routing_by_performance() -> <<"initial-performance">> => 1000 } }, - % Define the request that should be called in order to record performance - % information into the process. The `body' of the `http_monitor' message - % is filled with the signed performance report. - http_monitor => #{ - <<"method">> => <<"POST">>, - <<"path">> => <<"/perf-router~node-process@1.0/schedule">> + on => #{ + <<"http-client">> => #{ + <<"response">> => [ + #{ + <<"device">> => <<"relay@1.0">>, + <<"path">> => <<"call">>, + <<"method">> => <<"POST">>, + <<"relay-path">> => + <<"/perf-router~node-process@1.0/schedule">>, + <<"hook/result">> => <<"ignore">> + } + ] + } } }), % Start and add a series of nodes with decreasing performance, via lag diff --git a/src/dev_router_perf.erl b/src/dev_router_perf.erl new file mode 100644 index 000000000..c4c6186bc --- /dev/null +++ b/src/dev_router_perf.erl @@ -0,0 +1,697 @@ +%%% @doc An Erlang execution device for `process@1.0' that provides +%%% performance-based routing. Replaces the Lua `dynamic-router.lua' for +%%% Arweave gateway routing, fixing two gaps: +%%% 1. Supports `match'/`with' route format (not just `prefix'/`price'/`topup'). +%%% 2. Handles monitor duration POSTs that lack an `action' field. +%%% +%%% Device state is stored in the Base message (standard HyperBEAM architecture). +%%% Configuration keys read from Base: +%%% - `performance-period': EMA smoothing period (default 1000). +%%% - `initial-performance': Starting perf score for new nodes (default 30000). +%%% - `sampling-rate': Fraction of random sampling (default 0.1). +%%% - `performance-weight': Weight factor for perf scoring (default 1). +%%% - `pricing-weight': Weight factor for price scoring (default 1). +%%% - `score-preference': Decay exponent for scoring (default 1). +%%% - `trusted-peer': Wallet address whose signed registrations bypass the +%%% admissibility check. +%%% - `is-admissible': Device resolved against a registration body to decide +%%% whether the request may register a new node. Default device resolves +%%% to `<<"true">>' (open registration). +-module(dev_router_perf). +-export([init/3, compute/3, snapshot/3, normalize/3]). +-export([duration/3, register/3, recalculate/3]). +%%% Helper API +-export([to_float/1]). +-include("include/hb.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%% @doc Compute dispatcher for process@1.0 compatibility. +compute(Base, Req, Opts) -> + Body = hb_ao:get(<<"body">>, Req, Req, Opts), + %% TODO: THIS IS WIERD IS THERE A BETTER WAY??? + %% Falls back to `path' because hb_http_client:maybe_invoke_monitor + %% sends duration data with `path => <<"duration">>' (not `action'). + Action = + case hb_ao:get(<<"action">>, Body, not_found, Opts) of + not_found -> hb_ao:get(<<"path">>, Body, not_found, Opts); + A -> A + end, + case Action of + <<"register">> -> register(Base, Body, Opts); + <<"recalculate">> -> recalculate(Base, Body, Opts); + <<"duration">> -> duration(Base, Body, Opts); + _ -> + ?event(router_perf, {action_not_supported, {body, Body}}), + {ok, Base} + end. + +%% @doc Initialize the device state with defaults if not already set. +init(Base, _Req, Opts) -> + {ok, ensure_defaults(Base, Opts)}. + +%% @doc Return a snapshot of the execution device state for caching. +snapshot(Base, _Req, _Opts) -> + {ok, Base}. + +%% @doc Restore execution device state from a snapshot. +normalize(Base, _Req, _Opts) -> + {ok, Base}. + +%% @doc Update the performance score for a node identified by `reference'. +%% Uses exponential weighted average: +%% new_perf = current * (1 - 1/period) + duration * (1/period) +duration(RawBase, Req, Opts) -> + Base = ensure_defaults(RawBase, Opts), + Body = hb_ao:get(<<"body">>, Req, Req, Opts), + case hb_ao:get(<<"reference">>, Body, not_found, Opts) of + not_found -> {ok, Base}; + Reference -> + Duration = to_float(hb_ao:get(<<"duration">>, Body, Opts)), + Period = to_float( + hb_ao:get(<<"performance-period">>, Base, 1000, Opts) + ), + ChangeFactor = 1.0 / Period, + Routes = hb_ao:get(<<"routes">>, Base, [], Opts), + UpdatedRoutes = update_node_performance( + Routes, + Reference, + Duration, + ChangeFactor, + Opts + ), + {ok, hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts)} + end. + +%% @doc Register a new node on a route. Supports both: +%% - `match'/`with' format +%% - `prefix'/`price'/`topup' format +%% Generates `http_reference' from the node's URL (with or prefix). +%% +%% Registration is gated by either of: +%% - `trusted-peer': if any committer of the request matches the configured +%% trusted peer wallet address, the registration is accepted unconditionally. +%% - `is-admissible': otherwise the configured admissibility device is +%% resolved against the request body. The default device returns `true' for +%% backward compatibility, so deployments must opt in to a stricter check. +register(RawBase, Req, Opts) -> + Base = ensure_defaults(RawBase, Opts), + Body = hb_ao:get(<<"body">>, Req, Req, Opts), + case is_admissible(Base, Body, Opts) of + true -> + Route = hb_ao:get(<<"route">>, Body, Opts), + Template = hb_ao:get(<<"template">>, Route, Opts), + Routes = hb_ao:get(<<"routes">>, Base, [], Opts), + InitialPerf = + to_float( + hb_ao:get(<<"initial-performance">>, Base, 30000, Opts) + ), + HttpRef = node_url(Route, Opts), + Node = build_node(Route, HttpRef, InitialPerf, Opts), + RouteConfig = extract_route_config(Route, Opts), + {UpdatedRoutes, _} = + add_node_to_route(Routes, Template, Node, RouteConfig, Opts), + NewBase = + hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts), + recalculate(NewBase, Req, Opts); + false -> + ?event(router_perf, + {register_rejected, {body, Body}}), + {ok, Base} + end. + +%% @doc Check whether a registration request is admissible. +%% Returns `true' if either the request is signed by the configured +%% `trusted-peer' or the configured `is-admissible' device resolves to +%% `<<"true">>'. +is_admissible(Base, Body, Opts) -> + case is_trusted_peer(Base, Body, Opts) of + true -> true; + false -> resolve_is_admissible(Base, Body, Opts) + end. + +is_trusted_peer(Base, Body, Opts) -> + case hb_ao:get(<<"trusted-peer">>, Base, not_found, Opts) of + not_found -> false; + TrustedPeer -> + Signers = hb_message:signers(Body, Opts), + lists:member(TrustedPeer, Signers) + end. + +resolve_is_admissible(Base, Body, Opts) -> + Device = hb_ao:get(<<"is-admissible">>, Base, not_found, Opts), + case Device of + not_found -> true; + _ -> + case hb_ao:resolve(Device, Body, Opts) of + {ok, <<"true">>} -> true; + {ok, true} -> true; + _ -> false + end + end. + +%% @doc Recompute weights for all nodes in all routes. +%% Weight = inverse performance -- lower ms = higher weight, normalized. +recalculate(RawBase, _Req, Opts) -> + Base = ensure_defaults(RawBase, Opts), + Routes = hb_ao:get(<<"routes">>, Base, [], Opts), + SamplingRate = to_float(hb_ao:get(<<"sampling-rate">>, Base, 0.1, Opts)), + PerfWeight = to_float(hb_ao:get(<<"performance-weight">>, Base, 1, Opts)), + PricingWeight = to_float(hb_ao:get(<<"pricing-weight">>, Base, 1, Opts)), + ScorePref = to_float(hb_ao:get(<<"score-preference">>, Base, 1, Opts)), + ScoringParams = #{ + sampling_rate => SamplingRate, + perf_weight => PerfWeight, + pricing_weight => PricingWeight, + score_pref => ScorePref + }, + UpdatedRoutes = lists:map( + fun(R) -> + Nodes = hb_ao:get(<<"nodes">>, R, [], Opts), + recalculate_route(R, ScoringParams, Opts, Nodes) + end, + Routes + ), + {ok, hb_ao:set(Base, #{ <<"routes">> => UpdatedRoutes }, Opts)}. + +%%% Internal functions + +-define( + ROUTE_CONFIG_KEYS, + [ + <<"strategy">>, + <<"parallel">>, + <<"choose">>, + <<"admissible-status">> + ] +). + +%% @doc Extract route-level configuration keys from a registration message. +extract_route_config(Route, Opts) -> + lists:foldl( + fun(Key, Acc) -> + case hb_ao:get(Key, Route, not_found, Opts) of + not_found -> Acc; + Value -> Acc#{ Key => Value } + end + end, + #{}, + ?ROUTE_CONFIG_KEYS + ). + +ensure_defaults(Base, Opts) -> + Defaults = [ + {<<"routes">>, []}, + {<<"sampling-rate">>, 0.1}, + {<<"pricing-weight">>, 1}, + {<<"performance-weight">>, 1}, + {<<"score-preference">>, 1}, + {<<"performance-period">>, 1000}, + {<<"initial-performance">>, 30000} + ], + lists:foldl( + fun({Key, Default}, Acc) -> + case hb_ao:get(Key, Acc, not_found, Opts) of + not_found -> hb_ao:set(Acc, #{Key => Default}, Opts); + _ -> Acc + end + end, + Base, + Defaults + ). + + +node_url(Node, Opts) -> + case hb_ao:get(<<"with">>, Node, not_found, Opts) of + not_found -> hb_ao:get(<<"prefix">>, Node, <<"unknown">>, Opts); + With -> With + end. + +%% @doc Build a node map from a registration route. +%% `http_reference' is stored at the top level AND in opts. +build_node(Route, HttpRef, InitialPerf, Opts) -> + ExistingNodeOpts = + case hb_ao:get(<<"opts">>, Route, not_found, Opts) of + NodeOpts when is_map(NodeOpts) -> + %% TODO: PLEASE CROSS CHECK ONCE I THINK IT's OKAY BUT VERIFY + %% Strip old commitments so http_reference gets included + %% when the process pipeline commits the full state. + hb_maps:without([<<"commitments">>], NodeOpts, Opts); + _ -> #{} + end, + Node = #{ + <<"performance">> => InitialPerf, + <<"price">> => to_float(hb_ao:get(<<"price">>, Route, 0, Opts)), + <<"weight">> => 1.0, + <<"http_reference">> => HttpRef, + <<"opts">> => ExistingNodeOpts#{ <<"http_reference">> => HttpRef } + }, + WithMatch = + case hb_ao:get(<<"with">>, Route, not_found, Opts) of + not_found -> #{}; + With -> + #{ + <<"with">> => With, + <<"match">> => hb_ao:get(<<"match">>, Route, <<"">>, Opts) + } + end, + WithPrefix = + case hb_ao:get(<<"prefix">>, Route, not_found, Opts) of + not_found -> #{}; + Prefix -> #{ <<"prefix">> => Prefix } + end, + hb_maps:merge(hb_maps:merge(Node, WithMatch, Opts), WithPrefix, Opts). + +%% @doc Append a node to an existing route or create a new one. +add_node_to_route(Routes, Template, Node, RouteConfig, Opts) -> + case find_route_index(Routes, Template, Opts) of + not_found -> + BaseRoute = #{ + <<"template">> => Template, + <<"strategy">> => <<"By-Weight">>, + <<"nodes">> => [Node] + }, + NewRoute = hb_maps:merge(BaseRoute, RouteConfig, Opts), + {Routes ++ [NewRoute], length(Routes) + 1}; + Idx -> + Route = lists:nth(Idx, Routes), + ExistingNodes = hb_ao:get(<<"nodes">>, Route, [], Opts), + UpdatedRoute = + hb_ao:set( + Route, + #{ <<"nodes">> => ExistingNodes ++ [Node] }, + Opts + ), + MergedRoute = hb_maps:merge(UpdatedRoute, RouteConfig, Opts), + {list_replace(Routes, Idx, MergedRoute), Idx} + end. + +%% @doc Find the 1-based index of the route matching Template, or not_found. +find_route_index(Routes, Template, Opts) -> + find_route_index(Routes, Template, 1, Opts). +find_route_index([], _Template, _Idx, _Opts) -> not_found; +find_route_index([Route | Rest], Template, Idx, Opts) -> + case hb_ao:get(<<"template">>, Route, undefined, Opts) of + Template -> Idx; + _ -> find_route_index(Rest, Template, Idx + 1, Opts) + end. + +%% @doc Replace the element at 1-based Idx in List with Value. +list_replace(List, Idx, Value) -> + {Before, [_ | After]} = lists:split(Idx - 1, List), + Before ++ [Value | After]. + +%% @doc Apply a duration update to every node whose `http_reference' matches +%% Reference, across all routes. +%% TODO: scope by (route, reference) so the same URL in multiple routes does +%% not cross-contaminate weights. +update_node_performance(Routes, Reference, Duration, ChangeFactor, Opts) -> + lists:map( + fun(Route) -> + Nodes = hb_ao:get(<<"nodes">>, Route, [], Opts), + UpdatedNodes = + [ + apply_node_perf(Node, Reference, Duration, ChangeFactor, Opts) + || + Node <- Nodes + ], + hb_ao:set(Route, #{ <<"nodes">> => UpdatedNodes }, Opts) + end, + Routes + ). + +%% @doc Update a single node's performance if its http_reference matches. +apply_node_perf(Node, Reference, Duration, ChangeFactor, Opts) -> + case hb_ao:get(<<"http_reference">>, Node, not_found, Opts) of + Reference -> + OldPerf = to_float(hb_ao:get(<<"performance">>, Node, 30000, Opts)), + NewPerf = (OldPerf * (1.0 - ChangeFactor)) + (Duration * ChangeFactor), + hb_ao:set(Node, #{ <<"performance">> => NewPerf }, Opts); + _ -> + Node + end. + +%% @doc Recompute weights for all nodes in a single route. +recalculate_route(Route, ScoringParams, Opts, Nodes) when is_list(Nodes) -> + #{ + perf_weight := PerfW, + pricing_weight := PriceW, + sampling_rate := SamplingRate, + score_pref := ScorePref + } = ScoringParams, + TotalWeight = PerfW + PriceW, + PerfFactor = PerfW / TotalWeight, + PriceFactor = PriceW / TotalWeight, + PerfValues = + [ + to_float(hb_ao:get(<<"performance">>, N, 30000, Opts)) + || + N <- Nodes + ], + PriceValues = + [ + to_float(hb_ao:get(<<"price">>, N, 0, Opts)) + || + N <- Nodes + ], + SortedPerf = lists:sort(PerfValues), + SortedPrice = lists:sort(PriceValues), + ScoredNodes = + lists:map( + fun(Node) -> + Perf = + to_float(hb_ao:get(<<"performance">>, Node, 30000, Opts)), + Price = + to_float(hb_ao:get(<<"price">>, Node, 0, Opts)), + PerfPercentile = percentile(Perf, SortedPerf), + PricePercentile = percentile(Price, SortedPrice), + PerfScore = + (decay(ScorePref, PerfPercentile) * (1.0 - SamplingRate)) + + SamplingRate, + PriceScore = decay(ScorePref, PricePercentile), + Weight = (PerfScore * PerfFactor) + (PriceScore * PriceFactor), + hb_ao:set(Node, #{ <<"weight">> => Weight }, Opts) + end, + Nodes + ), + hb_ao:set(Route, #{ <<"nodes">> => ScoredNodes }, Opts); +recalculate_route(Route, _ScoringParams, _Opts, _Nodes) -> + Route. + +%% @doc Exponential decay function for score weighting. +decay(Preference, Score) -> + math:exp(-Preference * Score). + +%% @doc Compute the percentile rank of Value in a sorted list. +percentile(_Value, []) -> 0.0; +percentile(_Value, [_]) -> 0.0; +percentile(Value, Sorted) -> + Pos = count_less_or_equal(Value, Sorted, 0), + (Pos - 1) / length(Sorted). + +count_less_or_equal(_Value, [], Count) -> Count; +count_less_or_equal(Value, [H | T], Count) when H =< Value -> + count_less_or_equal(Value, T, Count + 1); +count_less_or_equal(_Value, _Rest, Count) -> Count. + +to_float(V) when is_float(V) -> V; +to_float(V) when is_integer(V) -> float(V); +to_float(V) when is_binary(V) -> + try binary_to_float(V) + catch _:_ -> + try float(binary_to_integer(V)) + catch _:_ -> 0.0 + end + end; +to_float(V) when is_list(V) -> + try list_to_float(V) + catch _:_ -> + try float(list_to_integer(V)) + catch _:_ -> 0.0 + end + end; +to_float(_) -> 0.0. + +%%% ============================================================ +%%% Tests +%%% ============================================================ + +register_test() -> + Base = #{}, + Req = #{ + <<"route">> => #{ + <<"template">> => <<"/test-key">>, + <<"prefix">> => <<"host1">>, + <<"price">> => 5 + } + }, + {ok, Base2} = register(Base, Req, #{}), + Routes = hb_maps:get(<<"routes">>, Base2, [], #{}), + ?assertEqual(1, length(Routes)), + [Route] = Routes, + Nodes = hb_maps:get(<<"nodes">>, Route, [], #{}), + ?assertEqual(1, length(Nodes)), + [Node] = Nodes, + ?assertEqual(<<"host1">>, hb_maps:get(<<"prefix">>, Node, undefined, #{})), + ?assertEqual(5.0, hb_maps:get(<<"price">>, Node, undefined, #{})). + +register_match_with_test() -> + Base = #{}, + Req = #{ + <<"route">> => #{ + <<"template">> => <<"^/arweave">>, + <<"match">> => <<"^/arweave">>, + <<"with">> => <<"http://chain-1.arweave.xyz:1984">> + } + }, + {ok, Base2} = register(Base, Req, #{}), + Routes = hb_maps:get(<<"routes">>, Base2, [], #{}), + ?assertEqual(1, length(Routes)), + [Route] = Routes, + Nodes = hb_maps:get(<<"nodes">>, Route, [], #{}), + ?assertEqual(1, length(Nodes)), + [Node] = Nodes, + ?assertEqual(<<"http://chain-1.arweave.xyz:1984">>, + hb_maps:get(<<"with">>, Node, undefined, #{})), + ?assertEqual(<<"^/arweave">>, + hb_maps:get(<<"match">>, Node, undefined, #{})), + ?assertEqual(<<"http://chain-1.arweave.xyz:1984">>, + hb_maps:get(<<"http_reference">>, Node, undefined, #{})), + NodeOpts = hb_maps:get(<<"opts">>, Node, #{}, #{}), + ?assertEqual(<<"http://chain-1.arweave.xyz:1984">>, + hb_maps:get(<<"http_reference">>, NodeOpts, undefined, #{})). + +duration_test() -> + Base = #{ + <<"performance-period">> => 2, + <<"initial-performance">> => 1000, + <<"routes">> => [ + #{ + <<"template">> => <<"^/arweave">>, + <<"strategy">> => <<"By-Weight">>, + <<"nodes">> => [ + #{ + <<"with">> => <<"http://chain-1.arweave.xyz:1984">>, + <<"match">> => <<"^/arweave">>, + <<"performance">> => 1000.0, + <<"price">> => 0, + <<"weight">> => 1.0, + <<"http_reference">> => + <<"http://chain-1.arweave.xyz:1984">>, + <<"opts">> => #{ + <<"http_reference">> => + <<"http://chain-1.arweave.xyz:1984">> + } + } + ] + } + ] + }, + Req = #{ + <<"duration">> => 200, + <<"reference">> => <<"http://chain-1.arweave.xyz:1984">> + }, + {ok, Base2} = duration(Base, Req, #{}), + Routes = hb_maps:get(<<"routes">>, Base2, [], #{}), + [Route] = Routes, + [Node] = hb_maps:get(<<"nodes">>, Route, [], #{}), + %% With period=2, change_factor=0.5. + %% new_perf = 1000 * 0.5 + 200 * 0.5 = 600. + ?assertEqual(600.0, hb_maps:get(<<"performance">>, Node, undefined, #{})). + +duration_no_reference_test() -> + Base = #{ <<"routes">> => [] }, + Req = #{ + <<"duration">> => 200 + }, + {ok, Base2} = duration(Base, Req, #{}), + ?assertEqual(Base2, ensure_defaults(Base, #{})). + +recalculate_test() -> + Base = #{ + <<"performance-period">> => 6, + <<"initial-performance">> => 30000, + <<"performance-weight">> => 1, + <<"pricing-weight">> => 1, + <<"sampling-rate">> => 0.1, + <<"score-preference">> => 1, + <<"routes">> => [ + #{ + <<"template">> => <<"/test">>, + <<"strategy">> => <<"By-Weight">>, + <<"nodes">> => [ + #{ + <<"prefix">> => <<"host1">>, + <<"performance">> => 200.0, + <<"price">> => 5, + <<"weight">> => 1.0, + <<"http_reference">> => <<"host1">> + }, + #{ + <<"prefix">> => <<"host2">>, + <<"performance">> => 55500.0, + <<"price">> => 5, + <<"weight">> => 1.0, + <<"http_reference">> => <<"host2">> + } + ] + } + ] + }, + {ok, Base2} = recalculate(Base, #{}, #{}), + Routes = hb_maps:get(<<"routes">>, Base2, [], #{}), + [Route] = Routes, + [N1, N2] = hb_maps:get(<<"nodes">>, Route, [], #{}), + W1 = hb_maps:get(<<"weight">>, N1, undefined, #{}), + W2 = hb_maps:get(<<"weight">>, N2, undefined, #{}), + ?assert(W1 > W2). + +hb_gateway_load_balancer_test_() -> + {timeout, 60, fun() -> + application:ensure_all_started(hb), + ID = <<"ptBC0UwDmrUTBQX3MqZ1lB57ex20ygwzkjjCrQjIx3o">>, + PerfProcess = <<"/perf-router~node-process@1.0">>, + SchedulePath = <>, + RoutesPath = <>, + HBGateways = [ + #{ + <<"match">> => <<"^/">>, + <<"with">> => <<"https://blue.hyperbeam.zephyrdev.xyz/">> + }, + #{ + <<"match">> => <<"^/">>, + <<"with">> => <<"https://neo.hyperbeam.zephyrdev.xyz/">> + }, + #{ <<"match">> => <<"^/">>, + <<"with">> => <<"https://neo2.hyperbeam.zephyrdev.xyz/">> } + ], + Opts = #{ + store => hb_test_utils:test_store(), + priv_wallet => ar_wallet:new(), + router_opts => #{ + <<"provider">> => #{ <<"path">> => RoutesPath } + }, + on => #{ + <<"request">> => #{ + <<"device">> => <<"router@1.0">>, + <<"path">> => <<"preprocess">> + }, + <<"http-client">> => #{ + <<"response">> => [ + #{ + <<"device">> => <<"relay@1.0">>, + <<"path">> => <<"call">>, + <<"method">> => <<"POST">>, + <<"relay-path">> => SchedulePath, + <<"hook/result">> => <<"ignore">> + } + ] + } + }, + node_processes => #{ + <<"perf-router">> => #{ + <<"device">> => <<"process@1.0">>, + <<"execution-device">> => <<"router-perf@1.0">>, + <<"scheduler-device">> => <<"scheduler@1.0">>, + <<"performance-period">> => 2, + <<"initial-performance">> => 1000 + } + } + }, + Node = hb_http_server:start_node(Opts), + %% Register HB gateways with the perf-router process. + RouteConfig = #{ + <<"template">> => <<"^/(?!.*~)">>, + <<"strategy">> => <<"By-Weight">>, + <<"choose">> => 2 + }, + lists:foreach( + fun(GW) -> + Body = + hb_message:commit( + #{ + <<"action">> => <<"register">>, + <<"route">> => maps:merge(GW, RouteConfig) + }, + Opts + ), + {ok, _} = + hb_http:post( + Node, + #{ + <<"path">> => SchedulePath, + <<"method">> => <<"POST">>, + <<"body">> => Body + }, + Opts + ) + end, + HBGateways + ), + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + {ok, InitPerf} = + hb_http:get( + Node, + <>, + Opts + ), + ?assertEqual(1000.0, to_float(InitPerf)), + {ok, Res} = hb_http:get(Node, <<"/", ID/binary>>, Opts), + ?assert(is_map(Res)), + timer:sleep(2000), + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + Perfs = + lists:map( + fun(I) -> + IB = integer_to_binary(I), + {ok, V} = + hb_http:get( + Node, + << + PerfProcess/binary, + "/now/routes/1/nodes/", + IB/binary, + "/performance" + >>, + Opts + ), + to_float(V) + end, + [1, 2, 3] + ), + ?assert(lists:any(fun(P) -> P =/= 1000.0 end, Perfs)), + RecalcBody = + hb_message:commit(#{ <<"action">> => <<"recalculate">> }, Opts), + {ok, _} = + hb_http:post( + Node, + #{ + <<"path">> => SchedulePath, + <<"method">> => <<"POST">>, + <<"body">> => RecalcBody + }, + Opts + ), + {ok, _} = hb_http:get(Node, RoutesPath, Opts), + Weights = + lists:map( + fun(I) -> + IB = integer_to_binary(I), + {ok, W} = + hb_http:get( + Node, + << + PerfProcess/binary, + "/now/routes/1/nodes/", + IB/binary, + "/weight" + >>, + Opts + ), + to_float(W) + end, + [1, 2, 3] + ), + UniqueWeights = lists:usort(Weights), + ?assert(length(UniqueWeights) > 1), + ok + end}. \ No newline at end of file diff --git a/src/hb_http_client.erl b/src/hb_http_client.erl index e02135d39..b00f757fc 100644 --- a/src/hb_http_client.erl +++ b/src/hb_http_client.erl @@ -155,6 +155,7 @@ httpc_req(Args, Opts) -> <<"status-class">> => get_status_class(Status), <<"duration">> => EndTime - StartTime }, + response_to_map({ok, Status, RespHeaders, RespBody}), Opts ), {ok, Status, RespHeaders, RespBody}; @@ -217,6 +218,7 @@ hackney_req(Args, Opts) -> <<"status-class">> => get_status_class(Response), <<"duration">> => EndTime - StartTime }, + response_to_map(Response), Opts ), record_response_status(Method, Response, Path), @@ -249,6 +251,7 @@ gun_req(Args, Opts) -> <<"status-class">> => get_status_class(Response), <<"duration">> => EndTime - StartTime }, + response_to_map(Response), Opts ), Response. @@ -261,45 +264,71 @@ init_hackney_pool() -> {timeout, ?DEFAULT_KEEPALIVE_TIMEOUT} ]). -%% @doc Invoke the HTTP monitor message with AO-Core, if it is set in the -%% node message key. We invoke the given message with the `body' set to a signed -%% version of the details. This allows node operators to configure their machine -%% to record duration statistics into customized data stores, computations, or -%% processes etc. Additionally, we include the `http_reference' value, if set in -%% the given `opts'. -%% -%% We use `hb_ao:get' rather than `hb_opts:get', as settings configured -%% by the `~router@1.0' route `opts' key are unable to generate atoms. -maybe_invoke_monitor(Details, Opts) -> - case hb_ao:get(<<"http_monitor">>, Opts, Opts) of - not_found -> ok; - Monitor -> - % We have a monitor message. Place the `details' into the body, set - % the `method' to "POST", add the `http_reference' (if applicable) - % and sign the request. We use the node message's wallet as the - % source of the key. - MaybeWithReference = - case hb_ao:get(<<"http_reference">>, Opts, Opts) of - not_found -> Details; - Ref -> Details#{ <<"reference">> => Ref } +%% @doc Record the duration of the request in an async process. We write the +%% data to prometheus if the application is enabled, as well as firing the +%% `http-client/response' hook if configured. +record_duration(Details, Response, Opts) -> + spawn( + fun() -> + % First, write to prometheus if it is enabled. Prometheus works + % only with strings as lists, so we encode the data before granting + % it. + GetFormat = + fun + (<<"request-category">>) -> + path_to_category(maps:get(<<"request-path">>, Details)); + (Key) -> + hb_util:list(maps:get(Key, Details)) end, - Req = - Monitor#{ + Labels = lists:map( + GetFormat, + [ + <<"request-method">>, + <<"status-class">>, + <<"request-category">> + ] + ), + hb_prometheus:observe( + maps:get(<<"duration">>, Details), + http_client_duration_seconds, + Labels + ), + HookReq = + #{ <<"body">> => hb_message:commit( - MaybeWithReference#{ - <<"method">> => <<"POST">> - }, + maybe_add_reference(Details, Opts), Opts - ) + ), + <<"priv">> => #{<<"response">> => Response} }, - % Use the singleton parse to generate the message sequence to - % execute. - ReqMsgs = hb_singleton:from(Req, Opts), - Res = hb_ao:resolve_many(ReqMsgs, Opts), - ?event(debug_http_monitor, {resolved_monitor, Res}) + dev_hook:on(<<"http-client/response">>, HookReq, Opts) + end + ). + +%% @doc Attach the `http_reference' from Opts to the details map, if present. +maybe_add_reference(Details, Opts) -> + case hb_opts:get(http_reference, not_found, Opts) of + not_found -> Details; + Ref -> Details#{ <<"reference">> => Ref } end. +%% @doc Convert a raw HTTP response tuple into a HB map of lowercased +%% header keys and a `<<"body">>' field, suitable for passing into +%% `dev_codec_httpsig:from/3'. +response_to_map({ok, _Status, Headers, Body}) -> + HeaderMap = + maps:from_list( + [ + {string:lowercase(hb_util:bin(K)), hb_util:bin(V)} + || + {K, V} <- Headers + ] + ), + HeaderMap#{<<"body">> => Body}; +response_to_map(_) -> + #{}. + %%% ================================================================== %%% gen_server callbacks. %%% ================================================================== @@ -615,40 +644,6 @@ init_prometheus() -> ?event(started), ok. -%% @doc Record the duration of the request in an async process. We write the -%% data to prometheus if the application is enabled, as well as invoking the -%% `http_monitor' if appropriate. -record_duration(Details, Opts) -> - spawn( - fun() -> - % Prometheus works only with strings as lists, so we encode the - % data before granting it. - GetFormat = - fun - (<<"request-category">>) -> - path_to_category(maps:get(<<"request-path">>, Details)); - (Key) -> - hb_util:list(maps:get(Key, Details)) - end, - Labels = lists:map( - GetFormat, - [ - <<"request-method">>, - <<"status-class">>, - <<"request-category">> - ]), - hb_prometheus:observe( - maps:get(<<"duration">>, Details), - http_client_duration_seconds, - Labels - ), - maybe_invoke_monitor( - Details#{ <<"path">> => <<"duration">> }, - Opts - ) - end - ). - record_response_status(Method, Response) -> record_response_status(Method, Response, undefined). record_response_status(Method, Response, Path) -> diff --git a/src/hb_opts.erl b/src/hb_opts.erl index 4d169cf88..7fc4b0b09 100644 --- a/src/hb_opts.erl +++ b/src/hb_opts.erl @@ -205,6 +205,7 @@ default_message() -> #{<<"name">> => <<"message@1.0">>, <<"module">> => dev_message}, #{<<"name">> => <<"meta@1.0">>, <<"module">> => dev_meta}, #{<<"name">> => <<"monitor@1.0">>, <<"module">> => dev_monitor}, + #{<<"name">> => <<"chance@1.0">>, <<"module">> => dev_chance}, #{<<"name">> => <<"multipass@1.0">>, <<"module">> => dev_multipass}, #{<<"name">> => <<"name@1.0">>, <<"module">> => dev_name}, #{<<"name">> => <<"node-process@1.0">>, <<"module">> => dev_node_process}, @@ -218,6 +219,7 @@ default_message() -> #{<<"name">> => <<"rate-limit@1.0">>, <<"module">> => dev_rate_limit}, #{<<"name">> => <<"relay@1.0">>, <<"module">> => dev_relay}, #{<<"name">> => <<"router@1.0">>, <<"module">> => dev_router}, + #{<<"name">> => <<"router-perf@1.0">>, <<"module">> => dev_router_perf}, #{<<"name">> => <<"scheduler@1.0">>, <<"module">> => dev_scheduler}, #{<<"name">> => <<"simple-pay@1.0">>, <<"module">> => dev_simple_pay}, #{<<"name">> => <<"snp@1.0">>, <<"module">> => dev_snp},