Skip to content

Commit

Permalink
guard TxRequest and rejection caches with new mutex
Browse files Browse the repository at this point in the history
We need to synchronize between various tx download structures.
TxRequest does not inherently need cs_main for synchronization, and it's
not appropriate to lock all of the tx download logic under cs_main.
  • Loading branch information
glozow committed Jul 16, 2024
1 parent 35dddbc commit 3eb1307
Showing 1 changed file with 57 additions and 32 deletions.
89 changes: 57 additions & 32 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,9 @@ class PeerManagerImpl final : public PeerManager

/** Overridden from CValidationInterface. */
void BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override
EXCLUSIVE_LOCKS_REQUIRED(!m_recent_confirmed_transactions_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_recent_confirmed_transactions_mutex, !m_tx_download_mutex);
void BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex) override
EXCLUSIVE_LOCKS_REQUIRED(!m_recent_confirmed_transactions_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_recent_confirmed_transactions_mutex, !m_tx_download_mutex);
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void BlockChecked(const CBlock& block, const BlockValidationState& state) override
Expand All @@ -501,13 +501,13 @@ class PeerManagerImpl final : public PeerManager
EXCLUSIVE_LOCKS_REQUIRED(!m_most_recent_block_mutex);

/** Implement NetEventsInterface */
void InitializeNode(const CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex);
void InitializeNode(const CNode& node, ServiceFlags our_services) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_tx_download_mutex);
void FinalizeNode(const CNode& node) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_headers_presync_mutex, !m_tx_download_mutex);
bool HasAllDesirableServiceFlags(ServiceFlags services) const override;
bool ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex);
bool SendMessages(CNode* pto) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, g_msgproc_mutex, !m_tx_download_mutex);

/** Implement PeerManager */
void StartScheduledTasks(CScheduler& scheduler) override;
Expand All @@ -526,7 +526,7 @@ class PeerManagerImpl final : public PeerManager
void UnitTestMisbehaving(NodeId peer_id) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex) { Misbehaving(*Assert(GetPeerRef(peer_id)), ""); };
void ProcessMessage(CNode& pfrom, const std::string& msg_type, DataStream& vRecv,
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex);
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override;
ServiceFlags GetDesirableServiceFlags(ServiceFlags services) const override;

Expand Down Expand Up @@ -585,12 +585,12 @@ class PeerManagerImpl final : public PeerManager
* Updates m_txrequest, m_recent_rejects, m_recent_rejects_reconsiderable, m_orphanage, and vExtraTxnForCompact. */
void ProcessInvalidTx(NodeId nodeid, const CTransactionRef& tx, const TxValidationState& result,
bool maybe_add_extra_compact_tx)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, cs_main);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, m_tx_download_mutex);

/** Handle a transaction whose result was MempoolAcceptResult::ResultType::VALID.
* Updates m_txrequest, m_orphanage, and vExtraTxnForCompact. Also queues the tx for relay. */
void ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, const std::list<CTransactionRef>& replaced_transactions)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, cs_main);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, m_tx_download_mutex);

struct PackageToValidate {
const Package m_txns;
Expand Down Expand Up @@ -620,13 +620,13 @@ class PeerManagerImpl final : public PeerManager
* individual transactions, and caches rejection for the package as a group.
*/
void ProcessPackageResult(const PackageToValidate& package_to_validate, const PackageMempoolAcceptResult& package_result)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, cs_main);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, m_tx_download_mutex);

/** Look for a child of this transaction in the orphanage to form a 1-parent-1-child package,
* skipping any combinations that have already been tried. Return the resulting package along with
* the senders of its respective transactions, or std::nullopt if no package is found. */
std::optional<PackageToValidate> Find1P1CPackage(const CTransactionRef& ptx, NodeId nodeid)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, cs_main);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, m_tx_download_mutex);

/**
* Reconsider orphan transactions after a parent has been accepted to the mempool.
Expand All @@ -640,7 +640,7 @@ class PeerManagerImpl final : public PeerManager
* will be empty.
*/
bool ProcessOrphanTx(Peer& peer)
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex);
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, g_msgproc_mutex, !m_tx_download_mutex);

/** Process a single headers message from a peer.
*
Expand Down Expand Up @@ -722,7 +722,7 @@ class PeerManagerImpl final : public PeerManager
* peer. The announcement parameters are decided in PeerManager and then
* passed to TxRequestTracker. */
void AddTxAnnouncement(const CNode& node, const GenTxid& gtxid, std::chrono::microseconds current_time)
EXCLUSIVE_LOCKS_REQUIRED(::cs_main);
EXCLUSIVE_LOCKS_REQUIRED(::cs_main, m_tx_download_mutex);

/** Send a message to a peer */
void PushMessage(CNode& node, CSerializedNetMsg&& msg) const { m_connman.PushMessage(&node, std::move(msg)); }
Expand Down Expand Up @@ -770,7 +770,19 @@ class PeerManagerImpl final : public PeerManager
BanMan* const m_banman;
ChainstateManager& m_chainman;
CTxMemPool& m_mempool;
TxRequestTracker m_txrequest GUARDED_BY(::cs_main);

/** Synchronizes tx download including TxRequestTracker, rejection filters, and TxOrphanage.
* Lock invariants:
* - A txhash (txid or wtxid) in m_txrequest is not also in m_orphanage.
* - A txhash (txid or wtxid) in m_txrequest is not also in m_recent_rejects.
* - A txhash (txid or wtxid) in m_txrequest is not also in m_recent_rejects_reconsiderable.
* - A txhash (txid or wtxid) in m_txrequest is not also in m_recent_confirmed_transactions.
* - Each data structure's limits hold (m_orphanage max size, m_txrequest per-peer limits, etc).
*
* m_tx_download_mutex must be acquired before mempool.cs
*/
Mutex m_tx_download_mutex;
TxRequestTracker m_txrequest GUARDED_BY(m_tx_download_mutex);
std::unique_ptr<TxReconciliationTracker> m_txreconciliation;

/** The height of the best chain */
Expand Down Expand Up @@ -851,7 +863,7 @@ class PeerManagerImpl final : public PeerManager
* chain tip has changed.
* */
bool AlreadyHaveTx(const GenTxid& gtxid, bool include_reconsiderable)
EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_recent_confirmed_transactions_mutex);
EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_recent_confirmed_transactions_mutex, m_tx_download_mutex);

/**
* Filter for transactions that were recently rejected by the mempool.
Expand Down Expand Up @@ -887,10 +899,10 @@ class PeerManagerImpl final : public PeerManager
*
* Memory used: 1.3 MB
*/
CRollingBloomFilter m_recent_rejects GUARDED_BY(::cs_main){120'000, 0.000'001};
CRollingBloomFilter m_recent_rejects GUARDED_BY(m_tx_download_mutex){120'000, 0.000'001};
/** Block hash of chain tip the last time we reset m_recent_rejects and
* m_recent_rejects_reconsiderable. */
uint256 hashRecentRejectsChainTip GUARDED_BY(cs_main);
uint256 hashRecentRejectsChainTip GUARDED_BY(m_tx_download_mutex);

/**
* Filter for:
Expand All @@ -912,7 +924,7 @@ class PeerManagerImpl final : public PeerManager
*
* Parameters are picked to be the same as m_recent_rejects, with the same rationale.
*/
CRollingBloomFilter m_recent_rejects_reconsiderable GUARDED_BY(::cs_main){120'000, 0.000'001};
CRollingBloomFilter m_recent_rejects_reconsiderable GUARDED_BY(m_tx_download_mutex){120'000, 0.000'001};

/*
* Filter for transactions that have been recently confirmed.
Expand Down Expand Up @@ -1067,7 +1079,7 @@ class PeerManagerImpl final : public PeerManager
int m_peers_downloading_from GUARDED_BY(cs_main) = 0;

/** Storage for orphan information */
TxOrphanage m_orphanage;
TxOrphanage m_orphanage GUARDED_BY(m_tx_download_mutex);

void AddToCompactExtraTransactions(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex);

Expand Down Expand Up @@ -1630,7 +1642,8 @@ void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer)

void PeerManagerImpl::AddTxAnnouncement(const CNode& node, const GenTxid& gtxid, std::chrono::microseconds current_time)
{
AssertLockHeld(::cs_main); // For m_txrequest
AssertLockHeld(::cs_main); // for State
AssertLockHeld(m_tx_download_mutex); // For m_txrequest
NodeId nodeid = node.GetId();
if (!node.HasPermission(NetPermissionFlags::Relay) && m_txrequest.Count(nodeid) >= MAX_PEER_TX_ANNOUNCEMENTS) {
// Too many queued announcements from this peer
Expand Down Expand Up @@ -1666,8 +1679,11 @@ void PeerManagerImpl::InitializeNode(const CNode& node, ServiceFlags our_service
{
NodeId nodeid = node.GetId();
{
LOCK(cs_main);
LOCK(cs_main); // For m_node_states
m_node_states.emplace_hint(m_node_states.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(node.IsInboundConn()));
}
{
LOCK(m_tx_download_mutex);
assert(m_txrequest.Count(nodeid) == 0);
}

Expand Down Expand Up @@ -1735,8 +1751,11 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
}
}
}
m_orphanage.EraseForPeer(nodeid);
m_txrequest.DisconnectedPeer(nodeid);
{
LOCK(m_tx_download_mutex);
m_orphanage.EraseForPeer(nodeid);
m_txrequest.DisconnectedPeer(nodeid);
}
if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid);
m_num_preferred_download_peers -= state->fPreferredDownload;
m_peers_downloading_from -= (!state->vBlocksInFlight.empty());
Expand All @@ -1753,6 +1772,7 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
assert(m_peers_downloading_from == 0);
assert(m_outbound_peers_with_protect_from_disconnect == 0);
assert(m_wtxid_relay_peers == 0);
LOCK(m_tx_download_mutex);
assert(m_txrequest.Size() == 0);
assert(m_orphanage.Size() == 0);
}
Expand Down Expand Up @@ -2084,6 +2104,7 @@ void PeerManagerImpl::BlockConnected(
if (role == ChainstateRole::BACKGROUND) {
return;
}
LOCK(m_tx_download_mutex);
m_orphanage.EraseForBlock(*pblock);

{
Expand All @@ -2096,7 +2117,6 @@ void PeerManagerImpl::BlockConnected(
}
}
{
LOCK(cs_main);
for (const auto& ptx : pblock->vtx) {
m_txrequest.ForgetTxHash(ptx->GetHash());
m_txrequest.ForgetTxHash(ptx->GetWitnessHash());
Expand Down Expand Up @@ -2254,6 +2274,9 @@ void PeerManagerImpl::BlockChecked(const CBlock& block, const BlockValidationSta

bool PeerManagerImpl::AlreadyHaveTx(const GenTxid& gtxid, bool include_reconsiderable)
{
AssertLockHeld(::cs_main);
AssertLockHeld(m_tx_download_mutex);

if (m_chainman.ActiveChain().Tip()->GetBlockHash() != hashRecentRejectsChainTip) {
// If the chain tip has changed previously rejected transactions
// might be now valid, e.g. due to a nLockTime'd tx becoming valid,
Expand Down Expand Up @@ -3154,7 +3177,7 @@ void PeerManagerImpl::ProcessInvalidTx(NodeId nodeid, const CTransactionRef& ptx
{
AssertLockNotHeld(m_peer_mutex);
AssertLockHeld(g_msgproc_mutex);
AssertLockHeld(cs_main);
AssertLockHeld(m_tx_download_mutex);

LogDebug(BCLog::MEMPOOLREJ, "%s (wtxid=%s) from peer=%d was not accepted: %s\n",
ptx->GetHash().ToString(),
Expand Down Expand Up @@ -3219,7 +3242,7 @@ void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, c
{
AssertLockNotHeld(m_peer_mutex);
AssertLockHeld(g_msgproc_mutex);
AssertLockHeld(cs_main);
AssertLockHeld(m_tx_download_mutex);

// As this version of the transaction was acceptable, we can forget about any requests for it.
// No-op if the tx is not in txrequest.
Expand Down Expand Up @@ -3247,7 +3270,7 @@ void PeerManagerImpl::ProcessPackageResult(const PackageToValidate& package_to_v
{
AssertLockNotHeld(m_peer_mutex);
AssertLockHeld(g_msgproc_mutex);
AssertLockHeld(cs_main);
AssertLockHeld(m_tx_download_mutex);

const auto& package = package_to_validate.m_txns;
const auto& senders = package_to_validate.m_senders;
Expand Down Expand Up @@ -3303,7 +3326,7 @@ std::optional<PeerManagerImpl::PackageToValidate> PeerManagerImpl::Find1P1CPacka
{
AssertLockNotHeld(m_peer_mutex);
AssertLockHeld(g_msgproc_mutex);
AssertLockHeld(cs_main);
AssertLockHeld(m_tx_download_mutex);

const auto& parent_wtxid{ptx->GetWitnessHash()};

Expand Down Expand Up @@ -3356,7 +3379,7 @@ std::optional<PeerManagerImpl::PackageToValidate> PeerManagerImpl::Find1P1CPacka
bool PeerManagerImpl::ProcessOrphanTx(Peer& peer)
{
AssertLockHeld(g_msgproc_mutex);
LOCK(cs_main);
LOCK2(::cs_main, m_tx_download_mutex);

CTransactionRef porphanTx = nullptr;

Expand Down Expand Up @@ -4173,7 +4196,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,

const bool reject_tx_invs{RejectIncomingTxs(pfrom)};

LOCK(cs_main);
LOCK2(cs_main, m_tx_download_mutex);

const auto current_time{GetTime<std::chrono::microseconds>()};
uint256* best_block{nullptr};
Expand Down Expand Up @@ -4506,7 +4529,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
const uint256& hash = peer->m_wtxid_relay ? wtxid : txid;
AddKnownTx(*peer, hash);

LOCK(cs_main);
LOCK2(cs_main, m_tx_download_mutex);

m_txrequest.ReceivedResponse(pfrom.GetId(), txid);
if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid);
Expand Down Expand Up @@ -5263,7 +5286,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
std::vector<CInv> vInv;
vRecv >> vInv;
if (vInv.size() <= MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
LOCK(::cs_main);
LOCK(m_tx_download_mutex);
for (CInv &inv : vInv) {
if (inv.IsGenTxMsg()) {
// If we receive a NOTFOUND message for a tx we requested, mark the announcement for it as
Expand Down Expand Up @@ -5388,6 +5411,7 @@ bool PeerManagerImpl::ProcessMessages(CNode* pfrom, std::atomic<bool>& interrupt
// by another peer that was already processed; in that case,
// the extra work may not be noticed, possibly resulting in an
// unnecessary 100ms delay)
LOCK(m_tx_download_mutex);
if (m_orphanage.HaveTxToReconsider(peer->m_id)) fMoreWork = true;
} catch (const std::exception& e) {
LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg.m_type), msg.m_message_size, e.what(), typeid(e).name());
Expand Down Expand Up @@ -6281,6 +6305,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
//
// Message: getdata (transactions)
//
LOCK(m_tx_download_mutex);
std::vector<std::pair<NodeId, GenTxid>> expired;
auto requestable = m_txrequest.GetRequestable(pto->GetId(), current_time, &expired);
for (const auto& entry : expired) {
Expand Down

0 comments on commit 3eb1307

Please sign in to comment.