From 5a468e849fe8dbd909d5ccc5d1e99bae83f0a204 Mon Sep 17 00:00:00 2001 From: klercker Date: Thu, 21 Nov 2024 13:22:01 +0100 Subject: [PATCH 1/4] Refac pinning agent to use the `new_epoch`event instead of patching into eoe code. --- apps/aecore/src/aec_consensus_hc.erl | 24 +----- apps/aecore/src/aec_events.erl | 1 + apps/aecore/src/aec_pinning_agent.erl | 118 ++++++++++++++++++++++---- apps/aehttp/src/aehttpc_aeternity.erl | 23 +++-- 4 files changed, 115 insertions(+), 51 deletions(-) diff --git a/apps/aecore/src/aec_consensus_hc.erl b/apps/aecore/src/aec_consensus_hc.erl index 296b3aa7d2..ec239cc178 100644 --- a/apps/aecore/src/aec_consensus_hc.erl +++ b/apps/aecore/src/aec_consensus_hc.erl @@ -132,6 +132,7 @@ start(Config, _) -> start_dependency(aec_parent_chain_cache, [StartHeight, RetryInterval, fun target_parent_heights/1, %% prefetch the next parent block CacheSize, Confirmations]), + start_dependency(aec_pinning_agent, [get_contract_pubkey(?ELECTION_CONTRACT), default_pinning_behavior()]), ok. start_btc(StakersEncoded, ParentConnMod) -> @@ -205,6 +206,7 @@ stop() -> aec_preset_keys:stop(), aec_parent_connector:stop(), aec_parent_chain_cache:stop(), + aec_pinning_agent:stop(), ok. is_providing_extra_http_endpoints() -> false. @@ -309,7 +311,6 @@ state_pre_transform_node(Type, Height, PrevNode, Trees) -> cache_validators_for_epoch({TxEnv, Trees1}, Seed, Epoch + 2), Trees2 = step_eoe(TxEnv, Trees1, Leader, Seed, 0, -1, CarryOverFlag), {ok, NextEpochInfo} = aec_chain_hc:epoch_info({TxEnv, Trees2}), - start_default_pinning_process(TxEnv, Trees2, Height, NextEpochInfo), {Trees2, Events ++ [{new_epoch, NextEpochInfo}]}; {error, _} -> lager:debug("Entropy hash for height ~p is not in cache, attempting to resync", [Height]), @@ -324,25 +325,6 @@ state_pre_transform_node(Type, Height, PrevNode, Trees) -> step_micro(TxEnv, Trees, Leader) end. -start_default_pinning_process(TxEnv, Trees, _Height, NextEpochInfo) -> - case default_pinning_behavior() of - true -> - #{ epoch := Epoch - , last := Last - , validators := _Validators} = NextEpochInfo, - {ok, LastLeader} = leader_for_height(Last, {TxEnv, Trees}), - lager:debug("AGENT: Trying to start pinning agent... for: ~p in epoch ~p", [LastLeader, Epoch]), - try - case aec_parent_connector:has_parent_account(LastLeader) of - true -> aec_pinning_agent:spawn_for_epoch(NextEpochInfo, get_contract_pubkey(?ELECTION_CONTRACT), LastLeader); - false -> lager:debug("AGENT: No parent chain account found for ~p", [LastLeader]) - end - catch - T:E -> lager:debug("AGENT throws: ~p:~p", [T,E]) - end; - _ -> ok - end. - cache_child_epoch_info(Epoch, Height, StartTime) -> %% if the leader is running on the same node, and the current process %% is not the leader, then the the epoch info should already be cached @@ -970,7 +952,7 @@ validate_pin(TxEnv, Trees, CurEpochInfo) -> end end. -add_pin_reward(Trees, TxEnv, Leader, #{epoch := CurEpoch, last := Last} = _EpochInfo) -> +add_pin_reward(Trees, TxEnv, Leader, #{epoch := CurEpoch, last := Last}) -> #{cur_pin_reward := Reward} = aec_chain_hc:pin_reward_info({TxEnv, Trees}), Event = {pin, {pin_accepted, #{reward => Reward, recipient => Leader, epoch => CurEpoch, height => Last}}}, ATrees = aec_trees:accounts(Trees), diff --git a/apps/aecore/src/aec_events.erl b/apps/aecore/src/aec_events.erl index 723708edff..845f3c46b9 100644 --- a/apps/aecore/src/aec_events.erl +++ b/apps/aecore/src/aec_events.erl @@ -37,6 +37,7 @@ | oracle_query_tx_created | oracle_response_tx_created | pin + | new_epoch | {tx_event, any()}. -spec publish(event(), any()) -> ok. diff --git a/apps/aecore/src/aec_pinning_agent.erl b/apps/aecore/src/aec_pinning_agent.erl index 8f5ee60c55..253a92dd89 100644 --- a/apps/aecore/src/aec_pinning_agent.erl +++ b/apps/aecore/src/aec_pinning_agent.erl @@ -8,38 +8,122 @@ -module(aec_pinning_agent). -author("mans.af.klercker@happihacking.se"). +-behaviour(gen_server). %%%============================================================================= %%% Export and Defs %%%============================================================================= %% External API +-export([ + start_link/2, + stop/0 +]). + +%% Callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + -export([ spawn_for_epoch/3, start/3 - ]). +]). -define(SERVER, ?MODULE). +-define(WORKER, worker_name()). + +%% Loop state +-record(state, { + contract +}). +-type state() :: state. + +%%%============================================================================= +%%% API +%%%============================================================================= + +-spec start_link(term(), atom()) -> {ok, pid()} | {error, {already_started, pid()}} | ignore | {error, Reason::any()}. +start_link(Contract, PinningBehavior) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Contract, PinningBehavior], []). + +%%%============================================================================= +%%% Gen Server Callbacks +%%%============================================================================= + +init([Contract, PinningBehavior]) -> + case PinningBehavior of + true -> + State = #state{contract = Contract}, + lager:debug("started pinning agent"), + aec_events:subscribe(new_epoch), + {ok, State}; + false -> + lager:debug("default pinning off - no agent started", []), + ignore + end. + +handle_call(_Request, _From, LoopState) -> + Reply = ok, + {reply, Reply, LoopState}. + +handle_cast(_Msg, LoopState) -> + {noreply, LoopState}. + +handle_info({gproc_ps_event, new_epoch, #{info := EpochInfo}}, #state{contract = Contract} = State) -> + Reply = spawn_for_epoch(EpochInfo, Contract), + {Reply, State}; +handle_info(_Info, LoopState) -> + {noreply, LoopState}. + +terminate(_Reason, _LoopState) -> + ok. + +code_change(_OldVsn, LoopState, _Extra) -> + {ok, LoopState}. + + % case aec_parent_connector:has_parent_account(LastLeader) of + % true -> aec_pinning_agent:spawn_for_epoch(NextEpochInfo, get_contract_pubkey(?ELECTION_CONTRACT), LastLeader); + % false -> lager:debug("AGENT: No parent chain account found for ~p", [LastLeader]) + % end +stop() -> + lager:debug("STOPPED"), + gen_server:stop(?SERVER). %%%============================================================================= %%% API %%%============================================================================= +%%%============================================================================= +%%% Pinning Worker Process +%%%============================================================================= + +spawn_for_epoch(#{last := Last} = EpochInfo, Contract) -> + {ok, LastLeader} = aec_consensus_hc:leader_for_height(Last), + case aec_parent_connector:has_parent_account(LastLeader) of + true -> spawn_for_epoch(EpochInfo, Contract, LastLeader); + false -> lager:debug("no pin in epoch, no parent account for ~p", [LastLeader]), noreply + end. + spawn_for_epoch(EpochInfo, Contract, LastLeader) -> - lager:debug("AGENT: Trying to spawn pinning agent...", []), try case whereis(my_unique_process) of undefined -> Pid = spawn(aec_pinning_agent, start, [EpochInfo, Contract, LastLeader]), - register(?MODULE, Pid), - Pid; + register(?WORKER, Pid), + noreply; Pid when is_pid(Pid) -> - lager:debug("AGENT: already started", []), - ok + lager:debug("pinning worker already started", []), + noreply end catch - T:E -> lager:debug("AGENT: Broke!! ~p:~p", [T,E]), ok + T:E -> lager:debug("Pinning agent worker failed: ~p:~p", [T,E]), noreply end. start(EpochInfo, Contract, LastLeader) -> @@ -48,37 +132,32 @@ start(EpochInfo, Contract, LastLeader) -> , length := Length , validators := _Validators} = EpochInfo, subscribe(), - lager:debug("AGENT: started ~p", [self()]), + lager:debug("pinning worker started ~p", [self()]), wait_for_top_changed(First + Length - 2, none, false, Contract, LastLeader). -%%%============================================================================= -%%% Internal functions -%%%============================================================================= %%%============================================================================= %%% FSM %%%============================================================================= - subscribe() -> aec_events:subscribe(top_changed). post_pin_to_pc(LastLeader, Height) -> - lager:debug("AGENT: (~p) Time to send to PC ~p", [self(), Height]), PCPinTx = aec_parent_connector:pin_to_pc(LastLeader, 1, 1000000 * min_gas_price()), - lager:debug("AGENT: (~p) pc-pinned ~p", [self(), PCPinTx]), + lager:debug("(~p) Pinned to PC ~p at height ~p", [self(), PCPinTx, Height]), PCPinTx. post_pin_pctx_to_cc(PinTx, LastLeader, Height) -> - lager:debug("AGENT: (~p) noting on CC @~p", [self(), Height]), try - aec_parent_connector:pin_tx_to_cc(PinTx, LastLeader, 1, 1000000 * min_gas_price()) + aec_parent_connector:pin_tx_to_cc(PinTx, LastLeader, 1, 1000000 * min_gas_price()), + lager:debug("(~p) noting on CC @~p", [self(), Height]) catch - T:E -> lager:debug("CRASHHHH: ~p:~p", [T,E]), ok + T:E -> lager:debug("Pin to CC failed: ~p:~p", [T,E]), ok end. post_pin_proof(ContractPubkey, PinTx, LastLeader, Height) -> - lager:debug("AGENT: (~p) pin proof @~p ~p", [self(), Height, PinTx]), + lager:debug("(~p) pin proof @~p ~p", [self(), Height, PinTx]), aec_parent_connector:pin_contract_call(ContractPubkey, PinTx, LastLeader, 0, 1000000 * min_gas_price()). wait_for_top_changed(Last, none, _, Contract, LastLeader) -> % no pin on PC yet, then we post one once the next CC block is done @@ -122,6 +201,9 @@ wait_for_top_changed(Last, PCPinTx, true, Contract, LastLeader) -> %%% Helpers, communication %%%============================================================================= +worker_name() -> + list_to_atom(?MODULE_STRING "_worker"). + min_gas_price() -> Protocol = aec_hard_forks:protocol_effective_at_height(1), max(aec_governance:minimum_gas_price(Protocol), diff --git a/apps/aehttp/src/aehttpc_aeternity.erl b/apps/aehttp/src/aehttpc_aeternity.erl index 4d210e0d5e..df670f03ed 100644 --- a/apps/aehttp/src/aehttpc_aeternity.erl +++ b/apps/aehttp/src/aehttpc_aeternity.erl @@ -204,17 +204,17 @@ post_pin_tx(SignedSpendTx, NodeSpec) -> pin_contract_call(ContractPubkey, PinTx, Who, Amount, _Fee, SignModule) -> Nonce = get_local_nonce(Who), {ok, CallData} = aeb_fate_abi:create_calldata("pin", [{bytes, PinTx}]), - ABI = ?ABI_FATE_SOPHIA_1, % not really nice, what is the supported version of getting the latest ABI version + ABI = ?ABI_FATE_SOPHIA_1, TxSpec = - #{ caller_id => aeser_id:create(account, Who) - , nonce => Nonce - , contract_id => aeser_id:create(contract, ContractPubkey) - , abi_version => ABI - , fee => 1000000 * min_gas_price() - , amount => Amount - , gas => 1000000 - , gas_price => min_gas_price() - , call_data => CallData}, + #{caller_id => aeser_id:create(account, Who) + , nonce => Nonce + , contract_id => aeser_id:create(contract, ContractPubkey) + , abi_version => ABI + , fee => 1000000 * min_gas_price() + , amount => Amount + , gas => 1000000 + , gas_price => min_gas_price() + , call_data => CallData}, {ok, Tx} = aect_call_tx:new(TxSpec), NetworkId = aec_governance:get_network_id(), SignedCallTx = sign_tx(Tx, NetworkId, Who, SignModule), @@ -231,8 +231,7 @@ get_pin_by_tx_hash(TxHashEnc, NodeSpec) -> {ok, TxHash} -> TxPath = <<"/v3/transactions/", TxHash/binary>>, case get_request(TxPath, NodeSpec, 5000) of - {ok, #{<<"tx">> := #{<<"payload">> := EncPin}, <<"block_height">> := Height}} = _Tx -> - %lager:debug("TXXXX: ~p", [Tx]), + {ok, #{<<"tx">> := #{<<"payload">> := EncPin}, <<"block_height">> := Height}} -> {ok, Pin} = aeser_api_encoder:safe_decode(bytearray, EncPin), {ok, DecPin} = decode_parent_pin_payload(Pin), {ok, maps:put(pc_height, Height, DecPin)}; % add the pc block height to pin map, -1 = not on chain yet. From 16698569e2ad12bfc38f2e79a30bd13750a91085 Mon Sep 17 00:00:00 2001 From: klercker Date: Thu, 21 Nov 2024 14:10:48 +0100 Subject: [PATCH 2/4] rewrite of inne worker loop --- apps/aecore/src/aec_pinning_agent.erl | 64 +++++++++++---------------- 1 file changed, 27 insertions(+), 37 deletions(-) diff --git a/apps/aecore/src/aec_pinning_agent.erl b/apps/aecore/src/aec_pinning_agent.erl index 253a92dd89..f7ece4a1dc 100644 --- a/apps/aecore/src/aec_pinning_agent.erl +++ b/apps/aecore/src/aec_pinning_agent.erl @@ -127,13 +127,12 @@ spawn_for_epoch(EpochInfo, Contract, LastLeader) -> end. start(EpochInfo, Contract, LastLeader) -> - #{ first := First - , epoch := _Epoch - , length := Length + #{ epoch := _Epoch + , last := Last , validators := _Validators} = EpochInfo, subscribe(), lager:debug("pinning worker started ~p", [self()]), - wait_for_top_changed(First + Length - 2, none, false, Contract, LastLeader). + wait_for_top_changed(Last - 1, none, false, Contract, LastLeader). %%%============================================================================= @@ -160,43 +159,34 @@ post_pin_proof(ContractPubkey, PinTx, LastLeader, Height) -> lager:debug("(~p) pin proof @~p ~p", [self(), Height, PinTx]), aec_parent_connector:pin_contract_call(ContractPubkey, PinTx, LastLeader, 0, 1000000 * min_gas_price()). -wait_for_top_changed(Last, none, _, Contract, LastLeader) -> % no pin on PC yet, then we post one once the next CC block is done +wait_for_top_changed(Last, PCPinTx, CCPosted, Contract, LastLeader) -> receive - {gproc_ps_event, top_changed, #{info := #{ height := Height }}} -> - PCPinTx = post_pin_to_pc(LastLeader, Height), - wait_for_top_changed(Last, PCPinTx, false, Contract, LastLeader) - end; -wait_for_top_changed(Last, PCPinTx, false, Contract, LastLeader) -> %% if PC pin tx done, post to CC if it's on PC - receive - {gproc_ps_event, top_changed, #{info := Info}} -> - CCPosted = % always try to post to CC if we haven't - case aec_parent_connector:get_pin_by_tx_hash(PCPinTx) of - {ok, #{pc_height := -1}} -> % Not on PC yet, let's wait for next PC generation - false; - {ok,_} -> % it's on PC - post_pin_pctx_to_cc(PCPinTx, LastLeader, maps:get(height, Info)), true; - _ -> false - end, - case Info of - #{ height := Last } when CCPosted == true -> % if we posted to CC and we're on last, post proof and die - post_pin_proof(Contract, PCPinTx, LastLeader, Last); - #{ height := Last } when CCPosted == false -> - ok; % just exit - _ -> % otherwise, we just carry on... - wait_for_top_changed(Last, PCPinTx, CCPosted, Contract, LastLeader) - end - end; -wait_for_top_changed(Last, PCPinTx, true, Contract, LastLeader) -> - receive - {gproc_ps_event, top_changed, #{info := Info}} -> - case Info of - #{ height := Last } -> % we're on last, post proof and die - post_pin_proof(Contract, PCPinTx, LastLeader, Last); - _ -> % something else, we just carry on... - wait_for_top_changed(Last, PCPinTx, true, Contract, LastLeader) + {gproc_ps_event, top_changed, #{info := #{height := Height} = Info}} -> + NewPCPinTx = maybe_post_pin_to_pc(PCPinTx, LastLeader, Height), + NewCCPosted = maybe_post_pin_to_cc(NewPCPinTx, CCPosted, LastLeader, Height), + + case {Height, NewCCPosted} of + {Last, true} -> post_pin_proof(Contract, NewPCPinTx, LastLeader, Last); + {Last, false} -> ok; + _ -> wait_for_top_changed(Last, NewPCPinTx, NewCCPosted, Contract, LastLeader) end end. +maybe_post_pin_to_pc(none, LastLeader, Height) -> + post_pin_to_pc(LastLeader, Height); +maybe_post_pin_to_pc(PCPinTx, _, _) -> + PCPinTx. + +maybe_post_pin_to_cc(PCPinTx, false, LastLeader, Height) -> + case aec_parent_connector:get_pin_by_tx_hash(PCPinTx) of + {ok, #{pc_height := -1}} -> false; + {ok, _} -> post_pin_pctx_to_cc(PCPinTx, LastLeader, Height), true; + _ -> false + end; +maybe_post_pin_to_cc(_, CCPosted, _, _) -> + CCPosted. + + %%%============================================================================= %%% Helpers, communication %%%============================================================================= From 853f616c3fc4102a98df37af3e8a3680c2515b1c Mon Sep 17 00:00:00 2001 From: klercker Date: Fri, 22 Nov 2024 15:25:47 +0100 Subject: [PATCH 3/4] Fix sub process naming, moved pc pin to sub process start (don't wait for anything), cleanup --- apps/aecore/src/aec_pinning_agent.erl | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/apps/aecore/src/aec_pinning_agent.erl b/apps/aecore/src/aec_pinning_agent.erl index f7ece4a1dc..56be0ce570 100644 --- a/apps/aecore/src/aec_pinning_agent.erl +++ b/apps/aecore/src/aec_pinning_agent.erl @@ -88,10 +88,6 @@ terminate(_Reason, _LoopState) -> code_change(_OldVsn, LoopState, _Extra) -> {ok, LoopState}. - % case aec_parent_connector:has_parent_account(LastLeader) of - % true -> aec_pinning_agent:spawn_for_epoch(NextEpochInfo, get_contract_pubkey(?ELECTION_CONTRACT), LastLeader); - % false -> lager:debug("AGENT: No parent chain account found for ~p", [LastLeader]) - % end stop() -> lager:debug("STOPPED"), gen_server:stop(?SERVER). @@ -113,7 +109,7 @@ spawn_for_epoch(#{last := Last} = EpochInfo, Contract) -> spawn_for_epoch(EpochInfo, Contract, LastLeader) -> try - case whereis(my_unique_process) of + case whereis(?WORKER) of undefined -> Pid = spawn(aec_pinning_agent, start, [EpochInfo, Contract, LastLeader]), register(?WORKER, Pid), @@ -129,10 +125,12 @@ spawn_for_epoch(EpochInfo, Contract, LastLeader) -> start(EpochInfo, Contract, LastLeader) -> #{ epoch := _Epoch , last := Last + , first := First , validators := _Validators} = EpochInfo, subscribe(), lager:debug("pinning worker started ~p", [self()]), - wait_for_top_changed(Last - 1, none, false, Contract, LastLeader). + PCPinTx = post_pin_to_pc(LastLeader, First), + wait_for_top_changed(Last - 1, PCPinTx, false, Contract, LastLeader). %%%============================================================================= @@ -161,22 +159,16 @@ post_pin_proof(ContractPubkey, PinTx, LastLeader, Height) -> wait_for_top_changed(Last, PCPinTx, CCPosted, Contract, LastLeader) -> receive - {gproc_ps_event, top_changed, #{info := #{height := Height} = Info}} -> - NewPCPinTx = maybe_post_pin_to_pc(PCPinTx, LastLeader, Height), - NewCCPosted = maybe_post_pin_to_cc(NewPCPinTx, CCPosted, LastLeader, Height), + {gproc_ps_event, top_changed, #{info := #{height := Height}}} -> + NewCCPosted = maybe_post_pin_to_cc(PCPinTx, CCPosted, LastLeader, Height), case {Height, NewCCPosted} of - {Last, true} -> post_pin_proof(Contract, NewPCPinTx, LastLeader, Last); - {Last, false} -> ok; - _ -> wait_for_top_changed(Last, NewPCPinTx, NewCCPosted, Contract, LastLeader) + {Last, true} -> post_pin_proof(Contract, PCPinTx, LastLeader, Last); + {Last, false} -> ok; % we're on last, pc pin tx not finalized, we bow out. + _ -> wait_for_top_changed(Last, PCPinTx, NewCCPosted, Contract, LastLeader) end end. -maybe_post_pin_to_pc(none, LastLeader, Height) -> - post_pin_to_pc(LastLeader, Height); -maybe_post_pin_to_pc(PCPinTx, _, _) -> - PCPinTx. - maybe_post_pin_to_cc(PCPinTx, false, LastLeader, Height) -> case aec_parent_connector:get_pin_by_tx_hash(PCPinTx) of {ok, #{pc_height := -1}} -> false; From 427b853f8832624e9762214a04f4816bc370f7de Mon Sep 17 00:00:00 2001 From: klercker Date: Mon, 25 Nov 2024 11:37:40 +0100 Subject: [PATCH 4/4] Refac into no process spawn --- apps/aecore/src/aec_pinning_agent.erl | 122 ++++++++++---------------- 1 file changed, 45 insertions(+), 77 deletions(-) diff --git a/apps/aecore/src/aec_pinning_agent.erl b/apps/aecore/src/aec_pinning_agent.erl index 56be0ce570..3361f63e60 100644 --- a/apps/aecore/src/aec_pinning_agent.erl +++ b/apps/aecore/src/aec_pinning_agent.erl @@ -30,17 +30,16 @@ code_change/3 ]). --export([ - spawn_for_epoch/3, - start/3 -]). - -define(SERVER, ?MODULE). --define(WORKER, worker_name()). %% Loop state -record(state, { - contract + contract, + pinning_mode, + next_last, + pc_pin, + cc_note, + last_leader }). -type state() :: state. @@ -52,6 +51,9 @@ start_link(Contract, PinningBehavior) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [Contract, PinningBehavior], []). +stop() -> + gen_server:stop(?SERVER). + %%%============================================================================= %%% Gen Server Callbacks %%%============================================================================= @@ -59,9 +61,10 @@ start_link(Contract, PinningBehavior) -> init([Contract, PinningBehavior]) -> case PinningBehavior of true -> - State = #state{contract = Contract}, + State = #state{contract = Contract, pinning_mode = false}, lager:debug("started pinning agent"), aec_events:subscribe(new_epoch), + aec_events:subscribe(top_changed), {ok, State}; false -> lager:debug("default pinning off - no agent started", []), @@ -76,9 +79,36 @@ handle_call(_Request, _From, LoopState) -> handle_cast(_Msg, LoopState) -> {noreply, LoopState}. -handle_info({gproc_ps_event, new_epoch, #{info := EpochInfo}}, #state{contract = Contract} = State) -> - Reply = spawn_for_epoch(EpochInfo, Contract), - {Reply, State}; +handle_info({gproc_ps_event, new_epoch, #{info := #{last := Last, first := First, epoch := Epoch}}}, State) -> + {ok, LastLeader} = aec_consensus_hc:leader_for_height(Last), + case aec_parent_connector:has_parent_account(LastLeader) of + true -> + PCPinTx = post_pin_to_pc(LastLeader, First), + ProcState = State#state{pinning_mode = true, + next_last = Last-1, + pc_pin = PCPinTx, + cc_note = false, + last_leader = LastLeader }, + {noreply, ProcState}; + false -> + lager:debug("no pin in epoch ~p, no parent account for ~p", [Epoch, LastLeader]), + {noreply, State#state{pinning_mode = false}} + end; +handle_info({gproc_ps_event, top_changed, #{info := #{height := Height}}}, + #state{pinning_mode = true, + contract = Contract, + next_last = Last, + pc_pin = PCPinTx, + cc_note = CCPosted, + last_leader = LastLeader } = State) -> + NewCCPosted = maybe_post_pin_to_cc(PCPinTx, CCPosted, LastLeader, Height), + PinningModeCont = + case {Height, NewCCPosted} of + {Last, true} -> post_pin_proof(Contract, PCPinTx, LastLeader, Last), false; + {Last, false} -> false; % we're on last, pc pin tx not finalized, we bow out. + _ -> true % not on last block, we continue triggering on top_changed + end, + {noreply, State#state{pinning_mode = PinningModeCont, cc_note = NewCCPosted}}; handle_info(_Info, LoopState) -> {noreply, LoopState}. @@ -88,87 +118,28 @@ terminate(_Reason, _LoopState) -> code_change(_OldVsn, LoopState, _Extra) -> {ok, LoopState}. -stop() -> - lager:debug("STOPPED"), - gen_server:stop(?SERVER). -%%%============================================================================= -%%% API -%%%============================================================================= - %%%============================================================================= -%%% Pinning Worker Process +%%% INTERNALS %%%============================================================================= -spawn_for_epoch(#{last := Last} = EpochInfo, Contract) -> - {ok, LastLeader} = aec_consensus_hc:leader_for_height(Last), - case aec_parent_connector:has_parent_account(LastLeader) of - true -> spawn_for_epoch(EpochInfo, Contract, LastLeader); - false -> lager:debug("no pin in epoch, no parent account for ~p", [LastLeader]), noreply - end. - -spawn_for_epoch(EpochInfo, Contract, LastLeader) -> - try - case whereis(?WORKER) of - undefined -> - Pid = spawn(aec_pinning_agent, start, [EpochInfo, Contract, LastLeader]), - register(?WORKER, Pid), - noreply; - Pid when is_pid(Pid) -> - lager:debug("pinning worker already started", []), - noreply - end - catch - T:E -> lager:debug("Pinning agent worker failed: ~p:~p", [T,E]), noreply - end. - -start(EpochInfo, Contract, LastLeader) -> - #{ epoch := _Epoch - , last := Last - , first := First - , validators := _Validators} = EpochInfo, - subscribe(), - lager:debug("pinning worker started ~p", [self()]), - PCPinTx = post_pin_to_pc(LastLeader, First), - wait_for_top_changed(Last - 1, PCPinTx, false, Contract, LastLeader). - - -%%%============================================================================= -%%% FSM -%%%============================================================================= - -subscribe() -> - aec_events:subscribe(top_changed). - post_pin_to_pc(LastLeader, Height) -> PCPinTx = aec_parent_connector:pin_to_pc(LastLeader, 1, 1000000 * min_gas_price()), - lager:debug("(~p) Pinned to PC ~p at height ~p", [self(), PCPinTx, Height]), + lager:debug("Pinned to PC @~p: ~p", [Height, PCPinTx]), PCPinTx. post_pin_pctx_to_cc(PinTx, LastLeader, Height) -> try aec_parent_connector:pin_tx_to_cc(PinTx, LastLeader, 1, 1000000 * min_gas_price()), - lager:debug("(~p) noting on CC @~p", [self(), Height]) + lager:debug("noting on CC @~p", [Height]) catch T:E -> lager:debug("Pin to CC failed: ~p:~p", [T,E]), ok end. post_pin_proof(ContractPubkey, PinTx, LastLeader, Height) -> - lager:debug("(~p) pin proof @~p ~p", [self(), Height, PinTx]), + lager:debug("pin proof @~p ~p", [Height, PinTx]), aec_parent_connector:pin_contract_call(ContractPubkey, PinTx, LastLeader, 0, 1000000 * min_gas_price()). -wait_for_top_changed(Last, PCPinTx, CCPosted, Contract, LastLeader) -> - receive - {gproc_ps_event, top_changed, #{info := #{height := Height}}} -> - NewCCPosted = maybe_post_pin_to_cc(PCPinTx, CCPosted, LastLeader, Height), - - case {Height, NewCCPosted} of - {Last, true} -> post_pin_proof(Contract, PCPinTx, LastLeader, Last); - {Last, false} -> ok; % we're on last, pc pin tx not finalized, we bow out. - _ -> wait_for_top_changed(Last, PCPinTx, NewCCPosted, Contract, LastLeader) - end - end. - maybe_post_pin_to_cc(PCPinTx, false, LastLeader, Height) -> case aec_parent_connector:get_pin_by_tx_hash(PCPinTx) of {ok, #{pc_height := -1}} -> false; @@ -183,9 +154,6 @@ maybe_post_pin_to_cc(_, CCPosted, _, _) -> %%% Helpers, communication %%%============================================================================= -worker_name() -> - list_to_atom(?MODULE_STRING "_worker"). - min_gas_price() -> Protocol = aec_hard_forks:protocol_effective_at_height(1), max(aec_governance:minimum_gas_price(Protocol),