Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/core/http/hb_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -788,9 +788,8 @@ encode_reply(Status, TABMReq, Message, Opts) ->
%% 1. If the `content-type' field is present in the response message, we always
%% use `httpsig@1.0', as the device is expected to have already encoded the
%% message and the `body' field.
%% 2. The `accept-codec' field in the original request.
%% 3. The `accept' field in the original request.
%% 4. The default codec
%% 2. The `accept' field in the original request.
%% 3. The default codec
%% Options can be specified in mime-type format (`application/*') or in
%% AO device format (`device@1.0').
accept_to_codec(OriginalReq, Opts) ->
Expand All @@ -803,7 +802,7 @@ accept_to_codec(OriginalReq, Reply = #{ <<"content-type">> := Link }, Opts) when
Reply#{ <<"content-type">> => hb_cache:ensure_loaded(Link, Opts) },
Opts
);
accept_to_codec(_OriginalReq, #{ <<"content-type">> := CT }, _Opts) ->
accept_to_codec(_OriginalReq, #{ <<"content-type">> := _CT }, _Opts) ->
<<"httpsig@1.0">>;
accept_to_codec(OriginalReq, _, Opts) ->
Accept = hb_maps:get(<<"accept">>, OriginalReq, <<"*/*">>, Opts),
Expand Down
5 changes: 4 additions & 1 deletion src/preloaded/arweave/dev_arweave.erl
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,10 @@ to_message(Path = <<"/block/", _/binary>>, <<"GET">>, {ok, #{ <<"body">> := Body
hb_message:convert(
Body,
<<"structured@1.0">>,
<<"json@1.0">>,
#{
<<"device">> => <<"json@1.0">>,
<<"bundle">> => true
},
Opts
),
CacheRes =
Expand Down
24 changes: 10 additions & 14 deletions src/preloaded/codec/dev_json.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,16 @@ from(JSON, Req, Opts) ->
ConvOpts
),
?event(debug_json, {structured, Structured}, Opts),
case hb_maps:get(<<"accept-codec">>, Req, undefined, Opts) of
<<"structured@1.0">> -> {ok, Structured};
_ ->
% Re-encode the structured message back to TABM for the caller.
TABM =
hb_message:convert(
Structured,
tabm,
Req#{ <<"device">> => <<"structured@1.0">> },
ConvOpts
),
?event(debug_json, {tabm, TABM}, Opts),
{ok, TABM}
end.
% Re-encode the structured message back to TABM for the caller.
TABM =
hb_message:convert(
Structured,
tabm,
Req#{ <<"device">> => <<"structured@1.0">> },
ConvOpts
),
?event(debug_json, {tabm, TABM}, Opts),
{ok, TABM}.

%% @doc Route commitments through `httpsig@1.0'.
commit(Msg, Req, Opts) ->
Expand Down
43 changes: 31 additions & 12 deletions src/preloaded/query/dev_copycat_arweave.erl
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ process_block(BlockRes, Current, To, Opts) ->
TotalTXs = maps:get(total_txs, Results, 0),
BundleTXs = maps:get(bundle_count, Results, 0),
SkippedTXs = maps:get(skipped_count, Results, 0),
SkippedReasons = maps:get(skipped_reasons, Results, #{}),
?event(
copycat_short,
{arweave_block_indexed,
Expand All @@ -242,6 +243,7 @@ process_block(BlockRes, Current, To, Opts) ->
{total_txs, TotalTXs},
{bundle_txs, BundleTXs},
{skipped_txs, SkippedTXs},
{skipped_reasons, SkippedReasons},
{target, To}
}
)
Expand Down Expand Up @@ -302,7 +304,7 @@ parallel_map(Items, Fun, Opts) ->
hb_pmap:parallel_map(Items, Fun, MaxWorkers).

%% @doc Process a single transaction and return its contribution to the counters.
%% Returns a map with keys: items_count, bundle_count, skipped_count
%% Returns a map with keys: items_count, bundle_count, skipped_count.
process_tx({{padding, _PaddingRoot}, _EndOffset}, _BlockStartOffset, _Opts) ->
#{items_count => 0, bundle_count => 0, skipped_count => 0};
process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) ->
Expand Down Expand Up @@ -367,22 +369,20 @@ process_tx({{TX, _TXDataRoot}, EndOffset}, BlockStartOffset, Opts) ->
record_event_metrics(<<"item_indexed">>, ItemsCount, TotalTime),
#{items_count => ItemsCount, bundle_count => 1, skipped_count => 0};
{error, Reason} ->
?event(
copycat_short,
{arweave_bundle_skipped,
{tx_id, {explicit, TXID}},
{reason, Reason}
}
),
#{items_count => 0, bundle_count => 1, skipped_count => 1}
#{
items_count => 0,
bundle_count => 1,
skipped_count => 1,
skipped_reasons => #{Reason => 1}
}
end
end.

%% @doc Process transactions: spawn workers and manage the worker pool.
%% This function processes transactions in parallel using parallel_map.
%% When arweave_index_workers <= 1, processes sequentially (one worker at a time).
%% When arweave_index_workers > 1, processes in parallel with the specified concurrency limit.
%% Returns a map with keys: items_count, bundle_count, skipped_count.
%% Returns counters for indexed items, bundle TXs, skipped TXs and skip reasons.
process_txs(ValidTXs, BlockStartOffset, Opts) ->
Results = parallel_map(
ValidTXs,
Expand All @@ -394,13 +394,32 @@ process_txs(ValidTXs, BlockStartOffset, Opts) ->
#{
items_count => maps:get(items_count, Result, 0) + maps:get(items_count, Acc, 0),
bundle_count => maps:get(bundle_count, Result, 0) + maps:get(bundle_count, Acc, 0),
skipped_count => maps:get(skipped_count, Result, 0) + maps:get(skipped_count, Acc, 0)
skipped_count => maps:get(skipped_count, Result, 0) + maps:get(skipped_count, Acc, 0),
skipped_reasons =>
merge_skipped_reasons(
maps:get(skipped_reasons, Result, #{}),
maps:get(skipped_reasons, Acc, #{})
)
}
end,
#{items_count => 0, bundle_count => 0, skipped_count => 0},
#{
items_count => 0,
bundle_count => 0,
skipped_count => 0,
skipped_reasons => #{}
},
Results
).

merge_skipped_reasons(New, Acc) ->
maps:fold(
fun(Reason, Count, Merged) ->
Merged#{Reason => Count + maps:get(Reason, Merged, 0)}
end,
Acc,
New
).

%% @doc Check whether a TX header indicates bundle content.
is_bundle_tx(TX, _Opts) ->
ar_tx:type(TX) =/= binary.
Expand Down