8000 Pinning Agent refactor to `new_epoch` event by klercker · Pull Request #4499 · aeternity/aeternity · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Pinning Agent refactor to new_epoch event #4499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 3 additions & 21 deletions apps/aecore/src/aec_consensus_hc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ start(Config, _) ->
start_dependency(aec_parent_chain_cache, [StartHeight, RetryInterval,
fun target_parent_heights/1, %% prefetch the next parent block
CacheSize, Confirmations]),
start_dependency(aec_pinning_agent, [get_contract_pubkey(?ELECTION_CONTRACT), default_pinning_behavior()]),
ok.

start_btc(StakersEncoded, ParentConnMod) ->
Expand Down Expand Up @@ -205,6 +206,7 @@ stop() ->
aec_preset_keys:stop(),
aec_parent_connector:stop(),
aec_parent_chain_cache:stop(),
aec_pinning_agent:stop(),
ok.

is_providing_extra_http_endpoints() -> false.
Expand Down Expand Up @@ -309,7 +311,6 @@ state_pre_transform_node(Type, Height, PrevNode, Trees) ->
cache_validators_for_epoch({TxEnv, Trees1}, Seed, Epoch + 2),
Trees2 = step_eoe(TxEnv, Trees1, Leader, Seed, 0, -1, CarryOverFlag),
{ok, NextEpochInfo} = aec_chain_hc:epoch_info({TxEnv, Trees2}),
start_default_pinning_process(TxEnv, Trees2, Height, NextEpochInfo),
{Trees2, Events ++ [{new_epoch, NextEpochInfo}]};
{error, _} ->
lager:debug("Entropy hash for height ~p is not in cache, attempting to resync", [Height]),
Expand All @@ -324,25 +325,6 @@ state_pre_transform_node(Type, Height, PrevNode, Trees) ->
step_micro(TxEnv, Trees, Leader)
end.

start_default_pinning_process(TxEnv, Trees, _Height, NextEpochInfo) ->
case default_pinning_behavior() of
true ->
#{ epoch := Epoch
, last := Last
, validators := _Validators} = NextEpochInfo,
{ok, LastLeader} = leader_for_height(Last, {TxEnv, Trees}),
lager:debug("AGENT: Trying to start pinning agent... for: ~p in epoch ~p", [LastLeader, Epoch]),
try
case aec_parent_connector:has_parent_account(LastLeader) of
true -> aec_pinning_agent:spawn_for_epoch(NextEpochInfo, get_contract_pubkey(?ELECTION_CONTRACT), LastLeader);
false -> lager:debug("AGENT: No parent chain account found for ~p", [LastLeader])
end
catch
T:E -> lager:debug("AGENT throws: ~p:~p", [T,E])
end;
_ -> ok
end.

cache_child_epoch_info(Epoch, Height, StartTime) ->
%% if the leader is running on the same node, and the current process
%% is not the leader, then the the epoch info should already be cached
Expand Down Expand Up @@ -970,7 +952,7 @@ validate_pin(TxEnv, Trees, CurEpochInfo) ->
end
end.

add_pin_reward(Trees, TxEnv, Leader, #{epoch := CurEpoch, last := Last} = _EpochInfo) ->
add_pin_reward(Trees, TxEnv, Leader, #{epoch := CurEpoch, last := Last}) ->
#{cur_pin_reward := Reward} = aec_chain_hc:pin_reward_info({TxEnv, Trees}),
Event = {pin, {pin_accepted, #{reward => Reward, recipient => Leader, epoch => CurEpoch, height => Last}}},
ATrees = aec_trees:accounts(Trees),
Expand Down
1 change: 1 addition & 0 deletions apps/aecore/src/aec_events.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
| oracle_query_tx_created
| oracle_response_tx_created
| pin
| new_epoch
| {tx_event, any()}.

-spec publish(event(), any()) -> ok.
Expand Down
180 changes: 106 additions & 74 deletions apps/aecore/src/aec_pinning_agent.erl
9E88
Original file line number Diff line number Diff line change
Expand Up @@ -8,115 +8,147 @@

-module(aec_pinning_agent).
-author("mans.af.klercker@happihacking.se").
-behaviour(gen_server).

%%%=============================================================================
%%% Export and Defs
%%%=============================================================================

%% External API
-export([
spawn_for_epoch/3,
start/3
]).
start_link/2,
stop/0
]).

%% Callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).

-define(SERVER, ?MODULE).

%% Loop state
-record(state, {
contract,
pinning_mode,
next_last,
pc_pin,
cc_note,
last_leader
}).
-type state() :: state.

%%%=============================================================================
%%% API
%%%=============================================================================

-spec start_link(term(), atom()) -> {ok, pid()} | {error, {already_started, pid()}} | ignore | {error, Reason::any()}.
start_link(Contract, PinningBehavior) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Contract, PinningBehavior], []).

spawn_for_epoch(EpochInfo, Contract, LastLeader) ->
lager:debug("AGENT: Trying to spawn pinning agent...", []),
try
case whereis(my_unique_process) of
undefined ->
Pid = spawn(aec_pinning_agent, start, [EpochInfo, Contract, LastLeader]),
register(?MODULE, Pid),
Pid;
Pid when is_pid(Pid) ->
lager:debug("AGENT: already started", []),
ok
end
catch
T:E -> lager:debug("AGENT: Broke!! ~p:~p", [T,E]), ok
end.

start(EpochInfo, Contract, LastLeader) ->
#{ first := First
, epoch := _Epoch
, length := Length
, validators := _Validators} = EpochInfo,
subscribe(),
lager:debug("AGENT: started ~p", [self()]),
wait_for_top_changed(First + Length - 2, none, false, Contract, LastLeader).
stop() ->
gen_server:stop(?SERVER).

%%%=============================================================================
%%% Internal functions
%%% Gen Server Callbacks
%%%=============================================================================

%%%=============================================================================
%%% FSM
%%%=============================================================================
init([Contract, PinningBehavior]) ->
case PinningBehavior of
true ->
State = #state{contract = Contract, pinning_mode = false},
lager:debug("started pinning agent"),
aec_events:subscribe(new_epoch),
aec_events:subscribe(top_changed),
{ok, State};
false ->
lager:debug("default pinning off - no agent started", []),
ignore
end.


handle_call(_Request, _From, LoopState) ->
Reply = ok,
{reply, Reply, LoopState}.

handle_cast(_Msg, LoopState) ->
{noreply, LoopState}.

handle_info({gproc_ps_event, new_epoch, #{info := #{last := Last, first := First, epoch := Epoch}}}, State) ->
{ok, LastLeader} = aec_consensus_hc:leader_for_height(Last),
case aec_parent_connector:has_parent_account(LastLeader) of
true ->
PCPinTx = post_pin_to_pc(LastLeader, First),
ProcState = State#state{pinning_mode = true,
next_last = Last-1,
pc_pin = PCPinTx,
cc_note = false,
last_leader = LastLeader },
{noreply, ProcState};
false ->
lager:debug("no pin in epoch ~p, no parent account for ~p", [Epoch, LastLeader]),
{noreply, State#state{pinning_mode = false}}
end;
handle_info({gproc_ps_event, top_changed, #{info := #{height := Height}}},
#state{pinning_mode = true,
contract = Contract,
next_last = Last,
pc_pin = PCPinTx,
cc_note = CCPosted,
last_leader = LastLeader } = State) ->
NewCCPosted = maybe_post_pin_to_cc(PCPinTx, CCPosted, LastLeader, Height),
PinningModeCont =
case {Height, NewCCPosted} of
{Last, true} -> post_pin_proof(Contract, PCPinTx, LastLeader, Last), false;
{Last, false} -> false; % we're on last, pc pin tx not finalized, we bow out.
_ -> true % not on last block, we continue triggering on top_changed
end,
{noreply, State#state{pinning_mode = PinningModeCont, cc_note = NewCCPosted}};
handle_info(_Info, LoopState) ->
{noreply, LoopState}.

terminate(_Reason, _LoopState) ->
ok.

code_change(_OldVsn, LoopState, _Extra) ->
{ok, LoopState}.

subscribe() ->
aec_events:subscribe(top_changed).

%%%=============================================================================
%%% INTERNALS
%%%=============================================================================

post_pin_to_pc(LastLeader, Height) ->
lager:debug("AGENT: (~p) Time to send to PC ~p", [self(), Height]),
PCPinTx = aec_parent_connector:pin_to_pc(LastLeader, 1, 1000000 * min_gas_price()),
lager:debug("AGENT: (~p) pc-pinned ~p", [self(), PCPinTx]),
lager:debug("Pinned to PC @~p: ~p", [Height, PCPinTx]),
PCPinTx.

post_pin_pctx_to_cc(PinTx, LastLeader, Height) ->
lager:debug("AGENT: (~p) noting on CC @~p", [self(), Height]),
try
aec_parent_connector:pin_tx_to_cc(PinTx, LastLeader, 1, 1000000 * min_gas_price())
aec_parent_connector:pin_tx_to_cc(PinTx, LastLeader, 1, 1000000 * min_gas_price()),
lager:debug("noting on CC @~p", [Height])
catch
T:E -> lager:debug("CRASHHHH: ~p:~p", [T,E]), ok
T:E -> lager:debug("Pin to CC failed: ~p:~p", [T,E]), ok
end.

post_pin_proof(ContractPubkey, PinTx, LastLeader, Height) ->
lager:debug("AGENT: (~p) pin proof @~p ~p", [self(), Height, PinTx]),
lager:debug("pin proof @~p ~p", [Height, PinTx]),
aec_parent_connector:pin_contract_call(ContractPubkey, PinTx, LastLeader, 0, 1000000 * min_gas_price()).

wait_for_top_changed(Last, none, _, Contract, LastLeader) -> % no pin on PC yet, then we post one once the next CC block is done
receive
{gproc_ps_event, top_changed, #{info := #{ height := Height }}} ->
PCPinTx = post_pin_to_pc(LastLeader, Height),
wait_for_top_changed(Last, PCPinTx, false, Contract, LastLeader)
end;
wait_for_top_changed(Last, PCPinTx, false, Contract, LastLeader) -> %% if PC pin tx done, post to CC if it's on PC
receive
{gproc_ps_event, top_changed, #{info := Info}} ->
CCPosted = % always try to post to CC if we haven't
case aec_parent_connector:get_pin_by_tx_hash(PCPinTx) of
{ok, #{pc_height := -1}} -> % Not on PC yet, let's wait for next PC generation
false;
{ok,_} -> % it's on PC
post_pin_pctx_to_cc(PCPinTx, LastLeader, maps:get(height, Info)), true;
_ -> false
end,
case Info of
#{ height := Last } when CCPosted == true -> % if we posted to CC and we're on last, post proof and die
post_pin_proof(Contract, PCPinTx, LastLeader, Last);
#{ height := Last } when CCPosted == false ->
ok; % just exit
_ -> % otherwise, we just carry on...
wait_for_top_changed(Last, PCPinTx, CCPosted, Contract, LastLeader)
end
maybe_post_pin_to_cc(PCPinTx, false, LastLeader, Height) ->
case aec_parent_connector:get_pin_by_tx_hash(PCPinTx) of
{ok, #{pc_height := -1}} -> false;
{ok, _} -> post_pin_pctx_to_cc(PCPinTx, LastLeader, Height), true;
_ -> false
end;
wait_for_top_changed(Last, PCPinTx, true, Contract, LastLeader) ->
receive
{gproc_ps_event, top_changed, #{info := Info}} ->
case Info of
#{ height := Last } -> % we're on last, post proof and die
post_pin_proof(Contract, PCPinTx, LastLeader, Last);
_ -> % something else, we just carry on...
wait_for_top_changed(Last, PCPinTx, true, Contract, LastLeader)
end
end.
maybe_post_pin_to_cc(_, CCPosted, _, _) ->
CCPosted.


%%%=============================================================================
%%% Helpers, communication
Expand Down
23 changes: 11 additions & 12 deletions apps/aehttp/src/aehttpc_aeternity.erl
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,17 @@ post_pin_tx(SignedSpendTx, NodeSpec) ->
pin_contract_call(ContractPubkey, PinTx, Who, Amount, _Fee, SignModule) ->
Nonce = get_local_nonce(Who),
{ok, CallData} = aeb_fate_abi:create_calldata("pin", [{bytes, PinTx}]),
ABI = ?ABI_FATE_SOPHIA_1, % not really nice, what is the supported version of getting the latest ABI version
ABI = ?ABI_FATE_SOPHIA_1,
TxSpec =
#{ caller_id => aeser_id:create(account, Who)
, nonce => Nonce
, contract_id => aeser_id:create(contract, ContractPubkey)
, abi_version => ABI
, fee => 1000000 * min_gas_price()
, amount => Amount
, gas => 1000000
, gas_price => min_gas_price()
, call_data => CallData},
#{caller_id => aeser_id:create(account, Who)
, nonce => Nonce
Comment on lines +209 to +210
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix: Space after the curly, and comma aligns with curly...

, contract_id => aeser_id:create(contract, ContractPubkey)
, abi_version => ABI
, fee => 1000000 * min_gas_price()
, amount => Amount
, gas => 1000000
, gas_price => min_gas_price()
, call_data => CallData},
{ok, Tx} = aect_call_tx:new(TxSpec),
NetworkId = aec_governance:get_network_id(),
SignedCallTx = sign_tx(Tx, NetworkId, Who, SignModule),
Expand All @@ -231,8 +231,7 @@ get_pin_by_tx_hash(TxHashEnc, NodeSpec) ->
{ok, TxHash} ->
TxPath = <<"/v3/transactions/", TxHash/binary>>,
case get_request(TxPath, NodeSpec, 5000) of
{ok, #{<<"tx">> := #{<<"payload">> := EncPin}, <<"block_height">> := Height}} = _Tx ->
%lager:debug("TXXXX: ~p", [Tx]),
{ok, #{<<"tx">> := #{<<"payload">> := EncPin}, <<"block_height">> := Height}} ->
{ok, Pin} = aeser_api_encoder:safe_decode(bytearray, EncPin),
{ok, DecPin} = decode_parent_pin_payload(Pin),
{ok, maps:put(pc_height, Height, DecPin)}; % add the pc block height to pin map, -1 = not on chain yet.
Expand Down
0