8000 feat: Implement attestation subnet subscription by vineetpant · Pull Request #616 · ReamLabs/ream · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: Implement attestation subnet subscription #616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/networking/discv5/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ ethereum_ssz_derive.workspace = true
futures.workspace = true
libp2p.workspace = true
rand = "0.8.5"
sha2.workspace = true
ssz_types.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/networking/discv5/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use discv5::{ConfigBuilder, Enr, ListenConfig};

use crate::subnet::{AttestationSubnets, SyncCommitteeSubnets};

#[derive(Clone)]
pub struct DiscoveryConfig {
pub discv5_config: discv5::Config,
pub bootnodes: Vec<Enr>,
Expand Down
35 changes: 25 additions & 10 deletions crates/networking/discv5/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
time::Instant,
};

use anyhow::anyhow;
use anyhow::{Result, anyhow};
use discv5::{
Discv5, Enr,
enr::{CombinedKey, NodeId, k256::ecdsa::SigningKey},
Expand All @@ -21,16 +21,16 @@ use libp2p::{
THandlerOutEvent, ToSwarm, dummy::ConnectionHandler,
},
};
use ream_consensus::constants::genesis_validators_root;
use ream_consensus::{constants::genesis_validators_root, misc::compute_epoch_at_slot};
use tokio::sync::mpsc;
use tracing::{error, info, warn};

use crate::{
config::DiscoveryConfig,
eth2::{ENR_ETH2_KEY, EnrForkId},
subnet::{
ATTESTATION_BITFIELD_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY,
attestation_subnet_predicate, sync_committee_subnet_predicate,
ATTESTATION_BITFIELD_ENR_KEY, AttestationSubnets, SYNC_COMMITTEE_BITFIELD_ENR_KEY,
attestation_subnet_predicate, compute_subscribed_subnets, sync_committee_subnet_predicate,
},
};

Expand Down Expand Up @@ -69,7 +69,11 @@ pub struct Discovery {
}

impl Discovery {
pub async fn new(local_key: Keypair, config: &DiscoveryConfig) -> anyhow::Result<Self> {
pub async fn new(
local_key: Keypair,
config: &DiscoveryConfig,
current_slot: u64,
) -> anyhow::Result<Self> {
let enr_local =
convert_to_enr(local_key).map_err(|err| anyhow!("Failed to convert key: {err:?}"))?;

Expand Down Expand Up @@ -101,7 +105,18 @@ impl Discovery {
}
if let Err(err) = discv5.add_enr(enr) {
error!("Failed to add bootnode to Discv5 {err:?}");
};
}
}

// Compute and set attestation subnets
let subnets =
compute_subscribed_subnets(enr.node_id(), compute_epoch_at_slot(current_slot))?;
let mut config = config.clone();
config.attestation_subnets = AttestationSubnets::new();
for subnet_id in subnets {
config
.attestation_subnets
.enable_attestation_subnet(subnet_id)?;
}
Comment on lines +111 to 120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is currently not doing anything. could you make a function called update_attestation_subnets, which takes in a slot and updates the nodes enr, instead of doing this in the new() function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, update function seems required here, i didn't use it as i remember from my last PR i removed update function , Should i call it from poll function or it will be called from consensus when the epoch changes


let event_stream = if !config.disable_discovery {
Expand Down Expand Up @@ -358,7 +373,7 @@ mod tests {
config.attestation_subnets.disable_attestation_subnet(1)?; // Set subnet 1
config.disable_discovery = true;

let discovery = Discovery::new(key, &config).await.unwrap();
let discovery = Discovery::new(key, &config, 0).await.unwrap();
let enr: &discv5::enr::Enr<CombinedKey> = discovery.local_enr();
// Check ENR reflects config.subnets
let enr_subnets = enr
Expand All @@ -378,7 +393,7 @@ mod tests {
config.attestation_subnets.disable_attestation_subnet(1)?;
config.disable_discovery = true;

let discovery = Discovery::new(key, &config).await.unwrap();
let discovery = Discovery::new(key, &config, 0).await.unwrap();
let local_enr = discovery.local_enr();

// Predicate for subnet 0 should match
Expand Down Expand Up @@ -407,7 +422,7 @@ mod tests {

config.attestation_subnets.enable_attestation_subnet(0)?; // Local node on subnet 0
config.disable_discovery = false;
let mut discovery = Discovery::new(key, &config).await.unwrap();
let mut discovery = Discovery::new(key, &config, 0).await.unwrap();

// Simulate a peer with another Discovery instance
let peer_key = Keypair::generate_secp256k1();
Expand All @@ -426,7 +441,7 @@ mod tests {
peer_config.socket_port = 9001; // Different port
peer_config.disable_discovery = true;

let peer_discovery = Discovery::new(peer_key, &peer_config).await.unwrap();
let peer_discovery = Discovery::new(peer_key, &peer_config, 0).await.unwrap();
let peer_enr = peer_discovery.local_enr().clone();

// Add peer to discv5
Expand Down
84 changes: 83 additions & 1 deletion crates/networking/discv5/src/subnet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use alloy_primitives::B256;
use alloy_rlp::{BufMut, Decodable, Encodable, bytes::Bytes};
use anyhow::{anyhow, ensure};
use discv5::Enr;
use discv5::{Enr, enr::NodeId};
use ream_consensus::misc::compute_shuffled_index;
use sha2::{Digest, Sha256};
use ssz::Encode;
use ssz_types::{
BitVector,
Expand All @@ -13,6 +16,11 @@ pub const ATTESTATION_SUBNET_COUNT: usize = 64;
pub const SYNC_COMMITTEE_BITFIELD_ENR_KEY: &str = "syncnets";
pub const SYNC_COMMITTEE_SUBNET_COUNT: usize = 4;

// Subscription constants
const SUBNETS_PER_NODE: usize = 2;
pub const EPOCHS_PER_SUBNET_SUBSCRIPTION: u64 = 256;
const ATTESTATION_SUBNET_PREFIX_BITS: u32 = 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be 6? ATTESTATION_SUBNET_EXTRA_BITS = 0

ceillog2(ATTESTATION_SUBNET_COUNT) = 6


/// Represents the attestation subnets a node is subscribed to
///
/// This directly wraps a BitVector<U64> for the attestation subnet bitfield
Expand Down Expand Up @@ -137,6 +145,38 @@ impl Decodable for SyncCommitteeSubnets {
}
}

/// Compute a single subscribed subnet based on node_id, epoch, and index
pub fn compute_subscribed_subnet(node_id: NodeId, epoch: u64, index: usize) -> anyhow::Result<u8> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn compute_subscribed_subnet(node_id: NodeId, epoch: u64, index: usize) -> anyhow::Result<u8> {
pub fn compute_subscribed_subnet(node_id: NodeId, epoch: u64, index: usize) -> anyhow::Result<u64> {

SubnetID is supposed to be a u64 not a u8

// Extract prefix from first 8 bytes of node_id
let mut node_id_prefix_bytes = [0u8; 8];
node_id_prefix_bytes.copy_from_slice(&node_id.raw()[..8]);
let node_id_prefix =
u64::from_be_bytes(node_id_prefix_bytes) >> (64 - ATTESTATION_SUBNET_PREFIX_BITS);
Comment on lines +150 to +154
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Extract prefix from first 8 bytes of node_id
let mut node_id_prefix_bytes = [0u8; 8];
node_id_prefix_bytes.copy_from_slice(&node_id.raw()[..8]);
let node_id_prefix =
u64::from_be_bytes(node_id_prefix_bytes) >> (64 - ATTESTATION_SUBNET_PREFIX_BITS);
// Extract prefix from first 8 bytes of node_id
let mut node_id_prefix_bytes = [0u8; 8];
node_id_prefix_bytes.copy_from_slice(&node_id.raw()[..8]);
let node_id_prefix =
u64::from_be_bytes(node_id_prefix_bytes) >> (NODE_ID_BITS - ATTESTATION_SUBNET_PREFIX_BITS);

we should be using B256 here no? instead of of converting this to a u64


let mut node_offset_bytes = [0u8; 8];
node_offset_bytes.copy_from_slice(&node_id.raw()[24..32]);
let node_offset = u64::from_be_bytes(node_offset_bytes) % EPOCHS_PER_SUBNET_SUBSCRIPTION;
Comment on lines +156 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut node_offset_bytes = [0u8; 8];
node_offset_bytes.copy_from_slice(&node_id.raw()[24..32]);
let node_offset = u64::from_be_bytes(node_offset_bytes) % EPOCHS_PER_SUBNET_SUBSCRIPTION;
let mut node_offset_bytes = [0u8; 8];
node_offset_bytes.copy_from_slice(&node_id.raw()[24..32]);
let node_offset = u64::from_be_bytes(node_offset_bytes) % EPOCHS_PER_SUBNET_SUBSCRIPTION;

same comment here we should be doing B256 % B256, NodeId is a B256


let permutation_seed =
Sha256::digest(((epoch + node_offset) / EPOCHS_PER_SUBNET_SUBSCRIPTION).to_le_bytes());

let permutated_prefix = compute_shuffled_index(
node_id_prefix as usize,
1 << ATTESTATION_SUBNET_PREFIX_BITS,
B256::from_slice(permutation_seed.as_slice()),
)?;
Ok(((permutated_prefix + index) % ATTESTATION_SUBNET_COUNT) as u8)
}
Comment on lines +148 to +169
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyways let me know about what I said above, I might be missing a few insights let me know


/// Compute all subscribed subnets for a node
pub fn compute_subscribed_subnets(node_id: NodeId, epoch: u64) -> anyhow::Result<Vec<u8>> {
(0..SUBNETS_PER_NODE).try_fold(Vec::new(), |mut acc, i| {
let subnet = compute_subscribed_subnet(node_id, epoch, i)?;
acc.push(subnet);
Ok(acc)
})
}

pub fn attestation_subnet_predicate(subnets: Vec<u8>) -> impl Fn(&Enr) -> bool + Send + Sync {
move |enr: &Enr| {
if subnets.is_empty() {
Expand Down Expand Up @@ -490,4 +530,46 @@ mod tests {
};
assert!(!combined_subnet_predicate_fn(&enr));
}

#[test]
fn test_compute_subscribed_subnet() {
let node_id = NodeId::random();
let epoch = 1000;
let index = 0;

// Test valid subnet
let subnet = compute_subscribed_subnet(node_id, epoch, index).unwrap();
assert!(
subnet < ATTESTATION_SUBNET_COUNT as u8,
"Subnet ID out of bounds: {}",
subnet
);

// Test determinism
let subnet_same = compute_subscribed_subnet(node_id, epoch, index).unwrap();
assert_eq!(subnet, subnet_same, "Non-deterministic subnet");

// Test different epoch
let subnet_diff = compute_subscribed_subnet(node_id, epoch + 256, index).unwrap();
// Subnets may differ after 256 epochs due to seed change
if subnet == subnet_diff {
println!("Note: Same subnet for different epochs (possible but rare)");
}

// Test index
let subnet_index1 = compute_subscribed_subnet(node_id, epoch, 1).unwrap();
// Subnets may be same or different (spec allows either)
assert!(
subnet_index1 < ATTESTATION_SUBNET_COUNT as u8,
"Subnet ID for index 1 out of bounds: {}",
subnet_index1
);

// Test edge cases
let subnet_epoch_zero = compute_subscribed_subnet(node_id, 0, index).unwrap();
assert!(
subnet_epoch_zero < ATTESTATION_SUBNET_COUNT as u8,
"Subnet ID for epoch 0 out of bounds"
);
}
}
8 changes: 6 additions & 2 deletions crates/networking/p2p/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,12 @@ impl Network {
let local_key = secp256k1::Keypair::generate();

let discovery = {
let mut discovery =
Discovery::new(Keypair::from(local_key.clone()), &config.discv5_config).await?;
let mut discovery = Discovery::new(
Keypair::from(local_key.clone()),
&config.discv5_config,
status.head_slot,
)
.await?;
discovery.discover_peers(QueryType::Peers, 16);
discovery
};
Expand Down
Loading
0