8000 DRAFT: Reduce duplicate peer traffic for ledger data by ximinez · Pull Request #5301 · XRPLF/rippled · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

DRAFT: Reduce duplicate peer traffic for ledger data #5301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10000
134 changes: 134 additions & 0 deletions include/xrpl/basics/CanProcess.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED

#include <functional>
#include <mutex>
#include <set>

/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
class CanProcess
{
public:
template <class Mutex, class Collection, class Item>
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: cleanup_(insert(mtx, collection, item))
{
}

~CanProcess()
{
if (cleanup_)
cleanup_();
}

explicit
operator bool() const
{
return static_cast<bool>(cleanup_);
}

private:
template <bool useIterator, class Mutex, class Collection, class Item>
std::function<void()>
doInsert(Mutex& mtx, Collection& collection, Item const& item)
{
std::unique_lock<Mutex> lock(mtx);
// TODO: Use structured binding once LLVM 16 is the minimum supported
// version. See also: https://github.com/llvm/llvm-project/issues/48582
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
auto const insertResult = collection.insert(item);
auto const it = insertResult.first;
if (!insertResult.second)
return {};
if constexpr (useIterator)
return [&, it]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(it);
};
else
return [&]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(item);
};
}

// Generic insert() function doesn't use iterators because they may get
// invalidated
template <class Mutex, class Collection, class Item>
std::function<void()>
insert(Mutex& mtx, Collection& collection, Item const& item)
{
return doInsert<false>(mtx, collection, item);
}

// Specialize insert() for std::set, which does not invalidate iterators for
// insert and erase
template <class Mutex, class Item>
std::function<void()>
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
{
return doInsert<true>(mtx, collection, item);
}

// If set, then the item is "usable"
std::function<void()> cleanup_;
};

#endif
7 changes: 7 additions & 0 deletions include/xrpl/basics/base_uint.h
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,13 @@ to_string(base_uint<Bits, Tag> const& a)
return strHex(a.cbegin(), a.cend());
}

template <std::size_t Bits, class Tag>
inline std::string
to_short_string(base_uint<Bits, Tag> const& a)
{
return strHex(a.cbegin(), a.cend()).substr(0, 8) + "...";
}

template <std::size_t Bits, class Tag>
inline std::ostream&
operator<<(std::ostream& out, base_uint<Bits, Tag> const& u)
Expand Down
10 changes: 10 additions & 0 deletions include/xrpl/proto/ripple.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,18 @@ message TMLedgerData
required uint32 ledgerSeq = 2;
required TMLedgerInfoType type = 3;
repeated TMLedgerNode nodes = 4;
// If the peer supports "responseCookies", this field will
// never be populated.
optional uint32 requestCookie = 5;
optional TMReplyError error = 6;
// The old field is called "requestCookie", but this is
// a response, so this name makes more sense
repeated uint32 responseCookies = 7;
// If a TMGetLedger request was received without a "requestCookie",
// and the peer supports it, this flag will be set to true to
// indicate that the receiver should process the result in addition
// to forwarding it to its "responseCookies" peers.
optional bool directResponse = 8;
}

message TMPing
Expand Down
2 changes: 2 additions & 0 deletions include/xrpl/protocol/LedgerHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ struct LedgerHeader

// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;
Expand Down
27 changes: 27 additions & 0 deletions src/test/app/HashRouter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,32 @@ class HashRouter_test : public beast::unit_test::suite
}
}

testProcessPeer()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, 5s);
uint256 const key(1);
HashRouter::PeerShortID peer1 = 1;
HashRouter::PeerShortID peer2 = 2;
auto const timeout = 2s;

BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
++stopwatch;
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
}

public:
void
run() override
Expand All @@ -375,6 +401,7 @@ class HashRouter_test : public beast::unit_test::suite
testRelay();
testProcess();
testSetup();
testProcessPeer();
}
};

Expand Down
5 changes: 5 additions & 0 deletions src/test/app/LedgerReplay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ class TestPeer : public Peer
{
return false;
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}

bool ledgerReplayEnabled_;
PublicKey nodePublicKey_;
Expand Down
5 changes: 5 additions & 0 deletions src/test/basics/base_uint_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ struct base_uint_test : beast::unit_test::suite
uset.insert(u);
BEAST_EXPECT(raw.size() == u.size());
BEAST_EXPECT(to_string(u) == "0102030405060708090A0B0C");
BEAST_EXPECT(to_short_string(u) == "01020304...");
BEAST_EXPECT(*u.data() == 1);
BEAST_EXPECT(u.signum() == 1);
BEAST_EXPECT(!!u);
Expand All @@ -174,6 +175,7 @@ struct base_uint_test : beast::unit_test::suite
test96 v{~u};
uset.insert(v);
BEAST_EXPECT(to_string(v) == "FEFDFCFBFAF9F8F7F6F5F4F3");
BEAST_EXPECT(to_short_string(v) == "FEFDFCFB...");
BEAST_EXPECT(*v.data() == 0xfe);
BEAST_EXPECT(v.signum() == 1);
BEAST_EXPECT(!!v);
Expand All @@ -194,6 +196,7 @@ struct base_uint_test : beast::unit_test::suite
test96 z{beast::zero};
uset.insert(z);
BEAST_EXPECT(to_string(z) == "000000000000000000000000");
BEAST_EXPECT(to_short_string(z) == "00000000...");
BEAST_EXPECT(*z.data() == 0);
BEAST_EXPECT(*z.begin() == 0);
BEAST_EXPECT(*std::prev(z.end(), 1) == 0);
Expand All @@ -214,6 +217,7 @@ struct base_uint_test : beast::unit_test::suite
BEAST_EXPECT(n == z);
n--;
BEAST_EXPECT(to_string(n) == "FFFFFFFFFFFFFFFFFFFFFFFF");
BEAST_EXPECT(to_short_string(n) == "FFFFFFFF...");
n = beast::zero;
BEAST_EXPECT(n == z);

Expand All @@ -224,6 +228,7 @@ struct base_uint_test : beast::unit_test::suite
test96 x{zm1 ^ zp1};
uset.insert(x);
BEAST_EXPECTS(to_string(x) == "FFFFFFFFFFFFFFFFFFFFFFFE", to_string(x));
BEAST_EXPECTS(to_short_string(x) == "FFFFFFFF...", to_short_string(x));

BEAST_EXPECT(uset.size() == 4);

Expand Down
4 changes: 2 additions & 2 deletions src/test/overlay/ProtocolVersion_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ class ProtocolVersion_test : public beast::unit_test::suite
negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
BEAST_EXPECT(
negotiateProtocolVersion(
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
make_protocol(2, 2));
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
make_protocol(2, 3));
BEAST_EXPECT(
negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") ==
std::nullopt);
Expand Down
5 changes: 5 additions & 0 deletions src/test/overlay/reduce_relay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ class PeerPartial : public Peer
removeTxQueue(uint256 const&) override
{
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
};

/** Manually advanced clock. */
Expand Down
3 changes: 2 additions & 1 deletion src/xrpld/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,8 @@ void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if (!positions && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
app_.getOPs().setMode(
OperatingMode::CONNECTED, "updateOperatingMode: no positions");
}

void
Expand Down
19 changes: 19 additions & 0 deletions src/xrpld/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,25 @@ class InboundLedger final : public TimeoutCounter,
std::unique_ptr<PeerSet> mPeerSet;
};

inline std::string
to_string(InboundLedger::Reason reason)
{
using enum InboundLedger::Reason;
switch (reason)
{
case HISTORY:
return "HISTORY";
case GENERIC:
return "GENERIC";
case CONSENSUS:
return "CONSENSUS";
default:
UNREACHABLE(
"ripple::to_string(InboundLedger::Reason) : unknown value");
return "unknown";
}
}

} // namespace ripple

#endif
9 changes: 8 additions & 1 deletion src/xrpld/app/ledger/detail/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)

if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}

mByHash = true;

Expand Down
Loading
0