-
Notifications
You must be signed in to change notification settings - Fork 25
feat(mempool): Mempool payload processing #1367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,7 @@ pub use nomos_system_sig::SystemSig; | |
use nomos_time::backends::NtpTimeBackend; | ||
#[cfg(feature = "tracing")] | ||
pub use nomos_tracing_service::Tracing; | ||
use nomos_utils::noop_service::NoService; | ||
use overwatch::derive_services; | ||
use rand_chacha::ChaCha20Rng; | ||
use serde::{de::DeserializeOwned, Serialize}; | ||
|
@@ -241,4 +242,5 @@ pub struct Nomos { | |
http: ApiService, | ||
storage: StorageService, | ||
system_sig: SystemSigService, | ||
no_service: NoService, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. leftovers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, added this to have fully working PoC without propagating types everywhere in mempool related services - when plugging in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. man these generics are coming after us |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,3 @@ | ||
mod gas; | ||
mod ops; | ||
pub mod ops; | ||
pub mod tx; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
pub mod noop; | ||
pub mod tx; | ||
|
||
use std::error::Error; | ||
|
||
use overwatch::services::{relay::OutboundRelay, ServiceData}; | ||
|
||
#[async_trait::async_trait] | ||
pub trait PayloadProcessor { | ||
type Payload; | ||
type Settings; | ||
type Error: Error; | ||
|
||
type DaSamplingService: ServiceData; | ||
|
||
fn new( | ||
settings: Self::Settings, | ||
outbound_relay: OutboundRelay<<Self::DaSamplingService as ServiceData>::Message>, | ||
) -> Self; | ||
|
||
/// Executes required procedures before adding payload to the pool. | ||
async fn process(&self, payload: &Self::Payload) -> Result<(), Vec<Self::Error>>; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
use std::{convert::Infallible, marker::PhantomData}; | ||
|
||
use overwatch::services::{relay::OutboundRelay, ServiceData}; | ||
|
||
use super::PayloadProcessor; | ||
|
||
pub type NoOpPayloadProcessor<Service, Payload> = PhantomData<(Service, Payload)>; | ||
|
||
#[async_trait::async_trait] | ||
impl<Service, Payload> PayloadProcessor for NoOpPayloadProcessor<Service, Payload> | ||
where | ||
Payload: Send + Sync, | ||
Service: ServiceData + Send + Sync, | ||
{ | ||
type Payload = Payload; | ||
type Settings = (); | ||
type Error = Infallible; | ||
|
||
type DaSamplingService = Service; | ||
|
||
fn new( | ||
(): Self::Settings, | ||
_: OutboundRelay<<Self::DaSamplingService as ServiceData>::Message>, | ||
) -> Self { | ||
Self | ||
} | ||
|
||
async fn process(&self, _: &Self::Payload) -> Result<(), Vec<Self::Error>> { | ||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
use std::error::Error; | ||
|
||
use futures::{stream::FuturesUnordered, StreamExt}; | ||
use nomos_core::da::BlobId; | ||
use nomos_da_sampling::DaSamplingServiceMsg; | ||
use nomos_mantle_core::{ops::Op, tx::SignedMantleTx}; | ||
use overwatch::services::{relay::OutboundRelay, ServiceData}; | ||
|
||
use super::PayloadProcessor; | ||
|
||
#[derive(thiserror::Error, Debug)] | ||
pub enum SignedTxProcessorError { | ||
#[error("Error from sampling relay {0}")] | ||
Sampling(Box<dyn Error + Send>), | ||
} | ||
|
||
impl SignedTxProcessorError { | ||
fn sampling_error(err: impl Error + Send + 'static) -> Self { | ||
Self::Sampling(Box::new(err)) | ||
} | ||
} | ||
|
||
pub struct SignedTxProcessor<SamplingService> | ||
where | ||
SamplingService: ServiceData, | ||
{ | ||
sampling_relay: OutboundRelay<<SamplingService as ServiceData>::Message>, | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl<SamplingService> PayloadProcessor for SignedTxProcessor<SamplingService> | ||
where | ||
SamplingService: ServiceData<Message = DaSamplingServiceMsg<BlobId>> + Send + Sync, | ||
{ | ||
type Payload = SignedMantleTx; | ||
type Settings = (); | ||
type Error = SignedTxProcessorError; | ||
|
||
type DaSamplingService = SamplingService; | ||
|
||
fn new( | ||
(): Self::Settings, | ||
sampling_relay: OutboundRelay<<Self::DaSamplingService as ServiceData>::Message>, | ||
) -> Self { | ||
Self { sampling_relay } | ||
} | ||
|
||
async fn process(&self, payload: &Self::Payload) -> Result<(), Vec<Self::Error>> { | ||
let all_futures: FuturesUnordered<_> = payload | ||
.mantle_tx | ||
.ops | ||
.iter() | ||
.filter_map(|op| { | ||
if let Op::Blob(blob_op) = op { | ||
Some(async { | ||
self.sampling_relay | ||
.send(DaSamplingServiceMsg::TriggerSampling { | ||
blob_id: blob_op.blob, | ||
}) | ||
.await | ||
.map_err(|(e, _)| SignedTxProcessorError::sampling_error(e)) | ||
}) | ||
} else { | ||
None | ||
} | ||
}) | ||
.collect(); | ||
|
||
let errors: Vec<SignedTxProcessorError> = StreamExt::collect::<Vec<_>>(all_futures) | ||
.await | ||
.into_iter() | ||
.filter_map(Result::err) | ||
.collect(); | ||
|
||
if errors.is_empty() { | ||
Ok(()) | ||
} else { | ||
Err(errors) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
why is this?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If PR looks good, I won't merge it, but remove Noop stuff and add all the stuff required to
SignedTxProcessor
, would be great @zeegomo to take a look before this.