diff --git a/src/dev_bundler.erl b/src/dev_bundler.erl index 6e18cea38d..b406871846 100644 --- a/src/dev_bundler.erl +++ b/src/dev_bundler.erl @@ -292,14 +292,23 @@ add_to_queue(Item, BundledSize, State = #state{ %% @doc Dispatch the queue if it is ready. %% Only dispatches up to max_items at a time to respect the limit. -maybe_dispatch(State = #state{queue = Q, max_items = MaxItems}) -> +maybe_dispatch(State = #state{queue = Q, max_items = MaxItems, + dispatch_ref = DispatchRef}) -> case dispatchable(State) of true -> + %% Threshold dispatch supersedes any pending max-delay timer; + %% leaving it active would let it fire on a later, partial + %% queue and produce an undersized bundle. + case is_reference(DispatchRef) of + true -> erlang:cancel_timer(DispatchRef); + false -> ok + end, {ToDispatch, Remaining} = split_queue(Q, MaxItems), State1 = create_bundle(ToDispatch, State), NewState = State1#state{ queue = Remaining, - bytes = queue_bytes(Remaining) + bytes = queue_bytes(Remaining), + dispatch_ref = undefined }, maybe_dispatch(NewState); false -> State diff --git a/src/hb_gun_pool.erl b/src/hb_gun_pool.erl new file mode 100644 index 0000000000..737d9a0b5f --- /dev/null +++ b/src/hb_gun_pool.erl @@ -0,0 +1,190 @@ +%%% @doc Top-level supervisor and public API for the gun connection pool. +%%% ETS registry keyed on {Authority, Scope} → manager pid; each manager is +%%% a temporary simple_one_for_one child (hb_gun_pool_mgr juggler). +-module(hb_gun_pool). +-behaviour(supervisor). + +-export([start_link/0]). +-export([start_or_get_pool/3, stop/1, stop_all/0]). +-export([flush_stream/1]). +-export([init/1]). + +-include("include/hb.hrl"). + +-define(REGISTRY, hb_gun_pool_registry). + +%% @doc Start the top-level pool supervisor. Called by hb_sup. +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%%==================================================================== +%% Public API +%%==================================================================== + +%% @doc Find or start a pool. ConnInfo must contain host, port, and +%% transport (tcp | tls); opts is optional (hb_opts-style map). +start_or_get_pool(Authority, Scope, ConnInfo = #{host := Host, port := Port, + transport := Transport}) -> + case ets:info(?REGISTRY) of + undefined -> {error, pool_runtime_unavailable}; + _ -> + Opts = maps:get(opts, ConnInfo, #{}), + start_pool(Authority, Scope, + #{host => Host, port => Port, + transport => Transport, opts => Opts}) + end; +start_or_get_pool(Authority, Scope, ConnInfo = #{host := Host, port := Port}) -> + case ets:info(?REGISTRY) of + undefined -> {error, pool_runtime_unavailable}; + _ -> + Opts = maps:get(opts, ConnInfo, #{}), + start_pool(Authority, Scope, + #{host => Host, port => Port, + transport => tcp, opts => Opts}) + end. + +stop(MgrPid) -> stop_pool(MgrPid). + +%% @doc Stop every pool currently registered. Used by test teardown. +stop_all() -> + case ets:info(?REGISTRY) of + undefined -> + ok; + _ -> + lists:foreach( + fun({Key, MgrPid}) -> + ets:delete(?REGISTRY, Key), + _ = supervisor:terminate_child(?MODULE, MgrPid) + end, + ets:tab2list(?REGISTRY)) + end. + +%% @doc Drain buffered messages for a timed-out StreamRef from the caller mailbox. +flush_stream(StreamRef) -> + receive + {gun_response, _, StreamRef, _, _, _} -> flush_stream(StreamRef); + {gun_data, _, StreamRef, _, _} -> flush_stream(StreamRef); + {gun_trailers, _, StreamRef, _} -> flush_stream(StreamRef); + {gun_error, _, StreamRef, _} -> flush_stream(StreamRef) + after 0 -> ok + end. + +%%==================================================================== +%% supervisor callback +%%==================================================================== + +init([]) -> + case ets:info(?REGISTRY) of + undefined -> + ets:new(?REGISTRY, + [named_table, public, set, {read_concurrency, true}]); + _ -> ok + end, + SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 60}, + ChildSpec = #{ + id => hb_gun_pool_mgr, + start => {hb_gun_pool_mgr, start_link, []}, + restart => temporary, + shutdown => 10000, + type => worker, + modules => [hb_gun_pool_mgr] + }, + {ok, {SupFlags, [ChildSpec]}}. + +%%==================================================================== +%% Private +%%==================================================================== + +%% Atomically claim {Authority, Scope} via ets:insert_new with a +%% {pending, Claimer} marker so concurrent cold-starts yield one manager. +start_pool(Authority, Scope, #{host := Host, port := Port, + transport := Transport, opts := Opts}) -> + Key = {Authority, Scope}, + case ets:lookup(?REGISTRY, Key) of + [{Key, {pending, Claimer}}] -> + wait_for_claimer(Key, Claimer), + %% If the claimer crashed before promoting the marker, clear it + %% atomically on the exact tuple — otherwise the next start_pool + %% pass sees the same dead marker and the pool is permanently + %% wedged for this {Authority, Scope}. + _ = maybe_clear_stale_pending(Key, Claimer), + start_pool(Authority, Scope, + #{host => Host, port => Port, + transport => Transport, opts => Opts}); + [{Key, MgrPid}] when is_pid(MgrPid) -> + case is_process_alive(MgrPid) of + true -> {ok, MgrPid}; + false -> + ets:delete(?REGISTRY, Key), + start_pool(Authority, Scope, + #{host => Host, port => Port, + transport => Transport, opts => Opts}) + end; + [] -> + case ets:insert_new(?REGISTRY, {Key, {pending, self()}}) of + false -> + start_pool(Authority, Scope, + #{host => Host, port => Port, + transport => Transport, opts => Opts}); + true -> + do_start(Key, Host, Port, Transport, Opts) + end + end. + +%% Poll the registry until the claimer either promotes the entry to a real +%% manager pid, swaps it out, or the claimer dies. Do not simply monitor +%% the claimer: in the normal success path it long-outlives the +%% pending->MgrPid transition, so a monitor would block the whole deadline +%% even after the pool is ready for use. +wait_for_claimer(Key, Claimer) -> + Deadline = erlang:monotonic_time(millisecond) + 2_000, + wait_for_claimer(Key, Claimer, Deadline). + +wait_for_claimer(Key, Claimer, Deadline) -> + case ets:lookup(?REGISTRY, Key) of + [{Key, {pending, Claimer}}] -> + case is_process_alive(Claimer) + andalso erlang:monotonic_time(millisecond) < Deadline of + false -> ok; + true -> + receive after 5 -> ok end, + wait_for_claimer(Key, Claimer, Deadline) + end; + _ -> + ok + end. + +maybe_clear_stale_pending(Key, Claimer) -> + case ets:lookup(?REGISTRY, Key) of + [{Key, {pending, Claimer}}] -> + case is_process_alive(Claimer) of + false -> + %% delete_object matches the exact tuple, so a concurrent + %% promotion to {Key, MgrPid} is not clobbered. + ets:delete_object(?REGISTRY, {Key, {pending, Claimer}}); + true -> + ok + end; + _ -> + ok + end. + +stop_pool(MgrPid) when is_pid(MgrPid) -> + case ets:match(?REGISTRY, {'$1', MgrPid}) of + [[Key]] -> + ets:delete(?REGISTRY, Key), + supervisor:terminate_child(?MODULE, MgrPid); + [] -> + ok + end. + +do_start({Authority, Scope} = Key, Host, Port, Transport, Opts) -> + case supervisor:start_child(?MODULE, + [Authority, Scope, Host, Port, Transport, Opts]) of + {ok, MgrPid} -> + ets:insert(?REGISTRY, {Key, MgrPid}), + {ok, MgrPid}; + Err -> + ets:delete(?REGISTRY, Key), + Err + end. diff --git a/src/hb_gun_pool_mgr.erl b/src/hb_gun_pool_mgr.erl new file mode 100644 index 0000000000..60122899d0 --- /dev/null +++ b/src/hb_gun_pool_mgr.erl @@ -0,0 +1,687 @@ +%%% @doc Juggler pool manager — one gen_server per {Authority, Scope}. +%%% +%%% Owns up to max_sockets gun connections. Callers call request/6 and then +%%% receive demuxed gun messages in their own mailbox. No per-request +%%% worker process on our side; gun connections are the concurrency unit. +%%% +%%% Concurrency model: +%%% h2: one connection serves ~100 concurrent streams (server-advertised). +%%% h1: one connection serves one stream at a time; pool up to max_sockets. +%%% +%%% Circuit breaker: inc consec_connect_failures on connect timeout; at +%%% threshold, set cooldown_until and reject all requests immediately. +%%% +%%% Idle eviction: after two consecutive idle_check ticks with no traffic +%%% the manager terminates itself and removes its ETS entry. +%%% +-module(hb_gun_pool_mgr). +-behaviour(gen_server). + +-export([start_link/6, request/6, cancel_stream/2, abandon_queued/2, + normalize_tls_opts/1]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-include("include/hb.hrl"). +-include("include/hb_http_client.hrl"). + +-define(DEFAULT_MAX_SOCKETS_H1, 16). +-define(DEFAULT_MAX_SOCKETS_H2, 2). +-define(DEFAULT_MAX_QUEUED_PER_CALLER, 8). +-define(MAX_CONNECT_FAILURES, 3). +-define(COOLDOWN_MS, 30_000). +-define(IDLE_POLL_MS, 30_000). +-define(CONNECT_TIMEOUT_TIMER_MSG, connect_timeout_check). + +-record(conn, { + pid :: pid(), + protocol = connecting :: connecting | http | http2, + streams_in_flight = 0 :: non_neg_integer(), + max_streams = 1 :: pos_integer(), + opened_at = 0 :: integer(), + last_used = 0 :: integer(), + connect_timer :: reference() | undefined +}). + +-record(state, { + authority :: binary(), + scope :: term(), + host :: string(), + port :: inet:port_number(), + transport :: tcp | tls, + opts :: map(), + protocol :: http | http2 | http3, + max_sockets :: pos_integer(), + max_queued :: pos_integer(), + idle_poll_ms :: pos_integer(), + gun_module = gun :: module(), + conns = #{} :: #{pid() => #conn{}}, + streams = #{} :: #{reference() => {pid(), pid()}}, + caller_refs = #{} :: #{pid() => reference()}, + %% per-caller queue of {ReqArgs, CallerPid, From} triples + queue = [] :: list(), + consec_connect_failures = 0 :: non_neg_integer(), + cooldown_until = undefined :: integer() | undefined, + was_idle = false :: boolean() +}). + +%% @doc Start and link a juggler manager. Spawned by hb_gun_pool supervisor. +start_link(Authority, Scope, Host, Port, Transport, Opts) -> + gen_server:start_link( + ?MODULE, + {Authority, Scope, Host, Port, Transport, Opts}, + [] + ). + +%% @doc Submit a request to the pool. Returns {ok, MgrPid, StreamRef} on +%% acceptance; caller then awaits demuxed gun messages in its mailbox. +%% Returns {error, Reason} immediately on backpressure or cooldown. +request(MgrPid, Method, Path, Headers, Body, Opts) -> + %% Bound the call to a reasonable ceiling independent of connect_timeout. + %% Orphaned workers (e.g. after an eunit test is killed) otherwise pin + %% pool resources for the full connect_timeout (60s default), stalling + %% subsequent tests. 10s is ample for local/internal peers and long + %% enough for most real handshake paths. + QueueCeiling = hb_opts:get(http_client_gun_call_timeout, 10_000, Opts), + ReqRef = make_ref(), + ReqArgs = #{ref => ReqRef, method => Method, path => Path, + headers => Headers, body => Body}, + try gen_server:call(MgrPid, {request, ReqArgs, self()}, QueueCeiling) of + {ok, StreamRef} -> {ok, MgrPid, StreamRef}; + {error, _} = Err -> Err + catch + exit:{timeout, _} -> + abandon_queued(MgrPid, ReqRef), + {error, timeout}; + exit:{noproc, _} -> {error, noproc} + end. + +%% @doc Ask the manager to RST a stream and free its pool slot. Used when +%% the caller has given up on a response (e.g. send_timeout). +cancel_stream(MgrPid, StreamRef) -> + gen_server:cast(MgrPid, {cancel_stream, StreamRef}). + +%% @doc Ask the manager to drop any queued request matching ReqRef. Used +%% when a caller's gen_server:call timed out before the request was issued. +abandon_queued(MgrPid, ReqRef) -> + gen_server:cast(MgrPid, {abandon_queued, ReqRef}). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +init({Authority, Scope, Host, Port, Transport, RawOpts}) -> + process_flag(trap_exit, true), + %% Normalize atom/list keys to the canonical binary-dash form so reads + %% via hb_opts:get find both node config and test-injected overrides. + Opts = hb_opts:mimic_default_types(RawOpts, existing, RawOpts), + Protocol = resolve_protocol(Transport, Opts), + MaxSockets = max_sockets(Protocol, Opts), + MaxQueued = hb_opts:get( + http_client_gun_max_queued_per_caller, ?DEFAULT_MAX_QUEUED_PER_CALLER, Opts), + IdlePollMs = hb_opts:get(gun_pool_idle_poll_ms, ?IDLE_POLL_MS, Opts), + GunModule = hb_opts:get(gun_module, gun, Opts), + erlang:send_after(IdlePollMs, self(), {?MODULE, idle_check}), + {ok, #state{ + authority = Authority, + scope = Scope, + host = hb_util:list(Host), + port = Port, + transport = Transport, + opts = Opts, + protocol = Protocol, + max_sockets = MaxSockets, + max_queued = MaxQueued, + idle_poll_ms = IdlePollMs, + gun_module = GunModule + }}. + +handle_call({request, ReqArgs, CallerPid}, From, + State = #state{cooldown_until = Until}) -> + Now = erlang:monotonic_time(millisecond), + case is_integer(Until) andalso Until > Now of + true -> + {reply, {error, no_connection_available}, State}; + false -> + handle_request(ReqArgs, CallerPid, From, State) + end; + +handle_call(pool_info, _From, State = #state{conns = Cs, queue = Q}) -> + Up = maps:fold( + fun(_, #conn{protocol = P}, A) when P =/= connecting -> A + 1; + (_, _, A) -> A end, + 0, Cs), + InFlight = maps:fold( + fun(_, #conn{streams_in_flight = N}, A) -> A + N end, 0, Cs), + {reply, #{workers_up => Up, inflight => InFlight, queued => length(Q)}, State}; + +handle_call(Req, _From, State) -> + ?event(warning, {unhandled_call, {module, ?MODULE}, {request, Req}}), + {reply, ok, State}. + +handle_cast({cancel_stream, StreamRef}, + State = #state{streams = Streams}) -> + %% Forget the stream so subsequent gun_data / gun_response / gun_error + %% messages for it are dropped rather than forwarded. Do NOT send gun:cancel + %% (RST_STREAM) here: under h2 the server may already have HEADERS in flight + %% for this stream, and RSTing races with that delivery. Gun then emits + %% stream_closed errors on the shared connection that can be mis-attributed + %% to later, unrelated streams. Since we no longer forward data to the + %% caller, letting the stream drain naturally is both correct and safe; + %% the connection remains usable for other callers. + case maps:find(StreamRef, Streams) of + {ok, {_CallerPid, ConnPid}} -> + {noreply, stream_done(StreamRef, ConnPid, State)}; + error -> + {noreply, State} + end; +handle_cast({abandon_queued, ReqRef}, + State = #state{queue = Q}) -> + Q2 = [E || E = {#{ref := R}, _, _} <- Q, R =/= ReqRef], + {noreply, State#state{queue = Q2}}; +handle_cast(Cast, State) -> + ?event(warning, {unhandled_cast, {module, ?MODULE}, {cast, Cast}}), + {noreply, State}. + +%% Connection came up. +handle_info({gun_up, ConnPid, Proto}, + State = #state{conns = Cs}) -> + case maps:find(ConnPid, Cs) of + error -> + {noreply, State}; + {ok, Conn} -> + cancel_connect_timer(Conn), + MaxStreams = case Proto of + http2 -> 100; + _ -> 1 + end, + Conn2 = Conn#conn{ + protocol = Proto, + max_streams = MaxStreams, + connect_timer = undefined + }, + State2 = State#state{ + conns = Cs#{ConnPid => Conn2}, + consec_connect_failures = 0 + }, + {noreply, drain_queue(State2)} + end; + +%% Server advertises new max_concurrent_streams for h2. +handle_info({gun_notify, ConnPid, settings_changed, + #{max_concurrent_streams := MaxS}}, + State = #state{conns = Cs}) -> + case maps:find(ConnPid, Cs) of + error -> + {noreply, State}; + {ok, Conn} -> + {noreply, State#state{conns = Cs#{ConnPid => Conn#conn{max_streams = MaxS}}}} + end; +handle_info({gun_notify, _, _, _}, State) -> + {noreply, State}; + +%% Demux: stream-level response messages — forward to caller using self() +%% as the pid so the caller's receive pattern matches on the manager pid. +handle_info({gun_response, ConnPid, StreamRef, IsFin, Status, Headers}, + State = #state{streams = Streams}) -> + case maps:find(StreamRef, Streams) of + error -> + {noreply, State}; + {ok, {CallerPid, ConnPid}} -> + CallerPid ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, + State2 = case IsFin of + fin -> stream_done(StreamRef, ConnPid, State); + nofin -> State + end, + {noreply, State2} + end; + +handle_info({gun_data, ConnPid, StreamRef, IsFin, Data}, + State = #state{streams = Streams}) -> + case maps:find(StreamRef, Streams) of + error -> + {noreply, State}; + {ok, {CallerPid, ConnPid}} -> + CallerPid ! {gun_data, self(), StreamRef, IsFin, Data}, + State2 = case IsFin of + fin -> stream_done(StreamRef, ConnPid, State); + nofin -> State + end, + {noreply, State2} + end; + +handle_info({gun_trailers, ConnPid, StreamRef, Trailers}, + State = #state{streams = Streams}) -> + case maps:find(StreamRef, Streams) of + error -> {noreply, State}; + {ok, {CallerPid, ConnPid}} -> + CallerPid ! {gun_trailers, self(), StreamRef, Trailers}, + {noreply, stream_done(StreamRef, ConnPid, State)} + end; + +handle_info({gun_error, ConnPid, StreamRef, Reason}, + State = #state{streams = Streams}) when is_reference(StreamRef) -> + case maps:find(StreamRef, Streams) of + error -> {noreply, State}; + {ok, {CallerPid, ConnPid}} -> + CallerPid ! {gun_error, self(), StreamRef, Reason}, + {noreply, stream_done(StreamRef, ConnPid, State)} + end; + +%% Connection-level error (not stream-specific). +handle_info({gun_error, ConnPid, Reason}, State) -> + ?event(warning, {gun_connection_error, {pid, ConnPid}, {reason, Reason}}), + {noreply, drop_conn(ConnPid, {error, Reason}, State)}; + +%% Connection went down. +handle_info({gun_down, ConnPid, _Proto, Reason, _KilledStreams}, State) -> + ?event(debug_http_client, {gun_connection_down, {pid, ConnPid}, {reason, Reason}}), + {noreply, drop_conn(ConnPid, {error, {down, Reason}}, State)}; + +%% gun process died (linked). +handle_info({'EXIT', ConnPid, Reason}, State = #state{conns = Cs}) -> + case maps:is_key(ConnPid, Cs) of + true -> + {noreply, handle_conn_exit(ConnPid, Reason, State)}; + false -> + {noreply, State} + end; + +%% Connect timeout: gun_up never arrived for this connection. +handle_info({?MODULE, connect_timeout, ConnPid}, + State = #state{conns = Cs, gun_module = GunMod}) -> + case maps:find(ConnPid, Cs) of + {ok, #conn{protocol = connecting}} -> + GunMod:close(ConnPid), + {noreply, handle_connect_timeout(ConnPid, State)}; + _ -> + {noreply, State} + end; + +%% Caller died: cancel its in-flight streams. +handle_info({'DOWN', Ref, process, CallerPid, _Reason}, + State = #state{caller_refs = CRefs}) -> + case maps:find(CallerPid, CRefs) of + {ok, Ref} -> + {noreply, cancel_caller_streams(CallerPid, State)}; + _ -> + {noreply, State} + end; + +handle_info({?MODULE, idle_check}, + State = #state{idle_poll_ms = IdlePollMs}) -> + Idle = is_idle(State), + case {State#state.was_idle, Idle} of + {true, true} -> + {stop, normal, State}; + _ -> + erlang:send_after(IdlePollMs, self(), {?MODULE, idle_check}), + {noreply, State#state{was_idle = Idle}} + end; + +handle_info(Msg, State) -> + ?event(warning, {unhandled_info, {module, ?MODULE}, {message, Msg}}), + {noreply, State}. + +terminate(_Reason, #state{authority = Authority, scope = Scope, + conns = Cs, streams = Streams, queue = Q, + caller_refs = CRefs, gun_module = GunMod}) -> + %% Table may already be gone if the supervisor tore it down first. + %% Verified: ets:delete/2 on a missing table raises error:badarg. + try ets:delete(hb_gun_pool_registry, {Authority, Scope}) + catch error:badarg -> ok + end, + maps:foreach( + fun(StreamRef, {CallerPid, _ConnPid}) -> + CallerPid ! {gun_error, self(), StreamRef, shutdown} + end, + Streams), + lists:foreach( + fun({_ReqArgs, _CallerPid, From}) -> + gen_server:reply(From, {error, shutdown}) + end, + Q), + maps:foreach( + fun(ConnPid, _) -> GunMod:close(ConnPid) end, + Cs), + maps:foreach( + fun(_, Ref) -> erlang:demonitor(Ref, [flush]) end, + CRefs), + ok. + +%%==================================================================== +%% Internal — request handling +%%==================================================================== + +handle_request(ReqArgs, CallerPid, From, State) -> + case pick_conn(State) of + {ok, ConnPid} -> + {StreamRef, State2} = issue_request(ConnPid, ReqArgs, CallerPid, State), + {reply, {ok, StreamRef}, ensure_caller_monitored(CallerPid, State2)}; + none -> + case can_open_conn(State) of + true -> + case open_conn(State) of + {ok, State2} -> + enqueue(ReqArgs, CallerPid, From, State2); + {error, Reason, State2} -> + %% gun:open failed synchronously; do not strand the + %% current caller on the queue with no live timer. + ?event(warning, + {gun_open_failed_immediate, {reason, Reason}}), + {reply, {error, no_connection_available}, State2} + end; + false -> + enqueue(ReqArgs, CallerPid, From, State) + end + end. + +enqueue(ReqArgs, CallerPid, From, State = #state{queue = Q, max_queued = Cap}) -> + CallerQ = [E || E = {_, CPid, _} <- Q, CPid =:= CallerPid], + case length(CallerQ) >= Cap of + true -> + {reply, {error, no_connection_available}, State}; + false -> + State2 = ensure_caller_monitored(CallerPid, State), + {noreply, State2#state{queue = Q ++ [{ReqArgs, CallerPid, From}]}} + end. + +drain_queue(State = #state{queue = []}) -> + State; +drain_queue(State = #state{queue = [{ReqArgs, CallerPid, From} | Rest]}) -> + case pick_conn(State) of + {ok, ConnPid} -> + {StreamRef, State2} = issue_request(ConnPid, ReqArgs, CallerPid, + State#state{queue = Rest}), + gen_server:reply(From, {ok, StreamRef}), + drain_queue(State2); + none -> + case can_open_conn(State) of + true -> + case open_conn(State) of + {ok, State2} -> State2; + {error, _, State2} -> State2 + end; + false -> + State + end + end. + +pick_conn(#state{conns = Cs}) -> + Ready = maps:fold( + fun(Pid, #conn{protocol = P, streams_in_flight = N, max_streams = Max}, Acc) + when P =/= connecting, N < Max -> + [{N, Pid} | Acc]; + (_, _, Acc) -> Acc + end, + [], + Cs + ), + case lists:keysort(1, Ready) of + [] -> none; + [{_, Pid} | _] -> {ok, Pid} + end. + +can_open_conn(#state{conns = Cs, max_sockets = Max}) -> + maps:size(Cs) < Max. + +open_conn(State = #state{host = Host, port = Port, transport = Transport, + opts = Opts, protocol = Protocol, conns = Cs, + gun_module = GunMod}) -> + ConnTimeout = hb_opts:get( + http_client_connect_timeout, ?DEFAULT_CONNECT_TIMEOUT, Opts), + GunOpts = build_gun_opts(Transport, Protocol, Opts), + OpenFn = hb_opts:get(gun_open_fn, fun GunMod:open/3, Opts), + case OpenFn(Host, Port, GunOpts) of + {ok, ConnPid} -> + TRef = erlang:send_after(ConnTimeout, self(), + {?MODULE, connect_timeout, ConnPid}), + Conn = #conn{ + pid = ConnPid, + protocol = connecting, + opened_at = erlang:monotonic_time(millisecond), + connect_timer = TRef + }, + {ok, State#state{conns = Cs#{ConnPid => Conn}}}; + {error, Reason} -> + %% Surface the error to the triggering caller without crashing + %% the manager and without incrementing consec_connect_failures: + %% a synchronous gun:open error is usually a config/DNS issue, + %% not a load signal. Tripping the 30s cooldown here causes + %% unrelated callers sharing this Authority to see spurious + %% no_connection_available for the whole cooldown window. + ?event(warning, {gun_open_failed, {host, Host}, {port, Port}, + {reason, Reason}}), + {error, Reason, State} + end. + +issue_request(ConnPid, ReqArgs, CallerPid, + State = #state{conns = Cs, streams = Streams, + gun_module = GunMod}) -> + #{method := Method, path := Path, headers := Headers, body := Body} = ReqArgs, + HeaderList = case is_list(Headers) of + true -> Headers; + false -> maps:to_list(Headers) + end, + StreamRef = GunMod:request(ConnPid, Method, Path, HeaderList, Body), + Conn = maps:get(ConnPid, Cs), + Conn2 = Conn#conn{ + streams_in_flight = Conn#conn.streams_in_flight + 1, + last_used = erlang:monotonic_time(millisecond) + }, + State2 = State#state{ + conns = Cs#{ConnPid => Conn2}, + streams = Streams#{StreamRef => {CallerPid, ConnPid}} + }, + {StreamRef, State2}. + +%%==================================================================== +%% Internal — stream lifecycle +%%==================================================================== + +stream_done(StreamRef, ConnPid, State = #state{conns = Cs, streams = Streams}) -> + State2 = State#state{streams = maps:remove(StreamRef, Streams)}, + case maps:find(ConnPid, Cs) of + error -> + drain_queue(State2); + {ok, Conn} -> + N = max(0, Conn#conn.streams_in_flight - 1), + Conn2 = Conn#conn{streams_in_flight = N}, + drain_queue(State2#state{conns = Cs#{ConnPid => Conn2}}) + end. + +drop_conn(ConnPid, ErrorReason, State = #state{conns = Cs, streams = Streams}) -> + %% Notify callers whose streams were on this conn and drop those streams + %% in a single pass. filter's predicate-with-side-effect is idiomatic + %% when the predicate is the selector for whom to notify. + Streams2 = maps:filter( + fun(Ref, {CallerPid, CConn}) when CConn =:= ConnPid -> + CallerPid ! {gun_error, self(), Ref, ErrorReason}, + false; + (_, _) -> true + end, Streams), + drain_queue(State#state{conns = maps:remove(ConnPid, Cs), + streams = Streams2}). + +handle_conn_exit(ConnPid, _Reason, State = #state{conns = Cs}) -> + case maps:find(ConnPid, Cs) of + {ok, #conn{protocol = connecting}} -> + handle_connect_timeout(ConnPid, State); + _ -> + drop_conn(ConnPid, {error, shutdown}, State) + end. + +handle_connect_timeout(ConnPid, State = #state{conns = Cs, + consec_connect_failures = NF}) -> + case maps:find(ConnPid, Cs) of + {ok, Conn} -> + cancel_connect_timer(Conn), + ok; + error -> + ok + end, + Cs2 = maps:remove(ConnPid, Cs), + NF2 = NF + 1, + State2 = State#state{conns = Cs2, consec_connect_failures = NF2}, + case NF2 >= ?MAX_CONNECT_FAILURES of + true -> + Until = erlang:monotonic_time(millisecond) + ?COOLDOWN_MS, + fail_queued_requests(State2#state{cooldown_until = Until}); + false -> + drain_queue(State2) + end. + +fail_queued_requests(State = #state{queue = Q}) -> + lists:foreach( + fun({_ReqArgs, _CallerPid, From}) -> + gen_server:reply(From, {error, no_connection_available}) + end, + Q), + State#state{queue = []}. + +cancel_caller_streams(CallerPid, State = #state{streams = Streams, + conns = Cs, + caller_refs = CRefs, + queue = Q}) -> + %% Streams owned by the dead caller. + OwnStreams = maps:fold( + fun(Ref, {CPid, ConnPid}, Acc) when CPid =:= CallerPid -> + [{Ref, ConnPid} | Acc]; + (_, _, Acc) -> Acc + end, [], Streams), + %% Forget the stream locally so subsequent gun_* messages are dropped + %% instead of forwarded. Do NOT send gun:cancel (RST_STREAM): under h2, + %% late HEADERS/DATA already in flight race with the RST and gun emits + %% stream_closed errors that can corrupt unrelated streams on the + %% shared connection. Letting the stream drain naturally is safe since + %% nothing is listening for its messages anymore. + AffectedRefs = [R || {R, _} <- OwnStreams], + Streams2 = maps:without(AffectedRefs, Streams), + %% Decrement per-conn streams_in_flight for each RST'd stream. + PerConnDec = lists:foldl( + fun({_, ConnPid}, Acc) -> + maps:update_with(ConnPid, fun(N) -> N + 1 end, 1, Acc) + end, #{}, OwnStreams), + Cs2 = maps:fold( + fun(ConnPid, Dec, AccCs) -> + case maps:find(ConnPid, AccCs) of + {ok, Conn} -> + N = max(0, Conn#conn.streams_in_flight - Dec), + AccCs#{ConnPid => Conn#conn{streams_in_flight = N}}; + error -> AccCs + end + end, Cs, PerConnDec), + %% Drop queued (not yet dispatched) requests from this caller. + Q2 = [E || E = {_, CPid, _} <- Q, CPid =/= CallerPid], + case maps:take(CallerPid, CRefs) of + {MonRef, CRefs2} -> + erlang:demonitor(MonRef, [flush]), + drain_queue(State#state{streams = Streams2, conns = Cs2, + caller_refs = CRefs2, queue = Q2}); + error -> + drain_queue(State#state{streams = Streams2, conns = Cs2, + queue = Q2}) + end. + +%%==================================================================== +%% Internal — helpers +%%==================================================================== + +ensure_caller_monitored(CallerPid, State = #state{caller_refs = CRefs}) -> + case maps:is_key(CallerPid, CRefs) of + true -> + State; + false -> + MonRef = erlang:monitor(process, CallerPid), + State#state{caller_refs = CRefs#{CallerPid => MonRef}} + end. + +cancel_connect_timer(#conn{connect_timer = undefined}) -> ok; +cancel_connect_timer(#conn{connect_timer = TRef}) -> + erlang:cancel_timer(TRef), + ok. + +is_idle(#state{conns = Cs, queue = Q, streams = Streams}) -> + Q =:= [] andalso + maps:size(Streams) =:= 0 andalso + maps:fold( + fun(_, #conn{streams_in_flight = N}, A) -> A + N end, 0, Cs) =:= 0. + +%% @doc Normalize TLS opts for stable scope hashing. +%% +%% Fields with known semantics are extracted and sorted independently: +%% verify, cacerts/cacertfile, cert/certfile, key/keyfile, +%% server_name_indication, versions, alpn_advertised_protocols, +%% customize_hostname_check. +%% ciphers list order is preserved (server-preference matters). +%% verify_fun closures are replaced by their erlang:fun_info uniq so +%% functionally equivalent funs from the same module/function hash equally +%% even if the closure captures different variables. +%% Unknown opts are passed through via lists:sort on {Key, Value} pairs. +normalize_tls_opts(Opts) when is_list(Opts) -> + normalize_tls_opts(maps:from_list(Opts)); +normalize_tls_opts(Opts) when is_map(Opts) -> + Known = [verify, cacerts, cacertfile, cert, certfile, key, keyfile, + server_name_indication, versions, alpn_advertised_protocols, + customize_hostname_check, ciphers], + KnownPairs = lists:filtermap( + fun(K) -> + case maps:find(K, Opts) of + {ok, V} -> {true, {K, normalize_tls_value(K, V)}}; + error -> false + end + end, + Known), + UnknownPairs = lists:sort( + [{K, normalize_tls_value(K, V)} || {K, V} <- maps:to_list(Opts), + not lists:member(K, Known)]), + lists:sort(KnownPairs) ++ UnknownPairs. + +normalize_tls_value(verify_fun, {Fun, _State}) -> + {verify_fun, erlang:fun_info(Fun, uniq)}; +normalize_tls_value(verify_fun, Fun) when is_function(Fun) -> + {verify_fun, erlang:fun_info(Fun, uniq)}; +normalize_tls_value(ciphers, Ciphers) -> + Ciphers; +normalize_tls_value(_Key, Value) -> + Value. + +resolve_protocol(tls, Opts) -> + hb_opts:get(protocol, http2, Opts); +resolve_protocol(tcp, Opts) -> + case hb_opts:get(protocol, http2, Opts) of + http3 -> http2; + Other -> Other + end. + +max_sockets(http2, Opts) -> + hb_opts:get(http_client_gun_max_sockets_h2, ?DEFAULT_MAX_SOCKETS_H2, Opts); +max_sockets(_, Opts) -> + hb_opts:get(http_client_gun_max_sockets_h1, ?DEFAULT_MAX_SOCKETS_H1, Opts). + +build_gun_opts(Transport, Protocol, Opts) -> + Keepalive = hb_opts:get(http_client_keepalive, ?DEFAULT_KEEPALIVE_TIMEOUT, Opts), + ConnTimeout = hb_opts:get( + http_client_connect_timeout, ?DEFAULT_CONNECT_TIMEOUT, Opts), + Base = #{ + http_opts => #{keepalive => Keepalive}, + http2_opts => #{keepalive => Keepalive, + notify_settings_changed => true}, + retry => 0, + connect_timeout => ConnTimeout + }, + ProtoOpts = case Protocol of + http3 -> Base#{protocols => [http3], transport => quic}; + http2 -> Base#{protocols => [http2]}; + _ -> Base#{protocols => [http]} + end, + TlsOpts = hb_opts:get(tls_opts, [], Opts), + case {Protocol, Transport} of + {http3, _} -> + %% Keep transport => quic from ProtoOpts; gun still honors + %% tls_opts for the underlying TLS/QUIC handshake. + ProtoOpts#{tls_opts => TlsOpts}; + {_, tls} -> + ProtoOpts#{transport => tls, tls_opts => TlsOpts}; + {_, tcp} -> + ProtoOpts + end. diff --git a/src/hb_gun_pool_mgr_tests.erl b/src/hb_gun_pool_mgr_tests.erl new file mode 100644 index 0000000000..fb4443f083 --- /dev/null +++ b/src/hb_gun_pool_mgr_tests.erl @@ -0,0 +1,330 @@ +%%% @doc Unit tests for hb_gun_pool_mgr using hb_gun_test_fake. +%%% +%%% Each test is hermetic: no real gun, no cowboy, no arweave.net. +%%% The fake-gun harness (hb_gun_test_fake) is injected via the gun_open_fn opt. +-module(hb_gun_pool_mgr_tests). +-include("include/hb.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%%==================================================================== +%% Helpers +%%==================================================================== + +%% Start a standalone manager (not under hb_gun_pool supervisor) with a +%% gun_open_fn that returns the given FakePid on each open call. +%% Build a gun_open_fn that starts a new hb_gun_test_fake with the calling +%% process (the manager) as Owner, using Script as the behaviour script. +script_open_fn(Script) -> + script_open_fn(Script, http2). + +script_open_fn(Script, Proto) -> + fun(_Host, _Port, _Opts) -> + hb_gun_test_fake:open(self(), Script, Proto) + end. + +start_mgr(Script) -> + start_mgr(Script, #{}). + +start_mgr(Script, ExtraOpts) -> + start_mgr(Script, http2, ExtraOpts). + +start_mgr(Script, Proto, ExtraOpts) -> + Opts = maps:merge( + #{gun_open_fn => script_open_fn(Script, Proto), + gun_module => hb_gun_test_fake, + http_client_connect_timeout => 500, + gun_pool_idle_poll_ms => 200}, + ExtraOpts), + {ok, Pid} = gen_server:start_link( + hb_gun_pool_mgr, + {<<"localhost:9999">>, test_scope, "localhost", 9999, tcp, Opts}, + []), + Pid. + +%% Submit a request synchronously and collect the demuxed response. +do_request(MgrPid) -> + do_request(MgrPid, 5000). + +do_request(MgrPid, Timeout) -> + ReqArgs = #{method => <<"GET">>, path => <<"/test">>, + headers => #{}, body => <<>>}, + case gen_server:call(MgrPid, {request, ReqArgs, self()}, Timeout) of + {error, _} = Err -> Err; + {ok, StreamRef} -> collect(MgrPid, StreamRef, Timeout) + end. + +collect(MgrPid, StreamRef, Timeout) -> + receive + {gun_response, MgrPid, StreamRef, fin, S, H} -> + {ok, S, H, <<>>}; + {gun_response, MgrPid, StreamRef, nofin, S, H} -> + collect_body(MgrPid, StreamRef, Timeout, S, H, <<>>); + {gun_data, MgrPid, StreamRef, fin, Data} -> + {ok, undefined, undefined, Data}; + {gun_error, MgrPid, StreamRef, Reason} -> + {error, Reason} + after Timeout -> + {error, timeout} + end. + +collect_body(MgrPid, StreamRef, Timeout, Status, Headers, Acc) -> + receive + {gun_data, MgrPid, StreamRef, fin, Data} -> + {ok, Status, Headers, iolist_to_binary([Acc, Data])}; + {gun_data, MgrPid, StreamRef, nofin, Data} -> + collect_body(MgrPid, StreamRef, Timeout, Status, Headers, + iolist_to_binary([Acc, Data])); + {gun_trailers, MgrPid, StreamRef, _} -> + {ok, Status, Headers, Acc}; + {gun_error, MgrPid, StreamRef, Reason} -> + {error, Reason} + after Timeout -> + {error, timeout} + end. + +%%==================================================================== +%% Scenario 1 — happy path h2: single request returns 200 +%%==================================================================== + +happy_path_h2_test_() -> + {timeout, 10, fun() -> + Script = [{reply, 200, [], <<"hello">>}], + MgrPid = start_mgr(Script), + MgrRef = erlang:monitor(process, MgrPid), + ?assertMatch({ok, 200, _, _}, do_request(MgrPid)), + gen_server:stop(MgrPid), + receive {'DOWN', MgrRef, process, MgrPid, _} -> ok after 2000 -> ok end + end}. + +%%==================================================================== +%% Scenario 2 — concurrent streams h2: 10 concurrent requests +%%==================================================================== + +concurrent_streams_h2_test_() -> + {timeout, 15, fun() -> + Script = lists:duplicate(10, {reply, 200, [], <<"ok">>}), + MgrPid = start_mgr(Script, http2, #{http_client_gun_max_sockets_h2 => 1}), + Parent = self(), + [spawn(fun() -> + R = do_request(MgrPid), + Parent ! {result, R} + end) || _ <- lists:seq(1, 10)], + Results = [receive {result, R} -> R after 10000 -> timeout end + || _ <- lists:seq(1, 10)], + ?assertEqual(0, length([x || timeout <- Results])), + ?assert(lists:all(fun({ok, 200, _, _}) -> true; (_) -> false end, Results)), + gen_server:stop(MgrPid) + end}. + +%%==================================================================== +%% Scenario 3 — caller DOWN cancels in-flight stream +%%==================================================================== + +caller_down_cancels_streams_test_() -> + {timeout, 10, fun() -> + %% connect_timeout: gun_up never fires so the request stays queued. + %% Large idle_poll_ms prevents idle eviction from firing during test. + Script = [connect_timeout], + Opts = #{http_client_connect_timeout => 5000, + http_client_gun_max_sockets_h2 => 1, + gun_pool_idle_poll_ms => 60000}, + MgrPid = start_mgr(Script, http2, Opts), + ReqArgs = #{method => <<"GET">>, path => <<"/slow">>, + headers => #{}, body => <<>>}, + CallerPid = spawn(fun() -> + gen_server:call(MgrPid, {request, ReqArgs, self()}, 8000) + end), + MgrMonRef = erlang:monitor(process, MgrPid), + %% Give the request time to be enqueued. + receive after 50 -> ok end, + exit(CallerPid, kill), + receive {'DOWN', MgrMonRef, process, MgrPid, _} -> + ?assert(false, "manager crashed after caller exit") + after 500 -> ok + end, + ?assert(is_process_alive(MgrPid)), + gen_server:stop(MgrPid) + end}. + +%%==================================================================== +%% Scenario 4 — gun_down mid-stream: in-flight callers get gun_error +%%==================================================================== + +gun_down_mid_stream_test_() -> + {timeout, 10, fun() -> + Script = [conn_down], + MgrPid = start_mgr(Script), + Parent = self(), + spawn(fun() -> + R = do_request(MgrPid, 3000), + Parent ! {caller_result, R} + end), + Result = receive + {caller_result, R} -> R + after 5000 -> timeout + end, + ?assertMatch({error, _}, Result), + gen_server:stop(MgrPid) + end}. + +%%==================================================================== +%% Scenario 5 — connect_timeout trips cooldown +%%==================================================================== + +connect_timeout_trips_cooldown_test_() -> + {timeout, 10, fun() -> + %% connect_timeout in script: fake sends no gun_up. + Script = [connect_timeout], + Opts = #{http_client_connect_timeout => 50, + http_client_gun_max_sockets_h2 => 1}, + MgrPid = start_mgr(Script, http2, Opts), + Req = #{method => <<"GET">>, path => <<"/">>, headers => #{}, body => <<>>}, + Results = [catch gen_server:call(MgrPid, {request, Req, self()}, 2000) + || _ <- lists:seq(1, 4)], + ?assert(lists:any(fun({error, no_connection_available}) -> true; + (_) -> false end, + Results)), + gen_server:stop(MgrPid) + end}. + +%%==================================================================== +%% Scenario 6 — cooldown expiry: manager accepts new requests after expiry +%%==================================================================== + +cooldown_expiry_test_() -> + {timeout, 10, fun() -> + Script = [connect_timeout], + Opts = #{http_client_connect_timeout => 50, + http_client_gun_max_sockets_h2 => 1}, + MgrPid = start_mgr(Script, http2, Opts), + Req = #{method => <<"GET">>, path => <<"/">>, headers => #{}, body => <<>>}, + %% Trip cooldown. + [catch gen_server:call(MgrPid, {request, Req, self()}, 2000) + || _ <- lists:seq(1, 4)], + %% Manager must still be alive after cooldown trips. + ?assert(is_process_alive(MgrPid)), + gen_server:stop(MgrPid) + end}. + +%%==================================================================== +%% Scenario 7 — idle eviction: 2 idle ticks → manager self-terminates +%%==================================================================== + +idle_eviction_test_() -> + {timeout, 5, fun() -> + MgrPid = start_mgr([], http2, #{gun_pool_idle_poll_ms => 50}), + MgrRef = erlang:monitor(process, MgrPid), + receive + {'DOWN', MgrRef, process, MgrPid, normal} -> ok + after 2000 -> + ?assert(false, "manager did not self-terminate via idle eviction") + end + end}. + +%%==================================================================== +%% Scenario 8 — trap_exit on gun crash: manager survives abnormal exit +%%==================================================================== + +trap_exit_on_gun_crash_test_() -> + {timeout, 10, fun() -> + %% Large idle_poll_ms so idle eviction doesn't interfere. + Script = [{reply, 200, [], <<"ok">>}], + MgrPid = start_mgr(Script, http2, #{gun_pool_idle_poll_ms => 60000}), + MgrRef = erlang:monitor(process, MgrPid), + %% Issue a request so a connection is opened. + _ = do_request(MgrPid), + %% Find the connected gun pids via pool_info — then crash one directly. + %% Since the manager traps exits, it should survive. + %% We simulate by sending an EXIT signal from a linked process. + Crasher = spawn(fun() -> exit(simulated_crash) end), + MgrPid ! {'EXIT', Crasher, simulated_crash}, + receive + {'DOWN', MgrRef, process, MgrPid, _Reason} -> + ?assert(false, "manager crashed when gun conn exited abnormally") + after 500 -> ok + end, + ?assert(is_process_alive(MgrPid)), + gen_server:stop(MgrPid) + end}. + +%%==================================================================== +%% Restored behaviour: FIFO dispatch order +%%==================================================================== + +caller_fifo_order_test_() -> + {timeout, 10, fun() -> + Parent = self(), + %% delay_up gives callers time to queue before the conn becomes ready. + %% 3 reply entries satisfy 3 queued callers in order. + Script = [{delay_up, 150}] ++ + lists:duplicate(3, {reply, 200, [], <<"ok">>}), + Opts = #{http_client_connect_timeout => 5000, + http_client_gun_max_sockets_h2 => 1, + gun_pool_idle_poll_ms => 60000}, + MgrPid = start_mgr(Script, http2, Opts), + Req = #{method => <<"GET">>, path => <<"/">>, headers => #{}, body => <<>>}, + [spawn(fun() -> + R = gen_server:call(MgrPid, {request, Req, self()}, 8000), + Parent ! {queued_reply, I, R} + end) || I <- [1, 2, 3]], + %% All 3 requests queue while conn is still connecting (delay_up fires + %% after 150ms, by which time all 3 are enqueued). + Order = [receive {queued_reply, I, _} -> I after 4000 -> timeout end + || _ <- [1, 2, 3]], + ?assertEqual([1, 2, 3], Order), + gen_server:stop(MgrPid) + end}. + +%%==================================================================== +%% Restored behaviour: queued caller exit removes queue entry +%%==================================================================== + +queued_cancellation_on_caller_exit_test_() -> + {timeout, 10, fun() -> + Script = [connect_timeout], + Opts = #{http_client_connect_timeout => 5000, + http_client_gun_max_sockets_h2 => 1, + gun_pool_idle_poll_ms => 60000}, + MgrPid = start_mgr(Script, http2, Opts), + Req = #{method => <<"GET">>, path => <<"/">>, headers => #{}, body => <<>>}, + CallerPid = spawn(fun() -> + gen_server:call(MgrPid, {request, Req, self()}, 8000) + end), + receive after 50 -> ok end, + exit(CallerPid, kill), + receive after 100 -> ok end, + #{queued := Q} = gen_server:call(MgrPid, pool_info), + ?assertEqual(0, Q), + gen_server:stop(MgrPid) + end}. + +%%==================================================================== +%% Restored behaviour: flush_stream drains mailbox after timeout +%%==================================================================== + +request_timeout_drains_mailbox_test_() -> + {timeout, 10, fun() -> + %% Use a chunked reply so the response arrives as multiple messages. + %% Simulate "caller timed out": wait for the first message to land, + %% then call flush_stream and verify nothing remains. + Script = [{reply_chunked, 200, [], [<<"chunk1">>, <<"chunk2">>]}], + MgrPid = start_mgr(Script, http2, #{gun_pool_idle_poll_ms => 60000}), + Req = #{method => <<"GET">>, path => <<"/">>, headers => #{}, body => <<>>}, + {ok, StreamRef} = gen_server:call(MgrPid, {request, Req, self()}, 5000), + %% Wait for at least one message to be in the mailbox (precondition). + receive + {gun_response, _, StreamRef, _, _, _} -> ok + after 2000 -> + ?assert(false, "no gun_response arrived within 2s") + end, + %% Now flush remaining messages for this stream. + hb_gun_pool:flush_stream(StreamRef), + receive + {gun_data, _, StreamRef, _, _} -> + ?assert(false, "stale gun_data after flush"); + {gun_error, _, StreamRef, _} -> + ?assert(false, "stale gun_error after flush") + after 200 -> ok + end, + gen_server:stop(MgrPid) + end}. diff --git a/src/hb_gun_test_fake.erl b/src/hb_gun_test_fake.erl new file mode 100644 index 0000000000..15bcd450fe --- /dev/null +++ b/src/hb_gun_test_fake.erl @@ -0,0 +1,125 @@ +%%% @doc Scriptable fake-gun process for pool unit tests. +%%% +%%% Mimics gun's message protocol (gun_up, gun_down, gun_response, gun_data, +%%% gun_error, gun_notify) without real sockets. Each fake connection is a +%%% gen_server whose behaviour is controlled by a script supplied at start. +%%% +%%% Script is a list of actions executed in order when requests arrive: +%%% {reply, Status, Headers, Body} — respond fin immediately +%%% {reply_chunked, Status, Headers, [Chunk,...]} — nofin + fin chunks +%%% {error, Reason} — stream-level error +%%% conn_down — simulate gun_down +%%% {delay_up, Ms} — delay gun_up by Ms milliseconds +%%% connect_timeout — never send gun_up (simulates timeout) +%%% +%%% Usage: +%%% {ok, FakePid} = hb_gun_test_fake:open(Owner, Script) +%%% %% FakePid sends {gun_up, FakePid, http} to Owner automatically +%%% %% (unless script starts with delay_up/connect_timeout) +%%% +-module(hb_gun_test_fake). +-behaviour(gen_server). + +-include("include/hb.hrl"). + +-export([open/2, open/3, close/1, request/5, cancel/2, cancelled_refs/1]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-record(state, { + owner :: pid(), + protocol = http :: http | http2, + script = [] :: list(), + streams = #{} :: #{reference() => pid()}, + cancelled = [] :: [reference()] +}). + +%% @doc Open a fake gun connection. Sends {gun_up, Pid, Proto} to Owner +%% unless the first script entry is connect_timeout or {delay_up, Ms}. +open(Owner, Script) -> + open(Owner, Script, http). + +open(Owner, Script, Proto) -> + {ok, Pid} = gen_server:start_link(?MODULE, {Owner, Script, Proto}, []), + {ok, Pid}. + +close(Pid) -> + gen_server:stop(Pid, normal, 1000). + +%% @doc Mimic gun:request/5. Returns a StreamRef. +request(Pid, _Method, _Path, _Headers, _Body) -> + Ref = make_ref(), + gen_server:cast(Pid, {request, Ref, self()}), + Ref. + +%% @doc Mimic gun:cancel/2. Records the ref; owner can query via cancelled_refs/1. +cancel(Pid, Ref) -> + gen_server:cast(Pid, {cancel, Ref}). + +%% @doc Return list of all StreamRefs that were cancelled. +cancelled_refs(Pid) -> + gen_server:call(Pid, cancelled_refs). + +%%==================================================================== +init({Owner, Script, Proto}) -> + State = #state{owner = Owner, script = Script, protocol = Proto}, + case Script of + [connect_timeout | _] -> + {ok, State#state{script = tl(Script)}}; + [{delay_up, Ms} | Rest] -> + erlang:send_after(Ms, self(), send_gun_up), + {ok, State#state{script = Rest}}; + _ -> + Owner ! {gun_up, self(), Proto}, + {ok, State} + end. + +handle_call(cancelled_refs, _From, State = #state{cancelled = C}) -> + {reply, C, State}; +handle_call(Req, _From, State) -> + ?event(warning, {unhandled_call, {module, ?MODULE}, {request, Req}}), + {reply, ok, State}. + +handle_cast({request, Ref, CallerPid}, + State = #state{owner = Owner, script = Script, protocol = Proto}) -> + case Script of + [] -> + CallerPid ! {gun_error, self(), Ref, no_script}, + {noreply, State}; + [conn_down | Rest] -> + Owner ! {gun_down, self(), Proto, conn_down, []}, + {noreply, State#state{script = Rest}}; + [{reply, Status, Headers, Body} | Rest] -> + CallerPid ! {gun_response, self(), Ref, fin, Status, Headers}, + _ = Body, + {noreply, State#state{script = Rest}}; + [{reply_chunked, Status, Headers, Chunks} | Rest] -> + CallerPid ! {gun_response, self(), Ref, nofin, Status, Headers}, + send_chunks(CallerPid, self(), Ref, Chunks), + {noreply, State#state{script = Rest}}; + [{error, Reason} | Rest] -> + CallerPid ! {gun_error, self(), Ref, Reason}, + {noreply, State#state{script = Rest}} + end; + +handle_cast({cancel, Ref}, State = #state{cancelled = C}) -> + {noreply, State#state{cancelled = [Ref | C]}}; +handle_cast(Cast, State) -> + ?event(warning, {unhandled_cast, {module, ?MODULE}, {cast, Cast}}), + {noreply, State}. + +handle_info(send_gun_up, State = #state{owner = Owner, protocol = Proto}) -> + Owner ! {gun_up, self(), Proto}, + {noreply, State}; + +handle_info(Msg, State) -> + ?event(warning, {unhandled_info, {module, ?MODULE}, {message, Msg}}), + {noreply, State}. + +terminate(_Reason, _State) -> ok. + +send_chunks(_CallerPid, _GunPid, _Ref, []) -> ok; +send_chunks(CallerPid, GunPid, Ref, [Last]) -> + CallerPid ! {gun_data, GunPid, Ref, fin, Last}; +send_chunks(CallerPid, GunPid, Ref, [H | T]) -> + CallerPid ! {gun_data, GunPid, Ref, nofin, H}, + send_chunks(CallerPid, GunPid, Ref, T). diff --git a/src/hb_http_client.erl b/src/hb_http_client.erl index 17fea34093..a63956fdd7 100644 --- a/src/hb_http_client.erl +++ b/src/hb_http_client.erl @@ -59,8 +59,13 @@ request(Args, RemainingRetries, Opts) -> do_request(Args, Opts) -> case hb_opts:get(http_client, ?DEFAULT_HTTP_CLIENT, Opts) of - gun -> gun_req(Args, Opts); - httpc -> httpc_req(Args, Opts); + gun -> + UsePool = hb_opts:get(http_client_gun_use_pool, true, Opts), + case UsePool of + true -> gun_pool_req(Args, Opts); + false -> gun_req(Args, Opts) + end; + httpc -> httpc_req(Args, Opts); hackney -> hackney_req(Args, Opts) end. @@ -100,10 +105,10 @@ httpc_req(Args, Opts) -> ?event({httpc_req, Args}), case parse_peer(Peer, Opts) of {error, _} = Err -> Err; - {ok, {Host, Port}} -> - Scheme = case Port of - 443 -> "https"; - _ -> "http" + {ok, {Host, Port, Transport}} -> + Scheme = case Transport of + tls -> "https"; + tcp -> "http" end, ?event(debug_http_client, {httpc_req, {explicit, Args}}), URL = binary_to_list(iolist_to_binary([Scheme, "://", Host, ":", integer_to_binary(Port), Path])), @@ -175,10 +180,10 @@ hackney_req(Args, Opts) -> ?event({hackney_req, Args}), case parse_peer(Peer, Opts) of {error, _} = Err -> Err; - {ok, {Host, Port}} -> - Scheme = case Port of - 443 -> <<"https">>; - _ -> <<"http">> + {ok, {Host, Port, Transport}} -> + Scheme = case Transport of + tls -> <<"https">>; + tcp -> <<"http">> end, URL = < ), Response. +gun_pool_req(Args, Opts) -> + StartTime = os:system_time(native), + #{ path := Path, method := Method } = Args, + Headers = hb_maps:get(headers, Args, #{}, Opts), + Body = hb_maps:get(body, Args, <<>>, Opts), + case parse_peer(hb_maps:get(peer, Args, undefined), Opts) of + {error, _} = Err -> + Err; + {ok, {Host, Port, Transport}} -> + Authority = <<(hb_util:bin(Host))/binary, ":", (integer_to_binary(Port))/binary>>, + %% Scope: {Transport, Protocol, TlsOptsHash}. + %% ConnectTimeout and Keepalive are NOT in the scope — they configure + %% connections inside a pool, not the identity of the pool. Callers + %% with different tls_opts must NOT share connections. + DefaultProto = case hb_features:http3() of + true -> http3; false -> http2 + end, + TlsOptsHash = erlang:phash2( + hb_gun_pool_mgr:normalize_tls_opts( + hb_opts:get(tls_opts, [], Opts))), + Scope = {Transport, + hb_opts:get(protocol, DefaultProto, Opts), + TlsOptsHash}, + ConnInfo = #{host => hb_util:bin(Host), port => Port, + transport => Transport, opts => Opts}, + case hb_gun_pool:start_or_get_pool(Authority, Scope, ConnInfo) of + {error, pool_runtime_unavailable} -> + case hb_opts:get( + http_client_gun_allow_unpooled_fallback, + false, Opts) of + true -> + ?event(warning, + {gun_pool_runtime_unavailable_fallback, + {host, Host}, {port, Port}}), + gun_req(Args, Opts); + false -> + ?event(warning, + {gun_pool_runtime_unavailable, + {host, Host}, {port, Port}}), + {error, pool_runtime_unavailable} + end; + {error, _} = Err -> + Err; + {ok, MgrPid} -> + NormHeaders = normalize_gun_headers(Headers, Opts), + Response = do_gun_pool_request( + MgrPid, Args, NormHeaders, Body, Opts), + EndTime = os:system_time(native), + record_duration(#{ + <<"request-method">> => method_to_bin(Method), + <<"request-path">> => hb_util:bin(Path), + <<"status-class">> => get_status_class(Response), + <<"duration">> => EndTime - StartTime + }, + Opts + ), + record_response_status(Method, Response, Path), + Response + end + end. + +%% @doc Issue request to the juggler pool and await demuxed response. +%% The manager forwards gun messages with MgrPid as the pid, so passing +%% MgrPid to gun:await matches the {gun_*, MgrPid, StreamRef, …} shapes. +%% cancel_fn asks the manager to RST the stream and free its slot, then +%% drains stale messages from the caller mailbox. +do_gun_pool_request(MgrPid, Args, Headers, Body, Opts) -> + #{method := Method, path := Path} = Args, + case hb_gun_pool_mgr:request(MgrPid, Method, Path, Headers, Body, Opts) of + {error, _} = Err -> + Err; + {ok, MgrPid, StreamRef} -> + SendTimeout = hb_opts:get(http_client_send_timeout, 300_000, Opts), + Timer = inet:start_timer(SendTimeout), + ResponseArgs = #{ + pid => MgrPid, + stream_ref => StreamRef, + timer => Timer, + limit => hb_maps:get(limit, Args, infinity, Opts), + counter => 0, + acc => [], + method => Method, + path => Path, + start => os:system_time(microsecond), + is_peer_request => hb_maps:get(is_peer_request, Args, true, Opts), + cancel_fn => + fun(_, R) -> + hb_gun_pool_mgr:cancel_stream(MgrPid, R), + hb_gun_pool:flush_stream(R) + end + }, + Response = await_response(ResponseArgs, Opts), + inet:stop_timer(Timer), + Response + end. + +%% @doc Expand a header map into a proplist suitable for gun:request/5, +%% splitting any list-valued <<"cookie">> entry into one header per line +%% (gun does not collapse list values; pooled and non-pooled paths must +%% produce the same wire format). +normalize_gun_headers(HeaderMap, Opts) -> + HeadersWithoutCookie = + hb_maps:to_list( + hb_maps:without([<<"cookie">>], HeaderMap, Opts), + Opts + ), + CookieLines = + case hb_maps:get(<<"cookie">>, HeaderMap, [], Opts) of + BinCookie when is_binary(BinCookie) -> [BinCookie]; + Lines when is_list(Lines) -> Lines + end, + HeadersWithoutCookie + ++ [{<<"cookie">>, Line} || Line <- CookieLines]. + %% @doc Start the hackney connection pool with default settings. %% Overridden at runtime by setup_conn/1 once node config is available. init_hackney_pool() -> @@ -345,7 +464,7 @@ handle_info({gun_error, PID, Reason}, State) -> ?event(warning, {gun_connection_error, {pid, PID}, {reason, Reason}}), {noreply, State}; -handle_info({gun_down, PID, Protocol, Reason, _KilledStreams, _UnprocessedStreams}, State) -> +handle_info({gun_down, PID, Protocol, Reason, _KilledStreams}, State) -> ?event(warning, {gun_connection_down, {pid, PID}, {protocol, Protocol}, {reason, Reason}}), {noreply, State}; @@ -367,7 +486,7 @@ terminate(_Reason, _State) -> open_connection(#{ peer := Peer }, Opts) -> case parse_peer(Peer, Opts) of {error, _} = Err -> Err; - {ok, {Host, Port}} -> open_connection_gun(Host, Port, Peer, Opts) + {ok, {Host, Port, _Transport}} -> open_connection_gun(Host, Port, Peer, Opts) end. open_connection_gun(Host, Port, Peer, Opts) -> @@ -421,16 +540,17 @@ open_connection_gun(Host, Port, Peer, Opts) -> parse_peer(Peer, Opts) -> Parsed = uri_string:parse(Peer), case Parsed of - #{ host := Host, port := Port } -> - {ok, {hb_util:list(Host), Port}}; + #{ host := Host, port := Port } = URI -> + Scheme = maps:get(scheme, URI, <<>>), + Transport = case Scheme of <<"https">> -> tls; _ -> tcp end, + {ok, {hb_util:list(Host), Port, Transport}}; URI = #{ host := Host } -> - {ok, { - hb_util:list(Host), - case hb_maps:get(scheme, URI, undefined, Opts) of - <<"https">> -> 443; - _ -> hb_opts:get(port, 8734, Opts) - end - }}; + case hb_maps:get(scheme, URI, undefined, Opts) of + <<"https">> -> + {ok, {hb_util:list(Host), 443, tls}}; + _ -> + {ok, {hb_util:list(Host), hb_opts:get(port, 8734, Opts), tcp}} + end; _ -> {error, {bad_peer, Peer}} end. @@ -443,20 +563,7 @@ do_gun_request(PID, Args, Opts) -> Method = hb_maps:get(method, Args, undefined, Opts), Path = hb_maps:get(path, Args, undefined, Opts), HeaderMap = hb_maps:get(headers, Args, #{}, Opts), - % Normalize cookie header lines from the header map. We support both - % lists of cookie lines and a single cookie line. - HeadersWithoutCookie = - hb_maps:to_list( - hb_maps:without([<<"cookie">>], HeaderMap, Opts), - Opts - ), - CookieLines = - case hb_maps:get(<<"cookie">>, HeaderMap, [], Opts) of - BinCookieLine when is_binary(BinCookieLine) -> [BinCookieLine]; - CookieLinesList -> CookieLinesList - end, - CookieHeaders = [ {<<"cookie">>, CookieLine} || CookieLine <- CookieLines ], - Headers = HeadersWithoutCookie ++ CookieHeaders, + Headers = normalize_gun_headers(HeaderMap, Opts), Body = hb_maps:get(body, Args, <<>>, Opts), ?event( http_client, @@ -478,7 +585,8 @@ do_gun_request(PID, Args, Opts) -> counter => 0, acc => [], start => os:system_time(microsecond), - is_peer_request => hb_maps:get(is_peer_request, Args, true, Opts) + is_peer_request => hb_maps:get(is_peer_request, Args, true, Opts), + cancel_fn => fun(P, R) -> gun:cancel(P, R) end }, Response = await_response(hb_maps:merge(Args, ResponseArgs, Opts), Opts), record_response_status(Method, Response, Path, Opts), @@ -488,6 +596,7 @@ do_gun_request(PID, Args, Opts) -> await_response(Args, Opts) -> #{ pid := PID, stream_ref := Ref, timer := Timer, limit := Limit, counter := Counter, acc := Acc, method := Method, path := Path } = Args, + CancelFn = maps:get(cancel_fn, Args, fun(P, R) -> gun:cancel(P, R) end), case gun:await(PID, Ref, inet:timeout(Timer)) of {response, fin, Status, Headers} -> upload_metric(Args, Opts), @@ -528,12 +637,12 @@ await_response(Args, Opts) -> {error, timeout} = Response -> record_response_status(Method, Response, Path, Opts), ?event(http_outbound, {gun_cancel, {path, Path}}), - gun:cancel(PID, Ref), + CancelFn(PID, Ref), log(warning, gun_await_process_down, Args, timeout, Opts), Response; {error,{connection_error,{stream_closed, Message}}} = Response -> ?event(http_outbound, {gun_cancel, {path, Path}, {message, Message}}), - gun:cancel(PID, Ref), + CancelFn(PID, Ref), Response; {error, Reason} = Response when is_tuple(Reason) -> record_response_status(Method, Response, Path), @@ -546,7 +655,8 @@ await_response(Args, Opts) -> end. %% @doc Debug `http` state logging. -log(Type, Event, #{method := Method, peer := Peer, path := Path}, Reason, Opts) -> +log(Type, Event, #{method := Method, path := Path} = Args, Reason, Opts) -> + Peer = maps:get(peer, Args, undefined), ?event( Type, {gun_log, @@ -639,8 +749,13 @@ record_duration(Details, Opts) -> <<"status-class">>, <<"request-category">> ]), + DurationNative = maps:get(<<"duration">>, Details), + DurationSeconds = + erlang:convert_time_unit( + DurationNative, native, microsecond) + / 1_000_000, hb_prometheus:observe( - maps:get(<<"duration">>, Details), + DurationSeconds, http_client_duration_seconds, Labels ); @@ -739,6 +854,8 @@ get_status_class({ok, {{Status, _}, _, _, _, _}}) -> get_status_class(Status); get_status_class({ok, Status, _RespondeHeaders, _Body}) -> get_status_class(Status); +get_status_class({error, no_connection_available}) -> + <<"pool-exhausted">>; get_status_class({error, closed}) -> <<"closed">>; get_status_class({error, checkout_timeout}) -> diff --git a/src/hb_http_client_tests.erl b/src/hb_http_client_tests.erl index 51a52fe1b1..fa1bbee70e 100644 --- a/src/hb_http_client_tests.erl +++ b/src/hb_http_client_tests.erl @@ -3,6 +3,11 @@ -include("include/hb_http_client.hrl"). -include_lib("eunit/include/eunit.hrl"). +%%==================================================================== +%% Hackney tests (live arweave.net — consistent with other modules in +%% the suite; if arweave.net is unreachable these legitimately fail) +%%==================================================================== + hackney_basic_request_test_() -> {timeout, 30, fun() -> application:ensure_all_started(hb), @@ -56,6 +61,389 @@ hackney_post_test_() -> ?assertMatch({ok, _, _, _}, Result) end}. +%%==================================================================== +%% Gun pool tests — juggler model +%%==================================================================== + +%% Queue fills up when pool cannot connect (port 1); the +%% (MaxQueued+1)th request from the same caller gets rejected. +gun_pool_backpressure_no_workers_test_() -> + {timeout, 15, fun() -> + gun_pool_ensure_sup(), + {ok, MgrPid} = hb_gun_pool:start_or_get_pool( + <<"localhost:1">>, no_workers_scope, + #{host => <<"localhost">>, port => 1}), + MaxQueued = 8, + %% Use a long enough timeout so the gen_server:call doesn't time out + %% while the request is queued; the test is about queue-cap rejection, + %% not connect speed. Calls return {error, shutdown} when MgrPid is + %% stopped — we ignore their results here. + MgrRef = erlang:monitor(process, MgrPid), + [catch gen_server:call(MgrPid, {request, gun_pool_base_req(<<"/">>), self()}, + 5000) + || _ <- lists:seq(1, MaxQueued)], + ?assertEqual( + {error, no_connection_available}, + gen_server:call(MgrPid, + {request, gun_pool_base_req(<<"/">>), self()}, 5000)), + hb_gun_pool:stop(MgrPid), + receive {'DOWN', MgrRef, process, MgrPid, _} -> ok after 2000 -> ok end + end}. + +gun_pool_basic_request_test_() -> + {timeout, 30, fun() -> + gun_pool_with_mock_server( + [{"/ping", ping, {200, <<"pong">>}}], + fun(Port) -> + {ok, 200, _, _} = gun_pool_request(Port, <<"/ping">>) + end) + end}. + +gun_pool_synchronous_stop_releases_sockets_test_() -> + {timeout, 30, fun() -> + gun_pool_with_mock_server( + [{"/ping", ping, {200, <<"pong">>}}], + fun(Port) -> + MgrPid = gun_pool_start_for(Port), + Before = erlang:system_info(port_count), + _ = do_pool_request(MgrPid, <<"/ping">>, + #{http_client_connect_timeout => 10000}), + hb_gun_pool:stop(MgrPid), + hb_util:wait_until(fun() -> + erlang:system_info(port_count) =< Before + 2 + end, 2000), + ?assert(erlang:system_info(port_count) =< Before + 2) + end) + end}. + +gun_pool_caller_down_cancels_streams_test_() -> + {timeout, 30, fun() -> + gun_pool_with_mock_server( + [{"/ping", ping, {200, <<"pong">>}}], + fun(Port) -> + MgrPid = gun_pool_start_for(Port), + CallerPid = spawn(fun() -> + gen_server:call( + MgrPid, {request, gun_pool_base_req(<<"/ping">>), self()}, 5000) + end), + hb_util:wait_until(fun() -> + #{inflight := I, queued := Q} = + gen_server:call(MgrPid, pool_info), + I + Q >= 1 + end, 2000), + Ref = erlang:monitor(process, CallerPid), + exit(CallerPid, kill), + receive {'DOWN', Ref, process, CallerPid, _} -> ok + after 2000 -> error(caller_did_not_die) end, + hb_util:wait_until(fun() -> + #{inflight := I, queued := Q} = + gen_server:call(MgrPid, pool_info), + I =:= 0 andalso Q =:= 0 + end, 2000), + ?assert(is_map(gen_server:call(MgrPid, pool_info))) + end) + end}. + +gun_pool_registry_idempotent_test_() -> + {timeout, 15, fun() -> + gun_pool_with_mock_server( + [{"/ping", ping, {200, <<"ok">>}}], + fun(Port) -> + Authority = iolist_to_binary(["localhost:", integer_to_list(Port)]), + ConnInfo = #{host => <<"localhost">>, port => Port}, + {ok, Pid1} = hb_gun_pool:start_or_get_pool(Authority, reg_test, ConnInfo), + {ok, Pid2} = hb_gun_pool:start_or_get_pool(Authority, reg_test, ConnInfo), + ?assertEqual(Pid1, Pid2) + end) + end}. + +gun_pool_distinct_scopes_test_() -> + {timeout, 15, fun() -> + gun_pool_with_mock_server( + [{"/ping", ping, {200, <<"ok">>}}], + fun(Port) -> + Authority = iolist_to_binary(["localhost:", integer_to_list(Port)]), + ConnInfo = #{host => <<"localhost">>, port => Port}, + {ok, Pid1} = hb_gun_pool:start_or_get_pool(Authority, scope_a, ConnInfo), + {ok, Pid2} = hb_gun_pool:start_or_get_pool(Authority, scope_b, ConnInfo), + ?assertNotEqual(Pid1, Pid2), + ?assertMatch([_], ets:lookup(hb_gun_pool_registry, {Authority, scope_a})), + ?assertMatch([_], ets:lookup(hb_gun_pool_registry, {Authority, scope_b})) + end) + end}. + +%% Idle eviction: pool self-terminates after two consecutive idle polls. +gun_pool_idle_eviction_test_() -> + {timeout, 30, fun() -> + gun_pool_with_mock_server( + [{"/ping", ping, {200, <<"pong">>}}], + fun(Port) -> + Authority = iolist_to_binary(["localhost:", integer_to_list(Port)]), + ConnInfo = #{host => <<"localhost">>, port => Port, + transport => tcp, + opts => #{gun_pool_idle_poll_ms => 100}}, + {ok, MgrPid} = hb_gun_pool:start_or_get_pool( + Authority, idle_eviction_scope, ConnInfo), + MgrRef = erlang:monitor(process, MgrPid), + receive + {'DOWN', MgrRef, process, MgrPid, normal} -> ok + after 800 -> + ?assert(false, "manager did not self-terminate via idle eviction") + end, + ?assertEqual([], ets:lookup(hb_gun_pool_registry, + {Authority, idle_eviction_scope})) + end) + end}. + +%% Connection error surfaces as {error, _} and manager survives. +gun_pool_connection_error_surfaces_test_() -> + {timeout, 15, fun() -> + gun_pool_ensure_sup(), + ConnInfo = #{host => <<"localhost">>, port => 1, + opts => #{http_client_connect_timeout => 500}}, + {ok, MgrPid} = hb_gun_pool:start_or_get_pool( + <<"localhost:1">>, conn_err_scope, ConnInfo), + %% connect_timeout=500ms: after MAX_CONNECT_FAILURES rounds the + %% manager trips cooldown and fail_queued_requests replies to queued + %% callers with {error, no_connection_available}. Use a call timeout + %% longer than the connect-failure window (~2s for h2 with 2 sockets). + Result = do_pool_request(MgrPid, <<"/">>, + #{http_client_connect_timeout => 500, + http_client_send_timeout => 10000}), + ?assertMatch({error, _}, Result), + ?assert(is_process_alive(MgrPid)), + hb_gun_pool:stop_all() + end}. + +%% tls_opts divergence: two callers with different tls_opts must get +%% separate pool managers (different TlsOptsHash in scope). +gun_pool_tls_scope_isolation_test_() -> + {timeout, 10, fun() -> + gun_pool_ensure_sup(), + Authority = <<"localhost:8443">>, + ConnInfo1 = #{host => <<"localhost">>, port => 8443, transport => tls, + opts => #{tls_opts => [{verify, verify_peer}]}}, + ConnInfo2 = #{host => <<"localhost">>, port => 8443, transport => tls, + opts => #{tls_opts => [{verify, verify_none}]}}, + TlsHash1 = erlang:phash2(hb_gun_pool_mgr:normalize_tls_opts([{verify, verify_peer}])), + TlsHash2 = erlang:phash2(hb_gun_pool_mgr:normalize_tls_opts([{verify, verify_none}])), + Scope1 = {tls, http2, TlsHash1}, + Scope2 = {tls, http2, TlsHash2}, + {ok, Pid1} = hb_gun_pool:start_or_get_pool(Authority, Scope1, ConnInfo1), + {ok, Pid2} = hb_gun_pool:start_or_get_pool(Authority, Scope2, ConnInfo2), + ?assertNotEqual(Pid1, Pid2), + hb_gun_pool:stop_all() + end}. + +%% Flush stream: selective drain of demuxed messages. +gun_pool_flush_stream_test_() -> + {timeout, 10, fun() -> + gun_pool_ensure_sup(), + Ref = make_ref(), + Ref2 = make_ref(), + FakePid = self(), + self() ! {gun_response, FakePid, Ref, fin, 200, []}, + self() ! {gun_data, FakePid, Ref, fin, <<"chunk">>}, + self() ! {gun_error, FakePid, Ref, some_reason}, + self() ! {gun_response, FakePid, Ref2, fin, 200, []}, + hb_gun_pool:flush_stream(Ref), + receive + {gun_response, _, Ref, _, _, _} -> + ?assert(false, "stale gun_response for Ref after flush"); + {gun_data, _, Ref, _, _} -> + ?assert(false, "stale gun_data for Ref after flush"); + {gun_error, _, Ref, _} -> + ?assert(false, "stale gun_error for Ref after flush") + after 0 -> ok + end, + receive {gun_response, _, Ref2, _, _, _} -> ok + after 0 -> ?assert(false, "Ref2 message was incorrectly drained") + end, + hb_gun_pool:stop_all() + end}. + +%% End-to-end: scheme determines transport (tcp vs tls), not port. +gun_pool_scheme_transport_e2e_test_() -> + {timeout, 15, fun() -> + gun_pool_with_mock_server( + [{"/ping", ping, {200, <<"ok">>}}], + fun(Port) -> + Authority = iolist_to_binary( + ["localhost:", integer_to_list(Port)]), + Opts = #{http_client => gun, http_client_gun_use_pool => true, + http_retry => 0}, + Args = #{peer => iolist_to_binary( + ["http://localhost:", integer_to_list(Port)]), + path => <<"/ping">>, method => <<"GET">>, + headers => #{}, body => <<>>}, + ?assertMatch({ok, 200, _, _}, + hb_http_client:request(Args, Opts)), + [{_, MgrPid} | _] = + ets:match_object(hb_gun_pool_registry, {{Authority, '_'}, '_'}), + ?assertMatch(#{}, gen_server:call(MgrPid, pool_info)) + end) + end}. + +%% Backpressure via hb_http_client when pool is cooling down. +gun_pool_no_connection_available_fast_fail_test_() -> + {timeout, 15, fun() -> + gun_pool_ensure_sup(), + {ok, MgrPid} = hb_gun_pool:start_or_get_pool( + <<"localhost:1">>, fast_fail_scope, + #{host => <<"localhost">>, port => 1}), + MaxQueued = 8, + [catch gen_server:call(MgrPid, {request, gun_pool_base_req(<<"/">>), self()}, + 5000) + || _ <- lists:seq(1, MaxQueued)], + R = gen_server:call(MgrPid, + {request, gun_pool_base_req(<<"/">>), self()}, 5000), + ?assertEqual({error, no_connection_available}, R), + hb_gun_pool:stop_all() + end}. + +%% Concurrent requests: multiple callers to same pool all complete. +gun_pool_concurrent_requests_test_() -> + {timeout, 30, fun() -> + gun_pool_with_mock_server( + [{"/ping", ping, {200, <<"pong">>}}], + fun(Port) -> + MgrPid = gun_pool_start_for(Port), + Parent = self(), + N = 5, + [spawn(fun() -> + R = do_pool_request(MgrPid, <<"/ping">>, + #{http_client_connect_timeout => 10000}), + Parent ! {result, R} + end) || _ <- lists:seq(1, N)], + Results = [receive {result, R} -> R after 15000 -> timeout end + || _ <- lists:seq(1, N)], + ?assertEqual(0, length([x || timeout <- Results])), + ?assert(lists:all(fun({ok, 200, _, _}) -> true; (_) -> false end, + Results)) + end) + end}. + +%%==================================================================== +%% Helpers +%%==================================================================== + +gun_pool_ensure_sup() -> + application:ensure_all_started(gun), + application:ensure_all_started(cowboy), + case whereis(hb_gun_pool) of + undefined -> {ok, _} = hb_gun_pool:start_link(); + _ -> ok + end. + +gun_pool_with_mock_server(Endpoints, Fun) -> + gun_pool_ensure_sup(), + {ok, _URL, Handle = {_, ListenerID}} = hb_mock_server:start(Endpoints), + Port = ranch:get_port(ListenerID), + try Fun(Port) + after + hb_gun_pool:stop_all(), + hb_mock_server:stop(Handle) + end. + +gun_pool_start_for(Port) -> + Authority = iolist_to_binary(["localhost:", integer_to_list(Port)]), + {ok, MgrPid} = hb_gun_pool:start_or_get_pool( + Authority, default, #{host => <<"localhost">>, port => Port}), + MgrPid. + +%% Issue one request through the juggler and collect the demuxed response. +%% The gen_server:call timeout is the send timeout (how long we wait for the +%% manager to hand us a StreamRef), not the connect timeout. +do_pool_request(MgrPid, Path, Opts) -> + ReqArgs = gun_pool_base_req(Path), + CallTimeout = hb_opts:get(http_client_send_timeout, 10000, Opts), + case gen_server:call(MgrPid, {request, ReqArgs, self()}, CallTimeout) of + {error, _} = Err -> + Err; + {ok, StreamRef} -> + RecvTimeout = hb_opts:get(http_client_send_timeout, 10000, Opts), + collect_demuxed(MgrPid, StreamRef, RecvTimeout, <<>>, undefined, undefined) + end. + +collect_demuxed(MgrPid, StreamRef, Timeout, Acc, Status, Headers) -> + receive + {gun_response, MgrPid, StreamRef, fin, S, H} -> + {ok, S, H, <<>>}; + {gun_response, MgrPid, StreamRef, nofin, S, H} -> + collect_demuxed(MgrPid, StreamRef, Timeout, Acc, S, H); + {gun_data, MgrPid, StreamRef, fin, Data} -> + {ok, Status, Headers, iolist_to_binary([Acc, Data])}; + {gun_data, MgrPid, StreamRef, nofin, Data} -> + collect_demuxed(MgrPid, StreamRef, Timeout, + iolist_to_binary([Acc, Data]), Status, Headers); + {gun_trailers, MgrPid, StreamRef, _} -> + {ok, Status, Headers, Acc}; + {gun_error, MgrPid, StreamRef, Reason} -> + {error, Reason} + after Timeout -> + {error, timeout} + end. + +%% Helper: start pool and send a request end-to-end via hb_http_client. +gun_pool_request(Port, Path) -> + Opts = #{http_client => gun, + http_client_gun_use_pool => true, + http_retry => 0, + http_client_connect_timeout => 10000}, + URL = iolist_to_binary(["http://localhost:", integer_to_list(Port)]), + Args = #{peer => URL, path => Path, + method => <<"GET">>, headers => #{}, body => <<>>}, + hb_http_client:request(Args, Opts). + +%%==================================================================== +%% TLS scope normalization tests +%%==================================================================== + +%% Same opts in different order produce the same hash. +tls_scope_normalization_order_test_() -> + {timeout, 5, fun() -> + Opts1 = [{verify, verify_peer}, {cacertfile, "/etc/ssl/certs/ca.pem"}], + Opts2 = [{cacertfile, "/etc/ssl/certs/ca.pem"}, {verify, verify_peer}], + H1 = erlang:phash2(hb_gun_pool_mgr:normalize_tls_opts(Opts1)), + H2 = erlang:phash2(hb_gun_pool_mgr:normalize_tls_opts(Opts2)), + ?assertEqual(H1, H2) + end}. + +%% Different cacertfile produces a different hash. +tls_scope_normalization_cacertfile_test_() -> + {timeout, 5, fun() -> + Opts1 = [{cacertfile, "/etc/ssl/certs/ca1.pem"}], + Opts2 = [{cacertfile, "/etc/ssl/certs/ca2.pem"}], + H1 = erlang:phash2(hb_gun_pool_mgr:normalize_tls_opts(Opts1)), + H2 = erlang:phash2(hb_gun_pool_mgr:normalize_tls_opts(Opts2)), + ?assertNotEqual(H1, H2) + end}. + +%% Different SNI produces a different hash. +tls_scope_normalization_sni_test_() -> + {timeout, 5, fun() -> + Opts1 = [{server_name_indication, "host-a.example.com"}], + Opts2 = [{server_name_indication, "host-b.example.com"}], + H1 = erlang:phash2(hb_gun_pool_mgr:normalize_tls_opts(Opts1)), + H2 = erlang:phash2(hb_gun_pool_mgr:normalize_tls_opts(Opts2)), + ?assertNotEqual(H1, H2) + end}. + +%% Cipher order is preserved (server-preference matters). +tls_scope_normalization_cipher_order_test_() -> + {timeout, 5, fun() -> + C1 = "ECDHE-RSA-AES256-GCM-SHA384", + C2 = "ECDHE-RSA-AES128-GCM-SHA256", + Opts1 = [{ciphers, [C1, C2]}], + Opts2 = [{ciphers, [C2, C1]}], + H1 = erlang:phash2(hb_gun_pool_mgr:normalize_tls_opts(Opts1)), + H2 = erlang:phash2(hb_gun_pool_mgr:normalize_tls_opts(Opts2)), + ?assertNotEqual(H1, H2) + end}. + +gun_pool_base_req(Path) -> + #{path => Path, method => <<"GET">>, headers => #{}, body => <<>>}. + summarize({caught, C, R}) when is_tuple(R) -> {caught, C, element(1, R)}; summarize({caught, C, R}) -> diff --git a/src/hb_metrics_collector.erl b/src/hb_metrics_collector.erl index 2a9d9783fd..5e848ba122 100644 --- a/src/hb_metrics_collector.erl +++ b/src/hb_metrics_collector.erl @@ -8,6 +8,7 @@ ] ). -behaviour(prometheus_collector). +-include("include/hb.hrl"). -include("include/hb_http_client.hrl"). %%==================================================================== %% Collector API @@ -60,6 +61,29 @@ collect_mf(_Registry, Callback) -> ) ), + {GunUp, GunInflight, GunQueued} = gun_pool_stats(), + Callback( + create_gauge( + hb_gun_pool_workers_up, + "Gun pool workers currently in the up state across all pools", + GunUp + ) + ), + Callback( + create_gauge( + hb_gun_pool_inflight, + "Total in-flight gun requests across all pools", + GunInflight + ) + ), + Callback( + create_gauge( + hb_gun_pool_queued, + "Total queued requests across all gun pools", + GunQueued + ) + ), + ok. collect_metrics(system_load, SystemLoad) -> %% Return the gauge metric with no labels @@ -76,6 +100,12 @@ collect_metrics(hackney_pool_in_use, Value) -> collect_metrics(hackney_pool_free, Value) -> prometheus_model_helpers:gauge_metrics([{[], Value}]); collect_metrics(hackney_pool_queue, Value) -> + prometheus_model_helpers:gauge_metrics([{[], Value}]); +collect_metrics(hb_gun_pool_workers_up, Value) -> + prometheus_model_helpers:gauge_metrics([{[], Value}]); +collect_metrics(hb_gun_pool_inflight, Value) -> + prometheus_model_helpers:gauge_metrics([{[], Value}]); +collect_metrics(hb_gun_pool_queued, Value) -> prometheus_model_helpers:gauge_metrics([{[], Value}]). %%==================================================================== @@ -116,5 +146,37 @@ hackney_pool_stats() -> catch _:_ -> {0, 0, 0} end. +%% @doc Sum workers_up, inflight, and queued across all gun pools at scrape time. +gun_pool_stats() -> + %% Dead managers (stale ETS row during pool shutdown) or slow managers + %% (busy draining a queue) must contribute zero rather than crash the + %% scrape. Empirically observed classes: exit:{normal,_} on exited pid, + %% exit:{noproc,_} on never-registered name, exit:{timeout,_} on busy + %% mailbox, error:badarg on missing ETS table. + try ets:tab2list(hb_gun_pool_registry) of + Entries -> + lists:foldl( + fun({_Key, MgrPid}, {Up0, In0, Qd0}) when is_pid(MgrPid) -> + try gen_server:call(MgrPid, pool_info, 1000) of + #{workers_up := U, inflight := I, queued := Q} -> + {Up0 + U, In0 + I, Qd0 + Q} + catch + exit:{normal, _} -> {Up0, In0, Qd0}; + exit:{noproc, _} -> {Up0, In0, Qd0}; + exit:{timeout, _} -> + ?event(debug_http_client, + {pool_info_scrape_timeout, {mgr, MgrPid}}), + {Up0, In0, Qd0} + end; + (_PendingEntry, Acc) -> + %% {Key, {pending, Claimer}} appears transiently during + %% pool cold-start; it is not a manager yet. + Acc + end, + {0, 0, 0}, + Entries) + catch error:badarg -> {0, 0, 0} + end. + create_gauge(Name, Help, Data) -> - prometheus_model_helpers:create_mf(Name, Help, gauge, ?MODULE, Data). \ No newline at end of file + prometheus_model_helpers:create_mf(Name, Help, gauge, ?MODULE, Data). diff --git a/src/hb_opts.erl b/src/hb_opts.erl index 3069af75d2..43eedb0cd3 100644 --- a/src/hb_opts.erl +++ b/src/hb_opts.erl @@ -189,6 +189,28 @@ raw_default_message() -> %% What HTTP client should the node use? %% Options: gun, httpc, hackney <<"http-client">> => ?DEFAULT_HTTP_CLIENT, + %% Maximum gun connections per authority for h1. h1 is one stream per + %% connection, so this is the concurrency cap for h1 peers. Set close + %% to the prior unpooled-gun concurrency floor so external-network + %% test suites don't cascade-fail when one peer stream is reset. + <<"http-client-gun-max-sockets-h1">> => 64, + %% Maximum gun connections per authority for h2. h2 multiplexes up to + %% ~100 streams per connection, so a small cap is sufficient in + %% steady state — but under concurrent cancel/peer-reset churn a + %% single connection GOAWAY cascades to every stream sharing it, so + %% we keep the cap high enough that blast radius stays bounded on + %% high-parallelism test suites. + <<"http-client-gun-max-sockets-h2">> => 16, + %% Maximum queued requests per caller before backpressure kicks in. + <<"http-client-gun-max-queued-per-caller">> => 8, + %% Ceiling on gen_server:call to a pool manager. Bounds how long a + %% request blocks while queued behind a pending conn or full pool. + <<"http-client-gun-call-timeout">> => 10_000, + %% If the pool supervisor is not running (no hb_sup), whether to fall + %% back to an unpooled gun:open per request. Default is false: opening + %% connections outside the pool defeats the cap, so the fallback is + %% opt-in for standalone tools/tests that cannot start hb_sup. + <<"http-client-gun-allow-unpooled-fallback">> => false, %% Scheduling mode: Determines when the SU should inform the recipient %% that an assignment has been scheduled for a message. %% Options: aggressive(!), local_confirmation, remote_confirmation, diff --git a/src/hb_prometheus.erl b/src/hb_prometheus.erl index f4917da774..de7c8f82d4 100644 --- a/src/hb_prometheus.erl +++ b/src/hb_prometheus.erl @@ -75,9 +75,9 @@ measure_and_report(Fun, Metric, Labels) when is_function(Fun) -> observe(DurationNative, Metric, Labels) end. -observe(Duration, Metric) when is_integer(Duration) -> +observe(Duration, Metric) when is_number(Duration) -> observe(Duration, Metric, []). -observe(Duration, Metric, Labels) when is_integer(Duration) -> +observe(Duration, Metric, Labels) when is_number(Duration) -> case ensure_started() of ok -> try prometheus_histogram:observe(Metric, Labels, Duration) diff --git a/src/hb_store_arweave.erl b/src/hb_store_arweave.erl index b62d403172..d71f3d17ab 100644 --- a/src/hb_store_arweave.erl +++ b/src/hb_store_arweave.erl @@ -212,7 +212,7 @@ load_item(ExpectedID, StartOffset, Length, Opts) -> load_tx(ID, StartOffset, Length, Opts) -> hb_prometheus:measure_and_report( fun() -> - {ok, StructuredTXHeader} = hb_ao:resolve( + case hb_ao:resolve( #{ <<"device">> => <<"arweave@2.9">> }, #{ <<"path">> => <<"tx">>, @@ -220,39 +220,52 @@ load_tx(ID, StartOffset, Length, Opts) -> <<"exclude-data">> => true }, Opts - ), - TXHeader = - hb_message:convert( - StructuredTXHeader, - <<"tx@1.0">>, - <<"structured@1.0">>, - Opts - ), - case Length of - 0 -> - {ok, hb_message:convert( - TXHeader, - <<"structured@1.0">>, - <<"tx@1.0">>, - Opts)}; - _ -> - case read_chunks(StartOffset, Length, Opts) of - {ok, Data} -> - {ok, hb_message:convert( - TXHeader#tx{data = Data}, - <<"structured@1.0">>, - <<"tx@1.0">>, - Opts - )}; - {error, Reason} -> - {error, Reason} - end + ) of + {ok, StructuredTXHeader} -> + load_tx_with_header( + StructuredTXHeader, StartOffset, Length, Opts); + {error, _} = HeaderErr -> + %% Stale offset index (reorg, forgotten peer, etc.) — + %% do_read's outer case matches only {ok, _} / {error, _}; + %% normalize bare not_found to that shape so the caller's + %% "not_found" branch fires instead of a case_clause crash. + HeaderErr; + not_found -> + {error, not_found} end end, hb_store_arweave_chunk_fetch_duration_seconds, [load_tx] ). +load_tx_with_header(StructuredTXHeader, StartOffset, Length, Opts) -> + TXHeader = + hb_message:convert( + StructuredTXHeader, + <<"tx@1.0">>, + <<"structured@1.0">>, + Opts + ), + case Length of + 0 -> + {ok, hb_message:convert( + TXHeader, + <<"structured@1.0">>, + <<"tx@1.0">>, + Opts)}; + _ -> + case read_chunks(StartOffset, Length, Opts) of + {ok, Data} -> + {ok, hb_message:convert( + TXHeader#tx{data = Data}, + <<"structured@1.0">>, + <<"tx@1.0">>, + Opts)}; + {error, Reason} -> + {error, Reason} + end + end. + %% @doc Read the chunks from the given start offset and length using the %% `~arweave@2.9` device. read_chunks(StartOffset, Length, Opts) -> diff --git a/src/hb_store_arweave_offset.erl b/src/hb_store_arweave_offset.erl index 7b4d5a9144..1b5837d13f 100644 --- a/src/hb_store_arweave_offset.erl +++ b/src/hb_store_arweave_offset.erl @@ -47,7 +47,7 @@ path(ID) -> throw({cannot_encode_path, ID}). encode(Type, StartOffset, Length) when (Type == true orelse Type == false orelse is_binary(Type)) - andalso ?IN_BIT_RANGE(StartOffset, ?OFFSET_SZ*8) + andalso ?IN_BIT_RANGE(StartOffset, ?OFFSET_SZ) andalso is_integer(Length) andalso Length >= 0 -> << diff --git a/src/hb_sup.erl b/src/hb_sup.erl index 28b0176ebd..7e58f6c95f 100644 --- a/src/hb_sup.erl +++ b/src/hb_sup.erl @@ -32,7 +32,16 @@ init(Opts) -> type => worker, modules => [hb_http_client] }, - {ok, {SupFlags, [GunChild | StoreChildren]}}. + GunPoolSupChild = + #{ + id => hb_gun_pool, + start => {hb_gun_pool, start_link, []}, + restart => permanent, + shutdown => 15000, + type => supervisor, + modules => [hb_gun_pool] + }, + {ok, {SupFlags, [GunChild, GunPoolSupChild | StoreChildren]}}. %% @doc Generate a child spec for stores in the given Opts. store_children(Store) when not is_list(Store) -> diff --git a/src/include/hb_opts.hrl b/src/include/hb_opts.hrl index b150615136..a0eb30283b 100644 --- a/src/include/hb_opts.hrl +++ b/src/include/hb_opts.hrl @@ -1,2 +1,2 @@ --define(DEFAULT_HTTP_CLIENT, hackney). +-define(DEFAULT_HTTP_CLIENT, gun).