8000 fix(node): fix sharing provider between async runtimes by GCdePaula · Pull Request #168 · cartesi/dave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix(node): fix sharing provider between async runtimes #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ alloy = { version = "0.12.4", features = [
"sol-types",
] }
alloy-transport = { version = "0.12", features = ["throttle"] }
alloy-chains = { version = "0.1" }
ruint = "1.12"
tiny-keccak = { version = "2.0", features = ["keccak"] }

Expand Down
76 changes: 37 additions & 39 deletions cartesi-rollups/node/blockchain-reader/src/lib.rs
< 10000 tr data-hunk="88e768772200ea44c777f7a2bdf09eb8ecd0b90e77966ccc0171685a8817036f" class="show-top-border">
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use alloy::{
eips::BlockNumberOrTag::Finalized,
hex::ToHexExt,
primitives::{Address, U256},
providers::{DynProvider, Provider},
providers::Provider,
rpc::types::{Log, Topic},
sol_types::SolEvent,
};
Expand Down Expand Up @@ -129,56 +129,30 @@ impl AddressBook {

pub struct BlockchainReader<SM: StateManager> {
state_manager: SM,
provider: DynProvider,
address_book: AddressBook,
input_reader: EventReader<InputAdded>,
epoch_reader: EventReader<EpochSealed>,
sleep_duration: Duration,
}

impl<SM: StateManager> BlockchainReader<SM> {
pub fn new(
state_manager: SM,
provider: DynProvider,
address_book: AddressBook,
sleep_duration: Duration,
) -> Self {
pub fn new(state_manager: SM, address_book: AddressBook, sleep_duration: Duration) -> Self {
Self {
state_manager,
address_book,
provider,
input_reader: EventReader::<InputAdded>::default(),
epoch_reader: EventReader::<EpochSealed>::default(),
sleep_duration,
}
}

pub fn start(self, watch: Watch) -> Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("`BlockchainReader` runtime build failure");

rt.block_on(async move { self.execution_loop(watch).await })
}

async fn execution_loop(mut self, watch: Watch) -> Result<()> {
// TODO set genesis at main.rs
let input_box_creation =
find_contract_creation_block(&self.provider, self.address_book.input_box)
.await
.map_err(|e| ProviderErrors(vec![Error::TransportError(e)]))?;
let latest_processed = self.state_manager.latest_processed_block()?;
if input_box_creation > latest_processed {
self.state_manager.set_genesis(input_box_creation)?;
}

pub async fn execution_loop(mut self, watch: Watch, provider: impl Provider) -> Result<()> {
loop {
let current_block = latest_finalized_block(&self.provider).await?;
let current_block = latest_finalized_block(&provider).await?;
let prev_block = self.state_manager.latest_processed_block()?;

if current_block > prev_block {
self.advance(prev_block, current_block).await?;
self.advance(&provider, prev_block, current_block).await?;
}

if matches!(watch.wait(self.sleep_duration), ControlFlow::Break(_)) {
Expand All @@ -187,8 +161,15 @@ impl<SM: StateManager> BlockchainReader<SM> {
}
}

async fn advance(&mut self, prev_block: u64, current_block: u64) -> Result<()> {
let (inputs, epochs) = self.collect_events(prev_block, current_block).await?;
async fn advance(
&mut self,
provider: &impl Provider,
prev_block: u64,
current_block: u64,
) -> Result<()> {
let (inputs, epochs) = self
.collect_events(provider, prev_block, current_block)
.await?;

self.state_manager.insert_consensus_data(
current_block,
Expand All @@ -201,12 +182,13 @@ impl<SM: StateManager> BlockchainReader<SM> {

async fn collect_events(
&mut self,
provider: &impl Provider,
prev_block: u64,
current_block: u64,
) -> Result<(Vec<Input>, Vec<Epoch>)> {
// read sealed epochs from blockchain
let sealed_epochs: Vec<Epoch> = self
.collect_sealed_epochs(prev_block, current_block)
.collect_sealed_epochs(provider, prev_block, current_block)
.await?;

let last_sealed_epoch_opt = self.state_manager.last_sealed_epoch()?;
Expand All @@ -222,21 +204,27 @@ impl<SM: StateManager> BlockchainReader<SM> {

// read inputs from blockchain
let inputs = self
.collect_inputs(prev_block, current_block, merged_sealed_epochs_iter)
.collect_inputs(
provider,
prev_block,
current_block,
merged_sealed_epochs_iter,
)
.await?;

Ok((inputs, sealed_epochs))
}

async fn collect_sealed_epochs(
&self,
provider: &impl Provider,
prev_block: u64,
current_block: u64,
) -> Result<Vec<Epoch>> {
Ok(self
.epoch_reader
.next(
&self.provider,
provider,
None,
&self.address_book.consensus,
prev_block,
Expand Down Expand Up @@ -268,6 +256,7 @@ impl<SM: StateManager> BlockchainReader<SM> {

async fn collect_inputs(
&mut self,
provider: &impl Provider,
prev_block: u64,
current_block: u64,
sealed_epochs_iter: impl Iterator<Item = &Epoch>,
Expand All @@ -276,7 +265,7 @@ impl<SM: StateManager> BlockchainReader<SM> {
let input_events: Vec<_> = self
.input_reader
.next(
&self.provider,
provider,
Some(&self.address_book.app.into_word().into()),
&self.address_book.input_box,
prev_block,
Expand Down Expand Up @@ -751,12 +740,21 @@ mod blockchain_reader_tests {
let r = thread::spawn(move || {
let blockchain_reader = BlockchainReader::new(
PersistentStateAccess::new(handle.path()).unwrap(),
provider,
address_book,
Duration::from_secs(1),
);

blockchain_reader.start(watch_0).unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("`BlockchainReader` runtime build failure");

rt.block_on(async move {
blockchain_reader
.execution_loop(watch_0, provider)
.await
.unwrap();
})
});

read_inputs_from_db_until_count(&mut state_manager, 0, 1).await?;
Expand Down
2 changes: 2 additions & 0 deletions cartesi-rollups/node/cartesi-rollups-prt-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ cartesi-prt-core = { workspace = true }

alloy = { workspace = true }
alloy-transport = { workspace = true }
alloy-chains = { workspace = true }

anyhow = { workspace = true }
clap = { workspace = true }
futures = { workspace = true }
Expand Down
24 changes: 21 additions & 3 deletions cartesi-rollups/node/cartesi-rollups-prt-node/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

use alloy::{primitives::Address, providers::DynProvider, transports::http::reqwest::Url};
use alloy_chains::NamedChain;
use clap::{ArgGroup, Parser, Subcommand};
use rollups_blockchain_reader::AddressBook;
use rollups_state_manager::{
Expand Down Expand Up @@ -96,22 +97,26 @@ pub struct PRTConfig {
pub machine_path: PathBuf,

// Provider
pub chain_id: NamedChain,
pub ethereum_gateway: Url,
pub signer_address: Address,
pub provider: DynProvider,

// State
pub state_dir: PathBuf,

// Misc
pub sleep_duration: Duration,

// private
signer: SignerArgs,
}

impl fmt::Display for PRTConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.address_book)?;
writeln!(f, "Machine path: {}", self.machine_path.display())?;
writeln!(f, "Signer address: {}", self.signer_address)?;
writeln!(f, "Chain Id: {}", self.chain_id)?;
writeln!(f, "Ethereum gateway: <redacted>")?;
writeln!(f, "State directory: {}", self.state_dir.display())?;
writeln!(
Expand All @@ -136,10 +141,22 @@ impl PRTConfig {
PersistentStateAccess::new(&self.state_dir)
}

pub async fn provider(&self) -> DynProvider {
create_provider(&self.ethereum_gateway, self.chain_id, &self.signer)
.await
.1
}

async fn _setup() -> (Self, PersistentStateAccess) {
let args = PRTArgs::parse();

let (signer_address, provider) = create_provider(&args).await;
let chain_id = args
.web3_chain_id
.try_into()
.expect("fail to convert chain id");

let (signer_address, provider) =
create_provider(&args.web3_rpc_url, chain_id, &args.signer).await;
let address_book = AddressBook::new(args.app_address, &provider).await;

let mut state_manager = PersistentStateAccess::migrate(
Expand All @@ -164,10 +181,11 @@ impl PRTConfig {
.canonicalize()
.expect("could not canonicalize state directory"),
machine_path: args.machine_path,
chain_id,
signer_address,
provider,
ethereum_gateway: args.web3_rpc_url,
sleep_duration: Duration::from_secs(args.sleep_duration_seconds),
signer: args.signer,
},
state_manager,
)
Expand Down
77 changes: 45 additions & 32 deletions cartesi-rollups/node/cartesi-rollups-prt-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,54 +35,67 @@ macro_rules! notify_all {
}};
}

fn create_runtime(service: &str) -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap_or_else(|e| panic!("`{}` runtime build failure: {e}", service))
}

pub fn create_blockchain_reader_task(
watch: Watch,
parameters: &PRTConfig,
) -> thread::JoinHandle<()> {
let params = parameters.clone();
let inner_watch = watch.clone();

thread::spawn(move || {
let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let state_manager = params.state_access().unwrap();

let blockchain_reader = BlockchainReader::new(
state_manager,
params.provider,
params.address_book,
params.sleep_duration,
);

blockchain_reader
.start(watch.clone())
.inspect_err(|e| error!("{e}"))
}));
let res = std::panic::catch_unwind(|| {
let rt = create_runtime("BlockchainReader");

rt.block_on(async move {
let state_manager = params.state_access().unwrap();
let blockchain_reader = BlockchainReader::new(
state_manager,
params.address_book,
params.sleep_duration,
);

blockchain_reader
.execution_loop(inner_watch, params.provider().await)
.await
})
.inspect_err(|e| error!("{e}"))
});

notify_all!("Blockchain reader", watch, res);
})
}

pub fn create_epoch_manager_task(watch: Watch, parameters: &PRTConfig) -> thread::JoinHandle<()> {
let params = parameters.clone();
let inner_watch = watch.clone();

thread::spawn(move || {
let res = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let arena_sender = EthArenaSender::new(params.provider.clone())
.expect("could not create arena sender");

let state_manager = params.state_access().unwrap();

let epoch_manager = EpochManager::new(
arena_sender,
params.provider,
params.address_book.consensus,
state_manager,
params.sleep_duration,
);

epoch_manager
.start(watch.clone())
.inspect_err(|e| error!("{e}"))
}));
let res = std::panic::catch_unwind(|| {
let rt = create_runtime("EpochManager");
rt.block_on(async move {
let state_manager = params.state_access().unwrap();
let provider = params.provider().await;
let arena_sender =
EthArenaSender::new(provider.clone()).expect("could not create arena sender");

let epoch_manager = EpochManager::new(
arena_sender,
params.address_book.consensus,
state_manager,
params.sleep_duration,
);

epoch_manager.execution_loop(inner_watch, provider).await
})
.inspect_err(|e| error!("{e}"))
});

notify_all!("Epoch manager", watch, res);
})
Expand Down
Loading
0