8000 v2.0: Marks old storages as dirty and uncleaned in clean_accounts() (backport of #3737) by mergify[bot] · Pull Request #3747 · anza-xyz/agave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

v2.0: Marks old storages as dirty and uncleaned in clean_accounts() (backport of #3737) #3747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 130 additions & 50 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3050,11 +3050,30 @@ impl AccountsDb {
last_full_snapshot_slot: Option<Slot>,
timings: &mut CleanKeyTimings,
epoch_schedule: &EpochSchedule,
old_storages_policy: OldStoragesPolicy,
) -> (Vec<Pubkey>, Option<Slot>) {
let oldest_non_ancient_slot = self.get_oldest_non_ancient_slot(epoch_schedule);
let mut dirty_store_processing_time = Measure::start("dirty_store_processing");
let max_slot_inclusive =
max_clean_root_inclusive.unwrap_or_else(|| self.accounts_index.max_root_inclusive());
let max_root_inclusive = self.accounts_index.max_root_inclusive();
let max_slot_inclusive = max_clean_root_inclusive.unwrap_or(max_root_inclusive);

if old_storages_policy == OldStoragesPolicy::Clean {
let slot_one_epoch_old =
max_root_inclusive.saturating_sub(epoch_schedule.slots_per_epoch);
// do nothing special for these 100 old storages that will likely get cleaned up shortly
let acceptable_straggler_slot_count = 100;
let old_slot_cutoff =
slot_one_epoch_old.saturating_sub(acceptable_straggler_slot_count);
let (old_storages, old_slots) = self.get_snapshot_storages(..old_slot_cutoff);
let num_old_storages = old_storages.len();
self.accounts_index
.add_uncleaned_roots(old_slots.iter().copied());
for (old_slot, old_storage) in std::iter::zip(old_slots, old_storages) {
self.dirty_stores.entry(old_slot).or_insert(old_storage);
}
info!("Marked {num_old_storages} old storages as dirty");
}

let mut dirty_stores = Vec::with_capacity(self.dirty_stores.len());
// find the oldest dirty slot
// we'll add logging if that append vec cannot be marked dead
Expand Down Expand Up @@ -3160,7 +3179,17 @@ impl AccountsDb {

/// Call clean_accounts() with the common parameters that tests/benches use.
pub fn clean_accounts_for_tests(&self) {
self.clean_accounts(None, false, None, &EpochSchedule::default())
self.clean_accounts(
None,
false,
None,
&EpochSchedule::default(),
if self.ancient_append_vec_offset.is_some() {
OldStoragesPolicy::Leave
} else {
OldStoragesPolicy::Clean
},
)
}

/// called with cli argument to verify refcounts are correct on all accounts
Expand Down Expand Up @@ -3244,6 +3273,7 @@ impl AccountsDb {
is_startup: bool,
last_full_snapshot_slot: Option<Slot>,
epoch_schedule: &EpochSchedule,
old_storages_policy: OldStoragesPolicy,
) {
if self.exhaustively_verify_refcounts {
self.exhaustively_verify_refcounts(max_clean_root_inclusive);
Expand All @@ -3265,6 +3295,7 @@ impl AccountsDb {
last_full_snapshot_slot,
&mut key_timings,
epoch_schedule,
old_storages_policy,
);

let mut sort = Measure::start("sort");
Expand Down Expand Up @@ -4882,6 +4913,10 @@ impl AccountsDb {
let _guard = self.active_stats.activate(ActiveStatItem::Shrink);
const DIRTY_STORES_CLEANING_THRESHOLD: usize = 10_000;
const OUTER_CHUNK_SIZE: usize = 2000;
// Leave any old storages alone for now. Once the validator is running
// normal, calls to clean_accounts() will have the correct policy based
// on if ancient storages are enabled or not.
let old_storages_policy = OldStoragesPolicy::Leave;
if is_startup {
let slots = self.all_slots_in_storage();
let threads = num_cpus::get();
Expand All @@ -4893,14 +4928,26 @@ impl AccountsDb {
}
});
if self.dirty_stores.len() > DIRTY_STORES_CLEANING_THRESHOLD {
self.clean_accounts(None, is_startup, last_full_snapshot_slot, epoch_schedule);
self.clean_accounts(
None,
is_startup,
last_full_snapshot_slot,
epoch_schedule,
old_storages_policy,
);
}
});
} else {
for slot in self.all_slots_in_storage() {
self.shrink_slot_forced(slot);
if self.dirty_stores.len() > DIRTY_STORES_CLEANING_THRESHOLD {
self.clean_accounts(None, is_startup, last_full_snapshot_slot, epoch_schedule);
self.clean_accounts(
None,
is_startup,
last_full_snapshot_slot,
epoch_schedule,
old_storages_policy,
);
}
}
}
Expand Down Expand Up @@ -7177,40 +7224,6 @@ impl AccountsDb {
.collect()
}

/// storages are sorted by slot and have range info.
/// add all stores older than slots_per_epoch to dirty_stores so clean visits these slots
fn mark_old_slots_as_dirty(
&self,
storages: &SortedStorages,
slots_per_epoch: Slot,
stats: &mut crate::accounts_hash::HashStats,
) {
// Nothing to do if ancient append vecs are enabled.
// Ancient slots will be visited by the ancient append vec code and dealt with correctly.
// we expect these ancient append vecs to be old and keeping accounts
// We can expect the normal processes will keep them cleaned.
// If we included them here then ALL accounts in ALL ancient append vecs will be visited by clean each time.
if self.ancient_append_vec_offset.is_some() {
return;
}

let mut mark_time = Measure::start("mark_time");
let mut num_dirty_slots: usize = 0;
let max = storages.max_slot_inclusive();
let acceptable_straggler_slot_count = 100; // do nothing special for these old stores which will likely get cleaned up shortly
let sub = slots_per_epoch + acceptable_straggler_slot_count;
let in_epoch_range_start = max.saturating_sub(sub);
for (slot, storage) in storages.iter_range(&(..in_epoch_range_start)) {
if let Some(storage) = storage {
self.dirty_stores.insert(slot, storage.clone());
num_dirty_slots += 1;
}
}
mark_time.stop();
stats.mark_time_us = mark_time.as_us();
stats.num_dirty_slots = num_dirty_slots;
}

pub fn calculate_accounts_hash_from(
&self,
data_source: CalcAccountsHashDataSource,
Expand Down Expand Up @@ -7583,8 +7596,6 @@ impl AccountsDb {
let storages_start_slot = storages.range().start;
stats.oldest_root = storages_start_slot;

self.mark_old_slots_as_dirty(storages, config.epoch_schedule.slots_per_epoch, &mut stats);

let slot = storages.max_slot_inclusive();
let use_bg_thread_pool = config.use_bg_thread_pool;
let accounts_hash_cache_path = self.accounts_hash_cache_path.clone();
Expand Down Expand Up @@ -9435,6 +9446,20 @@ pub(crate) enum UpdateIndexThreadSelection {
PoolWithThreshold,
}

/// How should old storages be handled in clean_accounts()?
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum OldStoragesPolicy {
/// Clean all old storages, even if they were not explictly marked as dirty.
///
/// This is the default behavior when not skipping rewrites.
Clean,
/// Leave all old storages.
///
/// When skipping rewrites, we intentionally will have ancient storages.
/// Do not clean them up automatically in clean_accounts().
Leave,
}

// These functions/fields are only usable from a dev context (i.e. tests and benches)
#[cfg(feature = "dev-context-only-utils")]
impl AccountStorageEntry {
Expand Down Expand Up @@ -12048,13 +12073,25 @@ pub mod tests {
// updates in later slots in slot 1
assert_eq!(accounts.alive_account_count_in_slot(0), 1);
assert_eq!(accounts.alive_account_count_in_slot(1), 1);
accounts.clean_accounts(Some(0), false, None, &EpochSchedule::default());
accounts.clean_accounts(
Some(0),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts.alive_account_count_in_slot(0), 1);
assert_eq!(accounts.alive_account_count_in_slot(1), 1);
assert!(accounts.accounts_index.contains_with(&pubkey, None, None));

// Now the account can be cleaned up
accounts.clean_accounts(Some(1), false, None, &EpochSchedule::default());
accounts.clean_accounts(
Some(1),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts.alive_account_count_in_slot(0), 0);
assert_eq!(accounts.alive_account_count_in_slot(1), 0);

Expand Down Expand Up @@ -13536,7 +13573,13 @@ pub mod tests {
db.add_root_and_flush_write_cache(1);

// Only clean zero lamport accounts up to slot 0
db.clean_accounts(Some(0), false, None, &EpochSchedule::default());
db.clean_accounts(
Some(0),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// Should still be able to find zero lamport account in slot 1
assert_eq!(
Expand Down Expand Up @@ -14689,7 +14732,13 @@ pub mod tests {
db.calculate_accounts_delta_hash(1);

// Clean to remove outdated entry from slot 0
db.clean_accounts(Some(1), false, None, &EpochSchedule::default());
db.clean_accounts(
Some(1),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// Shrink Slot 0
{
Expand All @@ -14708,7 +14757,13 @@ pub mod tests {
// Should be one store before clean for slot 0
db.get_and_assert_single_storage(0);
db.calculate_accounts_delta_hash(2);
db.clean_accounts(Some(2), false, None, &EpochSchedule::default());
db.clean_accounts(
Some(2),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);

// No stores should exist for slot 0 after clean
assert_no_storages_at_slot(&db, 0);
Expand Down Expand Up @@ -15587,13 +15642,31 @@ pub mod tests {

assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 3);

accounts_db.clean_accounts(Some(slot2), false, Some(slot2), &EpochSchedule::default());
accounts_db.clean_accounts(
Some(slot2),
false,
Some(slot2),
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 2);

accounts_db.clean_accounts(None, false, Some(slot2), &EpochSchedule::default());
accounts_db.clean_accounts(
None,
false,
Some(slot2),
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 1);

accounts_db.clean_accounts(None, false, Some(slot3), &EpochSchedule::default());
accounts_db.clean_accounts(
None,
false,
Some(slot3),
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
assert_eq!(accounts_db.ref_count_for_pubkey(&pubkey), 0);
}
);
Expand Down Expand Up @@ -17902,7 +17975,13 @@ pub mod tests {

// calculate the full accounts hash
let full_accounts_hash = {
accounts_db.clean_accounts(Some(slot - 1), false, None, &EpochSchedule::default());
accounts_db.clean_accounts(
Some(slot - 1),
false,
None,
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
let (storages, _) = accounts_db.get_snapshot_storages(..=slot);
let storages = SortedStorages::new(&storages);
accounts_db.calculate_accounts_hash(
Expand Down Expand Up @@ -17972,6 +18051,7 @@ pub mod tests {
false,
Some(full_accounts_hash_slot),
&EpochSchedule::default(),
OldStoragesPolicy::Leave,
);
let (storages, _) =
accounts_db.get_snapshot_storages(full_accounts_hash_slot + 1..=slot);
Expand Down
33 changes: 25 additions & 8 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ use {
accounts::{AccountAddressFilter, Accounts, PubkeyAccountSlot},
accounts_db::{
AccountShrinkThreshold, AccountStorageEntry, AccountsDb, AccountsDbConfig,
CalcAccountsHashDataSource, PubkeyHashAccount, VerifyAccountsHashAndLamportsConfig,
CalcAccountsHashDataSource, OldStoragesPolicy, PubkeyHashAccount,
VerifyAccountsHashAndLamportsConfig,
},
accounts_hash::{
AccountHash, AccountsHash, CalcAccountsHashConfig, HashStats, IncrementalAccountsHash,
Expand Down Expand Up @@ -5944,6 +5945,7 @@ impl Bank {
true,
Some(last_full_snapshot_slot),
self.epoch_schedule(),
self.clean_accounts_old_storages_policy(),
);
info!("Cleaning... Done.");
} else {
Expand Down Expand Up @@ -6285,6 +6287,7 @@ impl Bank {
false,
last_full_snapshot_slot,
self.epoch_schedule(),
self.clean_accounts_old_storages_policy(),
);
}

Expand All @@ -6300,23 +6303,37 @@ impl Bank {
}

pub(crate) fn shrink_ancient_slots(&self) {
let can_skip_rewrites = self.bank_hash_skips_rent_rewrites();
let test_skip_rewrites_but_include_in_bank_hash = self
.rc
.accounts
.accounts_db
.test_skip_rewrites_but_include_in_bank_hash;
// Invoke ancient slot shrinking only when the validator is
// explicitly configured to do so. This condition may be
// removed when the skip rewrites feature is enabled.
if can_skip_rewrites || test_skip_rewrites_but_include_in_bank_hash {
if self.are_ancient_storages_enabled() {
self.rc
.accounts
.accounts_db
.shrink_ancient_slots(self.epoch_schedule())
}
}

/// Returns if ancient storages are enabled or not
pub fn are_ancient_storages_enabled(&self) -> bool {
let can_skip_rewrites = self.bank_hash_skips_rent_rewrites();
let test_skip_rewrites_but_include_in_bank_hash = self
.rc
.accounts
.accounts_db
.test_skip_rewrites_but_include_in_bank_hash;
can_skip_rewrites || test_skip_rewrites_but_include_in_bank_hash
}

/// Returns how clean_accounts() should handle old storages
fn clean_accounts_old_storages_policy(&self) -> OldStoragesPolicy {
if self.are_ancient_storages_enabled() {
OldStoragesPolicy::Leave
} else {
OldStoragesPolicy::Clean
}
}

pub fn validate_fee_collector_account(&self) -> bool {
self.feature_set
.is_active(&feature_set::validate_fee_collector_account::id())
Expand Down
0