From 34c3591554e162c888f4abd48e67e48573593881 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 28 May 2025 15:57:10 +0200 Subject: [PATCH 01/27] adds config option for enhanced squelching --- src/xrpld/core/Config.h | 4 ++++ src/xrpld/core/detail/Config.cpp | 3 +++ 2 files changed, 7 insertions(+) diff --git a/src/xrpld/core/Config.h b/src/xrpld/core/Config.h index a58867958b1..db41032d1d2 100644 --- a/src/xrpld/core/Config.h +++ b/src/xrpld/core/Config.h @@ -253,6 +253,10 @@ class Config : public BasicConfig // made the default routing algorithm // std::size_t VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 5; ///////////////// END OF TEMPORARY CODE BLOCK ///////////////////// + // Reduce-relay - these parameters are experimental. + + // Enable enhanced squelching of unique untrusted validator messages + bool VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = false; // Transaction reduce-relay feature bool TX_REDUCE_RELAY_ENABLE = false; diff --git a/src/xrpld/core/detail/Config.cpp b/src/xrpld/core/detail/Config.cpp index 1a07109b741..9ad0bf69f09 100644 --- a/src/xrpld/core/detail/Config.cpp +++ b/src/xrpld/core/detail/Config.cpp @@ -775,6 +775,9 @@ Config::loadFromString(std::string const& fileContents) "greater than or equal to 3"); ///////////////// !!END OF TEMPORARY CODE BLOCK!! ///////////////////// + VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = + sec.value_or("vp_enhanced_squelch_enable", false); + TX_REDUCE_RELAY_ENABLE = sec.value_or("tx_enable", false); TX_REDUCE_RELAY_METRICS = sec.value_or("tx_metrics", false); TX_REDUCE_RELAY_MIN_PEERS = sec.value_or("tx_min_peers", 20); From 69aec23e1b3b31c699898511e247711d8a34de35 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 28 May 2025 16:09:23 +0200 Subject: [PATCH 02/27] adds isTrusted parameter to updateSlotAndSquelch method to differentiate messages from trusted validators --- src/xrpld/overlay/Slot.h | 46 +++++++++-------- src/xrpld/overlay/detail/OverlayImpl.cpp | 35 ++++++++----- src/xrpld/overlay/detail/OverlayImpl.h | 6 +-- src/xrpld/overlay/detail/PeerImp.cpp | 65 +++++++++++------------- 4 files changed, 81 insertions(+), 71 deletions(-) diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 0956eb06f76..10b18b24a0c 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -121,13 +121,15 @@ class Slot final Slot( SquelchHandler const& handler, beast::Journal journal, - uint16_t maxSelectedPeers) + uint16_t maxSelectedPeers, + bool isTrusted) : reachedThreshold_(0) , lastSelected_(clock_type::now()) , state_(SlotState::Counting) , handler_(handler) , journal_(journal) , maxSelectedPeers_(maxSelectedPeers) + , isTrusted_(isTrusted) { } @@ -139,22 +141,19 @@ class Slot final * MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. If the * number of considered peers who reached MAX_MESSAGE_THRESHOLD is * maxSelectedPeers_ then randomly select maxSelectedPeers_ from - * considered peers, and call squelch handler for each peer, which is not - * selected and not already in Squelched state. Set the state for those - * peers to Squelched and reset the count of all peers. Set slot's state to - * Selected. Message count is not updated when the slot is in Selected - * state. + * considered peers, and call squelch handler for each peer, which is + * not selected and not already in Squelched state. Set the state for + * those peers to Squelched and reset the count of all peers. Set slot's + * state to Selected. Message count is not updated when the slot is in + * Selected state. * @param validator Public key of the source validator * @param id Peer id which received the message - * @param type Message type (Validation and Propose Set only, - * others are ignored, future use) * @param callback A callback to report ignored squelches */ void update( PublicKey const& validator, id_t id, - protocol::MessageType type, ignored_squelch_callback callback); /** Handle peer deletion when a peer disconnects. @@ -257,6 +256,9 @@ class Slot final // the maximum number of peers that should be selected as a validator // message source uint16_t const maxSelectedPeers_; + + // indicate if the slot is for a trusted validator + bool const isTrusted_; }; template @@ -287,7 +289,6 @@ void Slot::update( PublicKey const& validator, id_t id, - protocol::MessageType type, ignored_squelch_callback callback) { using namespace std::chrono; @@ -321,8 +322,7 @@ Slot::update( << " slot state " << static_cast(state_) << " peer state " << static_cast(peer.state) << " count " << peer.count << " last " << duration_cast(now - peer.lastMessage).count() - << " pool " << considered_.size() << " threshold " << reachedThreshold_ - << " " << (type == protocol::mtVALIDATION ? "validation" : "proposal"); + << " pool " << considered_.size() << " threshold " << reachedThreshold_; peer.lastMessage = now; @@ -605,37 +605,39 @@ class Slots final return reduceRelayReady_; } - /** Calls Slot::update of Slot associated with the validator, with a noop - * callback. + /** Calls Slot::update of Slot associated with the validator, with a + * noop callback. * @param key Message's hash * @param validator Validator's public key * @param id Peer's id which received the message - * @param type Received protocol message type + * @param isTrusted Boolean to indicate if the message is from a trusted + * validator */ void updateSlotAndSquelch( uint256 const& key, PublicKey const& validator, id_t id, - protocol::MessageType type) + bool isTrusted) { - updateSlotAndSquelch(key, validator, id, type, []() {}); + updateSlotAndSquelch(key, validator, id, []() {}, isTrusted); } /** Calls Slot::update of Slot associated with the validator. * @param key Message's hash * @param validator Validator's public key * @param id Peer's id which received the message - * @param type Received protocol message type * @param callback A callback to report ignored validations + * @param isTrusted Boolean to indicate if the message is from a trusted + * validator */ void updateSlotAndSquelch( uint256 const& key, PublicKey const& validator, id_t id, - protocol::MessageType type, - typename Slot::ignored_squelch_callback callback); + typename Slot::ignored_squelch_callback callback, + bool isTrusted); /** Check if peers stopped relaying messages * and if slots stopped receiving messages from the validator. @@ -780,8 +782,8 @@ Slots::updateSlotAndSquelch( uint256 const& key, PublicKey const& validator, id_t id, - protocol::MessageType type, - typename Slot::ignored_squelch_callback callback) + typename Slot::ignored_squelch_callback callback, + bool isTrusted) { if (!addPeerMessage(key, id)) return; diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index 3cc5b2a0242..ae0fb2c4c09 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -1415,7 +1415,7 @@ OverlayImpl::updateSlotAndSquelch( uint256 const& key, PublicKey const& validator, std::set&& peers, - protocol::MessageType type) + bool isTrusted) { if (!slots_.baseSquelchReady()) return; @@ -1423,14 +1423,22 @@ OverlayImpl::updateSlotAndSquelch( if (!strand_.running_in_this_thread()) return post( strand_, - [this, key, validator, peers = std::move(peers), type]() mutable { - updateSlotAndSquelch(key, validator, std::move(peers), type); + [this, + key, + validator, + peers = std::move(peers), + isTrusted]() mutable { + updateSlotAndSquelch( + key, validator, std::move(peers), isTrusted); }); for (auto id : peers) - slots_.updateSlotAndSquelch(key, validator, id, type, [&]() { - reportInboundTraffic(TrafficCount::squelch_ignored, 0); - }); + slots_.updateSlotAndSquelch( + key, + validator, + id, + [&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); }, + isTrusted); } void @@ -1438,19 +1446,22 @@ OverlayImpl::updateSlotAndSquelch( uint256 const& key, PublicKey const& validator, Peer::id_t peer, - protocol::MessageType type) + bool isTrusted) { if (!slots_.baseSquelchReady()) return; if (!strand_.running_in_this_thread()) - return post(strand_, [this, key, validator, peer, type]() { - updateSlotAndSquelch(key, validator, peer, type); + return post(strand_, [this, key, validator, peer, isTrusted]() { + updateSlotAndSquelch(key, validator, peer, isTrusted); }); - slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() { - reportInboundTraffic(TrafficCount::squelch_ignored, 0); - }); + slots_.updateSlotAndSquelch( + key, + validator, + peer, + [&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); }, + isTrusted); } void diff --git a/src/xrpld/overlay/detail/OverlayImpl.h b/src/xrpld/overlay/detail/OverlayImpl.h index 86107fc591c..f6bb172b6c5 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.h +++ b/src/xrpld/overlay/detail/OverlayImpl.h @@ -399,14 +399,14 @@ class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler * @param key Unique message's key * @param validator Validator's public key * @param peers Peers' id to update the slots for - * @param type Received protocol message type + * @param isTrusted Indicate if the validator is trusted */ void updateSlotAndSquelch( uint256 const& key, PublicKey const& validator, std::set&& peers, - protocol::MessageType type); + bool isTrusted); /** Overload to reduce allocation in case of single peer */ @@ -415,7 +415,7 @@ class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler uint256 const& key, PublicKey const& validator, Peer::id_t peer, - protocol::MessageType type); + bool isTrusted); /** Called when the peer is deleted. If the peer was selected to be the * source of messages from the validator then squelched peers have to be diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 68894fb2348..1cb55fccc05 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -1699,21 +1699,6 @@ PeerImp::onMessage(std::shared_ptr const& m) // suppression for 30 seconds to avoid doing a relatively expensive lookup // every time a spam packet is received PublicKey const publicKey{makeSlice(set.nodepubkey())}; - auto const isTrusted = app_.validators().trusted(publicKey); - - // If the operator has specified that untrusted proposals be dropped then - // this happens here I.e. before further wasting CPU verifying the signature - // of an untrusted key - if (!isTrusted) - { - // report untrusted proposal messages - overlay_.reportInboundTraffic( - TrafficCount::category::proposal_untrusted, - Message::messageSize(*m)); - - if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1) - return; - } uint256 const proposeHash{set.currenttxhash()}; uint256 const prevLedger{set.previousledger()}; @@ -1728,7 +1713,9 @@ PeerImp::onMessage(std::shared_ptr const& m) publicKey.slice(), sig); - if (auto [added, relayed] = + auto const isTrusted = app_.validators().trusted(publicKey); + + if (auto const& [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_); !added) { @@ -1736,7 +1723,7 @@ PeerImp::onMessage(std::shared_ptr const& m) // receives within IDLED seconds since the message has been relayed. if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED) overlay_.updateSlotAndSquelch( - suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER); + suppression, publicKey, id_, isTrusted); // report duplicate proposal messages overlay_.reportInboundTraffic( @@ -1750,6 +1737,16 @@ PeerImp::onMessage(std::shared_ptr const& m) if (!isTrusted) { + overlay_.reportInboundTraffic( + TrafficCount::category::proposal_untrusted, + Message::messageSize(*m)); + + // If the operator has specified that untrusted proposals be dropped + // then this happens here I.e. before further wasting CPU verifying the + // signature of an untrusted key + if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1) + return; + if (tracking_.load() == Tracking::diverged) { JLOG(p_journal_.debug()) @@ -2358,20 +2355,6 @@ PeerImp::onMessage(std::shared_ptr const& m) auto const isTrusted = app_.validators().trusted(val->getSignerPublic()); - // If the operator has specified that untrusted validations be - // dropped then this happens here I.e. before further wasting CPU - // verifying the signature of an untrusted key - if (!isTrusted) - { - // increase untrusted validations received - overlay_.reportInboundTraffic( - TrafficCount::category::validation_untrusted, - Message::messageSize(*m)); - - if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1) - return; - } - auto key = sha512Half(makeSlice(m->validation())); auto [added, relayed] = @@ -2384,7 +2367,7 @@ PeerImp::onMessage(std::shared_ptr const& m) // relayed. if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED) overlay_.updateSlotAndSquelch( - key, val->getSignerPublic(), id_, protocol::mtVALIDATION); + key, val->getSignerPublic(), id_, isTrusted); // increase duplicate validations received overlay_.reportInboundTraffic( @@ -2395,6 +2378,20 @@ PeerImp::onMessage(std::shared_ptr const& m) return; } + // at this point the message is guaranteed to be unique + if (!isTrusted) + { + overlay_.reportInboundTraffic( + TrafficCount::category::validation_untrusted, + Message::messageSize(*m)); + + // If the operator has specified that untrusted validations be + // dropped then this happens here I.e. before further wasting CPU + // verifying the signature of an untrusted key + if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1) + return; + } + if (!isTrusted && (tracking_.load() == Tracking::diverged)) { JLOG(p_journal_.debug()) @@ -3008,7 +3005,7 @@ PeerImp::checkPropose( peerPos.suppressionID(), peerPos.publicKey(), std::move(haveMessage), - protocol::mtPROPOSE_LEDGER); + isTrusted); } } @@ -3044,7 +3041,7 @@ PeerImp::checkValidation( key, val->getSignerPublic(), std::move(haveMessage), - protocol::mtVALIDATION); + val->isTrusted()); } } } From 5613dab8983915bdf1eac99a7ef24769fe4c1d69 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 28 May 2025 16:20:01 +0200 Subject: [PATCH 03/27] adds methods to track which peers and validators were squelched --- src/xrpld/overlay/Slot.h | 79 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 10b18b24a0c..63f7de6f098 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -569,6 +569,11 @@ class Slots final std::unordered_set, clock_type, hardened_hash>; + using validators = beast::aged_unordered_map< + PublicKey, + std::unordered_set, + clock_type, + hardened_hash>; public: /** @@ -718,6 +723,23 @@ class Slots final void deletePeer(id_t id, bool erase); + /** Called to register that a given validator was squelched for a given + * peer. It is expected that this method is called by SquelchHandler. + * + * @param key Validator public key + * @param id peer ID + */ + void + squelchValidator(PublicKey const& key, id_t id) + { + auto it = peersWithValidators_.find(key); + if (it == peersWithValidators_.end()) + peersWithValidators_.emplace(key, std::unordered_set{id}); + + else if (it->second.find(id) == it->second.end()) + it->second.insert(id); + } + private: /** Add message/peer if have not seen this message * from the peer. A message is aged after IDLED seconds. @@ -725,6 +747,56 @@ class Slots final bool addPeerMessage(uint256 const& key, id_t id); + /** + * Updates the last message sent from a validator. + * @param validator the validator public kety + * @return true if the validator was updated, false otherwise + */ + std::optional + updateConsideredValidator(PublicKey const& validator, Peer::id_t peer); + + /** Remove all validators that have become invalid due to selection + * criteria + * @return zero or more validators that have been removed. + */ + std::vector + cleanConsideredValidators(); + + /** Checks whether a given validator is squelched. + * @param key Validator public key + * @return true if a given validator was squelched + */ + bool + validatorSquelched(PublicKey const& key) const + { + beast::expire( + peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); + + return peersWithValidators_.find(key) != peersWithValidators_.end(); + } + + /** Checks whether a given peer was recently sent a squelch message for + * a given validator. + * @param key Validator public key + * @param id Peer id + * @return true if a given validator was squelched for a given peeru + */ + bool + peerSquelched(PublicKey const& key, id_t id) const + { + beast::expire( + peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); + + auto const it = peersWithValidators_.find(key); + + // if validator was not squelched, the peer was also not squelched + if (it == peersWithValidators_.end()) + return false; + + // if a peer is found the squelch for it has not expired + return it->second.find(id) != it->second.end(); + } + std::atomic_bool reduceRelayReady_{false}; hash_map> slots_; @@ -741,6 +813,13 @@ class Slots final // after it was relayed is ignored by PeerImp. inline static messages peersWithMessage_{ beast::get_abstract_clock()}; + + // Maintain aged container of validator/peers. This is used to track + // which validator/peer were squelced. A peer that whose squelch + // has expired is removed. + inline static validators peersWithValidators_{ + beast::get_abstract_clock()}; + }; template From 1e02961a636b2f91a1e71ad85a647bddd99822e9 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 28 May 2025 16:21:47 +0200 Subject: [PATCH 04/27] adds method to SquelchHandler to squelch all peers --- src/xrpld/overlay/Slot.h | 11 ++++++++++- src/xrpld/overlay/detail/OverlayImpl.cpp | 9 +++++++++ src/xrpld/overlay/detail/OverlayImpl.h | 4 ++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 63f7de6f098..eb117600e7d 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -75,7 +75,7 @@ class SquelchHandler virtual ~SquelchHandler() { } - /** Squelch handler + /** Squelch handler for a single peer * @param validator Public key of the source validator * @param id Peer's id to squelch * @param duration Squelch duration in seconds @@ -83,6 +83,15 @@ class SquelchHandler virtual void squelch(PublicKey const& validator, Peer::id_t id, std::uint32_t duration) const = 0; + + /** Squelch for all peers, the method must call slots.squelchValidator + * to register that a (validator,peer) was squelched + * @param validator Public key of the source validator + * @param duration Squelch duration in seconds + */ + virtual void + squelchAll(PublicKey const& validator, std::uint32_t duration) = 0; + /** Unsquelch handler * @param validator Public key of the source validator * @param id Peer's id to unsquelch diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index ae0fb2c4c09..d79ec621f5d 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -1410,6 +1410,15 @@ OverlayImpl::squelch( } } +void +OverlayImpl::squelchAll(PublicKey const& validator, uint32_t squelchDuration) +{ + for_each([&](std::shared_ptr&& p) { + slots_.squelchValidator(validator, p->id()); + p->send(makeSquelchMessage(validator, true, squelchDuration)); + }); +} + void OverlayImpl::updateSlotAndSquelch( uint256 const& key, diff --git a/src/xrpld/overlay/detail/OverlayImpl.h b/src/xrpld/overlay/detail/OverlayImpl.h index f6bb172b6c5..b27d4a2570d 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.h +++ b/src/xrpld/overlay/detail/OverlayImpl.h @@ -451,6 +451,10 @@ class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler Peer::id_t const id, std::uint32_t squelchDuration) const override; + void + squelchAll(PublicKey const& validator, std::uint32_t squelchDuration) + override; + void unsquelch(PublicKey const& validator, Peer::id_t id) const override; From ba536ebfd8883acfd14c87501349788cd340ccf7 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 28 May 2025 16:25:15 +0200 Subject: [PATCH 05/27] adds data strcutures and methods to track red validators --- src/xrpld/overlay/Slot.h | 72 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index eb117600e7d..cce4fa99f4d 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -829,6 +829,15 @@ class Slots final inline static validators peersWithValidators_{ beast::get_abstract_clock()}; + struct ValidatorInfo + { + size_t count; // the number of messages sent from this validator + time_point lastMessage; // timestamp of the last message + std::unordered_set peers; // a list of peer IDs that sent a + // message for this validator + }; + + hash_map considered_validators_; }; template @@ -864,6 +873,69 @@ Slots::addPeerMessage(uint256 const& key, id_t id) return true; } +template +std::optional +Slots::updateConsideredValidator( + PublicKey const& validator, + Peer::id_t peer) +{ + auto const now = clock_type::now(); + + auto it = considered_validators_.find(validator); + if (it == considered_validators_.end()) + { + considered_validators_.emplace(std::make_pair( + validator, + ValidatorInfo{ + .count = 1, + .lastMessage = now, + .peers = {peer}, + })); + + return {}; + } + + // the validator idled. Don't update it, it will be cleaned later + if (now - it->second.lastMessage > IDLED) + return {}; + + it->second.peers.insert(peer); + + it->second.lastMessage = now; + ++it->second.count; + + if (it->second.count < MAX_MESSAGE_THRESHOLD || + it->second.peers.size() < reduce_relay::MAX_SELECTED_PEERS) + return {}; + + auto const key = it->first; + considered_validators_.erase(it); + + return key; +} + +template +std::vector +Slots::cleanConsideredValidators() +{ + auto const now = clock_type::now(); + + std::vector keys; + for (auto it = considered_validators_.begin(); + it != considered_validators_.end();) + { + if (now - it->second.lastMessage > IDLED) + { + keys.push_back(it->first); + it = considered_validators_.erase(it); + } + else + ++it; + } + + return keys; +} + template void Slots::updateSlotAndSquelch( From 71871bb9b6dc7531e6c56e65c3cb0d706134fc57 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 28 May 2025 16:29:33 +0200 Subject: [PATCH 06/27] feature: extend squelching to suppress untrusted validator traffic This feature improves network efficiency by limiting message propagation from untrusted validators. Squelching currently reduces the volume of duplicate messages from validators but does not address the volume of unique messages from untrusted validators, who may not contribute meaningfully to network progress. This change introduces a bounded number of slots for untrusted validators, selected based on message frequency. Once selected, their duplicate messages are subject to standard squelching logic, thereby reducing overall message overhead without impacting trusted validator performance. --- src/xrpld/overlay/ReduceRelayCommon.h | 2 + src/xrpld/overlay/Slot.h | 199 +++++++++++++++++++---- src/xrpld/overlay/detail/OverlayImpl.cpp | 19 +++ src/xrpld/overlay/detail/OverlayImpl.h | 15 ++ src/xrpld/overlay/detail/PeerImp.cpp | 2 + 5 files changed, 209 insertions(+), 28 deletions(-) diff --git a/src/xrpld/overlay/ReduceRelayCommon.h b/src/xrpld/overlay/ReduceRelayCommon.h index 473e5d1527c..efcb0f96901 100644 --- a/src/xrpld/overlay/ReduceRelayCommon.h +++ b/src/xrpld/overlay/ReduceRelayCommon.h @@ -49,6 +49,8 @@ static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 19; static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 20; // Max selected peers to choose as the source of messages from validator static constexpr uint16_t MAX_SELECTED_PEERS = 5; +// Max number of untrusted slots the server will maintain +static constexpr uint16_t MAX_UNTRUSTED_SLOTS = 5; // Wait before reduce-relay feature is enabled on boot up to let // the server establish peer connections static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10}; diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index cce4fa99f4d..d730ad9c70b 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -583,6 +583,7 @@ class Slots final std::unordered_set, clock_type, hardened_hash>; + using slots_map = hash_map>; public: /** @@ -590,14 +591,17 @@ class Slots final * @param handler Squelch/unsquelch implementation * @param config reference to the global config */ - Slots(Logs& logs, SquelchHandler const& handler, Config const& config) + Slots(Logs& logs, SquelchHandler& handler, Config const& config) : handler_(handler) , logs_(logs) , journal_(logs.journal("Slots")) , baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE) , maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS) + , enhancedSquelchEnabled_( + config.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE) { } + ~Slots() = default; /** Check if base squelching feature is enabled and ready */ @@ -607,6 +611,13 @@ class Slots final return baseSquelchEnabled_ && reduceRelayReady(); } + /** Check if enhanced squelching feature is enabled and ready */ + bool + enhancedSquelchReady() + { + return enhancedSquelchEnabled_ && reduceRelayReady(); + } + /** Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup */ bool reduceRelayReady() @@ -619,6 +630,32 @@ class Slots final return reduceRelayReady_; } + /** Updates untrusted validator slot. Do not call for trusted + * validators. The caller must ensure passed messages are unique. + * @param key Message hash + * @param validator Validator public key + * @param id The ID of the peer that sent the message + */ + void + updateValidatorSlot(uint256 const& key, PublicKey const& validator, id_t id) + { + updateValidatorSlot(key, validator, id, []() {}); + } + + /** Updates untrusted validator slot. Do not call for trusted + * validators. The caller must ensure passed messages are unique. + * @param key Message hash + * @param validator Validator public key + * @param id The ID of the peer that sent the message + * @param callback A callback to report ignored validations + */ + void + updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + id_t id, + typename Slot::ignored_squelch_callback callback); + /** Calls Slot::update of Slot associated with the validator, with a * noop callback. * @param key Message's hash @@ -758,7 +795,8 @@ class Slots final /** * Updates the last message sent from a validator. - * @param validator the validator public kety + * @param validator The validator public key + * @param peer The peer ID sending the message * @return true if the validator was updated, false otherwise */ std::optional @@ -808,13 +846,16 @@ class Slots final std::atomic_bool reduceRelayReady_{false}; - hash_map> slots_; - SquelchHandler const& handler_; // squelch/unsquelch handler + slots_map slots_; + slots_map untrusted_slots_; + + SquelchHandler& handler_; // squelch/unsquelch handler Logs& logs_; beast::Journal const journal_; bool const baseSquelchEnabled_; uint16_t const maxSelectedPeers_; + bool const enhancedSquelchEnabled_; // Maintain aged container of message/peers. This is required // to discard duplicate message from the same peer. A message @@ -948,50 +989,152 @@ Slots::updateSlotAndSquelch( if (!addPeerMessage(key, id)) return; - auto it = slots_.find(validator); - if (it == slots_.end()) + // If we receive a message from a trusted validator either update an + // existing slot or insert a new one. If we are not running enhanced + // squelching also deduplicate untrusted validator messages + if (isTrusted || !enhancedSquelchEnabled_) { JLOG(journal_.trace()) << "updateSlotAndSquelch: new slot " << Slice(validator); - auto it = - slots_ - .emplace(std::make_pair( - validator, - Slot( - handler_, logs_.journal("Slot"), maxSelectedPeers_))) - .first; - it->second.update(validator, id, type, callback); + auto it = slots_ + .emplace(std::make_pair( + validator, + Slot( + handler_, + logs_.journal("Slot"), + maxSelectedPeers_, + isTrusted))) + .first; + it->second.update(validator, id, callback); } else - it->second.update(validator, id, type, callback); + { + auto it = untrusted_slots_.find(validator); + // If we received a message from a validator that is not + // selected, and is not squelched, there is nothing to do. It + // will be squelched later when `updateValidatorSlot` is called. + if (it == untrusted_slots_.end()) + return; + + it->second.update(validator, id, callback); + } +} + +template +void +Slots::updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + id_t id, + typename Slot::ignored_squelch_callback callback) +{ + // We received a message from an already selected validator + // we can ignore this message + if (untrusted_slots_.find(validator) != untrusted_slots_.end()) + return; + + // We received a message from an already squelched validator. + // This could happen in few cases: + // 1. It happened so that the squelch for a particular peer expired + // before our local squelch. + // 2. We receive a message from a new peer that did not receive the + // squelch request. + // 3. The peer is ignoring our squelch request and we have not sent + // the controll message in a while. + // In all of these cases we can only send them a squelch request again. + if (validatorSquelched(validator)) + { + if (!peerSquelched(validator, id)) + { + squelchValidator(validator, id); + handler_.squelch( + validator, id, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + } + return; + } + + // update a slot if the message is from a selected untrusted validator + if (auto const& it = untrusted_slots_.find(validator); + it != untrusted_slots_.end()) + { + it->second.update(validator, id, callback); + return; + } + + // Do we have any available slots for additional untrusted validators? + // This could happen in few cases: + // 1. We received a message from a new untrusted validator, but we + // are at capacity. + // 2. We received a message from a previously squelched validator. + // In all of these cases we send a squelch message to all peers. + // The validator may still be considered by the selector. However, it + // will be eventually cleaned and squelched + if (untrusted_slots_.size() == MAX_UNTRUSTED_SLOTS) + { + handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + return; + } + + if (auto const v = updateConsideredValidator(validator, id)) + untrusted_slots_.emplace(std::make_pair( + *v, + Slot( + handler_, logs_.journal("Slot"), maxSelectedPeers_, false))); + // When we reach MAX_UNTRUSTED_SLOTS, don't explicitly clean them. + // Since we stop updating their counters, they will idle, and will be + // removed and squelched. } template void Slots::deletePeer(id_t id, bool erase) { - for (auto& [validator, slot] : slots_) - slot.deletePeer(validator, id, erase); + auto deletePeer = [&](slots_map& slots) { + for (auto& [validator, slot] : slots) + slot.deletePeer(validator, id, erase); + }; + + deletePeer(slots_); + deletePeer(untrusted_slots_); } template void Slots::deleteIdlePeers() { - auto now = clock_type::now(); + auto deleteSlots = [&](slots_map& slots) { + auto const now = clock_type::now(); - for (auto it = slots_.begin(); it != slots_.end();) - { - it->second.deleteIdlePeer(it->first); - if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE_DEFAULT) + for (auto it = slots.begin(); it != slots.end();) { - JLOG(journal_.trace()) - << "deleteIdlePeers: deleting idle slot " << Slice(it->first); - it = slots_.erase(it); + it->second.deleteIdlePeer(it->first); + if (now - it->second.getLastSelected() > + MAX_UNSQUELCH_EXPIRE_DEFAULT) + { + JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot " + << Slice(it->first); + + // if an untrusted validator slot idled - peers stopped + // sending messages for this validator squelch it + if (!it->second.isTrusted_) + handler_.squelchAll( + it->first, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + + it = slots.erase(it); + } + else + ++it; } - else - ++it; - } + }; + + deleteSlots(slots_); + deleteSlots(untrusted_slots_); + + // remove and squelch all validators that the selector deemed unsuitable + // there might be some good validators in this set that "lapsed". + // However, since these are untrusted validators we're not concerned + for (auto const& validator : cleanConsideredValidators()) + handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); } } // namespace reduce_relay diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index d79ec621f5d..3da627fea56 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -1473,6 +1473,25 @@ OverlayImpl::updateSlotAndSquelch( isTrusted); } +void +OverlayImpl::updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + Peer::id_t peer) +{ + if (!slots_.enhancedSquelchReady()) + return; + + if (!strand_.running_in_this_thread()) + return post(strand_, [this, key, validator, peer]() { + updateValidatorSlot(key, validator, peer); + }); + + slots_.updateValidatorSlot(key, validator, peer, [&]() { + reportInboundTraffic(TrafficCount::squelch_ignored, 0); + }); +} + void OverlayImpl::deletePeer(Peer::id_t id) { diff --git a/src/xrpld/overlay/detail/OverlayImpl.h b/src/xrpld/overlay/detail/OverlayImpl.h index b27d4a2570d..0bd3fafcdbc 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.h +++ b/src/xrpld/overlay/detail/OverlayImpl.h @@ -417,6 +417,21 @@ class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler Peer::id_t peer, bool isTrusted); + /** Updates the slot information for an untrusted validator. If the + * untrusted validator was previously squelched, sends TMSquelch message to + * the sender of the message. If there are no untrusted slots available + * sends TMSquelch message to all peers to squelch messages from the + * validator. + * @param key Unique message's key + * @param validator Validator's public key + * @param peers Peers' id to update the slots for + */ + void + updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + Peer::id_t peer); + /** Called when the peer is deleted. If the peer was selected to be the * source of messages from the validator then squelched peers have to be * unsquelched. diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 1cb55fccc05..f7ccfeb8819 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -2385,6 +2385,8 @@ PeerImp::onMessage(std::shared_ptr const& m) TrafficCount::category::validation_untrusted, Message::messageSize(*m)); + overlay_.updateValidatorSlot(key, val->getSignerPublic(), id_); + // If the operator has specified that untrusted validations be // dropped then this happens here I.e. before further wasting CPU // verifying the signature of an untrusted key From 9ecb457e558fdf5f809ab52bfd62ba3ac4cb6cb2 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 28 May 2025 16:30:59 +0200 Subject: [PATCH 07/27] adds methods to print slot details from command --- src/xrpld/overlay/Slot.h | 133 ++++++++++++++++++++++- src/xrpld/overlay/Squelch.h | 5 +- src/xrpld/overlay/detail/OverlayImpl.cpp | 25 +++-- 3 files changed, 145 insertions(+), 18 deletions(-) diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index d730ad9c70b..12c1375154d 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -52,12 +52,42 @@ enum class PeerState : uint8_t { Selected, // selected to relay, counting if Slot in Counting Squelched, // squelched, doesn't relay }; + +inline std::string +to_string(PeerState state) +{ + switch (state) + { + case PeerState::Counting: + return "counting"; + case PeerState::Selected: + return "selected"; + case PeerState::Squelched: + return "squelched"; + default: + return "unknown"; + } +} /** Slot's State */ enum class SlotState : uint8_t { Counting, // counting messages Selected, // peers selected, stop counting }; +inline std::string +to_string(SlotState state) +{ + switch (state) + { + case SlotState::Counting: + return "counting"; + case SlotState::Selected: + return "selected"; + default: + return "unknown"; + } +} + template Unit epoch(TP const& t) @@ -237,13 +267,17 @@ class Slot final void initCounting(); + void + onWrite(beast::PropertyStream::Map& stream) const; + /** Data maintained for each peer */ struct PeerInfo { - PeerState state; // peer's state - std::size_t count; // message count - time_point expire; // squelch expiration time - time_point lastMessage; // time last message received + PeerState state; // peer's state + std::size_t count; // message count + time_point expire; // squelch expiration time + time_point lastMessage; // time last message received + std::size_t timesSelected; // number of times the peer was selected }; std::unordered_map peers_; // peer's data @@ -308,8 +342,14 @@ Slot::update( { JLOG(journal_.trace()) << "update: adding peer " << Slice(validator) << " " << id; - peers_.emplace( - std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now})); + peers_.emplace(std::make_pair( + id, + PeerInfo{ + .state = PeerState::Counting, + .count = 0, + .expire = now, + .lastMessage = now, + .timesSelected = 0})); initCounting(); return; } @@ -412,7 +452,11 @@ Slot::update( v.count = 0; if (selected.find(k) != selected.end()) + { v.state = PeerState::Selected; + ++v.timesSelected; + } + else if (v.state != PeerState::Squelched) { if (journal_.trace()) @@ -491,6 +535,34 @@ Slot::deletePeer(PublicKey const& validator, id_t id, bool erase) } } +template +void +Slot::onWrite(beast::PropertyStream::Map& stream) const +{ + auto const now = clock_type::now(); + stream["state"] = to_string(state_); + stream["reachedThreshold"] = reachedThreshold_; + stream["considered"] = considered_.size(); + stream["lastSelected"] = + duration_cast(now - lastSelected_).count(); + stream["isTrusted"] = isTrusted_; + + beast::PropertyStream::Set peers("peers", stream); + + for (auto const& [id, info] : peers_) + { + beast::PropertyStream::Map item(peers); + item["id"] = id; + item["count"] = info.count; + item["expire"] = + duration_cast(info.expire - now).count(); + item["lastMessage"] = + duration_cast(now - info.lastMessage).count(); + item["timesSelected"] = info.timesSelected; + item["state"] = to_string(info.state); + } +} + template void Slot::resetCounts() @@ -786,6 +858,9 @@ class Slots final it->second.insert(id); } + void + onWrite(beast::PropertyStream::Map& stream) const; + private: /** Add message/peer if have not seen this message * from the peer. A message is aged after IDLED seconds. @@ -1137,6 +1212,52 @@ Slots::deleteIdlePeers() handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); } +template +void +Slots::onWrite(beast::PropertyStream::Map& stream) const +{ + auto const writeSlot = + [](beast::PropertyStream::Set& set, + hash_map> const& slots) { + for (auto const& [validator, slot] : slots) + { + beast::PropertyStream::Map item(set); + item["validator"] = toBase58(TokenType::NodePublic, validator); + slot.onWrite(item); + } + }; + + beast::PropertyStream::Map slots("slots", stream); + + { + beast::PropertyStream::Set set("trusted", slots); + writeSlot(set, slots_); + } + + { + beast::PropertyStream::Set set("untrusted", slots); + writeSlot(set, untrusted_slots_); + } + + { + beast::PropertyStream::Set set("considered", slots); + + auto const now = clock_type::now(); + + for (auto const& [validator, info] : considered_validators_) + { + beast::PropertyStream::Map item(set); + item["validator"] = toBase58(TokenType::NodePublic, validator); + item["lastMessage"] = + std::chrono::duration_cast( + now - info.lastMessage) + .count(); + item["messageCount"] = info.count; + item["peers"] = info.peers.size(); + } + } +} + } // namespace reduce_relay } // namespace ripple diff --git a/src/xrpld/overlay/Squelch.h b/src/xrpld/overlay/Squelch.h index 0507bd4d2d9..49d86d67fad 100644 --- a/src/xrpld/overlay/Squelch.h +++ b/src/xrpld/overlay/Squelch.h @@ -22,12 +22,11 @@ #include +#include #include #include -#include #include -#include namespace ripple { @@ -108,7 +107,7 @@ template bool Squelch::expireSquelch(PublicKey const& validator) { - auto now = clock_type::now(); + auto const now = clock_type::now(); auto const& it = squelched_.find(validator); if (it == squelched_.end()) diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index 3da627fea56..d037a26742a 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -578,16 +578,23 @@ OverlayImpl::stop() void OverlayImpl::onWrite(beast::PropertyStream::Map& stream) { - beast::PropertyStream::Set set("traffic", stream); - auto const stats = m_traffic.getCounts(); - for (auto const& pair : stats) { - beast::PropertyStream::Map item(set); - item["category"] = pair.second.name; - item["bytes_in"] = std::to_string(pair.second.bytesIn.load()); - item["messages_in"] = std::to_string(pair.second.messagesIn.load()); - item["bytes_out"] = std::to_string(pair.second.bytesOut.load()); - item["messages_out"] = std::to_string(pair.second.messagesOut.load()); + beast::PropertyStream::Set set("traffic", stream); + auto const stats = m_traffic.getCounts(); + for (auto const& pair : stats) + { + beast::PropertyStream::Map item(set); + item["category"] = pair.second.name; + item["bytes_in"] = std::to_string(pair.second.bytesIn.load()); + item["messages_in"] = std::to_string(pair.second.messagesIn.load()); + item["bytes_out"] = std::to_string(pair.second.bytesOut.load()); + item["messages_out"] = + std::to_string(pair.second.messagesOut.load()); + } + } + + { + slots_.onWrite(stream); } } From d4c6910c8ba31c9de22b549e69f07ca01fad440a Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 28 May 2025 16:33:42 +0200 Subject: [PATCH 08/27] adds enhanced squelching tests --- src/test/overlay/clock.h | 83 +++ src/test/overlay/enhanced_squelch_test.cpp | 757 +++++++++++++++++++++ 2 files changed, 840 insertions(+) create mode 100644 src/test/overlay/clock.h create mode 100644 src/test/overlay/enhanced_squelch_test.cpp diff --git a/src/test/overlay/clock.h b/src/test/overlay/clock.h new file mode 100644 index 00000000000..25dc69baceb --- /dev/null +++ b/src/test/overlay/clock.h @@ -0,0 +1,83 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2025 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_TEST_OVERLAY_CLOCK_H_INCLUDED +#define RIPPLE_TEST_OVERLAY_CLOCK_H_INCLUDED + +#include + +#include +#include +#include + +namespace ripple { + +namespace test { + +using namespace std::chrono; + +/** Manually advanced clock. */ +class ManualClock +{ +public: + typedef uint64_t rep; + typedef std::milli period; + typedef std::chrono::duration duration; + typedef std::chrono::time_point time_point; + inline static bool const is_steady = false; + + static void + advance(duration d) noexcept + { + now_ += d; + } + + static void + randAdvance(milliseconds min, milliseconds max) + { + now_ += randDuration(min, max); + } + + static void + reset() noexcept + { + now_ = time_point(seconds(0)); + } + + static time_point + now() noexcept + { + return now_; + } + + static duration + randDuration(milliseconds min, milliseconds max) + { + return duration(milliseconds(rand_int(min.count(), max.count()))); + } + + explicit ManualClock() = default; + +private: + inline static time_point now_ = time_point(seconds(0)); +}; +} // namespace test +} // namespace ripple + +#endif \ No newline at end of file diff --git a/src/test/overlay/enhanced_squelch_test.cpp b/src/test/overlay/enhanced_squelch_test.cpp new file mode 100644 index 00000000000..8993ae36b9f --- /dev/null +++ b/src/test/overlay/enhanced_squelch_test.cpp @@ -0,0 +1,757 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2025 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include + +#include + +#include +#include + +#include "test/overlay/clock.h" +#include "xrpld/overlay/Peer.h" +#include "xrpld/overlay/ReduceRelayCommon.h" + +#include +#include +#include +#include +#include + +namespace ripple { +namespace test { + +class TestHandler : public reduce_relay::SquelchHandler +{ +public: + using squelch_method = + std::function; + using squelchAll_method = + std::function; + using unsquelch_method = std::function; + + squelch_method squelch_f_; + squelchAll_method squelchAll_f_; + unsquelch_method unsquelch_f_; + + TestHandler( + squelch_method const& squelch_f, + squelchAll_method const& squelchAll_f, + unsquelch_method const& unsquelch_f) + : squelch_f_(squelch_f) + , squelchAll_f_(squelchAll_f) + , unsquelch_f_(unsquelch_f) + { + } + + TestHandler(TestHandler& copy) + { + squelch_f_ = copy.squelch_f_; + squelchAll_f_ = copy.squelchAll_f_; + unsquelch_f_ = copy.unsquelch_f_; + } + + void + squelch(PublicKey const& validator, Peer::id_t peer, std::uint32_t duration) + const override + { + squelch_f_(validator, peer, duration); + } + + void + squelchAll(PublicKey const& validator, std::uint32_t duration) override + { + squelchAll_f_(validator, duration); + } + + void + unsquelch(PublicKey const& validator, Peer::id_t peer) const override + { + unsquelch_f_(validator, peer); + } +}; + +class enhanced_squelch_test : public beast::unit_test::suite +{ +public: + TestHandler::squelch_method noop_squelch = + [&](PublicKey const&, Peer::id_t, std::uint32_t) { + BEAST_EXPECTS(false, "unexpected call to squelch handler"); + }; + + TestHandler::squelchAll_method noop_squelchAll = [&](PublicKey const&, + std::uint32_t) { + BEAST_EXPECTS(false, "unexpected call to squelchAll handler"); + }; + + TestHandler::unsquelch_method noop_unsquelch = [&](PublicKey const&, + Peer::id_t) { + BEAST_EXPECTS(false, "unexpected call to unsquelch handler"); + }; + + // noop_handler is passed as a place holder Handler to slots + TestHandler noop_handler = { + noop_squelch, + noop_squelchAll, + noop_unsquelch, + }; + + jtx::Env env_; + + enhanced_squelch_test() : env_(*this) + { + env_.app().config().VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = true; + } + + void + testConfig() + { + testcase("Test Config - enabled enhanced squelching"); + Config c; + + std::string toLoad(R"rippleConfig( +[reduce_relay] +vp_enhanced_squelch_enable=1 +)rippleConfig"); + + c.loadFromString(toLoad); + BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == true); + + toLoad = R"rippleConfig( +[reduce_relay] +vp_enhanced_squelch_enable=0 +)rippleConfig"; + + c.loadFromString(toLoad); + BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == false); + + toLoad = R"rippleConfig( +[reduce_relay] +)rippleConfig"; + + c.loadFromString(toLoad); + BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == false); + } + + /** Tests tracking for squelched validators and peers */ + void + testSquelchTracking() + { + testcase("squelchTracking"); + Peer::id_t squelchedPeerID = 0; + Peer::id_t newPeerID = 1; + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config()); + auto const publicKey = randomKeyPair(KeyType::ed25519).first; + + // a new key should not be squelched + BEAST_EXPECTS( + !slots.validatorSquelched(publicKey), "validator squelched"); + + slots.squelchValidator(publicKey, squelchedPeerID); + + // after squelching a peer, the validator must be squelched + BEAST_EXPECTS( + slots.validatorSquelched(publicKey), "validator not squelched"); + + // the peer must also be squelched + BEAST_EXPECTS( + slots.peerSquelched(publicKey, squelchedPeerID), + "peer not squelched"); + + // a new peer must not be squelched + BEAST_EXPECTS( + !slots.peerSquelched(publicKey, newPeerID), "new peer squelched"); + + // advance the manual clock to after expiration + ManualClock::advance( + reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT + + std::chrono::seconds{11}); + + // validator squelch should expire + BEAST_EXPECTS( + !slots.validatorSquelched(publicKey), + "validator squelched after expiry"); + + // peer squelch should also expire + BEAST_EXPECTS( + !slots.peerSquelched(publicKey, squelchedPeerID), + "validator squelched after expiry"); + } + + void + testUpdateValidatorSlot_newValidator() + { + testcase("updateValidatorSlot_newValidator"); + + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config()); + + Peer::id_t const peerID = 1; + auto const validator = randomKeyPair(KeyType::ed25519).first; + uint256 message{0}; + + slots.updateValidatorSlot(message, validator, peerID); + + // adding untrusted slot does not effect trusted slots + BEAST_EXPECTS(slots.slots_.size() == 0, "trusted slots changed"); + + // we expect that the validator was not added to untrusted slots + BEAST_EXPECTS( + slots.untrusted_slots_.size() == 0, "untrusted slot changed"); + + // we expect that the validator was added to th consideration list + BEAST_EXPECTS( + slots.considered_validators_.contains(validator), + "new validator was not considered"); + } + + void + testUpdateValidatorSlot_squelchedValidator() + { + testcase("testUpdateValidatorSlot_squelchedValidator"); + + Peer::id_t squelchedPeerID = 0; + Peer::id_t newPeerID = 1; + auto const validator = randomKeyPair(KeyType::ed25519).first; + + TestHandler::squelch_method const squelch_f = + [&](PublicKey const& key, Peer::id_t id, std::uint32_t duration) { + BEAST_EXPECTS( + key == validator, + "squelch called for unknown validator key"); + + BEAST_EXPECTS( + id == newPeerID, "squelch called for the wrong peer"); + }; + + TestHandler handler{squelch_f, noop_squelchAll, noop_unsquelch}; + + reduce_relay::Slots slots( + env_.app().logs(), handler, env_.app().config()); + + slots.squelchValidator(validator, squelchedPeerID); + + // this should not trigger squelch assertions, the peer is squelched + slots.updateValidatorSlot( + sha512Half(validator), validator, squelchedPeerID); + + slots.updateValidatorSlot(sha512Half(validator), validator, newPeerID); + + // the squelched peer remained squelched + BEAST_EXPECTS( + slots.peerSquelched(validator, squelchedPeerID), + "peer not squelched"); + + // because the validator was squelched, the new peer was also squelched + BEAST_EXPECTS( + slots.peerSquelched(validator, newPeerID), + "new peer was not squelched"); + + // a squelched validator must not be considered + BEAST_EXPECTS( + !slots.considered_validators_.contains(validator), + "squelched validator was added for consideration"); + } + + void + testUpdateValidatorSlot_slotsFull() + { + testcase("updateValidatorSlot_slotsFull"); + Peer::id_t const peerID = 1; + + // while there are open untrusted slots, no calls should be made to + // squelch any validators + TestHandler handler{noop_handler}; + reduce_relay::Slots slots( + env_.app().logs(), handler, env_.app().config()); + + // saturate validator slots + auto const validators = fillUntrustedSlots(slots); + + // adding untrusted slot does not effect trusted slots + BEAST_EXPECTS(slots.slots_.size() == 0, "trusted slots changed"); + + // simulate additional messages from already selected validators + for (auto const& validator : validators) + for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD; ++i) + slots.updateValidatorSlot( + sha512Half(validator) + static_cast(i), + validator, + peerID); + + // an untrusted slot was added for each validator + BEAST_EXPECT( + slots.untrusted_slots_.size() == + env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS); + + for (auto const& validator : validators) + BEAST_EXPECTS( + !slots.validatorSquelched(validator), + "selected validator was squelched"); + + auto const newValidator = randomKeyPair(KeyType::ed25519).first; + + // once slots are full squelchAll must be called for new peer/validator + handler.squelchAll_f_ = [&](PublicKey const& key, std::uint32_t) { + BEAST_EXPECTS( + key == newValidator, "unexpected validator squelched"); + slots.squelchValidator(key, peerID); + }; + + slots.updateValidatorSlot( + sha512Half(newValidator), newValidator, peerID); + + // Once the slots are saturated every other validator is squelched + BEAST_EXPECTS( + slots.validatorSquelched(newValidator), + "untrusted validator not squelched"); + + BEAST_EXPECTS( + slots.peerSquelched(newValidator, peerID), + "peer for untrusted validator not squelched"); + } + + void + testDeleteIdlePeers_deleteIdleSlots() + { + testcase("deleteIdlePeers"); + TestHandler handler{noop_handler}; + + reduce_relay::Slots slots( + env_.app().logs(), handler, env_.app().config()); + auto keys = fillUntrustedSlots(slots); + + // verify that squelchAll is called for each idled slot validator + handler.squelchAll_f_ = [&](PublicKey const& actualKey, + std::uint32_t duration) { + for (auto it = keys.begin(); it != keys.end(); ++it) + { + if (*it == actualKey) + { + keys.erase(it); + return; + } + } + BEAST_EXPECTS(false, "unexpected key passed to squelchAll"); + }; + + BEAST_EXPECTS( + slots.untrusted_slots_.size() == + env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS, + "unexpected number of untrusted slots"); + + // advance the manual clock to after slot expiration + ManualClock::advance( + reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT + + std::chrono::seconds{1}); + + slots.deleteIdlePeers(); + + BEAST_EXPECTS( + slots.untrusted_slots_.size() == 0, + "unexpected number of untrusted slots"); + + BEAST_EXPECTS(keys.empty(), "not all validators were squelched"); + } + + void + testDeleteIdlePeers_deleteIdleUntrustedPeer() + { + testcase("deleteIdleUntrustedPeer"); + Peer::id_t const peerID = 1; + Peer::id_t const peerID2 = 2; + + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config()); + + // fill one untrustd validator slot + auto const validator = fillUntrustedSlots(slots, 1)[0]; + + BEAST_EXPECTS( + slots.untrusted_slots_.size() == 1, + "unexpected number of untrusted slots"); + + slots.updateSlotAndSquelch( + sha512Half(validator) + static_cast(100), + validator, + peerID, + false); + + slots.updateSlotAndSquelch( + sha512Half(validator) + static_cast(100), + validator, + peerID2, + false); + + slots.deletePeer(peerID, true); + + auto const slotPeers = getUntrustedSlotPeers(validator, slots); + BEAST_EXPECTS( + slotPeers.size() == 1, "untrusted validator slot is missing"); + + BEAST_EXPECTS( + !slotPeers.contains(peerID), + "peer was not removed from untrusted slots"); + + BEAST_EXPECTS( + slotPeers.contains(peerID2), + "peer was removed from untrusted slots"); + } + + /** Test that untrusted validator slots are correctly updated by + * updateSlotAndSquelch + */ + void + testUpdateSlotAndSquelch_untrustedValidator() + { + testcase("updateUntrsutedValidatorSlot"); + TestHandler handler{noop_handler}; + + handler.squelch_f_ = [](PublicKey const&, Peer::id_t, std::uint32_t) {}; + reduce_relay::Slots slots( + env_.app().logs(), handler, env_.app().config()); + + // peers that will be source of validator messages + std::vector peers = {}; + + // prepare n+1 peers, we expect the n+1st peer will be squelched + for (int i = 0; i < + env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS + 1; + ++i) + peers.push_back(i); + + auto const validator = fillUntrustedSlots(slots, 1)[0]; + + // Squelching logic resets all counters each time a new peer is added + // Therfore we need to populate counters for each peer before sending + // new messages + for (auto const& peer : peers) + { + auto const now = ManualClock::now(); + slots.updateSlotAndSquelch( + sha512Half(validator) + + static_cast(now.time_since_epoch().count()), + validator, + peer, + false); + + ManualClock::advance(std::chrono::milliseconds{10}); + } + + // simulate new, unique validator messages sent by peers + for (auto const& peer : peers) + for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i) + { + auto const now = ManualClock::now(); + slots.updateSlotAndSquelch( + sha512Half(validator) + + static_cast(now.time_since_epoch().count()), + validator, + peer, + false); + + ManualClock::advance(std::chrono::milliseconds{10}); + } + + auto const slotPeers = getUntrustedSlotPeers(validator, slots); + BEAST_EXPECTS( + slotPeers.size() == + env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS + + 1, + "untrusted validator slot is missing"); + + int selected = 0; + int squelched = 0; + for (auto const& [_, info] : slotPeers) + { + switch (info.state) + { + case reduce_relay::PeerState::Selected: + ++selected; + break; + case reduce_relay::PeerState::Squelched: + ++squelched; + break; + case reduce_relay::PeerState::Counting: + BEAST_EXPECTS( + false, "peer should not be in counting state"); + } + } + + BEAST_EXPECTS(squelched == 1, "expected one squelched peer"); + BEAST_EXPECTS( + selected == + env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS, + "wrong number of peers selected"); + } + + void + testUpdateConsideredValidator_newValidator() + { + testcase("testUpdateConsideredValidator_newValidator"); + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config()); + + // insert some random validator key + auto const validator = randomKeyPair(KeyType::ed25519).first; + Peer::id_t const peerID = 0; + Peer::id_t const peerID2 = 1; + + BEAST_EXPECTS( + !slots.updateConsideredValidator(validator, peerID), + "validator was selected with insufficient number of peers"); + + BEAST_EXPECTS( + slots.considered_validators_.contains(validator), + "new validator was not added for consideration"); + + BEAST_EXPECTS( + !slots.updateConsideredValidator(validator, peerID), + "validator was selected with insufficient number of peers"); + + // expect that a peer will be registered once as a message source + BEAST_EXPECTS( + slots.considered_validators_.at(validator).peers.size() == 1, + "duplicate peer was registered"); + + BEAST_EXPECTS( + !slots.updateConsideredValidator(validator, peerID2), + "validator was selected with insufficient number of peers"); + + // expect that each distinct peer will be registered + BEAST_EXPECTS( + slots.considered_validators_.at(validator).peers.size() == 2, + "distinct peers were not registered"); + } + + void + testUpdateConsideredValidator_idleValidator() + { + testcase("testUpdateConsideredValidator_idleValidator"); + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config()); + + // insert some random validator key + auto const validator = randomKeyPair(KeyType::ed25519).first; + Peer::id_t peerID = 0; + + BEAST_EXPECTS( + !slots.updateConsideredValidator(validator, peerID), + "validator was selected with insufficient number of peers"); + + BEAST_EXPECTS( + slots.considered_validators_.contains(validator), + "new validator was not added for consideration"); + + auto const state = slots.considered_validators_.at(validator); + + // simulate a validator sending a new message before the idle timer + ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1)); + + BEAST_EXPECTS( + !slots.updateConsideredValidator(validator, peerID), + "validator was selected with insufficient number of peers"); + auto const newState = slots.considered_validators_.at(validator); + + BEAST_EXPECTS( + state.count + 1 == newState.count, + "non-idling validator was updated"); + + // simulate a validator idling + ManualClock::advance(reduce_relay::IDLED + std::chrono::seconds(1)); + + BEAST_EXPECTS( + !slots.updateConsideredValidator(validator, peerID), + "validator was selected with insufficient number of peers"); + + auto const idleState = slots.considered_validators_.at(validator); + // we expect that an idling validator will not be updated + BEAST_EXPECTS( + newState.count == idleState.count, "idling validator was updated"); + } + + void + testUpdateConsideredValidator_selectQualifyingValidator() + { + testcase("testUpdateConsideredValidator_selectQualifyingValidator"); + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config()); + + // insert some random validator key + auto const validator = randomKeyPair(KeyType::ed25519).first; + auto const validator2 = randomKeyPair(KeyType::ed25519).first; + Peer::id_t peerID = 0; + Peer::id_t peerID2 = + env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS; + + // a validator that sends only unique messages, but only from one peer + // must not be selected + for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i) + { + BEAST_EXPECTS( + !slots.updateConsideredValidator(validator, peerID), + "validator was selected before reaching message threshold"); + BEAST_EXPECTS( + !slots.updateConsideredValidator(validator2, peerID), + "validator was selected before reaching message threshold"); + + ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1)); + } + // as long as the peer criteria is not met, the validator most not be + // selected + for (int i = 1; i < + env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS - 1; + ++i) + { + BEAST_EXPECTS( + !slots.updateConsideredValidator(validator, i), + "validator was selected before reaching enough peers"); + BEAST_EXPECTS( + !slots.updateConsideredValidator(validator2, i), + "validator was selected before reaching enough peers"); + + ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1)); + } + + auto const consideredValidator = + slots.updateConsideredValidator(validator, peerID2); + BEAST_EXPECTS( + consideredValidator && *consideredValidator == validator, + "expected validator was not selected"); + + // expect that selected peer was removed + BEAST_EXPECTS( + !slots.considered_validators_.contains(validator), + "selected validator was not removed from considered list"); + + BEAST_EXPECTS( + slots.considered_validators_.contains(validator2), + "unqualified validator was removed from considered list"); + } + + void + testCleanConsideredValidators_deleteIdleValidator() + { + testcase("cleanConsideredValidators_deleteIdleValidator"); + + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config()); + + // insert some random validator key + auto const lateValidator = randomKeyPair(KeyType::ed25519).first; + auto const validator = randomKeyPair(KeyType::ed25519).first; + Peer::id_t peerID = 0; + + BEAST_EXPECTS( + !slots.updateConsideredValidator(lateValidator, peerID), + "validator was selected with insufficient number of peers"); + + BEAST_EXPECTS( + slots.considered_validators_.contains(lateValidator), + "new validator was not added for consideration"); + + // simulate a validator idling + ManualClock::advance(reduce_relay::IDLED + std::chrono::seconds(1)); + BEAST_EXPECTS( + !slots.updateConsideredValidator(validator, peerID), + "validator was selected with insufficient number of peers"); + + auto const invalidValidators = slots.cleanConsideredValidators(); + BEAST_EXPECTS( + invalidValidators.size() == 1, + "unexpected number of invalid validators"); + BEAST_EXPECTS( + invalidValidators[0] == lateValidator, "removed invalid validator"); + + BEAST_EXPECTS( + !slots.considered_validators_.contains(lateValidator), + "late validator was not removed"); + BEAST_EXPECTS( + slots.considered_validators_.contains(validator), + "timely validator was removed"); + } + +private: + /** A helper method to fill untrusted slots of a given Slots instance + * with random validator messages*/ + std::vector + fillUntrustedSlots( + reduce_relay::Slots& slots, + int64_t maxSlots = reduce_relay::MAX_UNTRUSTED_SLOTS) + { + std::vector keys; + for (int i = 0; i < maxSlots; ++i) + { + auto const validator = randomKeyPair(KeyType::ed25519).first; + keys.push_back(validator); + for (int j = 0; j < + env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS; + ++j) + // send enough messages so that a validator slot is selected + for (int k = 0; k < reduce_relay::MAX_MESSAGE_THRESHOLD; ++k) + slots.updateValidatorSlot( + sha512Half(validator) + static_cast(k), + validator, + j); + } + + return keys; + } + + std::unordered_map::PeerInfo> + getUntrustedSlotPeers( + PublicKey const& validator, + reduce_relay::Slots const& slots) + { + auto const& it = slots.untrusted_slots_.find(validator); + if (it == slots.untrusted_slots_.end()) + return {}; + + auto r = std:: + unordered_map::PeerInfo>(); + for (auto const& [id, info] : it->second.peers_) + r.emplace(std::make_pair(id, info)); + + return r; + } + + void + run() override + { + testConfig(); + testSquelchTracking(); + testUpdateValidatorSlot_newValidator(); + testUpdateValidatorSlot_slotsFull(); + testUpdateValidatorSlot_squelchedValidator(); + testDeleteIdlePeers_deleteIdleSlots(); + testDeleteIdlePeers_deleteIdleUntrustedPeer(); + testUpdateSlotAndSquelch_untrustedValidator(); + testUpdateConsideredValidator_newValidator(); + testUpdateConsideredValidator_idleValidator(); + testUpdateConsideredValidator_selectQualifyingValidator(); + testCleanConsideredValidators_deleteIdleValidator(); + } +}; + +BEAST_DEFINE_TESTSUITE(enhanced_squelch, overlay, ripple); + +} // namespace test + +} // namespace ripple \ No newline at end of file From a038b70bf462885f3f1ae88dc09eb05047ac958a Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 28 May 2025 16:35:41 +0200 Subject: [PATCH 09/27] removes member functions from Slots used only for testing, and moves them to tests --- ...e_relay_test.cpp => base_squelch_test.cpp} | 160 ++++++++--------- src/xrpld/overlay/Slot.h | 165 ++---------------- 2 files changed, 89 insertions(+), 236 deletions(-) rename src/test/overlay/{reduce_relay_test.cpp => base_squelch_test.cpp} (94%) diff --git a/src/test/overlay/reduce_relay_test.cpp b/src/test/overlay/base_squelch_test.cpp similarity index 94% rename from src/test/overlay/reduce_relay_test.cpp rename to src/test/overlay/base_squelch_test.cpp index a8aafcfa065..a13efb39d5e 100644 --- a/src/test/overlay/reduce_relay_test.cpp +++ b/src/test/overlay/base_squelch_test.cpp @@ -17,8 +17,8 @@ */ //============================================================================== -#include #include +#include #include #include @@ -33,8 +33,8 @@ #include -#include -#include +#include +#include #include #include @@ -191,52 +191,6 @@ class PeerPartial : public Peer } }; -/** Manually advanced clock. */ -class ManualClock -{ -public: - typedef uint64_t rep; - typedef std::milli period; - typedef std::chrono::duration duration; - typedef std::chrono::time_point time_point; - inline static bool const is_steady = false; - - static void - advance(duration d) noexcept - { - now_ += d; - } - - static void - randAdvance(milliseconds min, milliseconds max) - { - now_ += randDuration(min, max); - } - - static void - reset() noexcept - { - now_ = time_point(seconds(0)); - } - - static time_point - now() noexcept - { - return now_; - } - - static duration - randDuration(milliseconds min, milliseconds max) - { - return duration(milliseconds(rand_int(min.count(), max.count()))); - } - - explicit ManualClock() = default; - -private: - inline static time_point now_ = time_point(seconds(0)); -}; - /** Simulate server's OverlayImpl */ class Overlay { @@ -249,8 +203,7 @@ class Overlay uint256 const& key, PublicKey const& validator, Peer::id_t id, - SquelchCB f, - protocol::MessageType type = protocol::mtVALIDATION) = 0; + SquelchCB f) = 0; virtual void deleteIdlePeers(UnsquelchCB) = 0; @@ -538,8 +491,14 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler std::uint16_t inState(PublicKey const& validator, reduce_relay::PeerState state) { - auto res = slots_.inState(validator, state); - return res ? *res : 0; + auto const& it = slots_.slots_.find(validator); + if (it != slots_.slots_.end()) + return std::count_if( + it->second.peers_.begin(), + it->second.peers_.end(), + [&](auto const& it) { return (it.second.state == state); }); + + return 0; } void @@ -547,11 +506,10 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler uint256 const& key, PublicKey const& validator, Peer::id_t id, - SquelchCB f, - protocol::MessageType type = protocol::mtVALIDATION) override + SquelchCB f) override { squelch_ = f; - slots_.updateSlotAndSquelch(key, validator, id, type); + slots_.updateSlotAndSquelch(key, validator, id, true); } void @@ -632,40 +590,56 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler bool isCountingState(PublicKey const& validator) { - return slots_.inState(validator, reduce_relay::SlotState::Counting); + auto const& it = slots_.slots_.find(validator); + if (it != slots_.slots_.end()) + return it->second.state_ == reduce_relay::SlotState::Counting; + + return false; } std::set getSelected(PublicKey const& validator) { - return slots_.getSelected(validator); + auto const& it = slots_.slots_.find(validator); + if (it == slots_.slots_.end()) + return {}; + + std::set r; + for (auto const& [id, info] : it->second.peers_) + if (info.state == reduce_relay::PeerState::Selected) + r.insert(id); + + return r; } bool isSelected(PublicKey const& validator, Peer::id_t peer) { - auto selected = slots_.getSelected(validator); + auto selected = getSelected(validator); return selected.find(peer) != selected.end(); } id_t getSelectedPeer(PublicKey const& validator) { - auto selected = slots_.getSelected(validator); + auto selected = getSelected(validator); assert(selected.size()); return *selected.begin(); } - std::unordered_map< - id_t, - std::tuple< - reduce_relay::PeerState, - std::uint16_t, - std::uint32_t, - std::uint32_t>> + std::unordered_map::PeerInfo> getPeers(PublicKey const& validator) { - return slots_.getPeers(validator); + auto const& it = slots_.slots_.find(validator); + if (it == slots_.slots_.end()) + return {}; + + auto r = std:: + unordered_map::PeerInfo>(); + for (auto const& [id, info] : it->second.peers_) + r.emplace(std::make_pair(id, info)); + + return r; } std::uint16_t @@ -684,6 +658,13 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler if (auto it = peers_.find(id); it != peers_.end()) squelch_(validator, it->second, squelchDuration); } + + void + squelchAll(PublicKey const& validator, std::uint32_t duration) override + { + for (auto const& [id, peer] : peers_) + squelch_(validator, peer, duration); + } void unsquelch(PublicKey const& validator, Peer::id_t id) const override { @@ -874,8 +855,7 @@ class Network for (auto& [_, v] : peers) { (void)_; - if (std::get(v) == - reduce_relay::PeerState::Squelched) + if (v.state == reduce_relay::PeerState::Squelched) return false; } } @@ -887,7 +867,7 @@ class Network std::vector validators_; }; -class reduce_relay_test : public beast::unit_test::suite +class base_squelch_test : public beast::unit_test::suite { using Slot = reduce_relay::Slot; using id_t = Peer::id_t; @@ -901,8 +881,7 @@ class reduce_relay_test : public beast::unit_test::suite << "num peers " << (int)network_.overlay().getNumPeers() << std::endl; for (auto& [k, v] : peers) - std::cout << k << ":" << (int)std::get(v) - << " "; + std::cout << k << ":" << to_string(v.state) << " "; std::cout << std::endl; } @@ -1074,7 +1053,10 @@ class reduce_relay_test : public beast::unit_test::suite network_.overlay().isSelected(*event.key_, event.peer_); auto peers = network_.overlay().getPeers(*event.key_); auto d = reduce_relay::epoch(now).count() - - std::get<3>(peers[event.peer_]); + reduce_relay::epoch( + peers[event.peer_].lastMessage) + .count(); + mustHandle = event.isSelected_ && d > milliseconds(reduce_relay::IDLED).count() && network_.overlay().inState( @@ -1296,7 +1278,7 @@ class reduce_relay_test : public beast::unit_test::suite [&](PublicKey const& key, PeerWPtr const& peer) { unsquelched++; }); - auto peers = network_.overlay().getPeers(network_.validator(0)); + BEAST_EXPECT( unsquelched == MAX_PEERS - @@ -1317,8 +1299,7 @@ class reduce_relay_test : public beast::unit_test::suite BEAST_EXPECT(propagateAndSquelch(log, true, false)); auto peers = network_.overlay().getPeers(network_.validator(0)); auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) { - return std::get(it.second) == - reduce_relay::PeerState::Squelched; + return it.second.state == reduce_relay::PeerState::Squelched; }); assert(it != peers.end()); std::uint16_t unsquelched = 0; @@ -1514,7 +1495,7 @@ vp_base_squelch_max_selected_peers=2 auto peers = network_.overlay().getPeers(network_.validator(0)); // first message changes Slot state to Counting and is not counted, // hence '-1'. - BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1)); + BEAST_EXPECT(peers[0].count == (nMessages - 1)); // add duplicate uint256 key(nMessages - 1); network_.overlay().updateSlotAndSquelch( @@ -1524,7 +1505,7 @@ vp_base_squelch_max_selected_peers=2 [&](PublicKey const&, PeerWPtr, std::uint32_t) {}); // confirm the same number of messages peers = network_.overlay().getPeers(network_.validator(0)); - BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1)); + BEAST_EXPECT(peers[0].count == (nMessages - 1)); // advance the clock ManualClock::advance(reduce_relay::IDLED + seconds(1)); network_.overlay().updateSlotAndSquelch( @@ -1534,7 +1515,7 @@ vp_base_squelch_max_selected_peers=2 [&](PublicKey const&, PeerWPtr, std::uint32_t) {}); peers = network_.overlay().getPeers(network_.validator(0)); // confirm message number increased - BEAST_EXPECT(std::get<1>(peers[0]) == nMessages); + BEAST_EXPECT(peers[0].count == nMessages); }); } @@ -1550,6 +1531,12 @@ vp_base_squelch_max_selected_peers=2 if (duration > maxDuration_) maxDuration_ = duration; } + + void + squelchAll(PublicKey const&, std::uint32_t) override + { + } + void unsquelch(PublicKey const&, Peer::id_t) const override { @@ -1582,10 +1569,7 @@ vp_base_squelch_max_selected_peers=2 std::uint64_t mid = m * 1000 + peer; uint256 const message{mid}; slots.updateSlotAndSquelch( - message, - validator, - peer, - protocol::MessageType::mtVALIDATION); + message, validator, peer, true); } } // make Slot's internal hash router expire all messages @@ -1703,7 +1687,7 @@ vp_base_squelch_max_selected_peers=2 Network network_; public: - reduce_relay_test() + base_squelch_test() : env_(*this, jtx::envconfig([](std::unique_ptr cfg) { cfg->VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = true; cfg->VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 6; @@ -1732,7 +1716,7 @@ vp_base_squelch_max_selected_peers=2 } }; -class reduce_relay_simulate_test : public reduce_relay_test +class base_squelch_simulate_test : public base_squelch_test { void testRandom(bool log) @@ -1748,8 +1732,8 @@ class reduce_relay_simulate_test : public reduce_relay_test } }; -BEAST_DEFINE_TESTSUITE(reduce_relay, ripple_data, ripple); -BEAST_DEFINE_TESTSUITE_MANUAL(reduce_relay_simulate, ripple_data, ripple); +BEAST_DEFINE_TESTSUITE(base_squelch, ripple_data, ripple); +BEAST_DEFINE_TESTSUITE_MANUAL(base_squelch_simulate, ripple_data, ripple); } // namespace test diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 12c1375154d..3c2bc49e6a3 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -29,17 +29,23 @@ #include #include #include +#include #include #include #include +#include #include -#include -#include #include #include namespace ripple { +// used to make private members of Slots class accessible for testing +namespace test { +class enhanced_squelch_test; +class base_squelch_test; +class OverlaySim; +} // namespace test namespace reduce_relay { @@ -143,8 +149,10 @@ class SquelchHandler template class Slot final { -private: friend class Slots; + friend class test::enhanced_squelch_test; + friend class test::OverlaySim; + using id_t = Peer::id_t; using time_point = typename clock_type::time_point; @@ -215,32 +223,6 @@ class Slot final return lastSelected_; } - /** Return number of peers in state */ - std::uint16_t - inState(PeerState state) const; - - /** Return number of peers not in state */ - std::uint16_t - notInState(PeerState state) const; - - /** Return Slot's state */ - SlotState - getState() const - { - return state_; - } - - /** Return selected peers */ - std::set - getSelected() const; - - /** Get peers info. Return map of peer's state, count, squelch - * expiration milsec, and last message time milsec. - */ - std:: - unordered_map> - getPeers() const; - /** Check if peers stopped relaying messages. If a peer is * selected peer then call unsquelch handler for all * currently squelched peers and switch the slot to @@ -258,7 +240,6 @@ class Slot final std::chrono::seconds getSquelchDuration(std::size_t npeers); -private: /** Reset counts of peers in Selected or Counting state */ void resetCounts(); @@ -584,65 +565,18 @@ Slot::initCounting() resetCounts(); } -template -std::uint16_t -Slot::inState(PeerState state) const -{ - return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) { - return (it.second.state == state); - }); -} - -template -std::uint16_t -Slot::notInState(PeerState state) const -{ - return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) { - return (it.second.state != state); - }); -} - -template -std::set -Slot::getSelected() const -{ - std::set r; - for (auto const& [id, info] : peers_) - if (info.state == PeerState::Selected) - r.insert(id); - return r; -} - -template -std::unordered_map< - typename Peer::id_t, - std::tuple> -Slot::getPeers() const -{ - using namespace std::chrono; - auto r = std::unordered_map< - id_t, - std::tuple>(); - - for (auto const& [id, info] : peers_) - r.emplace(std::make_pair( - id, - std::move(std::make_tuple( - info.state, - info.count, - epoch(info.expire).count(), - epoch(info.lastMessage).count())))); - - return r; -} - /** Slots is a container for validator's Slot and handles Slot update * when a message is received from a validator. It also handles Slot aging - * and checks for peers which are disconnected or stopped relaying the messages. + * and checks for peers which are disconnected or stopped relaying the + * messages. */ template class Slots final { + friend class test::enhanced_squelch_test; + friend class test::base_squelch_test; + friend class test::OverlaySim; + using time_point = typename clock_type::time_point; using id_t = typename Peer::id_t; using messages = beast::aged_unordered_map< @@ -767,71 +701,6 @@ class Slots final */ void deleteIdlePeers(); - - /** Return number of peers in state */ - std::optional - inState(PublicKey const& validator, PeerState state) const - { - auto const& it = slots_.find(validator); - if (it != slots_.end()) - return it->second.inState(state); - return {}; - } - - /** Return number of peers not in state */ - std::optional - notInState(PublicKey const& validator, PeerState state) const - { - auto const& it = slots_.find(validator); - if (it != slots_.end()) - return it->second.notInState(state); - return {}; - } - - /** Return true if Slot is in state */ - bool - inState(PublicKey const& validator, SlotState state) const - { - auto const& it = slots_.find(validator); - if (it != slots_.end()) - return it->second.state_ == state; - return false; - } - - /** Get selected peers */ - std::set - getSelected(PublicKey const& validator) - { - auto const& it = slots_.find(validator); - if (it != slots_.end()) - return it->second.getSelected(); - return {}; - } - - /** Get peers info. Return map of peer's state, count, and squelch - * expiration milliseconds. - */ - std::unordered_map< - typename Peer::id_t, - std::tuple> - getPeers(PublicKey const& validator) - { - auto const& it = slots_.find(validator); - if (it != slots_.end()) - return it->second.getPeers(); - return {}; - } - - /** Get Slot's state */ - std::optional - getState(PublicKey const& validator) - { - auto const& it = slots_.find(validator); - if (it != slots_.end()) - return it->second.getState(); - return {}; - } - /** Called when a peer is deleted. If the peer was selected to be the * source of messages from the validator then squelched peers have to be * unsquelched. From 2a9c38693af4922851f77033a2b51c6d5fda3639 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 28 May 2025 17:08:30 +0200 Subject: [PATCH 10/27] fixes unittests for windows --- src/test/overlay/enhanced_squelch_test.cpp | 8 +++++--- src/xrpld/core/Config.h | 1 - 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/test/overlay/enhanced_squelch_test.cpp b/src/test/overlay/enhanced_squelch_test.cpp index 8993ae36b9f..477d22e0b50 100644 --- a/src/test/overlay/enhanced_squelch_test.cpp +++ b/src/test/overlay/enhanced_squelch_test.cpp @@ -715,7 +715,7 @@ vp_enhanced_squelch_enable=0 return keys; } - std::unordered_map::PeerInfo> + std::unordered_map::PeerInfo> getUntrustedSlotPeers( PublicKey const& validator, reduce_relay::Slots const& slots) @@ -724,8 +724,10 @@ vp_enhanced_squelch_enable=0 if (it == slots.untrusted_slots_.end()) return {}; - auto r = std:: - unordered_map::PeerInfo>(); + auto r = std::unordered_map< + Peer::id_t, + reduce_relay::Slot::PeerInfo>(); + for (auto const& [id, info] : it->second.peers_) r.emplace(std::make_pair(id, info)); diff --git a/src/xrpld/core/Config.h b/src/xrpld/core/Config.h index db41032d1d2..5e83583f834 100644 --- a/src/xrpld/core/Config.h +++ b/src/xrpld/core/Config.h @@ -253,7 +253,6 @@ class Config : public BasicConfig // made the default routing algorithm // std::size_t VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 5; ///////////////// END OF TEMPORARY CODE BLOCK ///////////////////// - // Reduce-relay - these parameters are experimental. // Enable enhanced squelching of unique untrusted validator messages bool VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = false; From d0a67d71ab290eb238483c9b1e0a32cf77b724d2 Mon Sep 17 00:00:00 2001 From: Vito Tumas <5780819+Tapanito@users.noreply.github.com> Date: Mon, 23 Jun 2025 15:45:57 +0200 Subject: [PATCH 11/27] Update src/xrpld/overlay/detail/OverlayImpl.h Co-authored-by: Valentin Balaschenko <13349202+vlntb@users.noreply.github.com> --- src/xrpld/overlay/detail/OverlayImpl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/xrpld/overlay/detail/OverlayImpl.h b/src/xrpld/overlay/detail/OverlayImpl.h index 0bd3fafcdbc..6135881cf57 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.h +++ b/src/xrpld/overlay/detail/OverlayImpl.h @@ -424,7 +424,7 @@ class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler * validator. * @param key Unique message's key * @param validator Validator's public key - * @param peers Peers' id to update the slots for + * @param peer Peer's id to update the slot for */ void updateValidatorSlot( From 4748caab0b7575733ea27872c4e09dc447d3bced Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Mon, 23 Jun 2025 16:11:23 +0200 Subject: [PATCH 12/27] removes duplicate untrusted slot check --- src/xrpld/overlay/Slot.h | 42 +++++++++++++++++----------------------- 1 file changed, 18 insertions(+), 24 deletions(-) diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 3c2bc49e6a3..001b79f1002 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -713,18 +713,19 @@ class Slots final /** Called to register that a given validator was squelched for a given * peer. It is expected that this method is called by SquelchHandler. * - * @param key Validator public key - * @param id peer ID + * @param validatorKey Validator public key + * @param peerID peer ID */ void - squelchValidator(PublicKey const& key, id_t id) + squelchValidator(PublicKey const& validatorKey, id_t peerID) { - auto it = peersWithValidators_.find(key); + auto it = peersWithValidators_.find(validatorKey); if (it == peersWithValidators_.end()) - peersWithValidators_.emplace(key, std::unordered_set{id}); + peersWithValidators_.emplace( + validatorKey, std::unordered_set{peerID}); - else if (it->second.find(id) == it->second.end()) - it->second.insert(id); + else if (it->second.find(peerID) == it->second.end()) + it->second.insert(peerID); } void @@ -754,38 +755,39 @@ class Slots final cleanConsideredValidators(); /** Checks whether a given validator is squelched. - * @param key Validator public key + * @param validatorKey Validator public key * @return true if a given validator was squelched */ bool - validatorSquelched(PublicKey const& key) const + validatorSquelched(PublicKey const& validatorKey) const { beast::expire( peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); - return peersWithValidators_.find(key) != peersWithValidators_.end(); + return peersWithValidators_.find(validatorKey) != + peersWithValidators_.end(); } /** Checks whether a given peer was recently sent a squelch message for * a given validator. - * @param key Validator public key - * @param id Peer id - * @return true if a given validator was squelched for a given peeru + * @param validatorKey Validator public key + * @param peerID Peer id + * @return true if a given validator was squelched for a given peer */ bool - peerSquelched(PublicKey const& key, id_t id) const + peerSquelched(PublicKey const& validatorKey, id_t peerID) const { beast::expire( peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); - auto const it = peersWithValidators_.find(key); + auto const it = peersWithValidators_.find(validatorKey); // if validator was not squelched, the peer was also not squelched if (it == peersWithValidators_.end()) return false; // if a peer is found the squelch for it has not expired - return it->second.find(id) != it->second.end(); + return it->second.find(peerID) != it->second.end(); } std::atomic_bool reduceRelayReady_{false}; @@ -997,14 +999,6 @@ Slots::updateValidatorSlot( return; } - // update a slot if the message is from a selected untrusted validator - if (auto const& it = untrusted_slots_.find(validator); - it != untrusted_slots_.end()) - { - it->second.update(validator, id, callback); - return; - } - // Do we have any available slots for additional untrusted validators? // This could happen in few cases: // 1. We received a message from a new untrusted validator, but we From 8d567a73f61d07b68608e51ac22e6346c191a0c2 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Tue, 24 Jun 2025 14:33:02 +0200 Subject: [PATCH 13/27] refactors squelching to use instanced clock instead of a static clock --- src/test/overlay/base_squelch_test.cpp | 108 +++- src/test/overlay/enhanced_squelch_test.cpp | 94 +-- src/xrpld/overlay/Slot.h | 702 ++------------------- src/xrpld/overlay/Squelch.h | 55 +- src/xrpld/overlay/detail/OverlayImpl.cpp | 4 +- src/xrpld/overlay/detail/OverlayImpl.h | 2 +- src/xrpld/overlay/detail/PeerImp.cpp | 3 +- src/xrpld/overlay/detail/PeerImp.h | 4 +- src/xrpld/overlay/detail/Slot.cpp | 652 +++++++++++++++++++ src/xrpld/overlay/detail/Squelch.cpp | 79 +++ 10 files changed, 919 insertions(+), 784 deletions(-) create mode 100644 src/xrpld/overlay/detail/Slot.cpp create mode 100644 src/xrpld/overlay/detail/Squelch.cpp diff --git a/src/test/overlay/base_squelch_test.cpp b/src/test/overlay/base_squelch_test.cpp index a13efb39d5e..a35941840e6 100644 --- a/src/test/overlay/base_squelch_test.cpp +++ b/src/test/overlay/base_squelch_test.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -44,6 +45,27 @@ namespace test { using namespace std::chrono; +template +class extended_manual_clock : public beast::manual_clock +{ +public: + using typename beast::manual_clock::duration; + using typename beast::manual_clock::time_point; + + void + reset() + { + this->set(time_point(duration(0))); + } + + void + randAdvance(std::chrono::milliseconds min, std::chrono::milliseconds max) + { + auto ms = ripple::rand_int(min.count(), max.count()); + this->advance(std::chrono::milliseconds(ms)); + } +}; + class Link; using MessageSPtr = std::shared_ptr; @@ -54,6 +76,7 @@ using SquelchCB = std::function; using UnsquelchCB = std::function; using LinkIterCB = std::function; +using TestStopwatch = extended_manual_clock; static constexpr std::uint32_t MAX_PEERS = 10; static constexpr std::uint32_t MAX_VALIDATORS = 10; @@ -208,6 +231,21 @@ class Overlay virtual void deleteIdlePeers(UnsquelchCB) = 0; virtual void deletePeer(Peer::id_t, UnsquelchCB) = 0; + + TestStopwatch& + clock() + { + return clock_; + } + + void + resetClock() + { + clock_ = TestStopwatch{}; + } + +protected: + TestStopwatch clock_; }; class Validator; @@ -415,7 +453,7 @@ class PeerSim : public PeerPartial, public std::enable_shared_from_this public: using id_t = Peer::id_t; PeerSim(Overlay& overlay, beast::Journal journal) - : overlay_(overlay), squelch_(journal) + : overlay_(overlay), squelch_(journal, overlay_.clock()) { id_ = sid_++; } @@ -463,7 +501,7 @@ class PeerSim : public PeerPartial, public std::enable_shared_from_this inline static id_t sid_ = 0; id_t id_; Overlay& overlay_; - reduce_relay::Squelch squelch_; + reduce_relay::Squelch squelch_; }; class OverlaySim : public Overlay, public reduce_relay::SquelchHandler @@ -472,9 +510,9 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler public: using id_t = Peer::id_t; - using clock_type = ManualClock; + using clock_type = TestStopwatch; OverlaySim(Application& app) - : slots_(app.logs(), *this, app.config()), logs_(app.logs()) + : slots_(app.logs(), *this, app.config(), clock_), logs_(app.logs()) { } @@ -484,7 +522,7 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler clear() { peers_.clear(); - ManualClock::advance(hours(1)); + clock_.advance(hours(1)); slots_.deleteIdlePeers(); } @@ -627,15 +665,14 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler return *selected.begin(); } - std::unordered_map::PeerInfo> + std::unordered_map getPeers(PublicKey const& validator) { auto const& it = slots_.slots_.find(validator); if (it == slots_.slots_.end()) return {}; - auto r = std:: - unordered_map::PeerInfo>(); + auto r = std::unordered_map(); for (auto const& [id, info] : it->second.peers_) r.emplace(std::make_pair(id, info)); @@ -671,11 +708,12 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler if (auto it = peers_.find(id); it != peers_.end()) unsquelch_(validator, it->second); } + SquelchCB squelch_; UnsquelchCB unsquelch_; Peers peers_; Peers peersCache_; - reduce_relay::Slots slots_; + reduce_relay::Slots slots_; Logs& logs_; }; @@ -811,7 +849,7 @@ class Network bool resetClock = true) { if (resetClock) - ManualClock::reset(); + overlay_.resetClock(); if (purge) { @@ -821,7 +859,8 @@ class Network for (int m = 0; m < nMessages; ++m) { - ManualClock::randAdvance(milliseconds(1800), milliseconds(2200)); + overlay_.clock().randAdvance( + milliseconds(1800), milliseconds(2200)); for_rand(0, nValidators, [&](std::uint32_t v) { validators_[v].for_links(link); }); @@ -869,7 +908,7 @@ class Network class base_squelch_test : public beast::unit_test::suite { - using Slot = reduce_relay::Slot; + using Slot = reduce_relay::Slot; using id_t = Peer::id_t; protected: @@ -919,7 +958,7 @@ class base_squelch_test : public beast::unit_test::suite Peer::id_t peer_; std::uint16_t validator_; std::optional key_; - time_point time_; + TestStopwatch::time_point time_; bool handled_ = false; }; @@ -931,12 +970,12 @@ class base_squelch_test : public beast::unit_test::suite { std::unordered_map events{ {LinkDown, {}}, {PeerDisconnected, {}}}; - time_point lastCheck = ManualClock::now(); + auto lastCheck = network_.overlay().clock().now(); network_.reset(); network_.propagate([&](Link& link, MessageSPtr m) { auto& validator = link.validator(); - auto now = ManualClock::now(); + auto const now = network_.overlay().clock().now(); bool squelched = false; std::stringstream str; @@ -1151,7 +1190,7 @@ class base_squelch_test : public beast::unit_test::suite void testPeerUnsquelched(bool log) { - ManualClock::advance(seconds(601)); + network_.overlay().clock().advance(seconds(601)); doTest("Peer Unsquelched", log, [this](bool log) { BEAST_EXPECT(propagateNoSquelch(log, 2, true, true, false)); }); @@ -1246,7 +1285,7 @@ class base_squelch_test : public beast::unit_test::suite testSelectedPeerDisconnects(bool log) { doTest("Selected Peer Disconnects", log, [this](bool log) { - ManualClock::advance(seconds(601)); + network_.overlay().clock().advance(seconds(601)); BEAST_EXPECT(propagateAndSquelch(log, true, false)); auto id = network_.overlay().getSelectedPeer(network_.validator(0)); std::uint16_t unsquelched = 0; @@ -1270,9 +1309,10 @@ class base_squelch_test : public beast::unit_test::suite testSelectedPeerStopsRelaying(bool log) { doTest("Selected Peer Stops Relaying", log, [this](bool log) { - ManualClock::advance(seconds(601)); + network_.overlay().clock().advance(seconds(601)); BEAST_EXPECT(propagateAndSquelch(log, true, false)); - ManualClock::advance(reduce_relay::IDLED + seconds(1)); + network_.overlay().clock().advance( + reduce_relay::IDLED + seconds(1)); std::uint16_t unsquelched = 0; network_.overlay().deleteIdlePeers( [&](PublicKey const& key, PeerWPtr const& peer) { @@ -1295,7 +1335,7 @@ class base_squelch_test : public beast::unit_test::suite testSquelchedPeerDisconnects(bool log) { doTest("Squelched Peer Disconnects", log, [this](bool log) { - ManualClock::advance(seconds(601)); + network_.overlay().clock().advance(seconds(601)); BEAST_EXPECT(propagateAndSquelch(log, true, false)); auto peers = network_.overlay().getPeers(network_.validator(0)); auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) { @@ -1450,13 +1490,16 @@ vp_base_squelch_max_selected_peers=2 testBaseSquelchReady(bool log) { doTest("BaseSquelchReady", log, [&](bool log) { - ManualClock::reset(); - auto createSlots = [&](bool baseSquelchEnabled) - -> reduce_relay::Slots { + network_.overlay().resetClock(); + auto createSlots = + [&](bool baseSquelchEnabled) -> reduce_relay::Slots { env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = baseSquelchEnabled; - return reduce_relay::Slots( - env_.app().logs(), network_.overlay(), env_.app().config()); + return reduce_relay::Slots( + env_.app().logs(), + network_.overlay(), + env_.app().config(), + network_.overlay().clock()); }; // base squelching must not be ready if squelching is disabled BEAST_EXPECT(!createSlots(false).baseSquelchReady()); @@ -1465,7 +1508,8 @@ vp_base_squelch_max_selected_peers=2 // bootup BEAST_EXPECT(!createSlots(true).baseSquelchReady()); - ManualClock::advance(reduce_relay::WAIT_ON_BOOTUP + minutes{1}); + network_.overlay().clock().advance( + reduce_relay::WAIT_ON_BOOTUP + minutes{1}); // base squelch enabled and bootup time passed BEAST_EXPECT(createSlots(true).baseSquelchReady()); @@ -1507,7 +1551,8 @@ vp_base_squelch_max_selected_peers=2 peers = network_.overlay().getPeers(network_.validator(0)); BEAST_EXPECT(peers[0].count == (nMessages - 1)); // advance the clock - ManualClock::advance(reduce_relay::IDLED + seconds(1)); + network_.overlay().clock().advance( + reduce_relay::IDLED + seconds(1)); network_.overlay().updateSlotAndSquelch( key, network_.validator(0), @@ -1553,8 +1598,11 @@ vp_base_squelch_max_selected_peers=2 auto run = [&](int npeers) { handler.maxDuration_ = 0; - reduce_relay::Slots slots( - env_.app().logs(), handler, env_.app().config()); + reduce_relay::Slots slots( + env_.app().logs(), + handler, + env_.app().config(), + network_.overlay().clock()); // 1st message from a new peer switches the slot // to counting state and resets the counts of all peers + // MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold @@ -1573,7 +1621,7 @@ vp_base_squelch_max_selected_peers=2 } } // make Slot's internal hash router expire all messages - ManualClock::advance(hours(1)); + network_.overlay().clock().advance(hours(1)); }; using namespace reduce_relay; diff --git a/src/test/overlay/enhanced_squelch_test.cpp b/src/test/overlay/enhanced_squelch_test.cpp index 477d22e0b50..06eba92712d 100644 --- a/src/test/overlay/enhanced_squelch_test.cpp +++ b/src/test/overlay/enhanced_squelch_test.cpp @@ -19,14 +19,13 @@ #include +#include #include #include #include -#include "test/overlay/clock.h" #include "xrpld/overlay/Peer.h" -#include "xrpld/overlay/ReduceRelayCommon.h" #include #include @@ -156,8 +155,9 @@ vp_enhanced_squelch_enable=0 testcase("squelchTracking"); Peer::id_t squelchedPeerID = 0; Peer::id_t newPeerID = 1; - reduce_relay::Slots slots( - env_.app().logs(), noop_handler, env_.app().config()); + TestStopwatch stopwatch; + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config(), stopwatch); auto const publicKey = randomKeyPair(KeyType::ed25519).first; // a new key should not be squelched @@ -180,7 +180,7 @@ vp_enhanced_squelch_enable=0 !slots.peerSquelched(publicKey, newPeerID), "new peer squelched"); // advance the manual clock to after expiration - ManualClock::advance( + stopwatch.advance( reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT + std::chrono::seconds{11}); @@ -199,9 +199,9 @@ vp_enhanced_squelch_enable=0 testUpdateValidatorSlot_newValidator() { testcase("updateValidatorSlot_newValidator"); - - reduce_relay::Slots slots( - env_.app().logs(), noop_handler, env_.app().config()); + TestStopwatch stopwatch; + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config(), stopwatch); Peer::id_t const peerID = 1; auto const validator = randomKeyPair(KeyType::ed25519).first; @@ -243,8 +243,9 @@ vp_enhanced_squelch_enable=0 TestHandler handler{squelch_f, noop_squelchAll, noop_unsquelch}; - reduce_relay::Slots slots( - env_.app().logs(), handler, env_.app().config()); + TestStopwatch stopwatch; + reduce_relay::Slots slots( + env_.app().logs(), handler, env_.app().config(), stopwatch); slots.squelchValidator(validator, squelchedPeerID); @@ -279,8 +280,10 @@ vp_enhanced_squelch_enable=0 // while there are open untrusted slots, no calls should be made to // squelch any validators TestHandler handler{noop_handler}; - reduce_relay::Slots slots( - env_.app().logs(), handler, env_.app().config()); + + TestStopwatch stopwatch; + reduce_relay::Slots slots( + env_.app().logs(), handler, env_.app().config(), stopwatch); // saturate validator slots auto const validators = fillUntrustedSlots(slots); @@ -333,9 +336,10 @@ vp_enhanced_squelch_enable=0 { testcase("deleteIdlePeers"); TestHandler handler{noop_handler}; + TestStopwatch stopwatch; - reduce_relay::Slots slots( - env_.app().logs(), handler, env_.app().config()); + reduce_relay::Slots slots( + env_.app().logs(), handler, env_.app().config(), stopwatch); auto keys = fillUntrustedSlots(slots); // verify that squelchAll is called for each idled slot validator @@ -358,7 +362,7 @@ vp_enhanced_squelch_enable=0 "unexpected number of untrusted slots"); // advance the manual clock to after slot expiration - ManualClock::advance( + stopwatch.advance( reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT + std::chrono::seconds{1}); @@ -377,9 +381,10 @@ vp_enhanced_squelch_enable=0 testcase("deleteIdleUntrustedPeer"); Peer::id_t const peerID = 1; Peer::id_t const peerID2 = 2; + TestStopwatch stopwatch; - reduce_relay::Slots slots( - env_.app().logs(), noop_handler, env_.app().config()); + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config(), stopwatch); // fill one untrustd validator slot auto const validator = fillUntrustedSlots(slots, 1)[0]; @@ -425,8 +430,10 @@ vp_enhanced_squelch_enable=0 TestHandler handler{noop_handler}; handler.squelch_f_ = [](PublicKey const&, Peer::id_t, std::uint32_t) {}; - reduce_relay::Slots slots( - env_.app().logs(), handler, env_.app().config()); + + TestStopwatch stopwatch; + reduce_relay::Slots slots( + env_.app().logs(), handler, env_.app().config(), stopwatch); // peers that will be source of validator messages std::vector peers = {}; @@ -444,7 +451,7 @@ vp_enhanced_squelch_enable=0 // new messages for (auto const& peer : peers) { - auto const now = ManualClock::now(); + auto const now = stopwatch.now(); slots.updateSlotAndSquelch( sha512Half(validator) + static_cast(now.time_since_epoch().count()), @@ -452,14 +459,14 @@ vp_enhanced_squelch_enable=0 peer, false); - ManualClock::advance(std::chrono::milliseconds{10}); + stopwatch.advance(std::chrono::milliseconds{10}); } // simulate new, unique validator messages sent by peers for (auto const& peer : peers) for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i) { - auto const now = ManualClock::now(); + auto const now = stopwatch.now(); slots.updateSlotAndSquelch( sha512Half(validator) + static_cast(now.time_since_epoch().count()), @@ -467,7 +474,7 @@ vp_enhanced_squelch_enable=0 peer, false); - ManualClock::advance(std::chrono::milliseconds{10}); + stopwatch.advance(std::chrono::milliseconds{10}); } auto const slotPeers = getUntrustedSlotPeers(validator, slots); @@ -506,8 +513,9 @@ vp_enhanced_squelch_enable=0 testUpdateConsideredValidator_newValidator() { testcase("testUpdateConsideredValidator_newValidator"); - reduce_relay::Slots slots( - env_.app().logs(), noop_handler, env_.app().config()); + TestStopwatch stopwatch; + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config(), stopwatch); // insert some random validator key auto const validator = randomKeyPair(KeyType::ed25519).first; @@ -545,8 +553,9 @@ vp_enhanced_squelch_enable=0 testUpdateConsideredValidator_idleValidator() { testcase("testUpdateConsideredValidator_idleValidator"); - reduce_relay::Slots slots( - env_.app().logs(), noop_handler, env_.app().config()); + TestStopwatch stopwatch; + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config(), stopwatch); // insert some random validator key auto const validator = randomKeyPair(KeyType::ed25519).first; @@ -563,7 +572,7 @@ vp_enhanced_squelch_enable=0 auto const state = slots.considered_validators_.at(validator); // simulate a validator sending a new message before the idle timer - ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1)); + stopwatch.advance(reduce_relay::IDLED - std::chrono::seconds(1)); BEAST_EXPECTS( !slots.updateConsideredValidator(validator, peerID), @@ -575,7 +584,7 @@ vp_enhanced_squelch_enable=0 "non-idling validator was updated"); // simulate a validator idling - ManualClock::advance(reduce_relay::IDLED + std::chrono::seconds(1)); + stopwatch.advance(reduce_relay::IDLED + std::chrono::seconds(1)); BEAST_EXPECTS( !slots.updateConsideredValidator(validator, peerID), @@ -591,8 +600,10 @@ vp_enhanced_squelch_enable=0 testUpdateConsideredValidator_selectQualifyingValidator() { testcase("testUpdateConsideredValidator_selectQualifyingValidator"); - reduce_relay::Slots slots( - env_.app().logs(), noop_handler, env_.app().config()); + + TestStopwatch stopwatch; + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config(), stopwatch); // insert some random validator key auto const validator = randomKeyPair(KeyType::ed25519).first; @@ -612,7 +623,7 @@ vp_enhanced_squelch_enable=0 !slots.updateConsideredValidator(validator2, peerID), "validator was selected before reaching message threshold"); - ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1)); + stopwatch.advance(reduce_relay::IDLED - std::chrono::seconds(1)); } // as long as the peer criteria is not met, the validator most not be // selected @@ -627,7 +638,7 @@ vp_enhanced_squelch_enable=0 !slots.updateConsideredValidator(validator2, i), "validator was selected before reaching enough peers"); - ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1)); + stopwatch.advance(reduce_relay::IDLED - std::chrono::seconds(1)); } auto const consideredValidator = @@ -651,8 +662,9 @@ vp_enhanced_squelch_enable=0 { testcase("cleanConsideredValidators_deleteIdleValidator"); - reduce_relay::Slots slots( - env_.app().logs(), noop_handler, env_.app().config()); + TestStopwatch stopwatch; + reduce_relay::Slots slots( + env_.app().logs(), noop_handler, env_.app().config(), stopwatch); // insert some random validator key auto const lateValidator = randomKeyPair(KeyType::ed25519).first; @@ -668,7 +680,7 @@ vp_enhanced_squelch_enable=0 "new validator was not added for consideration"); // simulate a validator idling - ManualClock::advance(reduce_relay::IDLED + std::chrono::seconds(1)); + stopwatch.advance(reduce_relay::IDLED + std::chrono::seconds(1)); BEAST_EXPECTS( !slots.updateConsideredValidator(validator, peerID), "validator was selected with insufficient number of peers"); @@ -693,7 +705,7 @@ vp_enhanced_squelch_enable=0 * with random validator messages*/ std::vector fillUntrustedSlots( - reduce_relay::Slots& slots, + reduce_relay::Slots& slots, int64_t maxSlots = reduce_relay::MAX_UNTRUSTED_SLOTS) { std::vector keys; @@ -715,18 +727,16 @@ vp_enhanced_squelch_enable=0 return keys; } - std::unordered_map::PeerInfo> + std::unordered_map getUntrustedSlotPeers( PublicKey const& validator, - reduce_relay::Slots const& slots) + reduce_relay::Slots const& slots) { auto const& it = slots.untrusted_slots_.find(validator); if (it == slots.untrusted_slots_.end()) return {}; - auto r = std::unordered_map< - Peer::id_t, - reduce_relay::Slot::PeerInfo>(); + auto r = std::unordered_map(); for (auto const& [id, info] : it->second.peers_) r.emplace(std::make_pair(id, info)); diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 001b79f1002..f827dac6f10 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -22,22 +22,16 @@ #include #include -#include #include +#include #include -#include #include #include #include #include -#include -#include -#include #include -#include -#include namespace ripple { // used to make private members of Slots class accessible for testing @@ -49,7 +43,9 @@ class OverlaySim; namespace reduce_relay { -template +using clock_type = beast::abstract_clock; +using time_point = clock_type::time_point; + class Slots; /** Peer's State */ @@ -146,16 +142,12 @@ class SquelchHandler * unsquelched, disconnected peer, or idling peer may transition Slot to * Counting state. */ -template class Slot final { - friend class Slots; + friend class Slots; friend class test::enhanced_squelch_test; friend class test::OverlaySim; - using id_t = Peer::id_t; - using time_point = typename clock_type::time_point; - // a callback to report ignored squelches using ignored_squelch_callback = std::function; @@ -169,14 +161,16 @@ class Slot final SquelchHandler const& handler, beast::Journal journal, uint16_t maxSelectedPeers, - bool isTrusted) + bool isTrusted, + clock_type& clock) : reachedThreshold_(0) - , lastSelected_(clock_type::now()) + , lastSelected_(clock.now()) , state_(SlotState::Counting) , handler_(handler) , journal_(journal) , maxSelectedPeers_(maxSelectedPeers) , isTrusted_(isTrusted) + , clock_(clock) { } @@ -200,7 +194,7 @@ class Slot final void update( PublicKey const& validator, - id_t id, + Peer::id_t id, ignored_squelch_callback callback); /** Handle peer deletion when a peer disconnects. @@ -214,7 +208,7 @@ class Slot final * disconnects */ void - deletePeer(PublicKey const& validator, id_t id, bool erase); + deletePeer(PublicKey const& validator, Peer::id_t id, bool erase); /** Get the time of the last peer selection round */ time_point const& @@ -261,17 +255,17 @@ class Slot final std::size_t timesSelected; // number of times the peer was selected }; - std::unordered_map peers_; // peer's data + std::unordered_map peers_; // peer's data // pool of peers considered as the source of messages // from validator - peers that reached MIN_MESSAGE_THRESHOLD - std::unordered_set considered_; + std::unordered_set considered_; // number of peers that reached MAX_MESSAGE_THRESHOLD std::uint16_t reachedThreshold_; // last time peers were selected, used to age the slot - typename clock_type::time_point lastSelected_; + time_point lastSelected_; SlotState state_; // slot's state SquelchHandler const& handler_; // squelch/unsquelch handler @@ -283,313 +277,32 @@ class Slot final // indicate if the slot is for a trusted validator bool const isTrusted_; -}; - -template -void -Slot::deleteIdlePeer(PublicKey const& validator) -{ - using namespace std::chrono; - auto now = clock_type::now(); - for (auto it = peers_.begin(); it != peers_.end();) - { - auto& peer = it->second; - auto id = it->first; - ++it; - if (now - peer.lastMessage > IDLED) - { - JLOG(journal_.trace()) - << "deleteIdlePeer: " << Slice(validator) << " " << id - << " idled " - << duration_cast(now - peer.lastMessage).count() - << " selected " << (peer.state == PeerState::Selected); - deletePeer(validator, id, false); - } - } -} - -template -void -Slot::update( - PublicKey const& validator, - id_t id, - ignored_squelch_callback callback) -{ - using namespace std::chrono; - auto now = clock_type::now(); - auto it = peers_.find(id); - // First message from this peer - if (it == peers_.end()) - { - JLOG(journal_.trace()) - << "update: adding peer " << Slice(validator) << " " << id; - peers_.emplace(std::make_pair( - id, - PeerInfo{ - .state = PeerState::Counting, - .count = 0, - .expire = now, - .lastMessage = now, - .timesSelected = 0})); - initCounting(); - return; - } - // Message from a peer with expired squelch - if (it->second.state == PeerState::Squelched && now > it->second.expire) - { - JLOG(journal_.trace()) - << "update: squelch expired " << Slice(validator) << " " << id; - it->second.state = PeerState::Counting; - it->second.lastMessage = now; - initCounting(); - return; - } - - auto& peer = it->second; - - JLOG(journal_.trace()) - << "update: existing peer " << Slice(validator) << " " << id - << " slot state " << static_cast(state_) << " peer state " - << static_cast(peer.state) << " count " << peer.count << " last " - << duration_cast(now - peer.lastMessage).count() - << " pool " << considered_.size() << " threshold " << reachedThreshold_; - - peer.lastMessage = now; - - // report if we received a message from a squelched peer - if (peer.state == PeerState::Squelched) - callback(); - - if (state_ != SlotState::Counting || peer.state == PeerState::Squelched) - return; - - if (++peer.count > MIN_MESSAGE_THRESHOLD) - considered_.insert(id); - if (peer.count == (MAX_MESSAGE_THRESHOLD + 1)) - ++reachedThreshold_; - - if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE_DEFAULT) - { - JLOG(journal_.trace()) - << "update: resetting due to inactivity " << Slice(validator) << " " - << id << " " << duration_cast(now - lastSelected_).count(); - initCounting(); - return; - } - - if (reachedThreshold_ == maxSelectedPeers_) - { - // Randomly select maxSelectedPeers_ peers from considered. - // Exclude peers that have been idling > IDLED - - // it's possible that deleteIdlePeer() has not been called yet. - // If number of remaining peers != maxSelectedPeers_ - // then reset the Counting state and let deleteIdlePeer() handle - // idled peers. - std::unordered_set selected; - auto const consideredPoolSize = considered_.size(); - while (selected.size() != maxSelectedPeers_ && considered_.size() != 0) - { - auto i = - considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1); - auto it = std::next(considered_.begin(), i); - auto id = *it; - considered_.erase(it); - auto const& itpeers = peers_.find(id); - if (itpeers == peers_.end()) - { - JLOG(journal_.error()) << "update: peer not found " - << Slice(validator) << " " << id; - continue; - } - if (now - itpeers->second.lastMessage < IDLED) - selected.insert(id); - } - - if (selected.size() != maxSelectedPeers_) - { - JLOG(journal_.trace()) - << "update: selection failed " << Slice(validator) << " " << id; - initCounting(); - return; - } - - lastSelected_ = now; - - auto s = selected.begin(); - JLOG(journal_.trace()) - << "update: " << Slice(validator) << " " << id << " pool size " - << consideredPoolSize << " selected " << *s << " " - << *std::next(s, 1) << " " << *std::next(s, 2); - - XRPL_ASSERT( - peers_.size() >= maxSelectedPeers_, - "ripple::reduce_relay::Slot::update : minimum peers"); - - // squelch peers which are not selected and - // not already squelched - std::stringstream str; - for (auto& [k, v] : peers_) - { - v.count = 0; - - if (selected.find(k) != selected.end()) - { - v.state = PeerState::Selected; - ++v.timesSelected; - } - - else if (v.state != PeerState::Squelched) - { - if (journal_.trace()) - str << k << " "; - v.state = PeerState::Squelched; - std::chrono::seconds duration = - getSquelchDuration(peers_.size() - maxSelectedPeers_); - v.expire = now + duration; - handler_.squelch(validator, k, duration.count()); - } - } - JLOG(journal_.trace()) << "update: squelching " << Slice(validator) - << " " << id << " " << str.str(); - considered_.clear(); - reachedThreshold_ = 0; - state_ = SlotState::Selected; - } -} - -template -std::chrono::seconds -Slot::getSquelchDuration(std::size_t npeers) -{ - using namespace std::chrono; - auto m = std::max( - MAX_UNSQUELCH_EXPIRE_DEFAULT, seconds{SQUELCH_PER_PEER * npeers}); - if (m > MAX_UNSQUELCH_EXPIRE_PEERS) - { - m = MAX_UNSQUELCH_EXPIRE_PEERS; - JLOG(journal_.warn()) - << "getSquelchDuration: unexpected squelch duration " << npeers; - } - return seconds{ripple::rand_int(MIN_UNSQUELCH_EXPIRE / 1s, m / 1s)}; -} - -template -void -Slot::deletePeer(PublicKey const& validator, id_t id, bool erase) -{ - auto it = peers_.find(id); - if (it != peers_.end()) - { - JLOG(journal_.trace()) - << "deletePeer: " << Slice(validator) << " " << id << " selected " - << (it->second.state == PeerState::Selected) << " considered " - << (considered_.find(id) != considered_.end()) << " erase " - << erase; - auto now = clock_type::now(); - if (it->second.state == PeerState::Selected) - { - for (auto& [k, v] : peers_) - { - if (v.state == PeerState::Squelched) - handler_.unsquelch(validator, k); - v.state = PeerState::Counting; - v.count = 0; - v.expire = now; - } - - considered_.clear(); - reachedThreshold_ = 0; - state_ = SlotState::Counting; - } - else if (considered_.find(id) != considered_.end()) - { - if (it->second.count > MAX_MESSAGE_THRESHOLD) - --reachedThreshold_; - considered_.erase(id); - } - - it->second.lastMessage = now; - it->second.count = 0; - - if (erase) - peers_.erase(it); - } -} - -template -void -Slot::onWrite(beast::PropertyStream::Map& stream) const -{ - auto const now = clock_type::now(); - stream["state"] = to_string(state_); - stream["reachedThreshold"] = reachedThreshold_; - stream["considered"] = considered_.size(); - stream["lastSelected"] = - duration_cast(now - lastSelected_).count(); - stream["isTrusted"] = isTrusted_; - - beast::PropertyStream::Set peers("peers", stream); - - for (auto const& [id, info] : peers_) - { - beast::PropertyStream::Map item(peers); - item["id"] = id; - item["count"] = info.count; - item["expire"] = - duration_cast(info.expire - now).count(); - item["lastMessage"] = - duration_cast(now - info.lastMessage).count(); - item["timesSelected"] = info.timesSelected; - item["state"] = to_string(info.state); - } -} -template -void -Slot::resetCounts() -{ - for (auto& [_, peer] : peers_) - { - (void)_; - peer.count = 0; - } -} - -template -void -Slot::initCounting() -{ - state_ = SlotState::Counting; - considered_.clear(); - reachedThreshold_ = 0; - resetCounts(); -} + clock_type& clock_; +}; /** Slots is a container for validator's Slot and handles Slot update * when a message is received from a validator. It also handles Slot aging * and checks for peers which are disconnected or stopped relaying the * messages. */ -template class Slots final { - friend class test::enhanced_squelch_test; + friend test::enhanced_squelch_test; friend class test::base_squelch_test; friend class test::OverlaySim; - using time_point = typename clock_type::time_point; - using id_t = typename Peer::id_t; using messages = beast::aged_unordered_map< uint256, std::unordered_set, - clock_type, + clock_type::clock_type, hardened_hash>; using validators = beast::aged_unordered_map< PublicKey, std::unordered_set, - clock_type, + clock_type::clock_type, hardened_hash>; - using slots_map = hash_map>; + using slots_map = hash_map; public: /** @@ -597,7 +310,11 @@ class Slots final * @param handler Squelch/unsquelch implementation * @param config reference to the global config */ - Slots(Logs& logs, SquelchHandler& handler, Config const& config) + Slots( + Logs& logs, + SquelchHandler& handler, + Config const& config, + clock_type& clock) : handler_(handler) , logs_(logs) , journal_(logs.journal("Slots")) @@ -605,6 +322,9 @@ class Slots final , maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS) , enhancedSquelchEnabled_( config.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE) + , clock_(clock) + , peersWithMessage_(clock) + , peersWithValidators_(clock) { } @@ -626,15 +346,7 @@ class Slots final /** Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup */ bool - reduceRelayReady() - { - if (!reduceRelayReady_) - reduceRelayReady_ = - reduce_relay::epoch(clock_type::now()) > - reduce_relay::WAIT_ON_BOOTUP; - - return reduceRelayReady_; - } + reduceRelayReady(); /** Updates untrusted validator slot. Do not call for trusted * validators. The caller must ensure passed messages are unique. @@ -643,7 +355,10 @@ class Slots final * @param id The ID of the peer that sent the message */ void - updateValidatorSlot(uint256 const& key, PublicKey const& validator, id_t id) + updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + Peer::id_t id) { updateValidatorSlot(key, validator, id, []() {}); } @@ -659,8 +374,8 @@ class Slots final updateValidatorSlot( uint256 const& key, PublicKey const& validator, - id_t id, - typename Slot::ignored_squelch_callback callback); + Peer::id_t id, + typename Slot::ignored_squelch_callback callback); /** Calls Slot::update of Slot associated with the validator, with a * noop callback. @@ -674,7 +389,7 @@ class Slots final updateSlotAndSquelch( uint256 const& key, PublicKey const& validator, - id_t id, + Peer::id_t id, bool isTrusted) { updateSlotAndSquelch(key, validator, id, []() {}, isTrusted); @@ -692,8 +407,8 @@ class Slots final updateSlotAndSquelch( uint256 const& key, PublicKey const& validator, - id_t id, - typename Slot::ignored_squelch_callback callback, + Peer::id_t id, + typename Slot::ignored_squelch_callback callback, bool isTrusted); /** Check if peers stopped relaying messages @@ -708,7 +423,7 @@ class Slots final * @param erase If true then erase the peer */ void - deletePeer(id_t id, bool erase); + deletePeer(Peer::id_t id, bool erase); /** Called to register that a given validator was squelched for a given * peer. It is expected that this method is called by SquelchHandler. @@ -717,16 +432,7 @@ class Slots final * @param peerID peer ID */ void - squelchValidator(PublicKey const& validatorKey, id_t peerID) - { - auto it = peersWithValidators_.find(validatorKey); - if (it == peersWithValidators_.end()) - peersWithValidators_.emplace( - validatorKey, std::unordered_set{peerID}); - - else if (it->second.find(peerID) == it->second.end()) - it->second.insert(peerID); - } + squelchValidator(PublicKey const& validatorKey, Peer::id_t peerID); void onWrite(beast::PropertyStream::Map& stream) const; @@ -736,7 +442,7 @@ class Slots final * from the peer. A message is aged after IDLED seconds. * Return true if added */ bool - addPeerMessage(uint256 const& key, id_t id); + addPeerMessage(uint256 const& key, Peer::id_t id); /** * Updates the last message sent from a validator. @@ -759,14 +465,7 @@ class Slots final * @return true if a given validator was squelched */ bool - validatorSquelched(PublicKey const& validatorKey) const - { - beast::expire( - peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); - - return peersWithValidators_.find(validatorKey) != - peersWithValidators_.end(); - } + validatorSquelched(PublicKey const& validatorKey); /** Checks whether a given peer was recently sent a squelch message for * a given validator. @@ -775,20 +474,7 @@ class Slots final * @return true if a given validator was squelched for a given peer */ bool - peerSquelched(PublicKey const& validatorKey, id_t peerID) const - { - beast::expire( - peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); - - auto const it = peersWithValidators_.find(validatorKey); - - // if validator was not squelched, the peer was also not squelched - if (it == peersWithValidators_.end()) - return false; - - // if a peer is found the squelch for it has not expired - return it->second.find(peerID) != it->second.end(); - } + peerSquelched(PublicKey const& validatorKey, Peer::id_t peerID); std::atomic_bool reduceRelayReady_{false}; @@ -803,18 +489,18 @@ class Slots final uint16_t const maxSelectedPeers_; bool const enhancedSquelchEnabled_; + clock_type& clock_; + // Maintain aged container of message/peers. This is required // to discard duplicate message from the same peer. A message // is aged after IDLED seconds. A message received IDLED seconds // after it was relayed is ignored by PeerImp. - inline static messages peersWithMessage_{ - beast::get_abstract_clock()}; + messages peersWithMessage_; // Maintain aged container of validator/peers. This is used to track // which validator/peer were squelced. A peer that whose squelch // has expired is removed. - inline static validators peersWithValidators_{ - beast::get_abstract_clock()}; + validators peersWithValidators_; struct ValidatorInfo { @@ -827,300 +513,6 @@ class Slots final hash_map considered_validators_; }; -template -bool -Slots::addPeerMessage(uint256 const& key, id_t id) -{ - beast::expire(peersWithMessage_, reduce_relay::IDLED); - - if (key.isNonZero()) - { - auto it = peersWithMessage_.find(key); - if (it == peersWithMessage_.end()) - { - JLOG(journal_.trace()) - << "addPeerMessage: new " << to_string(key) << " " << id; - peersWithMessage_.emplace(key, std::unordered_set{id}); - return true; - } - - if (it->second.find(id) != it->second.end()) - { - JLOG(journal_.trace()) << "addPeerMessage: duplicate message " - << to_string(key) << " " << id; - return false; - } - - JLOG(journal_.trace()) - << "addPeerMessage: added " << to_string(key) << " " << id; - - it->second.insert(id); - } - - return true; -} - -template -std::optional -Slots::updateConsideredValidator( - PublicKey const& validator, - Peer::id_t peer) -{ - auto const now = clock_type::now(); - - auto it = considered_validators_.find(validator); - if (it == considered_validators_.end()) - { - considered_validators_.emplace(std::make_pair( - validator, - ValidatorInfo{ - .count = 1, - .lastMessage = now, - .peers = {peer}, - })); - - return {}; - } - - // the validator idled. Don't update it, it will be cleaned later - if (now - it->second.lastMessage > IDLED) - return {}; - - it->second.peers.insert(peer); - - it->second.lastMessage = now; - ++it->second.count; - - if (it->second.count < MAX_MESSAGE_THRESHOLD || - it->second.peers.size() < reduce_relay::MAX_SELECTED_PEERS) - return {}; - - auto const key = it->first; - considered_validators_.erase(it); - - return key; -} - -template -std::vector -Slots::cleanConsideredValidators() -{ - auto const now = clock_type::now(); - - std::vector keys; - for (auto it = considered_validators_.begin(); - it != considered_validators_.end();) - { - if (now - it->second.lastMessage > IDLED) - { - keys.push_back(it->first); - it = considered_validators_.erase(it); - } - else - ++it; - } - - return keys; -} - -template -void -Slots::updateSlotAndSquelch( - uint256 const& key, - PublicKey const& validator, - id_t id, - typename Slot::ignored_squelch_callback callback, - bool isTrusted) -{ - if (!addPeerMessage(key, id)) - return; - - // If we receive a message from a trusted validator either update an - // existing slot or insert a new one. If we are not running enhanced - // squelching also deduplicate untrusted validator messages - if (isTrusted || !enhancedSquelchEnabled_) - { - JLOG(journal_.trace()) - << "updateSlotAndSquelch: new slot " << Slice(validator); - auto it = slots_ - .emplace(std::make_pair( - validator, - Slot( - handler_, - logs_.journal("Slot"), - maxSelectedPeers_, - isTrusted))) - .first; - it->second.update(validator, id, callback); - } - else - { - auto it = untrusted_slots_.find(validator); - // If we received a message from a validator that is not - // selected, and is not squelched, there is nothing to do. It - // will be squelched later when `updateValidatorSlot` is called. - if (it == untrusted_slots_.end()) - return; - - it->second.update(validator, id, callback); - } -} - -template -void -Slots::updateValidatorSlot( - uint256 const& key, - PublicKey const& validator, - id_t id, - typename Slot::ignored_squelch_callback callback) -{ - // We received a message from an already selected validator - // we can ignore this message - if (untrusted_slots_.find(validator) != untrusted_slots_.end()) - return; - - // We received a message from an already squelched validator. - // This could happen in few cases: - // 1. It happened so that the squelch for a particular peer expired - // before our local squelch. - // 2. We receive a message from a new peer that did not receive the - // squelch request. - // 3. The peer is ignoring our squelch request and we have not sent - // the controll message in a while. - // In all of these cases we can only send them a squelch request again. - if (validatorSquelched(validator)) - { - if (!peerSquelched(validator, id)) - { - squelchValidator(validator, id); - handler_.squelch( - validator, id, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); - } - return; - } - - // Do we have any available slots for additional untrusted validators? - // This could happen in few cases: - // 1. We received a message from a new untrusted validator, but we - // are at capacity. - // 2. We received a message from a previously squelched validator. - // In all of these cases we send a squelch message to all peers. - // The validator may still be considered by the selector. However, it - // will be eventually cleaned and squelched - if (untrusted_slots_.size() == MAX_UNTRUSTED_SLOTS) - { - handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); - return; - } - - if (auto const v = updateConsideredValidator(validator, id)) - untrusted_slots_.emplace(std::make_pair( - *v, - Slot( - handler_, logs_.journal("Slot"), maxSelectedPeers_, false))); - // When we reach MAX_UNTRUSTED_SLOTS, don't explicitly clean them. - // Since we stop updating their counters, they will idle, and will be - // removed and squelched. -} - -template -void -Slots::deletePeer(id_t id, bool erase) -{ - auto deletePeer = [&](slots_map& slots) { - for (auto& [validator, slot] : slots) - slot.deletePeer(validator, id, erase); - }; - - deletePeer(slots_); - deletePeer(untrusted_slots_); -} - -template -void -Slots::deleteIdlePeers() -{ - auto deleteSlots = [&](slots_map& slots) { - auto const now = clock_type::now(); - - for (auto it = slots.begin(); it != slots.end();) - { - it->second.deleteIdlePeer(it->first); - if (now - it->second.getLastSelected() > - MAX_UNSQUELCH_EXPIRE_DEFAULT) - { - JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot " - << Slice(it->first); - - // if an untrusted validator slot idled - peers stopped - // sending messages for this validator squelch it - if (!it->second.isTrusted_) - handler_.squelchAll( - it->first, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); - - it = slots.erase(it); - } - else - ++it; - } - }; - - deleteSlots(slots_); - deleteSlots(untrusted_slots_); - - // remove and squelch all validators that the selector deemed unsuitable - // there might be some good validators in this set that "lapsed". - // However, since these are untrusted validators we're not concerned - for (auto const& validator : cleanConsideredValidators()) - handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); -} - -template -void -Slots::onWrite(beast::PropertyStream::Map& stream) const -{ - auto const writeSlot = - [](beast::PropertyStream::Set& set, - hash_map> const& slots) { - for (auto const& [validator, slot] : slots) - { - beast::PropertyStream::Map item(set); - item["validator"] = toBase58(TokenType::NodePublic, validator); - slot.onWrite(item); - } - }; - - beast::PropertyStream::Map slots("slots", stream); - - { - beast::PropertyStream::Set set("trusted", slots); - writeSlot(set, slots_); - } - - { - beast::PropertyStream::Set set("untrusted", slots); - writeSlot(set, untrusted_slots_); - } - - { - beast::PropertyStream::Set set("considered", slots); - - auto const now = clock_type::now(); - - for (auto const& [validator, info] : considered_validators_) - { - beast::PropertyStream::Map item(set); - item["validator"] = toBase58(TokenType::NodePublic, validator); - item["lastMessage"] = - std::chrono::duration_cast( - now - info.lastMessage) - .count(); - item["messageCount"] = info.count; - item["peers"] = info.peers.size(); - } - } -} - } // namespace reduce_relay } // namespace ripple diff --git a/src/xrpld/overlay/Squelch.h b/src/xrpld/overlay/Squelch.h index 49d86d67fad..25985451af6 100644 --- a/src/xrpld/overlay/Squelch.h +++ b/src/xrpld/overlay/Squelch.h @@ -20,8 +20,6 @@ #ifndef RIPPLE_OVERLAY_SQUELCH_H_INCLUDED #define RIPPLE_OVERLAY_SQUELCH_H_INCLUDED -#include - #include #include #include @@ -33,13 +31,14 @@ namespace ripple { namespace reduce_relay { /** Maintains squelching of relaying messages from validators */ -template class Squelch { + using clock_type = beast::abstract_clock; using time_point = typename clock_type::time_point; public: - explicit Squelch(beast::Journal journal) : journal_(journal) + explicit Squelch(beast::Journal journal, clock_type& clock) + : journal_(journal), clock_(clock) { } virtual ~Squelch() = default; @@ -72,55 +71,9 @@ class Squelch * Expiration time is included in the TMSquelch message. */ hash_map squelched_; beast::Journal const journal_; + clock_type& clock_; }; -template -bool -Squelch::addSquelch( - PublicKey const& validator, - std::chrono::seconds const& squelchDuration) -{ - if (squelchDuration >= MIN_UNSQUELCH_EXPIRE && - squelchDuration <= MAX_UNSQUELCH_EXPIRE_PEERS) - { - squelched_[validator] = clock_type::now() + squelchDuration; - return true; - } - - JLOG(journal_.error()) << "squelch: invalid squelch duration " - << squelchDuration.count(); - - // unsquelch if invalid duration - removeSquelch(validator); - - return false; -} - -template -void -Squelch::removeSquelch(PublicKey const& validator) -{ - squelched_.erase(validator); -} - -template -bool -Squelch::expireSquelch(PublicKey const& validator) -{ - auto const now = clock_type::now(); - - auto const& it = squelched_.find(validator); - if (it == squelched_.end()) - return true; - else if (it->second > now) - return false; - - // squelch expired - squelched_.erase(it); - - return true; -} - } // namespace reduce_relay } // namespace ripple diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index d037a26742a..7b8b4f5ba38 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -142,7 +142,7 @@ OverlayImpl::OverlayImpl( , m_resolver(resolver) , next_id_(1) , timer_count_(0) - , slots_(app.logs(), *this, app.config()) + , slots_(app.logs(), *this, app.config(), stopwatch()) , m_stats( std::bind(&OverlayImpl::collect_metrics, this), collector, @@ -1421,8 +1421,8 @@ void OverlayImpl::squelchAll(PublicKey const& validator, uint32_t squelchDuration) { for_each([&](std::shared_ptr&& p) { - slots_.squelchValidator(validator, p->id()); p->send(makeSquelchMessage(validator, true, squelchDuration)); + slots_.squelchValidator(validator, p->id()); }); } diff --git a/src/xrpld/overlay/detail/OverlayImpl.h b/src/xrpld/overlay/detail/OverlayImpl.h index 6135881cf57..3911d1b63bc 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.h +++ b/src/xrpld/overlay/detail/OverlayImpl.h @@ -122,7 +122,7 @@ class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler std::atomic peerDisconnects_{0}; std::atomic peerDisconnectsCharges_{0}; - reduce_relay::Slots slots_; + reduce_relay::Slots slots_; // Transaction reduce-relay metrics metrics::TxMetrics txMetrics_; diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index dcb913afb8f..d0ef011ba66 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -95,7 +96,7 @@ PeerImp::PeerImp( , publicKey_(publicKey) , lastPingTime_(clock_type::now()) , creationTime_(clock_type::now()) - , squelch_(app_.journal("Squelch")) + , squelch_(app_.journal("Squelch"), stopwatch()) , usage_(consumer) , fee_{Resource::feeTrivialPeer, ""} , slot_(slot) diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index ecd3fc7f63f..acb07be11d9 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -115,7 +115,7 @@ class PeerImp : public Peer, clock_type::time_point lastPingTime_; clock_type::time_point const creationTime_; - reduce_relay::Squelch squelch_; + reduce_relay::Squelch squelch_; // Notes on thread locking: // @@ -679,7 +679,7 @@ PeerImp::PeerImp( , publicKey_(publicKey) , lastPingTime_(clock_type::now()) , creationTime_(clock_type::now()) - , squelch_(app_.journal("Squelch")) + , squelch_(app_.journal("Squelch"), stopwatch()) , usage_(usage) , fee_{Resource::feeTrivialPeer} , slot_(std::move(slot)) diff --git a/src/xrpld/overlay/detail/Slot.cpp b/src/xrpld/overlay/detail/Slot.cpp new file mode 100644 index 00000000000..08a8d7dff62 --- /dev/null +++ b/src/xrpld/overlay/detail/Slot.cpp @@ -0,0 +1,652 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace ripple { +namespace reduce_relay { + +void +Slot::deleteIdlePeer(PublicKey const& validator) +{ + using namespace std::chrono; + auto now = clock_.now(); + for (auto it = peers_.begin(); it != peers_.end();) + { + auto& peer = it->second; + auto id = it->first; + ++it; + if (now - peer.lastMessage > IDLED) + { + JLOG(journal_.trace()) + << "deleteIdlePeer: " << Slice(validator) << " " << id + << " idled " + << duration_cast(now - peer.lastMessage).count() + << " selected " << (peer.state == PeerState::Selected); + deletePeer(validator, id, false); + } + } +} + +void +Slot::update( + PublicKey const& validator, + Peer::id_t id, + ignored_squelch_callback callback) +{ + using namespace std::chrono; + auto const now = clock_.now(); + auto it = peers_.find(id); + // First message from this peer + if (it == peers_.end()) + { + JLOG(journal_.trace()) + << "update: adding peer " << Slice(validator) << " " << id; + peers_.emplace(std::make_pair( + id, + PeerInfo{ + .state = PeerState::Counting, + .count = 0, + .expire = now, + .lastMessage = now, + .timesSelected = 0})); + initCounting(); + return; + } + // Message from a peer with expired squelch + if (it->second.state == PeerState::Squelched && now > it->second.expire) + { + JLOG(journal_.trace()) + << "update: squelch expired " << Slice(validator) << " " << id; + it->second.state = PeerState::Counting; + it->second.lastMessage = now; + initCounting(); + return; + } + + auto& peer = it->second; + + JLOG(journal_.trace()) + << "update: existing peer " << Slice(validator) << " " << id + << " slot state " << static_cast(state_) << " peer state " + << static_cast(peer.state) << " count " << peer.count << " last " + << duration_cast(now - peer.lastMessage).count() + << " pool " << considered_.size() << " threshold " << reachedThreshold_; + + peer.lastMessage = now; + + // report if we received a message from a squelched peer + if (peer.state == PeerState::Squelched) + callback(); + + if (state_ != SlotState::Counting || peer.state == PeerState::Squelched) + return; + + if (++peer.count > MIN_MESSAGE_THRESHOLD) + considered_.insert(id); + if (peer.count == (MAX_MESSAGE_THRESHOLD + 1)) + ++reachedThreshold_; + + if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE_DEFAULT) + { + JLOG(journal_.trace()) + << "update: resetting due to inactivity " << Slice(validator) << " " + << id << " " << duration_cast(now - lastSelected_).count(); + initCounting(); + return; + } + + if (reachedThreshold_ == maxSelectedPeers_) + { + // Randomly select maxSelectedPeers_ peers from considered. + // Exclude peers that have been idling > IDLED - + // it's possible that deleteIdlePeer() has not been called yet. + // If number of remaining peers != maxSelectedPeers_ + // then reset the Counting state and let deleteIdlePeer() handle + // idled peers. + std::unordered_set selected; + auto const consideredPoolSize = considered_.size(); + while (selected.size() != maxSelectedPeers_ && considered_.size() != 0) + { + auto i = + considered_.size() == 1 ? 0 : rand_int(considered_.size() - 1); + auto it = std::next(considered_.begin(), i); + auto id = *it; + considered_.erase(it); + auto const& itpeers = peers_.find(id); + if (itpeers == peers_.end()) + { + JLOG(journal_.error()) << "update: peer not found " + << Slice(validator) << " " << id; + continue; + } + if (now - itpeers->second.lastMessage < IDLED) + selected.insert(id); + } + + if (selected.size() != maxSelectedPeers_) + { + JLOG(journal_.trace()) + << "update: selection failed " << Slice(validator) << " " << id; + initCounting(); + return; + } + + lastSelected_ = now; + + auto s = selected.begin(); + JLOG(journal_.trace()) + << "update: " << Slice(validator) << " " << id << " pool size " + << consideredPoolSize << " selected " << *s << " " + << *std::next(s, 1) << " " << *std::next(s, 2); + + XRPL_ASSERT( + peers_.size() >= maxSelectedPeers_, + "ripple::reduce_relay::Slot::update : minimum peers"); + + // squelch peers which are not selected and + // not already squelched + std::stringstream str; + for (auto& [k, v] : peers_) + { + v.count = 0; + + if (selected.find(k) != selected.end()) + { + v.state = PeerState::Selected; + ++v.timesSelected; + } + + else if (v.state != PeerState::Squelched) + { + if (journal_.trace()) + str << k << " "; + v.state = PeerState::Squelched; + std::chrono::seconds duration = + getSquelchDuration(peers_.size() - maxSelectedPeers_); + v.expire = now + duration; + handler_.squelch(validator, k, duration.count()); + } + } + JLOG(journal_.trace()) << "update: squelching " << Slice(validator) + << " " << id << " " << str.str(); + considered_.clear(); + reachedThreshold_ = 0; + state_ = SlotState::Selected; + } +} + +std::chrono::seconds +Slot::getSquelchDuration(std::size_t npeers) +{ + using namespace std::chrono; + auto m = std::max( + MAX_UNSQUELCH_EXPIRE_DEFAULT, seconds{SQUELCH_PER_PEER * npeers}); + if (m > MAX_UNSQUELCH_EXPIRE_PEERS) + { + m = MAX_UNSQUELCH_EXPIRE_PEERS; + JLOG(journal_.warn()) + << "getSquelchDuration: unexpected squelch duration " << npeers; + } + return seconds{ripple::rand_int(MIN_UNSQUELCH_EXPIRE / 1s, m / 1s)}; +} + +void +Slot::deletePeer(PublicKey const& validator, Peer::id_t id, bool erase) +{ + auto it = peers_.find(id); + if (it != peers_.end()) + { + JLOG(journal_.trace()) + << "deletePeer: " << Slice(validator) << " " << id << " selected " + << (it->second.state == PeerState::Selected) << " considered " + << (considered_.find(id) != considered_.end()) << " erase " + << erase; + auto now = clock_.now(); + if (it->second.state == PeerState::Selected) + { + for (auto& [k, v] : peers_) + { + if (v.state == PeerState::Squelched) + handler_.unsquelch(validator, k); + v.state = PeerState::Counting; + v.count = 0; + v.expire = now; + } + + considered_.clear(); + reachedThreshold_ = 0; + state_ = SlotState::Counting; + } + else if (considered_.find(id) != considered_.end()) + { + if (it->second.count > MAX_MESSAGE_THRESHOLD) + --reachedThreshold_; + considered_.erase(id); + } + + it->second.lastMessage = now; + it->second.count = 0; + + if (erase) + peers_.erase(it); + } +} + +void +Slot::onWrite(beast::PropertyStream::Map& stream) const +{ + auto const now = clock_.now(); + stream["state"] = to_string(state_); + stream["reachedThreshold"] = reachedThreshold_; + stream["considered"] = considered_.size(); + stream["lastSelected"] = + duration_cast(now - lastSelected_).count(); + stream["isTrusted"] = isTrusted_; + + beast::PropertyStream::Set peers("peers", stream); + + for (auto const& [id, info] : peers_) + { + beast::PropertyStream::Map item(peers); + item["id"] = id; + item["count"] = info.count; + item["expire"] = + duration_cast(info.expire - now).count(); + item["lastMessage"] = + duration_cast(now - info.lastMessage).count(); + item["timesSelected"] = info.timesSelected; + item["state"] = to_string(info.state); + } +} + +void +Slot::resetCounts() +{ + for (auto& [_, peer] : peers_) + { + (void)_; + peer.count = 0; + } +} + +void +Slot::initCounting() +{ + state_ = SlotState::Counting; + considered_.clear(); + reachedThreshold_ = 0; + resetCounts(); +} + +// --------------------------------- Slots --------------------------------- // + +bool +Slots::reduceRelayReady() +{ + if (!reduceRelayReady_) + reduceRelayReady_ = reduce_relay::epoch( + clock_.now()) > reduce_relay::WAIT_ON_BOOTUP; + + return reduceRelayReady_; +} + +void +Slots::squelchValidator(PublicKey const& validatorKey, Peer::id_t peerID) +{ + auto it = peersWithValidators_.find(validatorKey); + if (it == peersWithValidators_.end()) + peersWithValidators_.emplace( + validatorKey, std::unordered_set{peerID}); + + else if (it->second.find(peerID) == it->second.end()) + it->second.insert(peerID); +} + +bool +Slots::validatorSquelched(PublicKey const& validatorKey) +{ + beast::expire( + peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); + + return peersWithValidators_.find(validatorKey) != + peersWithValidators_.end(); +} + +bool +Slots::peerSquelched(PublicKey const& validatorKey, Peer::id_t peerID) +{ + beast::expire( + peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); + + auto const it = peersWithValidators_.find(validatorKey); + + // if validator was not squelched, the peer was also not squelched + if (it == peersWithValidators_.end()) + return false; + + // if a peer is found the squelch for it has not expired + return it->second.find(peerID) != it->second.end(); +} + +bool +Slots::addPeerMessage(uint256 const& key, Peer::id_t id) +{ + beast::expire(peersWithMessage_, reduce_relay::IDLED); + + if (key.isNonZero()) + { + auto it = peersWithMessage_.find(key); + if (it == peersWithMessage_.end()) + { + JLOG(journal_.trace()) + << "addPeerMessage: new " << to_string(key) << " " << id; + peersWithMessage_.emplace(key, std::unordered_set{id}); + return true; + } + + if (it->second.find(id) != it->second.end()) + { + JLOG(journal_.trace()) << "addPeerMessage: duplicate message " + << to_string(key) << " " << id; + return false; + } + + JLOG(journal_.trace()) + << "addPeerMessage: added " << to_string(key) << " " << id; + + it->second.insert(id); + } + + return true; +} + +std::optional +Slots::updateConsideredValidator(PublicKey const& validator, Peer::id_t peer) +{ + auto const now = clock_.now(); + + auto it = considered_validators_.find(validator); + if (it == considered_validators_.end()) + { + considered_validators_.emplace(std::make_pair( + validator, + ValidatorInfo{ + .count = 1, + .lastMessage = now, + .peers = {peer}, + })); + + return {}; + } + + // the validator idled. Don't update it, it will be cleaned later + if (now - it->second.lastMessage > IDLED) + return {}; + + it->second.peers.insert(peer); + + it->second.lastMessage = now; + ++it->second.count; + + if (it->second.count < MAX_MESSAGE_THRESHOLD || + it->second.peers.size() < reduce_relay::MAX_SELECTED_PEERS) + return {}; + + auto const key = it->first; + considered_validators_.erase(it); + + return key; +} + +std::vector +Slots::cleanConsideredValidators() +{ + auto const now = clock_.now(); + + std::vector keys; + for (auto it = considered_validators_.begin(); + it != considered_validators_.end();) + { + if (now - it->second.lastMessage > IDLED) + { + keys.push_back(it->first); + it = considered_validators_.erase(it); + } + else + ++it; + } + + return keys; +} + +void +Slots::updateSlotAndSquelch( + uint256 const& key, + PublicKey const& validator, + Peer::id_t id, + typename Slot::ignored_squelch_callback callback, + bool isTrusted) +{ + if (!addPeerMessage(key, id)) + return; + + // If we receive a message from a trusted validator either update an + // existing slot or insert a new one. If we are not running enhanced + // squelching also deduplicate untrusted validator messages + if (isTrusted || !enhancedSquelchEnabled_) + { + JLOG(journal_.trace()) + << "updateSlotAndSquelch: new slot " << Slice(validator); + auto it = slots_ + .emplace(std::make_pair( + validator, + Slot( + handler_, + logs_.journal("Slot"), + maxSelectedPeers_, + isTrusted, + clock_))) + .first; + it->second.update(validator, id, callback); + } + else + { + auto it = untrusted_slots_.find(validator); + // If we received a message from a validator that is not + // selected, and is not squelched, there is nothing to do. It + // will be squelched later when `updateValidatorSlot` is called. + if (it == untrusted_slots_.end()) + return; + + it->second.update(validator, id, callback); + } +} + +void +Slots::updateValidatorSlot( + uint256 const& key, + PublicKey const& validator, + Peer::id_t id, + typename Slot::ignored_squelch_callback callback) +{ + // We received a message from an already selected validator + // we can ignore this message + if (untrusted_slots_.find(validator) != untrusted_slots_.end()) + return; + + // We received a message from an already squelched validator. + // This could happen in few cases: + // 1. It happened so that the squelch for a particular peer expired + // before our local squelch. + // 2. We receive a message from a new peer that did not receive the + // squelch request. + // 3. The peer is ignoring our squelch request and we have not sent + // the controll message in a while. + // In all of these cases we can only send them a squelch request again. + if (validatorSquelched(validator)) + { + if (!peerSquelched(validator, id)) + { + squelchValidator(validator, id); + handler_.squelch( + validator, id, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + } + return; + } + + // Do we have any available slots for additional untrusted validators? + // This could happen in few cases: + // 1. We received a message from a new untrusted validator, but we + // are at capacity. + // 2. We received a message from a previously squelched validator. + // In all of these cases we send a squelch message to all peers. + // The validator may still be considered by the selector. However, it + // will be eventually cleaned and squelched + if (untrusted_slots_.size() == MAX_UNTRUSTED_SLOTS) + { + handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + return; + } + + if (auto const v = updateConsideredValidator(validator, id)) + untrusted_slots_.emplace(std::make_pair( + *v, + Slot( + handler_, + logs_.journal("Slot"), + maxSelectedPeers_, + false, + clock_))); + // When we reach MAX_UNTRUSTED_SLOTS, don't explicitly clean them. + // Since we stop updating their counters, they will idle, and will be + // removed and squelched. +} + +void +Slots::deletePeer(id_t id, bool erase) +{ + auto deletePeer = [&](slots_map& slots) { + for (auto& [validator, slot] : slots) + slot.deletePeer(validator, id, erase); + }; + + deletePeer(slots_); + deletePeer(untrusted_slots_); +} + +void +Slots::deleteIdlePeers() +{ + auto deleteSlots = [&](slots_map& slots) { + auto const now = clock_.now(); + + for (auto it = slots.begin(); it != slots.end();) + { + it->second.deleteIdlePeer(it->first); + if (now - it->second.getLastSelected() > + MAX_UNSQUELCH_EXPIRE_DEFAULT) + { + JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot " + << Slice(it->first); + + // if an untrusted validator slot idled - peers stopped + // sending messages for this validator squelch it + if (!it->second.isTrusted_) + handler_.squelchAll( + it->first, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + + it = slots.erase(it); + } + else + ++it; + } + }; + + deleteSlots(slots_); + deleteSlots(untrusted_slots_); + + // remove and squelch all validators that the selector deemed unsuitable + // there might be some good validators in this set that "lapsed". + // However, since these are untrusted validators we're not concerned + for (auto const& validator : cleanConsideredValidators()) + handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); +} + +void +Slots::onWrite(beast::PropertyStream::Map& stream) const +{ + auto const writeSlot = [](beast::PropertyStream::Set& set, + hash_map const& slots) { + for (auto const& [validator, slot] : slots) + { + beast::PropertyStream::Map item(set); + item["validator"] = toBase58(TokenType::NodePublic, validator); + slot.onWrite(item); + } + }; + + beast::PropertyStream::Map slots("slots", stream); + + { + beast::PropertyStream::Set set("trusted", slots); + writeSlot(set, slots_); + } + + { + beast::PropertyStream::Set set("untrusted", slots); + writeSlot(set, untrusted_slots_); + } + + { + beast::PropertyStream::Set set("considered", slots); + + auto const now = clock_.now(); + + for (auto const& [validator, info] : considered_validators_) + { + beast::PropertyStream::Map item(set); + item["validator"] = toBase58(TokenType::NodePublic, validator); + item["lastMessage"] = + std::chrono::duration_cast( + now - info.lastMessage) + .count(); + item["messageCount"] = info.count; + item["peers"] = info.peers.size(); + } + } +} + +} // namespace reduce_relay + +} // namespace ripple diff --git a/src/xrpld/overlay/detail/Squelch.cpp b/src/xrpld/overlay/detail/Squelch.cpp new file mode 100644 index 00000000000..a72421e03af --- /dev/null +++ b/src/xrpld/overlay/detail/Squelch.cpp @@ -0,0 +1,79 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include + +#include +#include +#include + +#include + +namespace ripple { + +namespace reduce_relay { + +bool +Squelch::addSquelch( + PublicKey const& validator, + std::chrono::seconds const& squelchDuration) +{ + if (squelchDuration >= MIN_UNSQUELCH_EXPIRE && + squelchDuration <= MAX_UNSQUELCH_EXPIRE_PEERS) + { + squelched_[validator] = clock_.now() + squelchDuration; + return true; + } + + JLOG(journal_.error()) << "squelch: invalid squelch duration " + << squelchDuration.count(); + + // unsquelch if invalid duration + removeSquelch(validator); + + return false; +} + +void +Squelch::removeSquelch(PublicKey const& validator) +{ + squelched_.erase(validator); +} + +bool +Squelch::expireSquelch(PublicKey const& validator) +{ + auto const now = clock_.now(); + + auto const& it = squelched_.find(validator); + if (it == squelched_.end()) + return true; + else if (it->second > now) + return false; + + // squelch expired + squelched_.erase(it); + + return true; +} + +} // namespace reduce_relay + +} // namespace ripple From 8767282ecc5d8c23bea795fb39974f18d9c7ac58 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Tue, 24 Jun 2025 14:49:30 +0200 Subject: [PATCH 14/27] improves code readabiliy --- src/test/overlay/base_squelch_test.cpp | 14 ++++++++++---- src/xrpld/overlay/Slot.h | 7 ------- src/xrpld/overlay/detail/Slot.cpp | 5 +++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/test/overlay/base_squelch_test.cpp b/src/test/overlay/base_squelch_test.cpp index a35941840e6..56f71fbc426 100644 --- a/src/test/overlay/base_squelch_test.cpp +++ b/src/test/overlay/base_squelch_test.cpp @@ -35,6 +35,7 @@ #include #include +#include #include #include #include @@ -999,7 +1000,8 @@ class base_squelch_test : public beast::unit_test::suite str << s << " "; if (log) std::cout - << (double)reduce_relay::epoch(now) + << (double)std::chrono::duration_cast( + now.time_since_epoch()) .count() / 1000. << " random, squelched, validator: " << validator.id() @@ -1091,9 +1093,13 @@ class base_squelch_test : public beast::unit_test::suite event.isSelected_ = network_.overlay().isSelected(*event.key_, event.peer_); auto peers = network_.overlay().getPeers(*event.key_); - auto d = reduce_relay::epoch(now).count() - - reduce_relay::epoch( - peers[event.peer_].lastMessage) + + auto d = + std::chrono::duration_cast( + now.time_since_epoch()) + .count() - + std::chrono::duration_cast( + peers[event.peer_].lastMessage.time_since_epoch()) .count(); mustHandle = event.isSelected_ && diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index f827dac6f10..7e21d8cdd25 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -90,13 +90,6 @@ to_string(SlotState state) } } -template -Unit -epoch(TP const& t) -{ - return std::chrono::duration_cast(t.time_since_epoch()); -} - /** Abstract class. Declares squelch and unsquelch handlers. * OverlayImpl inherits from this class. Motivation is * for easier unit tests to facilitate on the fly diff --git a/src/xrpld/overlay/detail/Slot.cpp b/src/xrpld/overlay/detail/Slot.cpp index 08a8d7dff62..a93f7321050 100644 --- a/src/xrpld/overlay/detail/Slot.cpp +++ b/src/xrpld/overlay/detail/Slot.cpp @@ -315,8 +315,9 @@ bool Slots::reduceRelayReady() { if (!reduceRelayReady_) - reduceRelayReady_ = reduce_relay::epoch( - clock_.now()) > reduce_relay::WAIT_ON_BOOTUP; + reduceRelayReady_ = + std::chrono::duration_cast( + clock_.now().time_since_epoch()) > reduce_relay::WAIT_ON_BOOTUP; return reduceRelayReady_; } From ad32b2fb8f71f3a373f6e483f9186832f9e79a3b Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Tue, 24 Jun 2025 14:55:19 +0200 Subject: [PATCH 15/27] removes unused clock --- src/test/overlay/clock.h | 83 ---------------------------------------- 1 file changed, 83 deletions(-) delete mode 100644 src/test/overlay/clock.h diff --git a/src/test/overlay/clock.h b/src/test/overlay/clock.h deleted file mode 100644 index 25dc69baceb..00000000000 --- a/src/test/overlay/clock.h +++ /dev/null @@ -1,83 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2025 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_TEST_OVERLAY_CLOCK_H_INCLUDED -#define RIPPLE_TEST_OVERLAY_CLOCK_H_INCLUDED - -#include - -#include -#include -#include - -namespace ripple { - -namespace test { - -using namespace std::chrono; - -/** Manually advanced clock. */ -class ManualClock -{ -public: - typedef uint64_t rep; - typedef std::milli period; - typedef std::chrono::duration duration; - typedef std::chrono::time_point time_point; - inline static bool const is_steady = false; - - static void - advance(duration d) noexcept - { - now_ += d; - } - - static void - randAdvance(milliseconds min, milliseconds max) - { - now_ += randDuration(min, max); - } - - static void - reset() noexcept - { - now_ = time_point(seconds(0)); - } - - static time_point - now() noexcept - { - return now_; - } - - static duration - randDuration(milliseconds min, milliseconds max) - { - return duration(milliseconds(rand_int(min.count(), max.count()))); - } - - explicit ManualClock() = default; - -private: - inline static time_point now_ = time_point(seconds(0)); -}; -} // namespace test -} // namespace ripple - -#endif \ No newline at end of file From 1fc8057a9d715f41cfa0616f23d74fdbf5dc7fe9 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Tue, 24 Jun 2025 15:46:57 +0200 Subject: [PATCH 16/27] removes unused imports --- src/test/overlay/base_squelch_test.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/overlay/base_squelch_test.cpp b/src/test/overlay/base_squelch_test.cpp index 56f71fbc426..9ec1afadbbe 100644 --- a/src/test/overlay/base_squelch_test.cpp +++ b/src/test/overlay/base_squelch_test.cpp @@ -18,7 +18,6 @@ //============================================================================== #include -#include #include #include From ebbce1c7b2a3992f1eb9b21f250c1fbdd5f8d408 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Tue, 24 Jun 2025 17:24:37 +0200 Subject: [PATCH 17/27] removes unused test parameters --- src/test/overlay/base_squelch_test.cpp | 65 +++++++++----------------- 1 file changed, 22 insertions(+), 43 deletions(-) diff --git a/src/test/overlay/base_squelch_test.cpp b/src/test/overlay/base_squelch_test.cpp index 9ec1afadbbe..70ef31cc818 100644 --- a/src/test/overlay/base_squelch_test.cpp +++ b/src/test/overlay/base_squelch_test.cpp @@ -29,9 +29,6 @@ #include #include #include -#include - -#include #include #include @@ -52,12 +49,6 @@ class extended_manual_clock : public beast::manual_clock using typename beast::manual_clock::duration; using typename beast::manual_clock::time_point; - void - reset() - { - this->set(time_point(duration(0))); - } - void randAdvance(std::chrono::milliseconds min, std::chrono::milliseconds max) { @@ -238,12 +229,6 @@ class Overlay return clock_; } - void - resetClock() - { - clock_ = TestStopwatch{}; - } - protected: TestStopwatch clock_; }; @@ -845,12 +830,8 @@ class Network LinkIterCB link, std::uint16_t nValidators = MAX_VALIDATORS, std::uint32_t nMessages = MAX_MESSAGES, - bool purge = true, - bool resetClock = true) + bool purge = true) { - if (resetClock) - overlay_.resetClock(); - if (purge) { purgePeers(); @@ -1185,7 +1166,7 @@ class base_squelch_test : public beast::unit_test::suite testPeerUnsquelchedTooSoon(bool log) { doTest("Peer Unsquelched Too Soon", log, [this](bool log) { - BEAST_EXPECT(propagateNoSquelch(log, 1, false, false, false)); + BEAST_EXPECT(propagateNoSquelch(log, 1, false, false)); }); } @@ -1197,13 +1178,13 @@ class base_squelch_test : public beast::unit_test::suite { network_.overlay().clock().advance(seconds(601)); doTest("Peer Unsquelched", log, [this](bool log) { - BEAST_EXPECT(propagateNoSquelch(log, 2, true, true, false)); + BEAST_EXPECT(propagateNoSquelch(log, 2, true, true)); }); } /** Propagate enough messages to generate one squelch event */ bool - propagateAndSquelch(bool log, bool purge = true, bool resetClock = true) + propagateAndSquelch(bool log, bool purge = true) { int n = 0; network_.propagate( @@ -1230,8 +1211,7 @@ class base_squelch_test : public beast::unit_test::suite }, 1, reduce_relay::MAX_MESSAGE_THRESHOLD + 2, - purge, - resetClock); + purge); auto selected = network_.overlay().getSelected(network_.validator(0)); BEAST_EXPECT( selected.size() == @@ -1248,8 +1228,7 @@ class base_squelch_test : public beast::unit_test::suite bool log, std::uint16_t nMessages, bool countingState, - bool purge = true, - bool resetClock = true) + bool purge = true) { bool squelched = false; network_.propagate( @@ -1265,8 +1244,7 @@ class base_squelch_test : public beast::unit_test::suite }, 1, nMessages, - purge, - resetClock); + purge); auto res = checkCounting(network_.validator(0), countingState); return !squelched && res; } @@ -1278,9 +1256,9 @@ class base_squelch_test : public beast::unit_test::suite testNewPeer(bool log) { doTest("New Peer", log, [this](bool log) { - BEAST_EXPECT(propagateAndSquelch(log, true, false)); + BEAST_EXPECT(propagateAndSquelch(log, true)); network_.addPeer(); - BEAST_EXPECT(propagateNoSquelch(log, 1, true, false, false)); + BEAST_EXPECT(propagateNoSquelch(log, 1, true, false)); }); } @@ -1291,7 +1269,7 @@ class base_squelch_test : public beast::unit_test::suite { doTest("Selected Peer Disconnects", log, [this](bool log) { network_.overlay().clock().advance(seconds(601)); - BEAST_EXPECT(propagateAndSquelch(log, true, false)); + BEAST_EXPECT(propagateAndSquelch(log, true)); auto id = network_.overlay().getSelectedPeer(network_.validator(0)); std::uint16_t unsquelched = 0; network_.overlay().deletePeer( @@ -1315,7 +1293,7 @@ class base_squelch_test : public beast::unit_test::suite { doTest("Selected Peer Stops Relaying", log, [this](bool log) { network_.overlay().clock().advance(seconds(601)); - BEAST_EXPECT(propagateAndSquelch(log, true, false)); + BEAST_EXPECT(propagateAndSquelch(log, true)); network_.overlay().clock().advance( reduce_relay::IDLED + seconds(1)); std::uint16_t unsquelched = 0; @@ -1341,7 +1319,7 @@ class base_squelch_test : public beast::unit_test::suite { doTest("Squelched Peer Disconnects", log, [this](bool log) { network_.overlay().clock().advance(seconds(601)); - BEAST_EXPECT(propagateAndSquelch(log, true, false)); + BEAST_EXPECT(propagateAndSquelch(log, true)); auto peers = network_.overlay().getPeers(network_.validator(0)); auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) { return it.second.state == reduce_relay::PeerState::Squelched; @@ -1495,33 +1473,34 @@ vp_base_squelch_max_selected_peers=2 testBaseSquelchReady(bool log) { doTest("BaseSquelchReady", log, [&](bool log) { - network_.overlay().resetClock(); auto createSlots = - [&](bool baseSquelchEnabled) -> reduce_relay::Slots { + [&](bool baseSquelchEnabled, + TestStopwatch stopwatch) -> reduce_relay::Slots { env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = baseSquelchEnabled; return reduce_relay::Slots( env_.app().logs(), network_.overlay(), env_.app().config(), - network_.overlay().clock()); + stopwatch); }; + + TestStopwatch stopwatch; // base squelching must not be ready if squelching is disabled - BEAST_EXPECT(!createSlots(false).baseSquelchReady()); + BEAST_EXPECT(!createSlots(false, stopwatch).baseSquelchReady()); // base squelch must not be ready as not enough time passed from // bootup - BEAST_EXPECT(!createSlots(true).baseSquelchReady()); + BEAST_EXPECT(!createSlots(true, stopwatch).baseSquelchReady()); - network_.overlay().clock().advance( - reduce_relay::WAIT_ON_BOOTUP + minutes{1}); + stopwatch.advance(reduce_relay::WAIT_ON_BOOTUP + minutes{1}); // base squelch enabled and bootup time passed - BEAST_EXPECT(createSlots(true).baseSquelchReady()); + BEAST_EXPECT(createSlots(true, stopwatch).baseSquelchReady()); // even if time passed, base squelching must not be ready if turned // off in the config - BEAST_EXPECT(!createSlots(false).baseSquelchReady()); + BEAST_EXPECT(!createSlots(false, stopwatch).baseSquelchReady()); }); } From a703fcaaeca04ade96a1483ad759610c0b807403 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 25 Jun 2025 17:57:11 +0200 Subject: [PATCH 18/27] improves code readability --- src/xrpld/overlay/Slot.h | 20 +++++++++------ src/xrpld/overlay/detail/Slot.cpp | 38 +++++++++++++++------------- src/xrpld/overlay/detail/Squelch.cpp | 2 +- 3 files changed, 33 insertions(+), 27 deletions(-) diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 7e21d8cdd25..86f9997338b 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -149,6 +149,8 @@ class Slot final * @param handler Squelch/Unsquelch implementation * @param maxSelectedPeers the maximum number of peers to be selected as * validator message source + * @param istrusted indicate if the slot is for a trusted validator + * @param clock a reference to a steady clock */ Slot( SquelchHandler const& handler, @@ -225,7 +227,7 @@ class Slot final * @param npeers number of peers that can be squelched in the Slot */ std::chrono::seconds - getSquelchDuration(std::size_t npeers); + getSquelchDuration(std::size_t npeers) const; /** Reset counts of peers in Selected or Counting state */ void @@ -302,6 +304,7 @@ class Slots final * @param logs reference to the logger * @param handler Squelch/unsquelch implementation * @param config reference to the global config + * @param clock a reference to a steady clock */ Slots( Logs& logs, @@ -453,26 +456,27 @@ class Slots final std::vector cleanConsideredValidators(); - /** Checks whether a given validator is squelched. + /** Expires old validators and checks whether a given validator is + * squelched. * @param validatorKey Validator public key * @return true if a given validator was squelched */ bool - validatorSquelched(PublicKey const& validatorKey); + expireAndIsValidatorSquelched(PublicKey const& validatorKey); - /** Checks whether a given peer was recently sent a squelch message for - * a given validator. + /** Expires old validators and checks whether a given peer was recently + * squelched for a given validator. * @param validatorKey Validator public key * @param peerID Peer id * @return true if a given validator was squelched for a given peer */ bool - peerSquelched(PublicKey const& validatorKey, Peer::id_t peerID); + expireAndIsPeerSquelched(PublicKey const& validatorKey, Peer::id_t peerID); std::atomic_bool reduceRelayReady_{false}; slots_map slots_; - slots_map untrusted_slots_; + slots_map untrustedSlots_; SquelchHandler& handler_; // squelch/unsquelch handler Logs& logs_; @@ -503,7 +507,7 @@ class Slots final // message for this validator }; - hash_map considered_validators_; + hash_map consideredValidators_; }; } // namespace reduce_relay diff --git a/src/xrpld/overlay/detail/Slot.cpp b/src/xrpld/overlay/detail/Slot.cpp index a93f7321050..a1b1003b8a6 100644 --- a/src/xrpld/overlay/detail/Slot.cpp +++ b/src/xrpld/overlay/detail/Slot.cpp @@ -1,7 +1,7 @@ //------------------------------------------------------------------------------ /* This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. + Copyright (c) 2025 Ripple Labs Inc. Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above @@ -134,7 +134,7 @@ Slot::update( // If number of remaining peers != maxSelectedPeers_ // then reset the Counting state and let deleteIdlePeer() handle // idled peers. - std::unordered_set selected; + std::unordered_set selected; auto const consideredPoolSize = considered_.size(); while (selected.size() != maxSelectedPeers_ && considered_.size() != 0) { @@ -207,7 +207,7 @@ Slot::update( } std::chrono::seconds -Slot::getSquelchDuration(std::size_t npeers) +Slot::getSquelchDuration(std::size_t npeers) const { using namespace std::chrono; auto m = std::max( @@ -328,14 +328,14 @@ Slots::squelchValidator(PublicKey const& validatorKey, Peer::id_t peerID) auto it = peersWithValidators_.find(validatorKey); if (it == peersWithValidators_.end()) peersWithValidators_.emplace( - validatorKey, std::unordered_set{peerID}); + validatorKey, std::unordered_set{peerID}); else if (it->second.find(peerID) == it->second.end()) it->second.insert(peerID); } bool -Slots::validatorSquelched(PublicKey const& validatorKey) +Slots::expireAndIsValidatorSquelched(PublicKey const& validatorKey) { beast::expire( peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); @@ -345,7 +345,9 @@ Slots::validatorSquelched(PublicKey const& validatorKey) } bool -Slots::peerSquelched(PublicKey const& validatorKey, Peer::id_t peerID) +Slots::expireAndIsPeerSquelched( + PublicKey const& validatorKey, + Peer::id_t peerID) { beast::expire( peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); @@ -372,7 +374,7 @@ Slots::addPeerMessage(uint256 const& key, Peer::id_t id) { JLOG(journal_.trace()) << "addPeerMessage: new " << to_string(key) << " " << id; - peersWithMessage_.emplace(key, std::unordered_set{id}); + peersWithMessage_.emplace(key, std::unordered_set{id}); return true; } @@ -483,11 +485,11 @@ Slots::updateSlotAndSquelch( } else { - auto it = untrusted_slots_.find(validator); + auto it = untrustedSlots_.find(validator); // If we received a message from a validator that is not // selected, and is not squelched, there is nothing to do. It // will be squelched later when `updateValidatorSlot` is called. - if (it == untrusted_slots_.end()) + if (it == untrustedSlots_.end()) return; it->second.update(validator, id, callback); @@ -503,7 +505,7 @@ Slots::updateValidatorSlot( { // We received a message from an already selected validator // we can ignore this message - if (untrusted_slots_.find(validator) != untrusted_slots_.end()) + if (untrustedSlots_.find(validator) != untrustedSlots_.end()) return; // We received a message from an already squelched validator. @@ -515,9 +517,9 @@ Slots::updateValidatorSlot( // 3. The peer is ignoring our squelch request and we have not sent // the controll message in a while. // In all of these cases we can only send them a squelch request again. - if (validatorSquelched(validator)) + if (expireAndIsValidatorSquelched(validator)) { - if (!peerSquelched(validator, id)) + if (!expireAndIsPeerSquelched(validator, id)) { squelchValidator(validator, id); handler_.squelch( @@ -534,14 +536,14 @@ Slots::updateValidatorSlot( // In all of these cases we send a squelch message to all peers. // The validator may still be considered by the selector. However, it // will be eventually cleaned and squelched - if (untrusted_slots_.size() == MAX_UNTRUSTED_SLOTS) + if (untrustedSlots_.size() == MAX_UNTRUSTED_SLOTS) { handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); return; } if (auto const v = updateConsideredValidator(validator, id)) - untrusted_slots_.emplace(std::make_pair( + untrustedSlots_.emplace(std::make_pair( *v, Slot( handler_, @@ -563,7 +565,7 @@ Slots::deletePeer(id_t id, bool erase) }; deletePeer(slots_); - deletePeer(untrusted_slots_); + deletePeer(untrustedSlots_); } void @@ -595,7 +597,7 @@ Slots::deleteIdlePeers() }; deleteSlots(slots_); - deleteSlots(untrusted_slots_); + deleteSlots(untrustedSlots_); // remove and squelch all validators that the selector deemed unsuitable // there might be some good validators in this set that "lapsed". @@ -626,7 +628,7 @@ Slots::onWrite(beast::PropertyStream::Map& stream) const { beast::PropertyStream::Set set("untrusted", slots); - writeSlot(set, untrusted_slots_); + writeSlot(set, untrustedSlots_); } { @@ -634,7 +636,7 @@ Slots::onWrite(beast::PropertyStream::Map& stream) const auto const now = clock_.now(); - for (auto const& [validator, info] : considered_validators_) + for (auto const& [validator, info] : consideredValidators_) { beast::PropertyStream::Map item(set); item["validator"] = toBase58(TokenType::NodePublic, validator); diff --git a/src/xrpld/overlay/detail/Squelch.cpp b/src/xrpld/overlay/detail/Squelch.cpp index a72421e03af..ca562f7dba0 100644 --- a/src/xrpld/overlay/detail/Squelch.cpp +++ b/src/xrpld/overlay/detail/Squelch.cpp @@ -1,7 +1,7 @@ //------------------------------------------------------------------------------ /* This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2020 Ripple Labs Inc. + Copyright (c) 2025 Ripple Labs Inc. Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above From 69446cb66949d72c203811d1ee5e1c2a6e5c75ec Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Wed, 25 Jun 2025 17:57:34 +0200 Subject: [PATCH 19/27] decouples tests from squelching implementation --- src/test/overlay/base_squelch_test.cpp | 49 +++++-- src/test/overlay/enhanced_squelch_test.cpp | 161 ++++++++++++++------- src/xrpld/overlay/Slot.h | 65 +++++---- src/xrpld/overlay/detail/Slot.cpp | 118 +++++++-------- 4 files changed, 239 insertions(+), 154 deletions(-) diff --git a/src/test/overlay/base_squelch_test.cpp b/src/test/overlay/base_squelch_test.cpp index 70ef31cc818..33e6af654ee 100644 --- a/src/test/overlay/base_squelch_test.cpp +++ b/src/test/overlay/base_squelch_test.cpp @@ -433,6 +433,27 @@ class Validator std::uint16_t id_ = 0; }; +class BaseSquelchingTestSlots : public reduce_relay::Slots +{ + using Slots = reduce_relay::Slots; + +public: + BaseSquelchingTestSlots( + Logs& logs, + reduce_relay::SquelchHandler& handler, + Config const& config, + reduce_relay::Slots::clock_type& clock) + : Slots(logs, handler, config, clock) + { + } + + Slots::slots_map const& + getSlots() const + { + return slots_; + } +}; + class PeerSim : public PeerPartial, public std::enable_shared_from_this { public: @@ -514,11 +535,11 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler std::uint16_t inState(PublicKey const& validator, reduce_relay::PeerState state) { - auto const& it = slots_.slots_.find(validator); - if (it != slots_.slots_.end()) + auto const& it = slots_.getSlots().find(validator); + if (it != slots_.getSlots().end()) return std::count_if( - it->second.peers_.begin(), - it->second.peers_.end(), + it->second.getPeers().begin(), + it->second.getPeers().end(), [&](auto const& it) { return (it.second.state == state); }); return 0; @@ -613,9 +634,9 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler bool isCountingState(PublicKey const& validator) { - auto const& it = slots_.slots_.find(validator); - if (it != slots_.slots_.end()) - return it->second.state_ == reduce_relay::SlotState::Counting; + auto const& it = slots_.getSlots().find(validator); + if (it != slots_.getSlots().end()) + return it->second.getState() == reduce_relay::SlotState::Counting; return false; } @@ -623,12 +644,12 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler std::set getSelected(PublicKey const& validator) { - auto const& it = slots_.slots_.find(validator); - if (it == slots_.slots_.end()) + auto const& it = slots_.getSlots().find(validator); + if (it == slots_.getSlots().end()) return {}; std::set r; - for (auto const& [id, info] : it->second.peers_) + for (auto const& [id, info] : it->second.getPeers()) if (info.state == reduce_relay::PeerState::Selected) r.insert(id); @@ -653,12 +674,12 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler std::unordered_map getPeers(PublicKey const& validator) { - auto const& it = slots_.slots_.find(validator); - if (it == slots_.slots_.end()) + auto const& it = slots_.getSlots().find(validator); + if (it == slots_.getSlots().end()) return {}; auto r = std::unordered_map(); - for (auto const& [id, info] : it->second.peers_) + for (auto const& [id, info] : it->second.getPeers()) r.emplace(std::make_pair(id, info)); return r; @@ -698,7 +719,7 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler UnsquelchCB unsquelch_; Peers peers_; Peers peersCache_; - reduce_relay::Slots slots_; + BaseSquelchingTestSlots slots_; Logs& logs_; }; diff --git a/src/test/overlay/enhanced_squelch_test.cpp b/src/test/overlay/enhanced_squelch_test.cpp index 06eba92712d..0dab8596680 100644 --- a/src/test/overlay/enhanced_squelch_test.cpp +++ b/src/test/overlay/enhanced_squelch_test.cpp @@ -86,6 +86,60 @@ class TestHandler : public reduce_relay::SquelchHandler } }; +class EnhancedSquelchingTestSlots : public reduce_relay::Slots +{ + using Slots = reduce_relay::Slots; + +public: + EnhancedSquelchingTestSlots( + Logs& logs, + reduce_relay::SquelchHandler& handler, + Config const& config, + reduce_relay::Slots::clock_type& clock) + : Slots(logs, handler, config, clock) + { + } + + Slots::slots_map const& + getSlots(bool trusted) const + { + if (trusted) + return slots_; + + return untrustedSlots_; + } + + hash_map const& + getConsideredValidators() + { + return consideredValidators_; + } + + std::optional + updateConsideredValidator(PublicKey const& validator, Peer::id_t peerID) + { + return Slots::updateConsideredValidator(validator, peerID); + } + + void + squelchValidator(PublicKey const& validatorKey, Peer::id_t peerID) + { + Slots::squelchValidator(validatorKey, peerID); + } + + bool + validatorSquelched(PublicKey const& validatorKey) + { + return Slots::expireAndIsValidatorSquelched(validatorKey); + } + + bool + peerSquelched(PublicKey const& validatorKey, Peer::id_t peerID) + { + return Slots::expireAndIsPeerSquelched(validatorKey, peerID); + } +}; + class enhanced_squelch_test : public beast::unit_test::suite { public: @@ -156,7 +210,7 @@ vp_enhanced_squelch_enable=0 Peer::id_t squelchedPeerID = 0; Peer::id_t newPeerID = 1; TestStopwatch stopwatch; - reduce_relay::Slots slots( + EnhancedSquelchingTestSlots slots( env_.app().logs(), noop_handler, env_.app().config(), stopwatch); auto const publicKey = randomKeyPair(KeyType::ed25519).first; @@ -200,7 +254,7 @@ vp_enhanced_squelch_enable=0 { testcase("updateValidatorSlot_newValidator"); TestStopwatch stopwatch; - reduce_relay::Slots slots( + EnhancedSquelchingTestSlots slots( env_.app().logs(), noop_handler, env_.app().config(), stopwatch); Peer::id_t const peerID = 1; @@ -210,15 +264,16 @@ vp_enhanced_squelch_enable=0 slots.updateValidatorSlot(message, validator, peerID); // adding untrusted slot does not effect trusted slots - BEAST_EXPECTS(slots.slots_.size() == 0, "trusted slots changed"); + BEAST_EXPECTS( + slots.getSlots(true).size() == 0, "trusted slots changed"); // we expect that the validator was not added to untrusted slots BEAST_EXPECTS( - slots.untrusted_slots_.size() == 0, "untrusted slot changed"); + slots.getSlots(false).size() == 0, "untrusted slot changed"); // we expect that the validator was added to th consideration list BEAST_EXPECTS( - slots.considered_validators_.contains(validator), + slots.getConsideredValidators().contains(validator), "new validator was not considered"); } @@ -244,7 +299,7 @@ vp_enhanced_squelch_enable=0 TestHandler handler{squelch_f, noop_squelchAll, noop_unsquelch}; TestStopwatch stopwatch; - reduce_relay::Slots slots( + EnhancedSquelchingTestSlots slots( env_.app().logs(), handler, env_.app().config(), stopwatch); slots.squelchValidator(validator, squelchedPeerID); @@ -267,7 +322,7 @@ vp_enhanced_squelch_enable=0 // a squelched validator must not be considered BEAST_EXPECTS( - !slots.considered_validators_.contains(validator), + !slots.getConsideredValidators().contains(validator), "squelched validator was added for consideration"); } @@ -282,14 +337,15 @@ vp_enhanced_squelch_enable=0 TestHandler handler{noop_handler}; TestStopwatch stopwatch; - reduce_relay::Slots slots( + EnhancedSquelchingTestSlots slots( env_.app().logs(), handler, env_.app().config(), stopwatch); // saturate validator slots auto const validators = fillUntrustedSlots(slots); // adding untrusted slot does not effect trusted slots - BEAST_EXPECTS(slots.slots_.size() == 0, "trusted slots changed"); + BEAST_EXPECTS( + slots.getSlots(true).size() == 0, "trusted slots changed"); // simulate additional messages from already selected validators for (auto const& validator : validators) @@ -301,7 +357,7 @@ vp_enhanced_squelch_enable=0 // an untrusted slot was added for each validator BEAST_EXPECT( - slots.untrusted_slots_.size() == + slots.getSlots(false).size() == env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS); for (auto const& validator : validators) @@ -338,7 +394,7 @@ vp_enhanced_squelch_enable=0 TestHandler handler{noop_handler}; TestStopwatch stopwatch; - reduce_relay::Slots slots( + EnhancedSquelchingTestSlots slots( env_.app().logs(), handler, env_.app().config(), stopwatch); auto keys = fillUntrustedSlots(slots); @@ -357,7 +413,7 @@ vp_enhanced_squelch_enable=0 }; BEAST_EXPECTS( - slots.untrusted_slots_.size() == + slots.getSlots(false).size() == env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS, "unexpected number of untrusted slots"); @@ -369,7 +425,7 @@ vp_enhanced_squelch_enable=0 slots.deleteIdlePeers(); BEAST_EXPECTS( - slots.untrusted_slots_.size() == 0, + slots.getSlots(false).size() == 0, "unexpected number of untrusted slots"); BEAST_EXPECTS(keys.empty(), "not all validators were squelched"); @@ -383,14 +439,14 @@ vp_enhanced_squelch_enable=0 Peer::id_t const peerID2 = 2; TestStopwatch stopwatch; - reduce_relay::Slots slots( + EnhancedSquelchingTestSlots slots( env_.app().logs(), noop_handler, env_.app().config(), stopwatch); // fill one untrustd validator slot auto const validator = fillUntrustedSlots(slots, 1)[0]; BEAST_EXPECTS( - slots.untrusted_slots_.size() == 1, + slots.getSlots(false).size() == 1, "unexpected number of untrusted slots"); slots.updateSlotAndSquelch( @@ -432,7 +488,7 @@ vp_enhanced_squelch_enable=0 handler.squelch_f_ = [](PublicKey const&, Peer::id_t, std::uint32_t) {}; TestStopwatch stopwatch; - reduce_relay::Slots slots( + EnhancedSquelchingTestSlots slots( env_.app().logs(), handler, env_.app().config(), stopwatch); // peers that will be source of validator messages @@ -514,7 +570,7 @@ vp_enhanced_squelch_enable=0 { testcase("testUpdateConsideredValidator_newValidator"); TestStopwatch stopwatch; - reduce_relay::Slots slots( + EnhancedSquelchingTestSlots slots( env_.app().logs(), noop_handler, env_.app().config(), stopwatch); // insert some random validator key @@ -527,7 +583,7 @@ vp_enhanced_squelch_enable=0 "validator was selected with insufficient number of peers"); BEAST_EXPECTS( - slots.considered_validators_.contains(validator), + slots.getConsideredValidators().contains(validator), "new validator was not added for consideration"); BEAST_EXPECTS( @@ -536,7 +592,7 @@ vp_enhanced_squelch_enable=0 // expect that a peer will be registered once as a message source BEAST_EXPECTS( - slots.considered_validators_.at(validator).peers.size() == 1, + slots.getConsideredValidators().at(validator).peers.size() == 1, "duplicate peer was registered"); BEAST_EXPECTS( @@ -545,7 +601,7 @@ vp_enhanced_squelch_enable=0 // expect that each distinct peer will be registered BEAST_EXPECTS( - slots.considered_validators_.at(validator).peers.size() == 2, + slots.getConsideredValidators().at(validator).peers.size() == 2, "distinct peers were not registered"); } @@ -554,7 +610,7 @@ vp_enhanced_squelch_enable=0 { testcase("testUpdateConsideredValidator_idleValidator"); TestStopwatch stopwatch; - reduce_relay::Slots slots( + EnhancedSquelchingTestSlots slots( env_.app().logs(), noop_handler, env_.app().config(), stopwatch); // insert some random validator key @@ -566,10 +622,10 @@ vp_enhanced_squelch_enable=0 "validator was selected with insufficient number of peers"); BEAST_EXPECTS( - slots.considered_validators_.contains(validator), + slots.getConsideredValidators().contains(validator), "new validator was not added for consideration"); - auto const state = slots.considered_validators_.at(validator); + auto const state = slots.getConsideredValidators().at(validator); // simulate a validator sending a new message before the idle timer stopwatch.advance(reduce_relay::IDLED - std::chrono::seconds(1)); @@ -577,7 +633,7 @@ vp_enhanced_squelch_enable=0 BEAST_EXPECTS( !slots.updateConsideredValidator(validator, peerID), "validator was selected with insufficient number of peers"); - auto const newState = slots.considered_validators_.at(validator); + auto const newState = slots.getConsideredValidators().at(validator); BEAST_EXPECTS( state.count + 1 == newState.count, @@ -590,7 +646,7 @@ vp_enhanced_squelch_enable=0 !slots.updateConsideredValidator(validator, peerID), "validator was selected with insufficient number of peers"); - auto const idleState = slots.considered_validators_.at(validator); + auto const idleState = slots.getConsideredValidators().at(validator); // we expect that an idling validator will not be updated BEAST_EXPECTS( newState.count == idleState.count, "idling validator was updated"); @@ -602,7 +658,7 @@ vp_enhanced_squelch_enable=0 testcase("testUpdateConsideredValidator_selectQualifyingValidator"); TestStopwatch stopwatch; - reduce_relay::Slots slots( + EnhancedSquelchingTestSlots slots( env_.app().logs(), noop_handler, env_.app().config(), stopwatch); // insert some random validator key @@ -649,11 +705,11 @@ vp_enhanced_squelch_enable=0 // expect that selected peer was removed BEAST_EXPECTS( - !slots.considered_validators_.contains(validator), + !slots.getConsideredValidators().contains(validator), "selected validator was not removed from considered list"); BEAST_EXPECTS( - slots.considered_validators_.contains(validator2), + slots.getConsideredValidators().contains(validator2), "unqualified validator was removed from considered list"); } @@ -661,22 +717,32 @@ vp_enhanced_squelch_enable=0 testCleanConsideredValidators_deleteIdleValidator() { testcase("cleanConsideredValidators_deleteIdleValidator"); - - TestStopwatch stopwatch; - reduce_relay::Slots slots( - env_.app().logs(), noop_handler, env_.app().config(), stopwatch); - // insert some random validator key - auto const lateValidator = randomKeyPair(KeyType::ed25519).first; + auto const idleValidator = randomKeyPair(KeyType::ed25519).first; auto const validator = randomKeyPair(KeyType::ed25519).first; Peer::id_t peerID = 0; + TestHandler handler{noop_handler}; + + // verify that squelchAll is called for idle validator + handler.squelchAll_f_ = [&](PublicKey const& actualKey, + std::uint32_t duration) { + BEAST_EXPECTS( + actualKey == idleValidator, + "unexpected key passed to squelchAll"); + }; + + TestStopwatch stopwatch; + + EnhancedSquelchingTestSlots slots( + env_.app().logs(), handler, env_.app().config(), stopwatch); + BEAST_EXPECTS( - !slots.updateConsideredValidator(lateValidator, peerID), + !slots.updateConsideredValidator(idleValidator, peerID), "validator was selected with insufficient number of peers"); BEAST_EXPECTS( - slots.considered_validators_.contains(lateValidator), + slots.getConsideredValidators().contains(idleValidator), "new validator was not added for consideration"); // simulate a validator idling @@ -685,18 +751,13 @@ vp_enhanced_squelch_enable=0 !slots.updateConsideredValidator(validator, peerID), "validator was selected with insufficient number of peers"); - auto const invalidValidators = slots.cleanConsideredValidators(); - BEAST_EXPECTS( - invalidValidators.size() == 1, - "unexpected number of invalid validators"); - BEAST_EXPECTS( - invalidValidators[0] == lateValidator, "removed invalid validator"); + slots.deleteIdlePeers(); BEAST_EXPECTS( - !slots.considered_validators_.contains(lateValidator), + !slots.getConsideredValidators().contains(idleValidator), "late validator was not removed"); BEAST_EXPECTS( - slots.considered_validators_.contains(validator), + slots.getConsideredValidators().contains(validator), "timely validator was removed"); } @@ -705,7 +766,7 @@ vp_enhanced_squelch_enable=0 * with random validator messages*/ std::vector fillUntrustedSlots( - reduce_relay::Slots& slots, + EnhancedSquelchingTestSlots& slots, int64_t maxSlots = reduce_relay::MAX_UNTRUSTED_SLOTS) { std::vector keys; @@ -728,17 +789,15 @@ vp_enhanced_squelch_enable=0 } std::unordered_map - getUntrustedSlotPeers( - PublicKey const& validator, - reduce_relay::Slots const& slots) + getUntrustedSlotPeers(PublicKey const& validator, EnhancedSquelchingTestSlots const& slots) { - auto const& it = slots.untrusted_slots_.find(validator); - if (it == slots.untrusted_slots_.end()) + auto const& it = slots.getSlots(false).find(validator); + if (it == slots.getSlots(false).end()) return {}; auto r = std::unordered_map(); - for (auto const& [id, info] : it->second.peers_) + for (auto const& [id, info] : it->second.getPeers()) r.emplace(std::make_pair(id, info)); return r; diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 86f9997338b..49ec145a007 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -34,20 +34,8 @@ #include namespace ripple { -// used to make private members of Slots class accessible for testing -namespace test { -class enhanced_squelch_test; -class base_squelch_test; -class OverlaySim; -} // namespace test - namespace reduce_relay { -using clock_type = beast::abstract_clock; -using time_point = clock_type::time_point; - -class Slots; - /** Peer's State */ enum class PeerState : uint8_t { Counting, // counting messages @@ -138,12 +126,40 @@ class SquelchHandler class Slot final { friend class Slots; - friend class test::enhanced_squelch_test; - friend class test::OverlaySim; // a callback to report ignored squelches using ignored_squelch_callback = std::function; + using clock_type = beast::abstract_clock; + using time_point = clock_type::time_point; + +public: + /** Data maintained for each peer */ + struct PeerInfo + { + PeerState state; // peer's state + std::size_t count; // message count + time_point expire; // squelch expiration time + time_point lastMessage; // time last message received + std::size_t timesSelected; // number of times the peer was selected + }; + + /** Get all peers of the slot. This methos is only to be used in + * unit-tests. + */ + std::unordered_map const& + getPeers() const + { + return peers_; + } + /** Get the slots state. */ + SlotState + getState() const + { + return state_; + } + +private: /** Constructor * @param journal Journal for logging * @param handler Squelch/Unsquelch implementation @@ -240,16 +256,6 @@ class Slot final void onWrite(beast::PropertyStream::Map& stream) const; - /** Data maintained for each peer */ - struct PeerInfo - { - PeerState state; // peer's state - std::size_t count; // message count - time_point expire; // squelch expiration time - time_point lastMessage; // time last message received - std::size_t timesSelected; // number of times the peer was selected - }; - std::unordered_map peers_; // peer's data // pool of peers considered as the source of messages @@ -281,11 +287,11 @@ class Slot final * and checks for peers which are disconnected or stopped relaying the * messages. */ -class Slots final +class Slots { - friend test::enhanced_squelch_test; - friend class test::base_squelch_test; - friend class test::OverlaySim; +public: + using clock_type = beast::abstract_clock; + using time_point = clock_type::time_point; using messages = beast::aged_unordered_map< uint256, @@ -299,7 +305,6 @@ class Slots final hardened_hash>; using slots_map = hash_map; -public: /** * @param logs reference to the logger * @param handler Squelch/unsquelch implementation @@ -433,7 +438,7 @@ class Slots final void onWrite(beast::PropertyStream::Map& stream) const; -private: +protected: /** Add message/peer if have not seen this message * from the peer. A message is aged after IDLED seconds. * Return true if added */ diff --git a/src/xrpld/overlay/detail/Slot.cpp b/src/xrpld/overlay/detail/Slot.cpp index a1b1003b8a6..f6a8127a116 100644 --- a/src/xrpld/overlay/detail/Slot.cpp +++ b/src/xrpld/overlay/detail/Slot.cpp @@ -394,65 +394,6 @@ Slots::addPeerMessage(uint256 const& key, Peer::id_t id) return true; } -std::optional -Slots::updateConsideredValidator(PublicKey const& validator, Peer::id_t peer) -{ - auto const now = clock_.now(); - - auto it = considered_validators_.find(validator); - if (it == considered_validators_.end()) - { - considered_validators_.emplace(std::make_pair( - validator, - ValidatorInfo{ - .count = 1, - .lastMessage = now, - .peers = {peer}, - })); - - return {}; - } - - // the validator idled. Don't update it, it will be cleaned later - if (now - it->second.lastMessage > IDLED) - return {}; - - it->second.peers.insert(peer); - - it->second.lastMessage = now; - ++it->second.count; - - if (it->second.count < MAX_MESSAGE_THRESHOLD || - it->second.peers.size() < reduce_relay::MAX_SELECTED_PEERS) - return {}; - - auto const key = it->first; - considered_validators_.erase(it); - - return key; -} - -std::vector -Slots::cleanConsideredValidators() -{ - auto const now = clock_.now(); - - std::vector keys; - for (auto it = considered_validators_.begin(); - it != considered_validators_.end();) - { - if (now - it->second.lastMessage > IDLED) - { - keys.push_back(it->first); - it = considered_validators_.erase(it); - } - else - ++it; - } - - return keys; -} - void Slots::updateSlotAndSquelch( uint256 const& key, @@ -556,6 +497,44 @@ Slots::updateValidatorSlot( // removed and squelched. } +std::optional +Slots::updateConsideredValidator(PublicKey const& validator, Peer::id_t peer) +{ + auto const now = clock_.now(); + + auto it = consideredValidators_.find(validator); + if (it == consideredValidators_.end()) + { + consideredValidators_.emplace(std::make_pair( + validator, + ValidatorInfo{ + .count = 1, + .lastMessage = now, + .peers = {peer}, + })); + + return {}; + } + + // the validator idled. Don't update it, it will be cleaned later + if (now - it->second.lastMessage > IDLED) + return {}; + + it->second.peers.insert(peer); + + it->second.lastMessage = now; + ++it->second.count; + + if (it->second.count < MAX_MESSAGE_THRESHOLD || + it->second.peers.size() < reduce_relay::MAX_SELECTED_PEERS) + return {}; + + auto const key = it->first; + consideredValidators_.erase(it); + + return key; +} + void Slots::deletePeer(id_t id, bool erase) { @@ -606,6 +585,27 @@ Slots::deleteIdlePeers() handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); } +std::vector +Slots::cleanConsideredValidators() +{ + auto const now = clock_.now(); + + std::vector keys; + for (auto it = consideredValidators_.begin(); + it != consideredValidators_.end();) + { + if (now - it->second.lastMessage > IDLED) + { + keys.push_back(it->first); + it = consideredValidators_.erase(it); + } + else + ++it; + } + + return keys; +} + void Slots::onWrite(beast::PropertyStream::Map& stream) const { From 5ca4f5533ddca1c4f91599c30e51ceb52de1930d Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Thu, 26 Jun 2025 10:00:07 +0200 Subject: [PATCH 20/27] fixes code formatting --- src/test/overlay/enhanced_squelch_test.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/overlay/enhanced_squelch_test.cpp b/src/test/overlay/enhanced_squelch_test.cpp index 0dab8596680..bef6ecf2d14 100644 --- a/src/test/overlay/enhanced_squelch_test.cpp +++ b/src/test/overlay/enhanced_squelch_test.cpp @@ -789,7 +789,9 @@ vp_enhanced_squelch_enable=0 } std::unordered_map - getUntrustedSlotPeers(PublicKey const& validator, EnhancedSquelchingTestSlots const& slots) + getUntrustedSlotPeers( + PublicKey const& validator, + EnhancedSquelchingTestSlots const& slots) { auto const& it = slots.getSlots(false).find(validator); if (it == slots.getSlots(false).end()) From 9030b28b7aca361bbcf0e7e8e2f491ad966789d2 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Thu, 26 Jun 2025 12:10:47 +0200 Subject: [PATCH 21/27] fixes windows tests --- src/test/overlay/base_squelch_test.cpp | 21 +++++++++------------ src/xrpld/overlay/detail/Slot.cpp | 14 +++++++------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/src/test/overlay/base_squelch_test.cpp b/src/test/overlay/base_squelch_test.cpp index 33e6af654ee..f18ea4deafb 100644 --- a/src/test/overlay/base_squelch_test.cpp +++ b/src/test/overlay/base_squelch_test.cpp @@ -457,7 +457,6 @@ class BaseSquelchingTestSlots : public reduce_relay::Slots class PeerSim : public PeerPartial, public std::enable_shared_from_this { public: - using id_t = Peer::id_t; PeerSim(Overlay& overlay, beast::Journal journal) : overlay_(overlay), squelch_(journal, overlay_.clock()) { @@ -466,7 +465,7 @@ class PeerSim : public PeerPartial, public std::enable_shared_from_this ~PeerSim() = default; - id_t + Peer::id_t id() const override { return id_; @@ -504,8 +503,8 @@ class PeerSim : public PeerPartial, public std::enable_shared_from_this } private: - inline static id_t sid_ = 0; - id_t id_; + inline static Peer::id_t sid_ = 0; + Peer::id_t id_; Overlay& overlay_; reduce_relay::Squelch squelch_; }; @@ -515,7 +514,6 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler using Peers = std::unordered_map; public: - using id_t = Peer::id_t; using clock_type = TestStopwatch; OverlaySim(Application& app) : slots_(app.logs(), *this, app.config(), clock_), logs_(app.logs()) @@ -557,7 +555,7 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler } void - deletePeer(id_t id, UnsquelchCB f) override + deletePeer(Peer::id_t id, UnsquelchCB f) override { unsquelch_ = f; slots_.deletePeer(id, true); @@ -641,14 +639,14 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler return false; } - std::set + std::set getSelected(PublicKey const& validator) { auto const& it = slots_.getSlots().find(validator); if (it == slots_.getSlots().end()) return {}; - std::set r; + std::set r; for (auto const& [id, info] : it->second.getPeers()) if (info.state == reduce_relay::PeerState::Selected) r.insert(id); @@ -663,7 +661,7 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler return selected.find(peer) != selected.end(); } - id_t + Peer::id_t getSelectedPeer(PublicKey const& validator) { auto selected = getSelected(validator); @@ -671,14 +669,14 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler return *selected.begin(); } - std::unordered_map + std::unordered_map getPeers(PublicKey const& validator) { auto const& it = slots_.getSlots().find(validator); if (it == slots_.getSlots().end()) return {}; - auto r = std::unordered_map(); + auto r = std::unordered_map(); for (auto const& [id, info] : it->second.getPeers()) r.emplace(std::make_pair(id, info)); @@ -911,7 +909,6 @@ class Network class base_squelch_test : public beast::unit_test::suite { using Slot = reduce_relay::Slot; - using id_t = Peer::id_t; protected: void diff --git a/src/xrpld/overlay/detail/Slot.cpp b/src/xrpld/overlay/detail/Slot.cpp index f6a8127a116..14c016698b1 100644 --- a/src/xrpld/overlay/detail/Slot.cpp +++ b/src/xrpld/overlay/detail/Slot.cpp @@ -536,21 +536,21 @@ Slots::updateConsideredValidator(PublicKey const& validator, Peer::id_t peer) } void -Slots::deletePeer(id_t id, bool erase) +Slots::deletePeer(Peer::id_t id, bool erase) { - auto deletePeer = [&](slots_map& slots) { + auto const f = [&](slots_map& slots) { for (auto& [validator, slot] : slots) slot.deletePeer(validator, id, erase); }; - deletePeer(slots_); - deletePeer(untrustedSlots_); + f(slots_); + f(untrustedSlots_); } void Slots::deleteIdlePeers() { - auto deleteSlots = [&](slots_map& slots) { + auto const f = [&](slots_map& slots) { auto const now = clock_.now(); for (auto it = slots.begin(); it != slots.end();) @@ -575,8 +575,8 @@ Slots::deleteIdlePeers() } }; - deleteSlots(slots_); - deleteSlots(untrustedSlots_); + f(slots_); + f(untrustedSlots_); // remove and squelch all validators that the selector deemed unsuitable // there might be some good validators in this set that "lapsed". From 521ec6f96417e70c3ad702bc536eb52ebc0202f7 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Fri, 27 Jun 2025 11:56:14 +0200 Subject: [PATCH 22/27] improves code readability --- src/test/overlay/enhanced_squelch_test.cpp | 15 ++++++----- src/xrpld/overlay/Slot.h | 15 ++++++----- src/xrpld/overlay/detail/OverlayImpl.cpp | 4 +-- src/xrpld/overlay/detail/PeerImp.cpp | 1 + src/xrpld/overlay/detail/Slot.cpp | 30 ++++++++++------------ 5 files changed, 34 insertions(+), 31 deletions(-) diff --git a/src/test/overlay/enhanced_squelch_test.cpp b/src/test/overlay/enhanced_squelch_test.cpp index bef6ecf2d14..600832286fa 100644 --- a/src/test/overlay/enhanced_squelch_test.cpp +++ b/src/test/overlay/enhanced_squelch_test.cpp @@ -124,7 +124,7 @@ class EnhancedSquelchingTestSlots : public reduce_relay::Slots void squelchValidator(PublicKey const& validatorKey, Peer::id_t peerID) { - Slots::squelchValidator(validatorKey, peerID); + Slots::registerSquelchedValidator(validatorKey, peerID); } bool @@ -261,7 +261,7 @@ vp_enhanced_squelch_enable=0 auto const validator = randomKeyPair(KeyType::ed25519).first; uint256 message{0}; - slots.updateValidatorSlot(message, validator, peerID); + slots.updateUntrustedValidatorSlot(message, validator, peerID); // adding untrusted slot does not effect trusted slots BEAST_EXPECTS( @@ -305,10 +305,11 @@ vp_enhanced_squelch_enable=0 slots.squelchValidator(validator, squelchedPeerID); // this should not trigger squelch assertions, the peer is squelched - slots.updateValidatorSlot( + slots.updateUntrustedValidatorSlot( sha512Half(validator), validator, squelchedPeerID); - slots.updateValidatorSlot(sha512Half(validator), validator, newPeerID); + slots.updateUntrustedValidatorSlot( + sha512Half(validator), validator, newPeerID); // the squelched peer remained squelched BEAST_EXPECTS( @@ -350,7 +351,7 @@ vp_enhanced_squelch_enable=0 // simulate additional messages from already selected validators for (auto const& validator : validators) for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD; ++i) - slots.updateValidatorSlot( + slots.updateUntrustedValidatorSlot( sha512Half(validator) + static_cast(i), validator, peerID); @@ -374,7 +375,7 @@ vp_enhanced_squelch_enable=0 slots.squelchValidator(key, peerID); }; - slots.updateValidatorSlot( + slots.updateUntrustedValidatorSlot( sha512Half(newValidator), newValidator, peerID); // Once the slots are saturated every other validator is squelched @@ -779,7 +780,7 @@ vp_enhanced_squelch_enable=0 ++j) // send enough messages so that a validator slot is selected for (int k = 0; k < reduce_relay::MAX_MESSAGE_THRESHOLD; ++k) - slots.updateValidatorSlot( + slots.updateUntrustedValidatorSlot( sha512Half(validator) + static_cast(k), validator, j); diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 49ec145a007..a1e864ef208 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -325,7 +325,7 @@ class Slots config.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE) , clock_(clock) , peersWithMessage_(clock) - , peersWithValidators_(clock) + , peersWithSquelchedValidators_(clock) { } @@ -356,12 +356,12 @@ class Slots * @param id The ID of the peer that sent the message */ void - updateValidatorSlot( + updateUntrustedValidatorSlot( uint256 const& key, PublicKey const& validator, Peer::id_t id) { - updateValidatorSlot(key, validator, id, []() {}); + updateUntrustedValidatorSlot(key, validator, id, []() {}); } /** Updates untrusted validator slot. Do not call for trusted @@ -372,7 +372,7 @@ class Slots * @param callback A callback to report ignored validations */ void - updateValidatorSlot( + updateUntrustedValidatorSlot( uint256 const& key, PublicKey const& validator, Peer::id_t id, @@ -433,7 +433,9 @@ class Slots * @param peerID peer ID */ void - squelchValidator(PublicKey const& validatorKey, Peer::id_t peerID); + registerSquelchedValidator( + PublicKey const& validatorKey, + Peer::id_t peerID); void onWrite(beast::PropertyStream::Map& stream) const; @@ -502,7 +504,7 @@ class Slots // Maintain aged container of validator/peers. This is used to track // which validator/peer were squelced. A peer that whose squelch // has expired is removed. - validators peersWithValidators_; + validators peersWithSquelchedValidators_; struct ValidatorInfo { @@ -512,6 +514,7 @@ class Slots // message for this validator }; + // Untrusted validators considered for open untrusted slots hash_map consideredValidators_; }; diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index 7b8b4f5ba38..33f6ffbaf13 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -1422,7 +1422,7 @@ OverlayImpl::squelchAll(PublicKey const& validator, uint32_t squelchDuration) { for_each([&](std::shared_ptr&& p) { p->send(makeSquelchMessage(validator, true, squelchDuration)); - slots_.squelchValidator(validator, p->id()); + slots_.registerSquelchedValidator(validator, p->id()); }); } @@ -1494,7 +1494,7 @@ OverlayImpl::updateValidatorSlot( updateValidatorSlot(key, validator, peer); }); - slots_.updateValidatorSlot(key, validator, peer, [&]() { + slots_.updateUntrustedValidatorSlot(key, validator, peer, [&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); }); } diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index d0ef011ba66..f9a9a33c9dd 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -2704,6 +2704,7 @@ PeerImp::onMessage(std::shared_ptr const& m) fee_.update(Resource::feeInvalidData, "squelch no pubkey"); return; } + auto validator = m->validatorpubkey(); auto const slice{makeSlice(validator)}; if (!publicKeyType(slice)) diff --git a/src/xrpld/overlay/detail/Slot.cpp b/src/xrpld/overlay/detail/Slot.cpp index 14c016698b1..4fb83d781f0 100644 --- a/src/xrpld/overlay/detail/Slot.cpp +++ b/src/xrpld/overlay/detail/Slot.cpp @@ -323,25 +323,22 @@ Slots::reduceRelayReady() } void -Slots::squelchValidator(PublicKey const& validatorKey, Peer::id_t peerID) +Slots::registerSquelchedValidator( + PublicKey const& validatorKey, + Peer::id_t peerID) { - auto it = peersWithValidators_.find(validatorKey); - if (it == peersWithValidators_.end()) - peersWithValidators_.emplace( - validatorKey, std::unordered_set{peerID}); - - else if (it->second.find(peerID) == it->second.end()) - it->second.insert(peerID); + peersWithSquelchedValidators_[validatorKey].insert(peerID); } bool Slots::expireAndIsValidatorSquelched(PublicKey const& validatorKey) { beast::expire( - peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); + peersWithSquelchedValidators_, + reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); - return peersWithValidators_.find(validatorKey) != - peersWithValidators_.end(); + return peersWithSquelchedValidators_.find(validatorKey) != + peersWithSquelchedValidators_.end(); } bool @@ -350,12 +347,13 @@ Slots::expireAndIsPeerSquelched( Peer::id_t peerID) { beast::expire( - peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); + peersWithSquelchedValidators_, + reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT); - auto const it = peersWithValidators_.find(validatorKey); + auto const it = peersWithSquelchedValidators_.find(validatorKey); // if validator was not squelched, the peer was also not squelched - if (it == peersWithValidators_.end()) + if (it == peersWithSquelchedValidators_.end()) return false; // if a peer is found the squelch for it has not expired @@ -438,7 +436,7 @@ Slots::updateSlotAndSquelch( } void -Slots::updateValidatorSlot( +Slots::updateUntrustedValidatorSlot( uint256 const& key, PublicKey const& validator, Peer::id_t id, @@ -462,7 +460,7 @@ Slots::updateValidatorSlot( { if (!expireAndIsPeerSquelched(validator, id)) { - squelchValidator(validator, id); + registerSquelchedValidator(validator, id); handler_.squelch( validator, id, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); } From 1a768d73774246ae60d1ba6bc75f37c0ea33958d Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Mon, 30 Jun 2025 14:26:53 +0200 Subject: [PATCH 23/27] further improves member function and attribute names --- src/test/overlay/base_squelch_test.cpp | 2 +- src/test/overlay/enhanced_squelch_test.cpp | 2 +- src/xrpld/overlay/Slot.h | 9 ++++++++- src/xrpld/overlay/detail/OverlayImpl.cpp | 4 ++-- src/xrpld/overlay/detail/OverlayImpl.h | 2 +- src/xrpld/overlay/detail/PeerImp.cpp | 3 ++- src/xrpld/overlay/detail/Slot.cpp | 22 ++++++++++++++-------- 7 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/test/overlay/base_squelch_test.cpp b/src/test/overlay/base_squelch_test.cpp index f18ea4deafb..b69bfb83247 100644 --- a/src/test/overlay/base_squelch_test.cpp +++ b/src/test/overlay/base_squelch_test.cpp @@ -450,7 +450,7 @@ class BaseSquelchingTestSlots : public reduce_relay::Slots Slots::slots_map const& getSlots() const { - return slots_; + return trustedSlots_; } }; diff --git a/src/test/overlay/enhanced_squelch_test.cpp b/src/test/overlay/enhanced_squelch_test.cpp index 600832286fa..5c6636103c1 100644 --- a/src/test/overlay/enhanced_squelch_test.cpp +++ b/src/test/overlay/enhanced_squelch_test.cpp @@ -104,7 +104,7 @@ class EnhancedSquelchingTestSlots : public reduce_relay::Slots getSlots(bool trusted) const { if (trusted) - return slots_; + return trustedSlots_; return untrustedSlots_; } diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index a1e864ef208..6262923173e 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -482,7 +482,14 @@ class Slots std::atomic_bool reduceRelayReady_{false}; - slots_map slots_; + // Maintain an open number of slots for trusted validators to reduce + // duplicate traffic from trusted validators. + slots_map trustedSlots_; + + // Maintain slots for untrusted validators to reduce duplicate traffic from + // untrusted validators. If enhanced squelching is enabled, the number of + // untrustedSlots_ is capped at reduce_relay::MAX_UNTRUSTED_SLOTS. + // Otherwise, there is no limit. slots_map untrustedSlots_; SquelchHandler& handler_; // squelch/unsquelch handler diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index 33f6ffbaf13..05bfaedcd81 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -1481,7 +1481,7 @@ OverlayImpl::updateSlotAndSquelch( } void -OverlayImpl::updateValidatorSlot( +OverlayImpl::updateUntrustedValidatorSlot( uint256 const& key, PublicKey const& validator, Peer::id_t peer) @@ -1491,7 +1491,7 @@ OverlayImpl::updateValidatorSlot( if (!strand_.running_in_this_thread()) return post(strand_, [this, key, validator, peer]() { - updateValidatorSlot(key, validator, peer); + updateUntrustedValidatorSlot(key, validator, peer); }); slots_.updateUntrustedValidatorSlot(key, validator, peer, [&]() { diff --git a/src/xrpld/overlay/detail/OverlayImpl.h b/src/xrpld/overlay/detail/OverlayImpl.h index 3911d1b63bc..863451585e2 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.h +++ b/src/xrpld/overlay/detail/OverlayImpl.h @@ -427,7 +427,7 @@ class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler * @param peer Peer's id to update the slot for */ void - updateValidatorSlot( + updateUntrustedValidatorSlot( uint256 const& key, PublicKey const& validator, Peer::id_t peer); diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index f9a9a33c9dd..777c702d7d8 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -2386,7 +2386,8 @@ PeerImp::onMessage(std::shared_ptr const& m) TrafficCount::category::validation_untrusted, Message::messageSize(*m)); - overlay_.updateValidatorSlot(key, val->getSignerPublic(), id_); + overlay_.updateUntrustedValidatorSlot( + key, val->getSignerPublic(), id_); // If the operator has specified that untrusted validations be // dropped then this happens here I.e. before further wasting CPU diff --git a/src/xrpld/overlay/detail/Slot.cpp b/src/xrpld/overlay/detail/Slot.cpp index 4fb83d781f0..09ead75fc5a 100644 --- a/src/xrpld/overlay/detail/Slot.cpp +++ b/src/xrpld/overlay/detail/Slot.cpp @@ -410,7 +410,10 @@ Slots::updateSlotAndSquelch( { JLOG(journal_.trace()) << "updateSlotAndSquelch: new slot " << Slice(validator); - auto it = slots_ + + // if enhanced squelching is disabled, keep untrusted validator slots + // separately from trusted ones + auto it = (isTrusted ? trustedSlots_ : untrustedSlots_) .emplace(std::make_pair( validator, Slot( @@ -420,6 +423,7 @@ Slots::updateSlotAndSquelch( isTrusted, clock_))) .first; + it->second.update(validator, id, callback); } else @@ -511,21 +515,23 @@ Slots::updateConsideredValidator(PublicKey const& validator, Peer::id_t peer) .peers = {peer}, })); - return {}; + return std::nullopt; } // the validator idled. Don't update it, it will be cleaned later if (now - it->second.lastMessage > IDLED) - return {}; + return std::nullopt; it->second.peers.insert(peer); - it->second.lastMessage = now; ++it->second.count; + // if the validator has not met selection criteria yet if (it->second.count < MAX_MESSAGE_THRESHOLD || it->second.peers.size() < reduce_relay::MAX_SELECTED_PEERS) - return {}; + { + return std::nullopt; + } auto const key = it->first; consideredValidators_.erase(it); @@ -541,7 +547,7 @@ Slots::deletePeer(Peer::id_t id, bool erase) slot.deletePeer(validator, id, erase); }; - f(slots_); + f(trustedSlots_); f(untrustedSlots_); } @@ -573,7 +579,7 @@ Slots::deleteIdlePeers() } }; - f(slots_); + f(trustedSlots_); f(untrustedSlots_); // remove and squelch all validators that the selector deemed unsuitable @@ -621,7 +627,7 @@ Slots::onWrite(beast::PropertyStream::Map& stream) const { beast::PropertyStream::Set set("trusted", slots); - writeSlot(set, slots_); + writeSlot(set, trustedSlots_); } { From 820f1e11c445079e1bc9c68041fbb1064b068f0a Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Mon, 30 Jun 2025 15:07:57 +0200 Subject: [PATCH 24/27] adds callback to squelchAll instead of calling slots directly --- src/test/overlay/base_squelch_test.cpp | 13 +++++++-- src/test/overlay/enhanced_squelch_test.cpp | 32 ++++++++++++++-------- src/xrpld/overlay/Slot.h | 29 ++++++++++++-------- src/xrpld/overlay/detail/OverlayImpl.cpp | 9 ++++-- src/xrpld/overlay/detail/OverlayImpl.h | 7 +++-- src/xrpld/overlay/detail/Slot.cpp | 16 +++++++++-- 6 files changed, 73 insertions(+), 33 deletions(-) diff --git a/src/test/overlay/base_squelch_test.cpp b/src/test/overlay/base_squelch_test.cpp index b69bfb83247..17e58b11a63 100644 --- a/src/test/overlay/base_squelch_test.cpp +++ b/src/test/overlay/base_squelch_test.cpp @@ -701,10 +701,16 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler } void - squelchAll(PublicKey const& validator, std::uint32_t duration) override + squelchAll( + PublicKey const& validator, + std::uint32_t duration, + std::function callback) override { for (auto const& [id, peer] : peers_) + { squelch_(validator, peer, duration); + callback(id); + } } void unsquelch(PublicKey const& validator, Peer::id_t id) const override @@ -1580,7 +1586,10 @@ vp_base_squelch_max_selected_peers=2 } void - squelchAll(PublicKey const&, std::uint32_t) override + squelchAll( + PublicKey const&, + std::uint32_t, + std::function) override { } diff --git a/src/test/overlay/enhanced_squelch_test.cpp b/src/test/overlay/enhanced_squelch_test.cpp index 5c6636103c1..da69937c844 100644 --- a/src/test/overlay/enhanced_squelch_test.cpp +++ b/src/test/overlay/enhanced_squelch_test.cpp @@ -41,8 +41,8 @@ class TestHandler : public reduce_relay::SquelchHandler public: using squelch_method = std::function; - using squelchAll_method = - std::function; + using squelchAll_method = std::function< + void(PublicKey const&, std::uint32_t, std::function)>; using unsquelch_method = std::function; squelch_method squelch_f_; @@ -74,9 +74,12 @@ class TestHandler : public reduce_relay::SquelchHandler } void - squelchAll(PublicKey const& validator, std::uint32_t duration) override + squelchAll( + PublicKey const& validator, + std::uint32_t duration, + std::function callback) override { - squelchAll_f_(validator, duration); + squelchAll_f_(validator, duration, callback); } void @@ -148,10 +151,10 @@ class enhanced_squelch_test : public beast::unit_test::suite BEAST_EXPECTS(false, "unexpected call to squelch handler"); }; - TestHandler::squelchAll_method noop_squelchAll = [&](PublicKey const&, - std::uint32_t) { - BEAST_EXPECTS(false, "unexpected call to squelchAll handler"); - }; + TestHandler::squelchAll_method noop_squelchAll = + [&](PublicKey const&, std::uint32_t, std::function) { + BEAST_EXPECTS(false, "unexpected call to squelchAll handler"); + }; TestHandler::unsquelch_method noop_unsquelch = [&](PublicKey const&, Peer::id_t) { @@ -369,10 +372,12 @@ vp_enhanced_squelch_enable=0 auto const newValidator = randomKeyPair(KeyType::ed25519).first; // once slots are full squelchAll must be called for new peer/validator - handler.squelchAll_f_ = [&](PublicKey const& key, std::uint32_t) { + handler.squelchAll_f_ = [&](PublicKey const& key, + std::uint32_t, + std::function callback) { BEAST_EXPECTS( key == newValidator, "unexpected validator squelched"); - slots.squelchValidator(key, peerID); + callback(peerID); }; slots.updateUntrustedValidatorSlot( @@ -401,7 +406,8 @@ vp_enhanced_squelch_enable=0 // verify that squelchAll is called for each idled slot validator handler.squelchAll_f_ = [&](PublicKey const& actualKey, - std::uint32_t duration) { + std::uint32_t duration, + std::function callback) { for (auto it = keys.begin(); it != keys.end(); ++it) { if (*it == actualKey) @@ -727,10 +733,12 @@ vp_enhanced_squelch_enable=0 // verify that squelchAll is called for idle validator handler.squelchAll_f_ = [&](PublicKey const& actualKey, - std::uint32_t duration) { + std::uint32_t duration, + std::function callback) { BEAST_EXPECTS( actualKey == idleValidator, "unexpected key passed to squelchAll"); + callback(peerID); }; TestStopwatch stopwatch; diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 6262923173e..40315e8c3fc 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -31,6 +31,7 @@ #include #include +#include #include namespace ripple { @@ -101,9 +102,13 @@ class SquelchHandler * to register that a (validator,peer) was squelched * @param validator Public key of the source validator * @param duration Squelch duration in seconds + * @param callback a callback to register that a validator was squelched */ virtual void - squelchAll(PublicKey const& validator, std::uint32_t duration) = 0; + squelchAll( + PublicKey const& validator, + std::uint32_t duration, + std::function callback) = 0; /** Unsquelch handler * @param validator Public key of the source validator @@ -426,17 +431,6 @@ class Slots void deletePeer(Peer::id_t id, bool erase); - /** Called to register that a given validator was squelched for a given - * peer. It is expected that this method is called by SquelchHandler. - * - * @param validatorKey Validator public key - * @param peerID peer ID - */ - void - registerSquelchedValidator( - PublicKey const& validatorKey, - Peer::id_t peerID); - void onWrite(beast::PropertyStream::Map& stream) const; @@ -480,6 +474,17 @@ class Slots bool expireAndIsPeerSquelched(PublicKey const& validatorKey, Peer::id_t peerID); + /** Called to register that a given validator was squelched for a given + * peer. It is expected that this method is called by SquelchHandler. + * + * @param validatorKey Validator public key + * @param peerID peer ID + */ + void + registerSquelchedValidator( + PublicKey const& validatorKey, + Peer::id_t peerID); + std::atomic_bool reduceRelayReady_{false}; // Maintain an open number of slots for trusted validators to reduce diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index 05bfaedcd81..f9631ff83d5 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -43,6 +43,8 @@ #include "xrpld/overlay/detail/TrafficCount.h" +#include + namespace ripple { namespace CrawlOptions { @@ -1418,11 +1420,14 @@ OverlayImpl::squelch( } void -OverlayImpl::squelchAll(PublicKey const& validator, uint32_t squelchDuration) +OverlayImpl::squelchAll( + PublicKey const& validator, + uint32_t squelchDuration, + std::function callback) { for_each([&](std::shared_ptr&& p) { p->send(makeSquelchMessage(validator, true, squelchDuration)); - slots_.registerSquelchedValidator(validator, p->id()); + callback(p->id()); }); } diff --git a/src/xrpld/overlay/detail/OverlayImpl.h b/src/xrpld/overlay/detail/OverlayImpl.h index 863451585e2..8bcb45be4a6 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.h +++ b/src/xrpld/overlay/detail/OverlayImpl.h @@ -48,6 +48,7 @@ #include #include #include +#include #include #include #include @@ -467,8 +468,10 @@ class OverlayImpl : public Overlay, public reduce_relay::SquelchHandler std::uint32_t squelchDuration) const override; void - squelchAll(PublicKey const& validator, std::uint32_t squelchDuration) - override; + squelchAll( + PublicKey const& validator, + std::uint32_t squelchDuration, + std::function) override; void unsquelch(PublicKey const& validator, Peer::id_t id) const override; diff --git a/src/xrpld/overlay/detail/Slot.cpp b/src/xrpld/overlay/detail/Slot.cpp index 09ead75fc5a..4fe7f62e8b0 100644 --- a/src/xrpld/overlay/detail/Slot.cpp +++ b/src/xrpld/overlay/detail/Slot.cpp @@ -481,7 +481,10 @@ Slots::updateUntrustedValidatorSlot( // will be eventually cleaned and squelched if (untrustedSlots_.size() == MAX_UNTRUSTED_SLOTS) { - handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + handler_.squelchAll( + validator, + MAX_UNSQUELCH_EXPIRE_DEFAULT.count(), + [&](Peer::id_t id) { registerSquelchedValidator(validator, id); }); return; } @@ -570,7 +573,11 @@ Slots::deleteIdlePeers() // sending messages for this validator squelch it if (!it->second.isTrusted_) handler_.squelchAll( - it->first, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + it->first, + MAX_UNSQUELCH_EXPIRE_DEFAULT.count(), + [&](Peer::id_t id) { + registerSquelchedValidator(it->first, id); + }); it = slots.erase(it); } @@ -586,7 +593,10 @@ Slots::deleteIdlePeers() // there might be some good validators in this set that "lapsed". // However, since these are untrusted validators we're not concerned for (auto const& validator : cleanConsideredValidators()) - handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + handler_.squelchAll( + validator, + MAX_UNSQUELCH_EXPIRE_DEFAULT.count(), + [&](Peer::id_t id) { registerSquelchedValidator(validator, id); }); } std::vector From a7f257bb6d38e5a635f8d15fb62b0dd6b245a800 Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Mon, 30 Jun 2025 15:42:33 +0200 Subject: [PATCH 25/27] removes redundant scope --- src/xrpld/overlay/detail/OverlayImpl.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index f9631ff83d5..cd9f5ea2adf 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -595,9 +595,7 @@ OverlayImpl::onWrite(beast::PropertyStream::Map& stream) } } - { - slots_.onWrite(stream); - } + slots_.onWrite(stream); } //------------------------------------------------------------------------------ From 79a561073e3298e883ca54af285629b13bab383e Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Tue, 1 Jul 2025 15:01:45 +0200 Subject: [PATCH 26/27] adds logic to reset validator progress and better deletion safeguards If the validator was idle for a short period, reset it's progress. However, if the validator was idle for a long time, delete and squelch it. Similarly, if the validator sent a lot of unique messages, but failed to reach peering constraints, squelch it. --- src/test/overlay/base_squelch_test.cpp | 8 +- src/test/overlay/enhanced_squelch_test.cpp | 145 +++++++++++++++++---- src/xrpld/overlay/ReduceRelayCommon.h | 13 +- src/xrpld/overlay/Slot.h | 20 ++- src/xrpld/overlay/detail/PeerImp.cpp | 6 +- src/xrpld/overlay/detail/Slot.cpp | 52 ++++++-- 6 files changed, 198 insertions(+), 46 deletions(-) diff --git a/src/test/overlay/base_squelch_test.cpp b/src/test/overlay/base_squelch_test.cpp index 17e58b11a63..67ff24ebd27 100644 --- a/src/test/overlay/base_squelch_test.cpp +++ b/src/test/overlay/base_squelch_test.cpp @@ -1107,7 +1107,7 @@ class base_squelch_test : public beast::unit_test::suite .count(); mustHandle = event.isSelected_ && - d > milliseconds(reduce_relay::IDLED).count() && + d > milliseconds(reduce_relay::PEER_IDLED).count() && network_.overlay().inState( *event.key_, reduce_relay::PeerState::Squelched) > 0 && @@ -1129,7 +1129,7 @@ class base_squelch_test : public beast::unit_test::suite } if (event.state_ == State::WaitReset || (event.state_ == State::On && - (now - event.time_ > (reduce_relay::IDLED + seconds(2))))) + (now - event.time_ > (reduce_relay::PEER_IDLED + seconds(2))))) { bool handled = event.state_ == State::WaitReset || !event.handled_; @@ -1319,7 +1319,7 @@ class base_squelch_test : public beast::unit_test::suite network_.overlay().clock().advance(seconds(601)); BEAST_EXPECT(propagateAndSquelch(log, true)); network_.overlay().clock().advance( - reduce_relay::IDLED + seconds(1)); + reduce_relay::PEER_IDLED + seconds(1)); std::uint16_t unsquelched = 0; network_.overlay().deleteIdlePeers( [&](PublicKey const& key, PeerWPtr const& peer) { @@ -1560,7 +1560,7 @@ vp_base_squelch_max_selected_peers=2 BEAST_EXPECT(peers[0].count == (nMessages - 1)); // advance the clock network_.overlay().clock().advance( - reduce_relay::IDLED + seconds(1)); + reduce_relay::PEER_IDLED + seconds(1)); network_.overlay().updateSlotAndSquelch( key, network_.validator(0), diff --git a/src/test/overlay/enhanced_squelch_test.cpp b/src/test/overlay/enhanced_squelch_test.cpp index da69937c844..8cd16122cab 100644 --- a/src/test/overlay/enhanced_squelch_test.cpp +++ b/src/test/overlay/enhanced_squelch_test.cpp @@ -573,9 +573,9 @@ vp_enhanced_squelch_enable=0 } void - testUpdateConsideredValidator_newValidator() + testUpdateConsideredValidator_new() { - testcase("testUpdateConsideredValidator_newValidator"); + testcase("testUpdateConsideredValidator_new"); TestStopwatch stopwatch; EnhancedSquelchingTestSlots slots( env_.app().logs(), noop_handler, env_.app().config(), stopwatch); @@ -613,9 +613,9 @@ vp_enhanced_squelch_enable=0 } void - testUpdateConsideredValidator_idleValidator() + testUpdateConsideredValidator_idle() { - testcase("testUpdateConsideredValidator_idleValidator"); + testcase("testUpdateConsideredValidator_idle"); TestStopwatch stopwatch; EnhancedSquelchingTestSlots slots( env_.app().logs(), noop_handler, env_.app().config(), stopwatch); @@ -635,7 +635,7 @@ vp_enhanced_squelch_enable=0 auto const state = slots.getConsideredValidators().at(validator); // simulate a validator sending a new message before the idle timer - stopwatch.advance(reduce_relay::IDLED - std::chrono::seconds(1)); + stopwatch.advance(reduce_relay::PEER_IDLED - std::chrono::seconds(1)); BEAST_EXPECTS( !slots.updateConsideredValidator(validator, peerID), @@ -647,22 +647,17 @@ vp_enhanced_squelch_enable=0 "non-idling validator was updated"); // simulate a validator idling - stopwatch.advance(reduce_relay::IDLED + std::chrono::seconds(1)); + stopwatch.advance(reduce_relay::PEER_IDLED + std::chrono::seconds(1)); BEAST_EXPECTS( !slots.updateConsideredValidator(validator, peerID), "validator was selected with insufficient number of peers"); - - auto const idleState = slots.getConsideredValidators().at(validator); - // we expect that an idling validator will not be updated - BEAST_EXPECTS( - newState.count == idleState.count, "idling validator was updated"); } void - testUpdateConsideredValidator_selectQualifyingValidator() + testUpdateConsideredValidator_selectQualifying() { - testcase("testUpdateConsideredValidator_selectQualifyingValidator"); + testcase("testUpdateConsideredValidator_selectQualifying"); TestStopwatch stopwatch; EnhancedSquelchingTestSlots slots( @@ -686,7 +681,8 @@ vp_enhanced_squelch_enable=0 !slots.updateConsideredValidator(validator2, peerID), "validator was selected before reaching message threshold"); - stopwatch.advance(reduce_relay::IDLED - std::chrono::seconds(1)); + stopwatch.advance( + reduce_relay::PEER_IDLED - std::chrono::seconds(1)); } // as long as the peer criteria is not met, the validator most not be // selected @@ -701,7 +697,8 @@ vp_enhanced_squelch_enable=0 !slots.updateConsideredValidator(validator2, i), "validator was selected before reaching enough peers"); - stopwatch.advance(reduce_relay::IDLED - std::chrono::seconds(1)); + stopwatch.advance( + reduce_relay::PEER_IDLED - std::chrono::seconds(1)); } auto const consideredValidator = @@ -721,9 +718,109 @@ vp_enhanced_squelch_enable=0 } void - testCleanConsideredValidators_deleteIdleValidator() + testCleanConsideredValidators_resetIdle() { - testcase("cleanConsideredValidators_deleteIdleValidator"); + testcase("testCleanConsideredValidators_resetIdle"); + auto const validator = randomKeyPair(KeyType::ed25519).first; + + TestStopwatch stopwatch; + + EnhancedSquelchingTestSlots slots( + env_.app().logs(), noop_handler, env_.app().config(), stopwatch); + + // send enough messages for a slot to meet peer requirements + for (int i = 0; + i < env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS; + ++i) + slots.updateUntrustedValidatorSlot( + sha512Half(validator) + static_cast(i), validator, i); + + // send enough messages from some peer to be one message away from + // meeting the selection criteria + for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD - + (env_.app() + .config() + .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS + + 1); + ++i) + slots.updateUntrustedValidatorSlot( + sha512Half(validator) + static_cast(i), validator, 0); + + BEAST_EXPECTS( + slots.getConsideredValidators().at(validator).count == + reduce_relay::MAX_MESSAGE_THRESHOLD - 1, + "considered validator information is in an invalid state"); + + BEAST_EXPECTS( + slots.getConsideredValidators().at(validator).peers.size() == + env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS, + "considered validator information is in an invalid state"); + + stopwatch.advance(reduce_relay::PEER_IDLED + std::chrono::seconds{1}); + + // deleteIdlePeers must reset the progress of a validator that idled + slots.deleteIdlePeers(); + + slots.updateUntrustedValidatorSlot( + sha512Half(validator) + static_cast(1), validator, 0); + + // we expect that the validator was not selected + BEAST_EXPECTS( + slots.getSlots(false).size() == 0, "untrusted slot was created"); + + BEAST_EXPECTS( + slots.getConsideredValidators().at(validator).count == 1, + "considered validator information is in an invalid state"); + + BEAST_EXPECTS( + slots.getConsideredValidators().at(validator).peers.size() == 1, + "considered validator information is in an invalid state"); + } + + void + testCleanConsideredValidators_deletePoorlyConnected() + { + testcase("cleanConsideredValidators_deletePoorlyConnected"); + auto const validator = randomKeyPair(KeyType::ed25519).first; + Peer::id_t peerID = 0; + TestHandler handler{noop_handler}; + + // verify that squelchAll is called for poorly connected validator + handler.squelchAll_f_ = [&](PublicKey const& actualKey, + std::uint32_t duration, + std::function callback) { + BEAST_EXPECTS( + actualKey == validator, "unexpected key passed to squelchAll"); + callback(peerID); + }; + + TestStopwatch stopwatch; + + EnhancedSquelchingTestSlots slots( + env_.app().logs(), handler, env_.app().config(), stopwatch); + + // send enough messages from a single peer + for (int i = 0; i < 2 * reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i) + slots.updateUntrustedValidatorSlot( + sha512Half(validator) + static_cast(i), + validator, + peerID); + + stopwatch.advance(reduce_relay::PEER_IDLED + std::chrono::seconds{1}); + + // deleteIdlePeers must squelch the validator as it failed to reach + // peering requirements + slots.deleteIdlePeers(); + + BEAST_EXPECTS( + slots.getConsideredValidators().size() == 0, + "poorly connected validator was not deleted"); + } + + void + testCleanConsideredValidators_deleteSilent() + { + testcase("cleanConsideredValidators_deleteSilent"); // insert some random validator key auto const idleValidator = randomKeyPair(KeyType::ed25519).first; auto const validator = randomKeyPair(KeyType::ed25519).first; @@ -755,7 +852,9 @@ vp_enhanced_squelch_enable=0 "new validator was not added for consideration"); // simulate a validator idling - stopwatch.advance(reduce_relay::IDLED + std::chrono::seconds(1)); + stopwatch.advance( + reduce_relay::MAX_UNTRUSTED_VALIDATOR_IDLE + + std::chrono::seconds(1)); BEAST_EXPECTS( !slots.updateConsideredValidator(validator, peerID), "validator was selected with insufficient number of peers"); @@ -825,10 +924,12 @@ vp_enhanced_squelch_enable=0 testDeleteIdlePeers_deleteIdleSlots(); testDeleteIdlePeers_deleteIdleUntrustedPeer(); testUpdateSlotAndSquelch_untrustedValidator(); - testUpdateConsideredValidator_newValidator(); - testUpdateConsideredValidator_idleValidator(); - testUpdateConsideredValidator_selectQualifyingValidator(); - testCleanConsideredValidators_deleteIdleValidator(); + testUpdateConsideredValidator_new(); + testUpdateConsideredValidator_idle(); + testUpdateConsideredValidator_selectQualifying(); + testCleanConsideredValidators_deleteSilent(); + testCleanConsideredValidators_resetIdle(); + testCleanConsideredValidators_deletePoorlyConnected(); } }; diff --git a/src/xrpld/overlay/ReduceRelayCommon.h b/src/xrpld/overlay/ReduceRelayCommon.h index efcb0f96901..7bfdcca900b 100644 --- a/src/xrpld/overlay/ReduceRelayCommon.h +++ b/src/xrpld/overlay/ReduceRelayCommon.h @@ -21,6 +21,7 @@ #define RIPPLE_OVERLAY_REDUCERELAYCOMMON_H_INCLUDED #include +#include namespace ripple { @@ -39,21 +40,31 @@ static constexpr auto MIN_UNSQUELCH_EXPIRE = std::chrono::seconds{300}; static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT = std::chrono::seconds{600}; static constexpr auto SQUELCH_PER_PEER = std::chrono::seconds(10); static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS = std::chrono::seconds{3600}; + // No message received threshold before identifying a peer as idled -static constexpr auto IDLED = std::chrono::seconds{8}; +static constexpr auto PEER_IDLED = std::chrono::seconds{8}; + // Message count threshold to start selecting peers as the source // of messages from the validator. We add peers who reach // MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS // reach MAX_MESSAGE_THRESHOLD. static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 19; static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 20; + // Max selected peers to choose as the source of messages from validator static constexpr uint16_t MAX_SELECTED_PEERS = 5; + // Max number of untrusted slots the server will maintain static constexpr uint16_t MAX_UNTRUSTED_SLOTS = 5; + +// The maximum of seconds an untrusted validator can go without sending a +// validation message. After this, a validator may be squelched +static constexpr auto MAX_UNTRUSTED_VALIDATOR_IDLE = std::chrono::seconds{30}; + // Wait before reduce-relay feature is enabled on boot up to let // the server establish peer connections static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10}; + // Maximum size of the aggregated transaction hashes per peer. // Once we get to high tps throughput, this cap will prevent // TMTransactions from exceeding the current protocol message diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index 40315e8c3fc..0bce52699f3 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -148,8 +148,7 @@ class Slot final std::size_t timesSelected; // number of times the peer was selected }; - /** Get all peers of the slot. This methos is only to be used in - * unit-tests. + /** Get all peers of the slot. */ std::unordered_map const& getPeers() const @@ -520,10 +519,19 @@ class Slots struct ValidatorInfo { - size_t count; // the number of messages sent from this validator - time_point lastMessage; // timestamp of the last message - std::unordered_set peers; // a list of peer IDs that sent a - // message for this validator + // the number of messages sent from this validator + size_t count; + // timestamp of the last message + time_point lastMessage; + // a list of peer IDs that sent a message for this validator + std::unordered_set peers; + + void + reset() + { + count = 0; + peers.clear(); + } }; // Untrusted validators considered for open untrusted slots diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 777c702d7d8..43286c6f6ef 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -1722,7 +1722,8 @@ PeerImp::onMessage(std::shared_ptr const& m) { // Count unique messages (Slots has it's own 'HashRouter'), which a peer // receives within IDLED seconds since the message has been relayed. - if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED) + if (relayed && + (stopwatch().now() - *relayed) < reduce_relay::PEER_IDLED) overlay_.updateSlotAndSquelch( suppression, publicKey, id_, isTrusted); @@ -2366,7 +2367,8 @@ PeerImp::onMessage(std::shared_ptr const& m) // Count unique messages (Slots has it's own 'HashRouter'), which a // peer receives within IDLED seconds since the message has been // relayed. - if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED) + if (relayed && + (stopwatch().now() - *relayed) < reduce_relay::PEER_IDLED) overlay_.updateSlotAndSquelch( key, val->getSignerPublic(), id_, isTrusted); diff --git a/src/xrpld/overlay/detail/Slot.cpp b/src/xrpld/overlay/detail/Slot.cpp index 4fe7f62e8b0..2f6d338b5f8 100644 --- a/src/xrpld/overlay/detail/Slot.cpp +++ b/src/xrpld/overlay/detail/Slot.cpp @@ -521,10 +521,6 @@ Slots::updateConsideredValidator(PublicKey const& validator, Peer::id_t peer) return std::nullopt; } - // the validator idled. Don't update it, it will be cleaned later - if (now - it->second.lastMessage > IDLED) - return std::nullopt; - it->second.peers.insert(peer); it->second.lastMessage = now; ++it->second.count; @@ -562,19 +558,32 @@ Slots::deleteIdlePeers() for (auto it = slots.begin(); it != slots.end();) { - it->second.deleteIdlePeer(it->first); - if (now - it->second.getLastSelected() > - MAX_UNSQUELCH_EXPIRE_DEFAULT) + auto const& validator = it->first; + auto& slot = it->second; + slot.deleteIdlePeer(validator); + + // delete the slot if the untrusted slot no longer meets the + // selection critera or it has not been selected for a while + if ((!slot.isTrusted_ && + slot.getPeers().size() < maxSelectedPeers_) || + now - it->second.getLastSelected() > + reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT) { - JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot " - << Slice(it->first); + JLOG(journal_.trace()) + << "deleteIdlePeers: deleting " + << (slot.isTrusted_ ? "trusted" : "untrusted") << " slot " + << Slice(it->first) << " reason: " + << (now - it->second.getLastSelected() > + reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT + ? " inactive " + : " insufficient peers"); // if an untrusted validator slot idled - peers stopped // sending messages for this validator squelch it if (!it->second.isTrusted_) handler_.squelchAll( it->first, - MAX_UNSQUELCH_EXPIRE_DEFAULT.count(), + reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT.count(), [&](Peer::id_t id) { registerSquelchedValidator(it->first, id); }); @@ -608,11 +617,32 @@ Slots::cleanConsideredValidators() for (auto it = consideredValidators_.begin(); it != consideredValidators_.end();) { - if (now - it->second.lastMessage > IDLED) + // this is a safety check for validators that have + // sent a lot of validations via limited number of peers + if (it->second.count > 2 * reduce_relay::MAX_MESSAGE_THRESHOLD && + it->second.peers.size() < maxSelectedPeers_) + { + JLOG(journal_.warn()) + << "cleanConsideredValidators: removing " + "validator " + << Slice(it->first) << " with insufficient peers"; + + keys.push_back(it->first); + it = consideredValidators_.erase(it); + } + else if ( + now - it->second.lastMessage > + reduce_relay::MAX_UNTRUSTED_VALIDATOR_IDLE) { keys.push_back(it->first); it = consideredValidators_.erase(it); } + // Due to some reason the validator idled, reset their progress + else if (now - it->second.lastMessage > reduce_relay::PEER_IDLED) + { + it->second.reset(); + ++it; + } else ++it; } From 14ba721538f02470c5718f15445156b1a5e187bd Mon Sep 17 00:00:00 2001 From: Vito <5780819+Tapanito@users.noreply.github.com> Date: Tue, 1 Jul 2025 15:04:19 +0200 Subject: [PATCH 27/27] improves code readability --- src/xrpld/overlay/detail/Slot.cpp | 36 ++++++++++++++++--------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/xrpld/overlay/detail/Slot.cpp b/src/xrpld/overlay/detail/Slot.cpp index 2f6d338b5f8..0cfb8e6a284 100644 --- a/src/xrpld/overlay/detail/Slot.cpp +++ b/src/xrpld/overlay/detail/Slot.cpp @@ -46,7 +46,7 @@ Slot::deleteIdlePeer(PublicKey const& validator) auto& peer = it->second; auto id = it->first; ++it; - if (now - peer.lastMessage > IDLED) + if (now - peer.lastMessage > reduce_relay::PEER_IDLED) { JLOG(journal_.trace()) << "deleteIdlePeer: " << Slice(validator) << " " << id @@ -112,12 +112,12 @@ Slot::update( if (state_ != SlotState::Counting || peer.state == PeerState::Squelched) return; - if (++peer.count > MIN_MESSAGE_THRESHOLD) + if (++peer.count > reduce_relay::MIN_MESSAGE_THRESHOLD) considered_.insert(id); - if (peer.count == (MAX_MESSAGE_THRESHOLD + 1)) + if (peer.count == (reduce_relay::MAX_MESSAGE_THRESHOLD + 1)) ++reachedThreshold_; - if (now - lastSelected_ > 2 * MAX_UNSQUELCH_EXPIRE_DEFAULT) + if (now - lastSelected_ > 2 * reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT) { JLOG(journal_.trace()) << "update: resetting due to inactivity " << Slice(validator) << " " @@ -150,7 +150,7 @@ Slot::update( << Slice(validator) << " " << id; continue; } - if (now - itpeers->second.lastMessage < IDLED) + if (now - itpeers->second.lastMessage < reduce_relay::PEER_IDLED) selected.insert(id); } @@ -211,14 +211,16 @@ Slot::getSquelchDuration(std::size_t npeers) const { using namespace std::chrono; auto m = std::max( - MAX_UNSQUELCH_EXPIRE_DEFAULT, seconds{SQUELCH_PER_PEER * npeers}); - if (m > MAX_UNSQUELCH_EXPIRE_PEERS) + reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT, + seconds{reduce_relay::SQUELCH_PER_PEER * npeers}); + if (m > reduce_relay::MAX_UNSQUELCH_EXPIRE_PEERS) { - m = MAX_UNSQUELCH_EXPIRE_PEERS; + m = reduce_relay::MAX_UNSQUELCH_EXPIRE_PEERS; JLOG(journal_.warn()) << "getSquelchDuration: unexpected squelch duration " << npeers; } - return seconds{ripple::rand_int(MIN_UNSQUELCH_EXPIRE / 1s, m / 1s)}; + return seconds{ + ripple::rand_int(reduce_relay::MIN_UNSQUELCH_EXPIRE / 1s, m / 1s)}; } void @@ -250,7 +252,7 @@ Slot::deletePeer(PublicKey const& validator, Peer::id_t id, bool erase) } else if (considered_.find(id) != considered_.end()) { - if (it->second.count > MAX_MESSAGE_THRESHOLD) + if (it->second.count > reduce_relay::MAX_MESSAGE_THRESHOLD) --reachedThreshold_; considered_.erase(id); } @@ -363,7 +365,7 @@ Slots::expireAndIsPeerSquelched( bool Slots::addPeerMessage(uint256 const& key, Peer::id_t id) { - beast::expire(peersWithMessage_, reduce_relay::IDLED); + beast::expire(peersWithMessage_, reduce_relay::PEER_IDLED); if (key.isNonZero()) { @@ -466,7 +468,9 @@ Slots::updateUntrustedValidatorSlot( { registerSquelchedValidator(validator, id); handler_.squelch( - validator, id, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); + validator, + id, + reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); } return; } @@ -479,7 +483,7 @@ Slots::updateUntrustedValidatorSlot( // In all of these cases we send a squelch message to all peers. // The validator may still be considered by the selector. However, it // will be eventually cleaned and squelched - if (untrustedSlots_.size() == MAX_UNTRUSTED_SLOTS) + if (untrustedSlots_.size() == reduce_relay::MAX_UNTRUSTED_SLOTS) { handler_.squelchAll( validator, @@ -526,11 +530,9 @@ Slots::updateConsideredValidator(PublicKey const& validator, Peer::id_t peer) ++it->second.count; // if the validator has not met selection criteria yet - if (it->second.count < MAX_MESSAGE_THRESHOLD || + if (it->second.count < reduce_relay::MAX_MESSAGE_THRESHOLD || it->second.peers.size() < reduce_relay::MAX_SELECTED_PEERS) - { return std::nullopt; - } auto const key = it->first; consideredValidators_.erase(it); @@ -604,7 +606,7 @@ Slots::deleteIdlePeers() for (auto const& validator : cleanConsideredValidators()) handler_.squelchAll( validator, - MAX_UNSQUELCH_EXPIRE_DEFAULT.count(), + reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT.count(), [&](Peer::id_t id) { registerSquelchedValidator(validator, id); }); }