-
Notifications
You must be signed in to change notification settings - Fork 25
feat(mempool): Mempool payload processing #1367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
d9a72e1
to
f3930e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks like good structure. Is there anything else we have to trigger in mempool? Just sampling right?
@@ -194,4 +195,5 @@ pub struct NomosExecutor { | |||
http: ApiService, | |||
storage: StorageService, | |||
system_sig: SystemSigService, | |||
no_service: NoService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
why is this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If PR looks good, I won't merge it, but remove Noop stuff and add all the stuff required to SignedTxProcessor
, would be great @zeegomo to take a look before this.
@@ -190,4 +191,5 @@ pub struct Nomos { | |||
http: ApiService, | |||
storage: StorageService, | |||
system_sig: SystemSigService, | |||
no_service: NoService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
leftovers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, added this to have fully working PoC without propagating types everywhere in mempool related services - when plugging in the SignedTxProcessor
we'll need to propagate the <Processor>
type. At the moment it's hardcoded here https://github.com/logos-co/nomos/pull/1367/files#diff-ca797e6e2c0f5ada3af86c41728f417cb7b1fd3428b65780ab51ade0e4dac0b7R36-R58
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
man these generics are coming after us
for op in &payload.mantle_tx.ops { | ||
if let Op::Blob(blob_op) = op { | ||
self.sampling_relay | ||
.send(DaSamplingServiceMsg::TriggerSampling { | ||
blob_id: blob_op.blob, | ||
}) | ||
.await | ||
.map_err(|(e, _)| SignedTxProcessorError::sampling_error(e))?; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nipick:
- Filter only blobs
- Map sending the message
- Join all futures (either join or in a futures_unordered)
- Either gather error or fuse from the futures_unordered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this:
let mut all_futures = FuturesUnordered::new();
for op in &payload.mantle_tx.ops {
if let Op::Blob(blob_op) = op {
let fut = async {
self.sampling_relay
.send(DaSamplingServiceMsg::TriggerSampling {
blob_id: blob_op.blob,
})
.await
.map_err(|(e, _)| SignedTxProcessorError::sampling_error(e))
};
all_futures.push(fut);
}
}
let mut errors = Vec::new();
while let Some(result) = all_futures.next().await {
if let Err(e) = result {
errors.push(e);
}
}
if errors.is_empty() {
Ok(())
} else {
Err(errors) // Returns Vec<SignedTxProcessorErrors>
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting below sucks but just to get the idea maybe is enough:
let mut all_futures: FuturesUnordered =
payload.mantle_tx.ops
.iter()
.copied()
.filter_map(|op| if let Op::Blob(blob_op) = op) {self.sampling_relay
.send(DaSamplingServiceMsg::TriggerSampling {
blob_id: blob_op.blob,
})} else {None}).collect();
let results: Result<(), WhateverError> = StreamExt::collect(all_futures).await;
results
For a moment this is for blobs only, but if there's a need in a future, processor implementation could be updated and service types added for mempool to fetch relays. |
6f57083
to
a972360
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, but I feel like we might want to explore some other options rather than generics for everything, 200 lines just to change a type feels a bit overkill
@@ -190,4 +191,5 @@ pub struct Nomos { | |||
http: ApiService, | |||
storage: StorageService, | |||
system_sig: SystemSigService, | |||
no_service: NoService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
man these generics are coming after us
Some((key, item)) = network_items.next() => { | ||
if let Err(e) =processor.process(&item).await { | ||
tracing::debug!("could not process item from network due to: {e:?}"); | ||
continue; | ||
} | ||
if let Err(e) = self.pool.add_item(key, item) { | ||
tracing::debug!("could not add item to the pool due to: {e}"); | ||
}); | ||
continue; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sampling can definitely take a while, we should run sampling concurrently for different txs as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When blob operation is being processed - it only "triggers" the sampling and returns immediately. In here it's assumed that other ops would be checked similarly - if they need to run some background task, processor would trigger it (via service or in a thread) and return imediatly; or - perform some required validation before adding.
Or do you suggest that we should process in parallel and then remove item later if something fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so it does not actually wait for the result of the sampling?
I would add a comment to the process
method to remind implementors of this then
How are you expected to act on the result of the operation, for example if sampling fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are you expected to act on the result of the operation, for example if sampling fails?
In general - processor implementation decides if item can be added to the pool or not.
In case of blob operation - mempool just cares about triggering sampling - but not the result of sampling. This specific implementation returns result of "did sending trigger message to sampling service succeeded".
Why we don't wait for sampling result here?:
- because mempool doesn't use this information;
- sampling result is polled by consensus from sampling service when blob is being validated;
- sampling result is polled by consensus when a new block proposal is being formed;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, why would we keep items in the mempool if they're not valid?
Like, we run sampling, if fails, should we remove it from the mempool entirely instead of waiting for somebody to request its status? What's the rationale for not doing that.
It's not bad per se, I'm just curious to know the reason
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that both solutions would achieve the same goal - not sure how one be better than other apart from that "triggering" solution is already implemented and tested.
Current solution:
- Mempool receives item and triggers sampling
- Sampling service samples multiple subnets for blob
- When consensus is proposing tx with blob ops:
- pulls list of sampled blobs from sampling service;
- pulls txs that contain sampled blobs from mempool;
- When consensus is processing block:
- verifies list of blob ops via sampling service;
- Items that stay in pool for too long gets pruned
Proposed solution (correct if wrong):
- Mempool receives items and triggers sampling
- Sampling service samples multiple subnets for blob
- Instead of consensus, mempool periodically checks if blob sampling succeeded (or receives sampling result via chan)
- When consensus is proposing tx with blob ops:
- pulls any txs from mempool because they are sampled
- When consensus is processing block
- verifies list of blob ops via sampling service
- Only items that are proposed or processed in block are pruned, or some other logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed one is not necessarily better than the other, however
Items that stay in pool for too long gets pruned
How's that decided? Since mempools need to be in sync across nodes to validate blocks, we need to be sure that I'm not deleting something that some leader might include later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How's that decided? Since mempools need to be in sync across nodes to validate blocks
My mistake, at the moment the transactions are not pruned - we only have a service command to prune, but it's not used anywhere, this part will need to be decided and implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think we need to clean the mempool from invalid transactions regardless of other requests but it can be done in another pr.
Imagine a non-leader node or a node with very little stake which is rarely elected, it will never prune invalid txs from its mempool because those will ofc not be broadcast by incoming blocks.
In this specific case, we have an option to not propagate types by keeping mempool generic over what it's already generic about, but setting Processor to SingedTxProcessor directly in the service, just like it's already done with NoOpProcessor: https://github.com/logos-co/nomos/pull/1367/files#diff-ca797e6e2c0f5ada3af86c41728f417cb7b1fd3428b65780ab51ade0e4dac0b7R36-R62 If in future we need have other transaction types other than SignedMantleTx, then we can propagate generics required for processor. |
1. What does this PR implement?
Proposal for enabling mempool to trigger actions according to the received transaction or operations before adding it to the mempool.
NoOp implementation is added to keep current behaviour, an example
SignedTransactionProcessor
contains a potential code that would trigger sampling for a blob.2. Does the code have enough context to be clearly understood?
Preparation to remove DA mempool
3. Who are the specification authors and who is accountable for this PR?
@bacv @danielSanchezQ
4. Is the specification accurate and complete?
N/A
5. Does the implementation introduce changes in the specification?
N/A
Checklist