diff --git a/src/Makefile.am b/src/Makefile.am index a0eca537b48d..52ffad3efff0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -282,6 +282,9 @@ BITCOIN_CORE_H = \ llmq/dkgsessionhandler.h \ llmq/dkgsessionmgr.h \ llmq/ehf_signals.h \ + llmq/net_dkg.h \ + llmq/net_quorum.h \ + llmq/net_signing.h \ llmq/observer.h \ llmq/options.h \ llmq/params.h \ @@ -289,8 +292,6 @@ BITCOIN_CORE_H = \ llmq/quorumsman.h \ llmq/signhash.h \ llmq/signing.h \ - llmq/net_quorum.h \ - llmq/net_signing.h \ llmq/signing_shares.h \ llmq/snapshot.h \ llmq/types.h \ @@ -553,6 +554,7 @@ libbitcoin_node_a_SOURCES = \ llmq/dkgsessionhandler.cpp \ llmq/dkgsessionmgr.cpp \ llmq/ehf_signals.cpp \ + llmq/net_dkg.cpp \ llmq/net_quorum.cpp \ llmq/net_signing.cpp \ llmq/observer.cpp \ diff --git a/src/active/context.cpp b/src/active/context.cpp index c89162df6a7e..5a05df672e22 100644 --- a/src/active/context.cpp +++ b/src/active/context.cpp @@ -4,7 +4,6 @@ #include -#include #include #include #include @@ -26,19 +25,18 @@ #include ActiveContext::ActiveContext(CBLSWorker& bls_worker, ChainstateManager& chainman, CConnman& connman, - CDeterministicMNManager& dmnman, CGovernanceManager& govman, CMasternodeMetaMan& mn_metaman, - CSporkManager& sporkman, const chainlock::Chainlocks& chainlocks, CTxMemPool& mempool, + CDeterministicMNManager& dmnman, CGovernanceManager& govman, CSporkManager& sporkman, + const chainlock::Chainlocks& chainlocks, CTxMemPool& mempool, chainlock::ChainlockHandler& clhandler, llmq::CInstantSendManager& isman, - llmq::CQuorumBlockProcessor& qblockman, llmq::CQuorumManager& qman, - llmq::CQuorumSnapshotManager& qsnapman, llmq::CSigningManager& sigman, - const CMasternodeSync& mn_sync, const CBLSSecretKey& operator_sk, - const util::DbWrapperParams& db_params, bool quorums_watch) : + llmq::CQuorumManager& qman, llmq::CQuorumSnapshotManager& qsnapman, + llmq::CSigningManager& sigman, const CMasternodeSync& mn_sync, + const CBLSSecretKey& operator_sk, const util::DbWrapperParams& db_params, bool quorums_watch) : llmq::QuorumRole{qman}, m_bls_worker{bls_worker}, m_quorums_watch{quorums_watch}, nodeman{std::make_unique(connman, dmnman, operator_sk)}, dkgdbgman{std::make_unique(dmnman, qsnapman, chainman)}, - qdkgsman{std::make_unique(dmnman, qsnapman, chainman, sporkman, db_params, quorums_watch)}, + qdkgsman{std::make_unique(dmnman, qsnapman, chainman, sporkman, db_params)}, shareman{std::make_unique(connman, chainman, sigman, *nodeman, qman, sporkman)}, gov_signer{std::make_unique(connman, dmnman, govman, *nodeman, chainman, mn_sync)}, ehf_sighandler{std::make_unique(chainman, sigman, *shareman, qman)}, @@ -47,23 +45,12 @@ ActiveContext::ActiveContext(CBLSWorker& bls_worker, ChainstateManager& chainman is_signer{std::make_unique(chainman.ActiveChainstate(), chainlocks, isman, sigman, *shareman, qman, sporkman, mempool, mn_sync)} { - qdkgsman->InitializeHandlers([&](const Consensus::LLMQParams& llmq_params, - int quorum_idx) -> std::unique_ptr { - return std::make_unique(bls_worker, dmnman, mn_metaman, *dkgdbgman, *qdkgsman, - qblockman, qsnapman, *nodeman, chainman, sporkman, - llmq_params, quorums_watch, quorum_idx); - }); - m_qman.ConnectManagers(this, qdkgsman.get()); } -ActiveContext::~ActiveContext() -{ - m_qman.DisconnectManagers(); -} +ActiveContext::~ActiveContext() = default; -void ActiveContext::Start(CConnman& connman, PeerManager& peerman) +void ActiveContext::Start() { - qdkgsman->StartThreads(connman, peerman); cl_signer->Start(); cl_signer->RegisterRecoveryInterface(); is_signer->RegisterRecoveryInterface(); @@ -80,7 +67,6 @@ void ActiveContext::Stop() is_signer->UnregisterRecoveryInterface(); cl_signer->UnregisterRecoveryInterface(); cl_signer->Stop(); - qdkgsman->StopThreads(); } CCoinJoinServer& ActiveContext::GetCJServer() const diff --git a/src/active/context.h b/src/active/context.h index 44b71abc8af7..a7d55dc87e66 100644 --- a/src/active/context.h +++ b/src/active/context.h @@ -19,13 +19,11 @@ class CBLSWorker; class CCoinJoinServer; class CConnman; class CGovernanceManager; -class CMasternodeMetaMan; class CMasternodeSync; class CMNHFManager; class CSporkManager; class CTxMemPool; class GovernanceSigner; -class PeerManager; namespace chainlock { class Chainlocks; class ChainlockHandler; @@ -55,16 +53,15 @@ struct ActiveContext final : public llmq::QuorumRole, public CValidationInterfac ActiveContext(const ActiveContext&) = delete; ActiveContext& operator=(const ActiveContext&) = delete; explicit ActiveContext(CBLSWorker& bls_worker, ChainstateManager& chainman, CConnman& connman, - CDeterministicMNManager& dmnman, CGovernanceManager& govman, CMasternodeMetaMan& mn_metaman, - CSporkManager& sporkman, const chainlock::Chainlocks& chainlocks, CTxMemPool& mempool, + CDeterministicMNManager& dmnman, CGovernanceManager& govman, CSporkManager& sporkman, + const chainlock::Chainlocks& chainlocks, CTxMemPool& mempool, chainlock::ChainlockHandler& clhandler, llmq::CInstantSendManager& isman, - llmq::CQuorumBlockProcessor& qblockman, llmq::CQuorumManager& qman, - llmq::CQuorumSnapshotManager& qsnapman, llmq::CSigningManager& sigman, - const CMasternodeSync& mn_sync, const CBLSSecretKey& operator_sk, - const util::DbWrapperParams& db_params, bool quorums_watch); + llmq::CQuorumManager& qman, llmq::CQuorumSnapshotManager& qsnapman, + llmq::CSigningManager& sigman, const CMasternodeSync& mn_sync, + const CBLSSecretKey& operator_sk, const util::DbWrapperParams& db_params, bool quorums_watch); ~ActiveContext(); - void Start(CConnman& connman, PeerManager& peerman); + void Start(); void Stop(); CCoinJoinServer& GetCJServer() const; diff --git a/src/active/dkgsession.cpp b/src/active/dkgsession.cpp index 00f037f8088b..8a66f74766bd 100644 --- a/src/active/dkgsession.cpp +++ b/src/active/dkgsession.cpp @@ -36,10 +36,10 @@ ActiveDKGSession::ActiveDKGSession(CBLSWorker& bls_worker, CDeterministicMNManag ActiveDKGSession::~ActiveDKGSession() = default; -void ActiveDKGSession::Contribute(CDKGPendingMessages& pendingMessages, PeerManager& peerman) +std::optional ActiveDKGSession::Contribute() { if (!AreWeMember()) { - return; + return std::nullopt; } assert(params.threshold > 1); // we should not get there with single-node-quorums @@ -51,15 +51,15 @@ void ActiveDKGSession::Contribute(CDKGPendingMessages& pendingMessages, PeerMana if (!blsWorker.GenerateContributions(params.threshold, memberIds, vvecContribution, m_sk_contributions)) { // this should never happen actually logger.Batch("GenerateContributions failed"); - return; + return std::nullopt; } logger.Batch("generated contributions. time=%d", t1.count()); logger.Flush(); - SendContributions(pendingMessages, peerman); + return SendContributions(); } -void ActiveDKGSession::SendContributions(CDKGPendingMessages& pendingMessages, PeerManager& peerman) +std::optional ActiveDKGSession::SendContributions() { CDKGLogger logger(*this, __func__, __LINE__); @@ -69,7 +69,7 @@ void ActiveDKGSession::SendContributions(CDKGPendingMessages& pendingMessages, P if (ShouldSimulateError(DKGError::type::CONTRIBUTION_OMIT)) { logger.Batch("omitting"); - return; + return std::nullopt; } CDKGContribution qc; @@ -93,7 +93,7 @@ void ActiveDKGSession::SendContributions(CDKGPendingMessages& pendingMessages, P if (!qc.contributions->Encrypt(i, m->dmn->pdmnState->pubKeyOperator.Get(), skContrib, PROTOCOL_VERSION)) { logger.Batch("failed to encrypt contribution for %s", m->dmn->proTxHash.ToString()); - return; + return std::nullopt; } } @@ -108,7 +108,7 @@ void ActiveDKGSession::SendContributions(CDKGPendingMessages& pendingMessages, P return true; }); - pendingMessages.PushPendingMessage(-1, qc, peerman); + return qc; } // Verifies all pending secret key contributions in one batch @@ -170,10 +170,10 @@ void ActiveDKGSession::VerifyPendingContributions() pendingContributionVerifications.clear(); } -void ActiveDKGSession::VerifyAndComplain(CConnman& connman, CDKGPendingMessages& pendingMessages, PeerManager& peerman) +std::optional ActiveDKGSession::VerifyAndComplain(CConnman& connman) { if (!AreWeMember()) { - return; + return std::nullopt; } { @@ -208,7 +208,7 @@ void ActiveDKGSession::VerifyAndComplain(CConnman& connman, CDKGPendingMessages& VerifyConnectionAndMinProtoVersions(connman); - SendComplaint(pendingMessages, peerman); + return SendComplaint(); } void ActiveDKGSession::VerifyConnectionAndMinProtoVersions(CConnman& connman) const @@ -255,7 +255,7 @@ void ActiveDKGSession::VerifyConnectionAndMinProtoVersions(CConnman& connman) co } } -void ActiveDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages, PeerManager& peerman) +std::optional ActiveDKGSession::SendComplaint() { CDKGLogger logger(*this, __func__, __LINE__); @@ -280,7 +280,7 @@ void ActiveDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages, PeerM } if (badCount == 0 && complaintCount == 0) { - return; + return std::nullopt; } logger.Batch("sending complaint. badCount=%d, complaintCount=%d", badCount, complaintCount); @@ -294,13 +294,13 @@ void ActiveDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages, PeerM return true; }); - pendingMessages.PushPendingMessage(-1, qc, peerman); + return qc; } -void ActiveDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages, PeerManager& peerman) +std::optional ActiveDKGSession::VerifyAndJustify() { if (!AreWeMember()) { - return; + return std::nullopt; } CDKGLogger logger(*this, __func__, __LINE__); @@ -333,13 +333,13 @@ void ActiveDKGSession::VerifyAndJustify(CDKGPendingMessages& pendingMessages, Pe } logger.Flush(); - if (!justifyFor.empty()) { - SendJustification(pendingMessages, peerman, justifyFor); + if (justifyFor.empty()) { + return std::nullopt; } + return SendJustification(justifyFor); } -void ActiveDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, - const Uint256HashSet& forMembers) +std::optional ActiveDKGSession::SendJustification(const Uint256HashSet& forMembers) { CDKGLogger logger(*this, __func__, __LINE__); @@ -372,7 +372,7 @@ void ActiveDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, P if (ShouldSimulateError(DKGError::type::JUSTIFY_OMIT)) { logger.Batch("omitting"); - return; + return std::nullopt; } qj.sig = m_mn_activeman.Sign(qj.GetSignHash(), m_use_legacy_bls); @@ -384,13 +384,13 @@ void ActiveDKGSession::SendJustification(CDKGPendingMessages& pendingMessages, P return true; }); - pendingMessages.PushPendingMessage(-1, qj, peerman); + return qj; } -void ActiveDKGSession::VerifyAndCommit(CDKGPendingMessages& pendingMessages, PeerManager& peerman) +std::optional ActiveDKGSession::VerifyAndCommit() { if (!AreWeMember()) { - return; + return std::nullopt; } CDKGLogger logger(*this, __func__, __LINE__); @@ -429,10 +429,10 @@ void ActiveDKGSession::VerifyAndCommit(CDKGPendingMessages& pendingMessages, Pee logger.Flush(); - SendCommitment(pendingMessages, peerman); + return SendCommitment(); } -void ActiveDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages, PeerManager& peerman) +std::optional ActiveDKGSession::SendCommitment() { CDKGLogger logger(*this, __func__, __LINE__); @@ -454,12 +454,12 @@ void ActiveDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages, Peer if (qc.CountValidMembers() < params.minSize) { logger.Batch("not enough valid members. not sending commitment"); - return; + return std::nullopt; } if (ShouldSimulateError(DKGError::type::COMMIT_OMIT)) { logger.Batch("omitting"); - return; + return std::nullopt; } cxxtimer::Timer timerTotal(true); @@ -470,13 +470,13 @@ void ActiveDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages, Peer std::vector skContributions; if (!dkgManager.GetVerifiedContributions(params.type, m_quorum_base_block_index, qc.validMembers, memberIndexes, vvecs, skContributions)) { logger.Batch("failed to get valid contributions"); - return; + return std::nullopt; } BLSVerificationVectorPtr vvec = cache.BuildQuorumVerificationVector(::SerializeHash(memberIndexes), vvecs); if (vvec == nullptr) { logger.Batch("failed to build quorum verification vector"); - return; + return std::nullopt; } t1.stop(); @@ -484,7 +484,7 @@ void ActiveDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages, Peer CBLSSecretKey skShare = cache.AggregateSecretKeys(::SerializeHash(memberIndexes), skContributions); if (!skShare.IsValid()) { logger.Batch("failed to build own secret share"); - return; + return std::nullopt; } t2.stop(); @@ -541,7 +541,7 @@ void ActiveDKGSession::SendCommitment(CDKGPendingMessages& pendingMessages, Peer return true; }); - pendingMessages.PushPendingMessage(-1, qc, peerman); + return qc; } std::vector ActiveDKGSession::FinalizeCommitments() diff --git a/src/active/dkgsession.h b/src/active/dkgsession.h index 3350a06c6b2d..48ca59dd7429 100644 --- a/src/active/dkgsession.h +++ b/src/active/dkgsession.h @@ -7,6 +7,12 @@ #include +#include + +class CActiveMasternodeManager; +class CSporkManager; +class CMasternodeMetaMan; + namespace llmq { class ActiveDKGSession final : public llmq::CDKGSession { @@ -29,25 +35,22 @@ class ActiveDKGSession final : public llmq::CDKGSession public: // Phase 1: contribution - void Contribute(CDKGPendingMessages& pendingMessages, PeerManager& peerman) override; - void SendContributions(CDKGPendingMessages& pendingMessages, PeerManager& peerman) override; + std::optional Contribute() override; + std::optional SendContributions() override; void VerifyPendingContributions() override EXCLUSIVE_LOCKS_REQUIRED(cs_pending); // Phase 2: complaint - void VerifyAndComplain(CConnman& connman, CDKGPendingMessages& pendingMessages, PeerManager& peerman) override - EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); + std::optional VerifyAndComplain(CConnman& connman) override EXCLUSIVE_LOCKS_REQUIRED(!cs_pending); void VerifyConnectionAndMinProtoVersions(CConnman& connman) const override; - void SendComplaint(CDKGPendingMessages& pendingMessages, PeerManager& peerman) override; + std::optional SendComplaint() override; // Phase 3: justification - void VerifyAndJustify(CDKGPendingMessages& pendingMessages, PeerManager& peerman) override - EXCLUSIVE_LOCKS_REQUIRED(!invCs); - void SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, - const Uint256HashSet& forMembers) override; + std::optional VerifyAndJustify() override EXCLUSIVE_LOCKS_REQUIRED(!invCs); + std::optional SendJustification(const Uint256HashSet& forMembers) override; // Phase 4: commit - void VerifyAndCommit(CDKGPendingMessages& pendingMessages, PeerManager& peerman) override; - void SendCommitment(CDKGPendingMessages& pendingMessages, PeerManager& peerman) override; + std::optional VerifyAndCommit() override; + std::optional SendCommitment() override; // Phase 5: aggregate/finalize std::vector FinalizeCommitments() EXCLUSIVE_LOCKS_REQUIRED(!invCs) override; diff --git a/src/active/dkgsessionhandler.cpp b/src/active/dkgsessionhandler.cpp index 9b796d796516..b1227527a900 100644 --- a/src/active/dkgsessionhandler.cpp +++ b/src/active/dkgsessionhandler.cpp @@ -6,17 +6,14 @@ #include #include -#include -#include #include #include #include -#include +#include #include #include -#include -#include +#include namespace llmq { ActiveDKGSessionHandler::ActiveDKGSessionHandler( @@ -69,24 +66,7 @@ void ActiveDKGSessionHandler::UpdatedBlockTip(const CBlockIndex* pindexNew) params.name, quorumIndex, currentHeight, pQuorumBaseBlockIndex->nHeight, std23::to_underlying(oldPhase), std23::to_underlying(phase)); } -void ActiveDKGSessionHandler::StartThread(CConnman& connman, PeerManager& peerman) -{ - if (phaseHandlerThread.joinable()) { - throw std::runtime_error("Tried to start an already started ActiveDKGSessionHandler thread."); - } - - m_thread_name = strprintf("llmq-%d-%d", std23::to_underlying(params.type), quorumIndex); - phaseHandlerThread = std::thread(&util::TraceThread, m_thread_name.c_str(), - [this, &connman, &peerman] { PhaseHandlerThread(connman, peerman); }); -} - -void ActiveDKGSessionHandler::StopThread() -{ - stopRequested = true; - if (phaseHandlerThread.joinable()) { - phaseHandlerThread.join(); - } -} +uint256 ActiveDKGSessionHandler::GetCurrentQuorumHash() const { return WITH_LOCK(cs_phase_qhash, return quorumHash); } std::pair ActiveDKGSessionHandler::GetPhaseAndQuorumHash() const { @@ -114,9 +94,6 @@ bool ActiveDKGSessionHandler::InitNewQuorum(gsl::not_null pQ return true; } -class AbortPhaseException : public std::exception { -}; - void ActiveDKGSessionHandler::WaitForNextPhase(std::optional curPhase, QuorumPhase nextPhase, const uint256& expectedQuorumHash, const WhileWaitFunc& shouldNotWait) const { @@ -249,282 +226,6 @@ void ActiveDKGSessionHandler::HandlePhase(QuorumPhase curPhase, QuorumPhase next LogPrint(BCLog::LLMQ_DKG, "ActiveDKGSessionHandler::%s -- %s qi[%d] - done, curPhase=%d, nextPhase=%d\n", __func__, params.name, quorumIndex, std23::to_underlying(curPhase), std23::to_underlying(nextPhase)); } -// returns a set of NodeIds which sent invalid messages -template -std::unordered_set BatchVerifyMessageSigs(CDKGSession& session, const std::vector>>& messages) -{ - if (messages.empty()) { - return {}; - } - - std::unordered_set ret; - bool revertToSingleVerification = false; - - CBLSSignature aggSig; - std::vector pubKeys; - std::vector messageHashes; - Uint256HashSet messageHashesSet; - pubKeys.reserve(messages.size()); - messageHashes.reserve(messages.size()); - bool first = true; - for (const auto& [nodeId, msg] : messages) { - auto member = session.GetMember(msg->proTxHash); - if (!member) { - // should not happen as it was verified before - ret.emplace(nodeId); - continue; - } - - if (first) { - aggSig = msg->sig; - } else { - aggSig.AggregateInsecure(msg->sig); - } - first = false; - - auto msgHash = msg->GetSignHash(); - if (!messageHashesSet.emplace(msgHash).second) { - // can only happen in 2 cases: - // 1. Someone sent us the same message twice but with differing signature, meaning that at least one of them - // must be invalid. In this case, we'd have to revert to single message verification nevertheless - // 2. Someone managed to find a way to create two different binary representations of a message that deserializes - // to the same object representation. This would be some form of malleability. However, this shouldn't be - // possible as only deterministic/unique BLS signatures and very simple data types are involved - revertToSingleVerification = true; - break; - } - - pubKeys.emplace_back(member->dmn->pdmnState->pubKeyOperator.Get()); - messageHashes.emplace_back(msgHash); - } - if (!revertToSingleVerification) { - if (aggSig.VerifyInsecureAggregated(pubKeys, messageHashes)) { - // all good - return ret; - } - - // are all messages from the same node? - bool nodeIdsAllSame = std::adjacent_find( messages.begin(), messages.end(), [](const auto& first, const auto& second){ - return first.first != second.first; - }) == messages.end(); - - // if yes, take a short path and return a set with only him - if (nodeIdsAllSame) { - ret.emplace(messages[0].first); - return ret; - } - // different nodes, let's figure out who are the bad ones - } - - for (const auto& [nodeId, msg] : messages) { - if (ret.count(nodeId)) { - continue; - } - - auto member = session.GetMember(msg->proTxHash); - bool valid = msg->sig.VerifyInsecure(member->dmn->pdmnState->pubKeyOperator.Get(), msg->GetSignHash()); - if (!valid) { - ret.emplace(nodeId); - } - } - return ret; -} - -static void RelayInvToParticipants(const CDKGSession& session, const CConnman& connman, PeerManager& peerman, - const CInv& inv) -{ - CDKGLogger logger(session, __func__, __LINE__); - std::stringstream ss; - const auto& relayMembers = session.RelayMembers(); - for (const auto& r : relayMembers) { - ss << r.ToString().substr(0, 4) << " | "; - } - logger.Batch("RelayInvToParticipants inv[%s] relayMembers[%d] GetNodeCount[%d] GetNetworkActive[%d] " - "HasMasternodeQuorumNodes[%d] for quorumHash[%s] forMember[%s] relayMembers[%s]", - inv.ToString(), relayMembers.size(), connman.GetNodeCount(ConnectionDirection::Both), - connman.GetNetworkActive(), - connman.HasMasternodeQuorumNodes(session.GetType(), session.BlockIndex()->GetBlockHash()), - session.BlockIndex()->GetBlockHash().ToString(), session.ProTx().ToString().substr(0, 4), ss.str()); - - std::stringstream ss2; - connman.ForEachNode([&](const CNode* pnode) { - if (pnode->qwatch || - (!pnode->GetVerifiedProRegTxHash().IsNull() && (relayMembers.count(pnode->GetVerifiedProRegTxHash()) != 0))) { - peerman.PushInventory(pnode->GetId(), inv); - } - - if (pnode->GetVerifiedProRegTxHash().IsNull()) { - logger.Batch("node[%d:%s] not mn", pnode->GetId(), pnode->m_addr_name); - } else if (relayMembers.count(pnode->GetVerifiedProRegTxHash()) == 0) { - ss2 << pnode->GetVerifiedProRegTxHash().ToString().substr(0, 4) << " | "; - } - }); - logger.Batch("forMember[%s] NOTrelayMembers[%s]", session.ProTx().ToString().substr(0, 4), ss2.str()); - logger.Flush(); -} - -template -bool ProcessPendingMessageBatch(const CConnman& connman, CDKGSession& session, CDKGPendingMessages& pendingMessages, - PeerManager& peerman, size_t maxCount) -{ - auto msgs = pendingMessages.PopAndDeserializeMessages(maxCount); - if (msgs.empty()) { - return false; - } - - std::vector>> preverifiedMessages; - preverifiedMessages.reserve(msgs.size()); - - for (const auto& p : msgs) { - const NodeId &nodeId = p.first; - if (!p.second) { - LogPrint(BCLog::LLMQ_DKG, "%s -- failed to deserialize message, peer=%d\n", __func__, nodeId); - { - pendingMessages.Misbehaving(nodeId, 100, peerman); - } - continue; - } - bool ban = false; - if (!session.PreVerifyMessage(*p.second, ban)) { - if (ban) { - LogPrint(BCLog::LLMQ_DKG, "%s -- banning node due to failed preverification, peer=%d\n", __func__, nodeId); - { - pendingMessages.Misbehaving(nodeId, 100, peerman); - } - } - LogPrint(BCLog::LLMQ_DKG, "%s -- skipping message due to failed preverification, peer=%d\n", __func__, nodeId); - continue; - } - preverifiedMessages.emplace_back(p); - } - if (preverifiedMessages.empty()) { - return true; - } - - auto badNodes = BatchVerifyMessageSigs(session, preverifiedMessages); - if (!badNodes.empty()) { - for (auto nodeId : badNodes) { - LogPrint(BCLog::LLMQ_DKG, "%s -- failed to verify signature, peer=%d\n", __func__, nodeId); - pendingMessages.Misbehaving(nodeId, 100, peerman); - } - } - - for (const auto& p : preverifiedMessages) { - const NodeId &nodeId = p.first; - if (badNodes.count(nodeId)) { - continue; - } - const std::optional inv = session.ReceiveMessage(*p.second); - if (inv) { - RelayInvToParticipants(session, connman, peerman, *inv); - } - } - - return true; -} - -void ActiveDKGSessionHandler::HandleDKGRound(CConnman& connman, PeerManager& peerman) -{ - WaitForNextPhase(std::nullopt, QuorumPhase::Initialized); - - pendingContributions.Clear(); - pendingComplaints.Clear(); - pendingJustifications.Clear(); - pendingPrematureCommitments.Clear(); - uint256 curQuorumHash = WITH_LOCK(cs_phase_qhash, return quorumHash); - - const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(::cs_main, - return m_chainman.m_blockman.LookupBlockIndex(curQuorumHash)); - - if (!pQuorumBaseBlockIndex || !InitNewQuorum(pQuorumBaseBlockIndex)) { - // should actually never happen - WaitForNewQuorum(curQuorumHash); - throw AbortPhaseException(); - } - - m_dkgdbgman.UpdateLocalSessionStatus(params.type, quorumIndex, [&](CDKGDebugSessionStatus& status) { - bool changed = status.phase != QuorumPhase::Initialized; - status.phase = QuorumPhase::Initialized; - return changed; - }); - - if (params.is_single_member()) { - auto finalCommitment = curSession->FinalizeSingleCommitment(); - if (!finalCommitment.IsNull()) { // it can be null only if we are not member - if (auto inv_opt = m_qblockman.AddMineableCommitment(finalCommitment); inv_opt.has_value()) { - peerman.RelayInv(inv_opt.value()); - } - } - WaitForNextPhase(QuorumPhase::Initialized, QuorumPhase::Contribute, curQuorumHash); - return; - } - - const auto tip_mn_list = m_dmnman.GetListAtChainTip(); - utils::EnsureQuorumConnections(params, connman, m_sporkman, {m_dmnman, m_qsnapman, m_chainman, pQuorumBaseBlockIndex}, - tip_mn_list, curSession->ProTx(), /*is_masternode=*/true, m_quorums_watch); - if (curSession->AreWeMember()) { - utils::AddQuorumProbeConnections(params, connman, m_mn_metaman, m_sporkman, - {m_dmnman, m_qsnapman, m_chainman, pQuorumBaseBlockIndex}, tip_mn_list, - curSession->ProTx()); - } - - WaitForNextPhase(QuorumPhase::Initialized, QuorumPhase::Contribute, curQuorumHash); - - // Contribute - auto fContributeStart = [this, &peerman]() { curSession->Contribute(pendingContributions, peerman); }; - auto fContributeWait = [this, &connman, &peerman] { - return ProcessPendingMessageBatch(connman, *curSession, pendingContributions, peerman, 8); - }; - HandlePhase(QuorumPhase::Contribute, QuorumPhase::Complain, curQuorumHash, 0.05, fContributeStart, fContributeWait); - - // Complain - auto fComplainStart = [this, &connman, &peerman]() { - curSession->VerifyAndComplain(connman, pendingComplaints, peerman); - }; - auto fComplainWait = [this, &connman, &peerman] { - return ProcessPendingMessageBatch(connman, *curSession, pendingComplaints, peerman, 8); - }; - HandlePhase(QuorumPhase::Complain, QuorumPhase::Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait); - - // Justify - auto fJustifyStart = [this, &peerman]() { curSession->VerifyAndJustify(pendingJustifications, peerman); }; - auto fJustifyWait = [this, &connman, &peerman] { - return ProcessPendingMessageBatch(connman, *curSession, pendingJustifications, peerman, 8); - }; - HandlePhase(QuorumPhase::Justify, QuorumPhase::Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait); - - // Commit - auto fCommitStart = [this, &peerman]() { curSession->VerifyAndCommit(pendingPrematureCommitments, peerman); }; - auto fCommitWait = [this, &connman, &peerman] { - return ProcessPendingMessageBatch(connman, *curSession, pendingPrematureCommitments, - peerman, 8); - }; - HandlePhase(QuorumPhase::Commit, QuorumPhase::Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait); - - auto finalCommitments = curSession->FinalizeCommitments(); - for (const auto& fqc : finalCommitments) { - if (auto inv_opt = m_qblockman.AddMineableCommitment(fqc); inv_opt.has_value()) { - peerman.RelayInv(inv_opt.value()); - } - } -} - -void ActiveDKGSessionHandler::PhaseHandlerThread(CConnman& connman, PeerManager& peerman) -{ - while (!stopRequested) { - try { - LogPrint(BCLog::LLMQ_DKG, "ActiveDKGSessionHandler::%s -- %s qi[%d] - starting HandleDKGRound\n", __func__, params.name, quorumIndex); - HandleDKGRound(connman, peerman); - } catch (AbortPhaseException& e) { - m_dkgdbgman.UpdateLocalSessionStatus(params.type, quorumIndex, [&](CDKGDebugSessionStatus& status) { - status.statusBits.aborted = true; - return true; - }); - LogPrint(BCLog::LLMQ_DKG, "ActiveDKGSessionHandler::%s -- %s qi[%d] - aborted current DKG session\n", __func__, params.name, quorumIndex); - } - } -} - bool ActiveDKGSessionHandler::GetContribution(const uint256& hash, CDKGContribution& ret) const { return curSession && curSession->GetContribution(hash, ret); diff --git a/src/active/dkgsessionhandler.h b/src/active/dkgsessionhandler.h index 845a50d74461..349cab171e35 100644 --- a/src/active/dkgsessionhandler.h +++ b/src/active/dkgsessionhandler.h @@ -10,21 +10,18 @@ #include #include +#include #include #include #include -#include -#include class CActiveMasternodeManager; class CBLSWorker; class CBlockIndex; -class CConnman; class ChainstateManager; class CDeterministicMNManager; class CMasternodeMetaMan; class CSporkManager; -class PeerManager; namespace Consensus { struct LLMQParams; } // namespace Consensus @@ -37,6 +34,11 @@ class CQuorumSnapshotManager; } // namespace llmq namespace llmq { +//! Thrown by Wait*, Sleep* and HandlePhase to bail out of the current DKG round. +class AbortPhaseException : public std::exception +{ +}; + class ActiveDKGSessionHandler final : public llmq::CDKGSessionHandler { using StartPhaseFunc = std::function; @@ -59,8 +61,6 @@ class ActiveDKGSessionHandler final : public llmq::CDKGSessionHandler private: std::atomic stopRequested{false}; std::atomic currentHeight{-1}; - std::string m_thread_name; - std::thread phaseHandlerThread; std::unique_ptr curSession{nullptr}; mutable Mutex cs_phase_qhash; @@ -86,12 +86,20 @@ class ActiveDKGSessionHandler final : public llmq::CDKGSessionHandler bool GetJustification(const uint256& hash, CDKGJustification& ret) const override; bool GetPrematureCommitment(const uint256& hash, CDKGPrematureCommitment& ret) const override; QuorumPhase GetPhase() const override EXCLUSIVE_LOCKS_REQUIRED(!cs_phase_qhash); - void StartThread(CConnman& connman, PeerManager& peerman) override; - void StopThread() override; void UpdatedBlockTip(const CBlockIndex* pindexNew) override EXCLUSIVE_LOCKS_REQUIRED(!cs_phase_qhash); -private: - std::pair GetPhaseAndQuorumHash() const EXCLUSIVE_LOCKS_REQUIRED(!cs_phase_qhash); + /* + * Phase-thread interface, called by NetDKG. Wait/Sleep/HandlePhase throw + * AbortPhaseException when stopRequested is set or when an unexpected + * phase change is observed. + */ + int QuorumIndex() const { return quorumIndex; } + bool QuorumsWatch() const { return m_quorums_watch; } + uint256 GetCurrentQuorumHash() const EXCLUSIVE_LOCKS_REQUIRED(!cs_phase_qhash); + CDKGSession* GetCurSession() { return curSession.get(); } + + void RequestStop() { stopRequested = true; } + bool IsStopRequested() const { return stopRequested; } bool InitNewQuorum(gsl::not_null pQuorumBaseBlockIndex); @@ -110,8 +118,9 @@ class ActiveDKGSessionHandler final : public llmq::CDKGSessionHandler void HandlePhase(QuorumPhase curPhase, QuorumPhase nextPhase, const uint256& expectedQuorumHash, double randomSleepFactor, const StartPhaseFunc& startPhaseFunc, const WhileWaitFunc& runWhileWaiting) EXCLUSIVE_LOCKS_REQUIRED(!cs_phase_qhash); - void HandleDKGRound(CConnman& connman, PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_phase_qhash); - void PhaseHandlerThread(CConnman& connman, PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_phase_qhash); + +private: + std::pair GetPhaseAndQuorumHash() const EXCLUSIVE_LOCKS_REQUIRED(!cs_phase_qhash); }; } // namespace llmq diff --git a/src/init.cpp b/src/init.cpp index 113bd4c9f025..b31dfb9c426a 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -98,6 +98,7 @@ #include #include #include +#include #include #include #include @@ -2173,18 +2174,6 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.clhandler = std::make_unique(*node.chainlocks, chainman, *node.mempool, *node.mn_sync); RegisterValidationInterface(node.clhandler.get()); - assert(!node.peerman); - node.peerman = PeerManager::make(chainparams, *node.connman, *node.addrman, node.banman.get(), *node.dstxman, - chainman, *node.mempool, *node.mn_metaman, *node.mn_sync, - *node.govman, *node.sporkman, *node.chainlocks, *node.clhandler, node.active_ctx, node.dmnman, - node.cj_walletman, node.llmq_ctx, node.observer_ctx, ignores_incoming_txs); - RegisterValidationInterface(node.peerman.get()); - - g_ds_notification_interface = std::make_unique( - *node.connman, *node.dstxman, *node.mn_sync, *node.govman, chainman, node.dmnman // todo: replace unique_ptr for dmnman to reference - ); - RegisterValidationInterface(g_ds_notification_interface.get()); - // ********************************************************* Step 7c: Setup masternode mode or watch-only mode assert(!node.active_ctx); assert(!node.observer_ctx); @@ -2199,18 +2188,30 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) return InitError(_("Invalid masternodeblsprivkey. Please see documentation.")); } // Will init later in ThreadImport - node.active_ctx = std::make_unique(*node.llmq_ctx->bls_worker, chainman, *node.connman, *node.dmnman, *node.govman, *node.mn_metaman, + node.active_ctx = std::make_unique(*node.llmq_ctx->bls_worker, chainman, *node.connman, *node.dmnman, *node.govman, *node.sporkman, *node.chainlocks, *node.mempool, *node.clhandler, *node.llmq_ctx->isman, - *node.llmq_ctx->quorum_block_processor, *node.llmq_ctx->qman, *node.llmq_ctx->qsnapman, *node.llmq_ctx->sigman, + *node.llmq_ctx->qman, *node.llmq_ctx->qsnapman, *node.llmq_ctx->sigman, *node.mn_sync, operator_sk, dash_db_params, quorums_watch); RegisterValidationInterface(node.active_ctx.get()); } else if (quorums_watch) { - node.observer_ctx = std::make_unique(*node.llmq_ctx->bls_worker, *node.dmnman, *node.mn_metaman, - *node.llmq_ctx->quorum_block_processor, *node.llmq_ctx->qman, *node.llmq_ctx->qsnapman, + node.observer_ctx = std::make_unique(*node.dmnman, *node.llmq_ctx->qman, *node.llmq_ctx->qsnapman, chainman, *node.sporkman, dash_db_params); RegisterValidationInterface(node.observer_ctx.get()); } + assert(!node.peerman); + node.peerman = PeerManager::make(chainparams, *node.connman, *node.addrman, node.banman.get(), *node.dstxman, + chainman, *node.mempool, *node.mn_metaman, *node.mn_sync, + *node.govman, *node.sporkman, *node.chainlocks, *node.clhandler, + node.active_ctx ? node.active_ctx->nodeman.get() : nullptr, + node.dmnman, node.cj_walletman, node.llmq_ctx, ignores_incoming_txs); + RegisterValidationInterface(node.peerman.get()); + + g_ds_notification_interface = std::make_unique( + *node.connman, *node.dstxman, *node.mn_sync, *node.govman, chainman, node.dmnman // todo: replace unique_ptr for dmnman to reference + ); + RegisterValidationInterface(g_ds_notification_interface.get()); + // ********************************************************* Step 7d: Setup other Dash services node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->isman, node.active_ctx ? node.active_ctx->is_signer.get() : nullptr, *node.llmq_ctx->sigman, *node.llmq_ctx->qman, *node.chainlocks, chainman.ActiveChainstate(), *node.mempool, *node.mn_sync)); @@ -2228,6 +2229,21 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.peerman->AddExtraHandler(std::move(net_quorum)); } + if (node.active_ctx) { + node.peerman->AddExtraHandler(std::make_unique( + node.peerman.get(), *node.sporkman, *node.active_ctx->qdkgsman, chainman, quorums_watch, + *node.llmq_ctx->qman, *node.active_ctx, + *node.llmq_ctx->bls_worker, *node.dmnman, *node.mn_metaman, + *node.active_ctx->dkgdbgman, *node.llmq_ctx->quorum_block_processor, *node.llmq_ctx->qsnapman, + *node.active_ctx->nodeman, *node.connman)); + } else if (node.observer_ctx) { + node.peerman->AddExtraHandler(std::make_unique( + node.peerman.get(), *node.sporkman, *node.observer_ctx->qdkgsman, chainman, /*quorums_watch=*/true, + *node.llmq_ctx->qman, *node.observer_ctx)); + } else { + node.peerman->AddExtraHandler(std::make_unique(node.peerman.get())); + } + if (node.active_ctx) { auto cj_server = std::make_unique(node.peerman.get(), chainman, *node.connman, *node.dmnman, *node.dstxman, *node.mn_metaman, *node.mempool, *node.active_ctx->nodeman, *node.mn_sync, *node.llmq_ctx->isman); @@ -2346,7 +2362,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.peerman->ScheduleHandlers(*node.scheduler); if (node.active_ctx) { - node.active_ctx->Start(*node.connman, *node.peerman); + node.active_ctx->Start(); node.scheduler->scheduleEvery(std::bind(&llmq::CDKGSessionManager::CleanupOldContributions, std::ref(*node.active_ctx->qdkgsman)), std::chrono::hours{1}); } diff --git a/src/llmq/debug.cpp b/src/llmq/debug.cpp index cafa9b3e699c..d3678d61a6df 100644 --- a/src/llmq/debug.cpp +++ b/src/llmq/debug.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -209,4 +210,21 @@ void CDKGDebugManager::UpdateLocalMemberStatus(Consensus::LLMQType llmqType, int } } +void CDKGDebugManager::MarkPhaseAdvanced(Consensus::LLMQType llmqType, int quorumIndex, QuorumPhase newPhase) +{ + UpdateLocalSessionStatus(llmqType, quorumIndex, [&](CDKGDebugSessionStatus& status) { + bool changed = status.phase != newPhase; + status.phase = newPhase; + return changed; + }); +} + +void CDKGDebugManager::MarkAborted(Consensus::LLMQType llmqType, int quorumIndex) +{ + UpdateLocalSessionStatus(llmqType, quorumIndex, [&](CDKGDebugSessionStatus& status) { + status.statusBits.aborted = true; + return true; + }); +} + } // namespace llmq diff --git a/src/llmq/debug.h b/src/llmq/debug.h index 5fef407fa1b1..7ca0e566d313 100644 --- a/src/llmq/debug.h +++ b/src/llmq/debug.h @@ -116,6 +116,12 @@ class CDKGDebugManager std::function&& func) EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); + //! Set the locally tracked phase to @p newPhase if different. + void MarkPhaseAdvanced(Consensus::LLMQType llmqType, int quorumIndex, QuorumPhase newPhase) + EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); + //! Mark the local session as aborted. + void MarkAborted(Consensus::LLMQType llmqType, int quorumIndex) EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); + size_t GetSessionCount() const EXCLUSIVE_LOCKS_REQUIRED(!cs_lockStatus); [[nodiscard]] static RPCResult GetJsonHelp(const std::string& key, bool optional, bool inner_optional = false); diff --git a/src/llmq/dkgsession.h b/src/llmq/dkgsession.h index abe65f6b5e84..1e790e12b165 100644 --- a/src/llmq/dkgsession.h +++ b/src/llmq/dkgsession.h @@ -22,17 +22,9 @@ #include #include -class CActiveMasternodeManager; class CConnman; -class CDeterministicMN; -class CMasternodeMetaMan; -class CSporkManager; -class PeerManager; namespace llmq { -class ActiveDKGSession; -class ActiveDKGSessionHandler; class CDKGDebugManager; -class CDKGPendingMessages; class CDKGSession; class CDKGSessionManager; class CFinalCommitment; @@ -361,29 +353,37 @@ class CDKGSession */ // Phase 1: contribution - virtual void Contribute(CDKGPendingMessages& pendingMessages, PeerManager& peerman) {} - virtual void SendContributions(CDKGPendingMessages& pendingMessages, PeerManager& peerman) {} + virtual std::optional Contribute() { return std::nullopt; } + virtual std::optional SendContributions() { return std::nullopt; } bool PreVerifyMessage(const CDKGContribution& qc, bool& retBan) const; std::optional ReceiveMessage(const CDKGContribution& qc) EXCLUSIVE_LOCKS_REQUIRED(!invCs, !cs_pending); virtual void VerifyPendingContributions() EXCLUSIVE_LOCKS_REQUIRED(cs_pending) {} // Phase 2: complaint - virtual void VerifyAndComplain(CConnman& connman, CDKGPendingMessages& pendingMessages, PeerManager& peerman) - EXCLUSIVE_LOCKS_REQUIRED(!cs_pending) {} + virtual std::optional VerifyAndComplain(CConnman& connman) EXCLUSIVE_LOCKS_REQUIRED(!cs_pending) + { + return std::nullopt; + } virtual void VerifyConnectionAndMinProtoVersions(CConnman& connman) const {} - virtual void SendComplaint(CDKGPendingMessages& pendingMessages, PeerManager& peerman) {} + virtual std::optional SendComplaint() { return std::nullopt; } bool PreVerifyMessage(const CDKGComplaint& qc, bool& retBan) const; std::optional ReceiveMessage(const CDKGComplaint& qc) EXCLUSIVE_LOCKS_REQUIRED(!invCs); // Phase 3: justification - virtual void VerifyAndJustify(CDKGPendingMessages& pendingMessages, PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!invCs) {} - virtual void SendJustification(CDKGPendingMessages& pendingMessages, PeerManager& peerman, const Uint256HashSet& forMembers) {} + virtual std::optional VerifyAndJustify() EXCLUSIVE_LOCKS_REQUIRED(!invCs) + { + return std::nullopt; + } + virtual std::optional SendJustification(const Uint256HashSet& forMembers) + { + return std::nullopt; + } bool PreVerifyMessage(const CDKGJustification& qj, bool& retBan) const; std::optional ReceiveMessage(const CDKGJustification& qj) EXCLUSIVE_LOCKS_REQUIRED(!invCs); // Phase 4: commit - virtual void VerifyAndCommit(CDKGPendingMessages& pendingMessages, PeerManager& peerman) {} - virtual void SendCommitment(CDKGPendingMessages& pendingMessages, PeerManager& peerman) {} + virtual std::optional VerifyAndCommit() { return std::nullopt; } + virtual std::optional SendCommitment() { return std::nullopt; } bool PreVerifyMessage(const CDKGPrematureCommitment& qc, bool& retBan) const; std::optional ReceiveMessage(const CDKGPrematureCommitment& qc) EXCLUSIVE_LOCKS_REQUIRED(!invCs); diff --git a/src/llmq/dkgsessionhandler.cpp b/src/llmq/dkgsessionhandler.cpp index 2c83b07266e5..289324f371f0 100644 --- a/src/llmq/dkgsessionhandler.cpp +++ b/src/llmq/dkgsessionhandler.cpp @@ -26,36 +26,23 @@ CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params) : CDKGSessionHandler::~CDKGSessionHandler() = default; -MessageProcessingResult CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv) +void CDKGPendingMessages::PushPendingMessage(NodeId from, std::shared_ptr pm, const uint256& hash) { - // this will also consume the data, even if we bail out early - auto pm = std::make_shared(std::move(vRecv)); - - CHashWriter hw(SER_GETHASH, 0); - hw.write(AsWritableBytes(Span{*pm})); - uint256 hash = hw.GetHash(); - - MessageProcessingResult ret{}; - if (from != -1) { - ret.m_to_erase = CInv{invType, hash}; - } - LOCK(cs_messages); if (messagesPerNode[from] >= maxMessagesPerNode) { // TODO ban? LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- too many messages, peer=%d\n", __func__, from); - return ret; + return; } messagesPerNode[from]++; if (!seenMessages.emplace(hash).second) { LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, hash.ToString(), from); - return ret; + return; } pendingMessages.emplace_back(std::make_pair(from, std::move(pm))); - return ret; } std::list CDKGPendingMessages::PopPendingMessages(size_t maxCount) @@ -77,12 +64,6 @@ bool CDKGPendingMessages::HasSeen(const uint256& hash) const return seenMessages.count(hash) != 0; } -void CDKGPendingMessages::Misbehaving(const NodeId from, const int score, PeerManager& peerman) -{ - if (from == -1) return; - peerman.Misbehaving(from, score); -} - void CDKGPendingMessages::Clear() { LOCK(cs_messages); @@ -91,20 +72,11 @@ void CDKGPendingMessages::Clear() seenMessages.clear(); } -////// - -MessageProcessingResult CDKGSessionHandler::ProcessMessage(NodeId from, std::string_view msg_type, CDataStream& vRecv) +void CDKGSessionHandler::ClearPendingMessages() { - // We don't handle messages in the calling thread as deserialization/processing of these would block everything - if (msg_type == NetMsgType::QCONTRIB) { - return pendingContributions.PushPendingMessage(from, vRecv); - } else if (msg_type == NetMsgType::QCOMPLAINT) { - return pendingComplaints.PushPendingMessage(from, vRecv); - } else if (msg_type == NetMsgType::QJUSTIFICATION) { - return pendingJustifications.PushPendingMessage(from, vRecv); - } else if (msg_type == NetMsgType::QPCOMMITMENT) { - return pendingPrematureCommitments.PushPendingMessage(from, vRecv); - } - return {}; + pendingContributions.Clear(); + pendingComplaints.Clear(); + pendingJustifications.Clear(); + pendingPrematureCommitments.Clear(); } } // namespace llmq diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index 98b1cf7ac45e..ed3d749913ec 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -5,10 +5,7 @@ #ifndef BITCOIN_LLMQ_DKGSESSIONHANDLER_H #define BITCOIN_LLMQ_DKGSESSIONHANDLER_H -#include - #include // for NodeId -#include #include #include #include @@ -18,13 +15,12 @@ #include #include #include +#include #include #include #include class CBlockIndex; -class CConnman; -class PeerManager; namespace Consensus { struct LLMQParams; @@ -73,21 +69,19 @@ class CDKGPendingMessages explicit CDKGPendingMessages(size_t _maxMessagesPerNode, uint32_t _invType) : invType(_invType), maxMessagesPerNode(_maxMessagesPerNode) {}; - [[nodiscard]] MessageProcessingResult PushPendingMessage(NodeId from, CDataStream& vRecv) + /** + * Enqueue a serialized DKG message under @p from with content hash @p hash. + * Caller is responsible for hashing the payload and (for real peers) + * routing the erase-request to PeerManager. Drops the message silently on + * per-node capacity overflow or duplicate hash. + */ + void PushPendingMessage(NodeId from, std::shared_ptr pm, const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(!cs_messages); + std::list PopPendingMessages(size_t maxCount) EXCLUSIVE_LOCKS_REQUIRED(!cs_messages); bool HasSeen(const uint256& hash) const EXCLUSIVE_LOCKS_REQUIRED(!cs_messages); - void Misbehaving(NodeId from, int score, PeerManager& peerman); void Clear() EXCLUSIVE_LOCKS_REQUIRED(!cs_messages); - template - void PushPendingMessage(NodeId from, Message& msg, PeerManager& peerman) EXCLUSIVE_LOCKS_REQUIRED(!cs_messages) - { - CDataStream ds(SER_NETWORK, PROTOCOL_VERSION); - ds << msg; - peerman.PostProcessMessage(PushPendingMessage(from, ds), from); - } - // Might return nullptr messages, which indicates that deserialization failed for some reason template std::vector>> PopAndDeserializeMessages(size_t maxCount) @@ -116,16 +110,10 @@ class CDKGPendingMessages /** * Handles multiple sequential sessions of one specific LLMQ type. There is one instance of this class per LLMQ type. - * - * It internally starts the phase handler thread, which constantly loops and sequentially processes one session at a - * time and waiting for the next phase if necessary. */ class CDKGSessionHandler { -private: - friend class CDKGSessionManager; - -protected: +public: const Consensus::LLMQParams& params; // Do not guard these, they protect their internals themselves @@ -138,7 +126,7 @@ class CDKGSessionHandler explicit CDKGSessionHandler(const Consensus::LLMQParams& _params); virtual ~CDKGSessionHandler(); - [[nodiscard]] MessageProcessingResult ProcessMessage(NodeId from, std::string_view msg_type, CDataStream& vRecv); + void ClearPendingMessages(); public: virtual bool GetContribution(const uint256& hash, CDKGContribution& ret) const { return false; } @@ -146,8 +134,6 @@ class CDKGSessionHandler virtual bool GetJustification(const uint256& hash, CDKGJustification& ret) const { return false; } virtual bool GetPrematureCommitment(const uint256& hash, CDKGPrematureCommitment& ret) const { return false; } virtual QuorumPhase GetPhase() const { return QuorumPhase::Idle; } - virtual void StartThread(CConnman& connman, PeerManager& peerman) {} - virtual void StopThread() {} virtual void UpdatedBlockTip(const CBlockIndex* pindexNew) {} }; } // namespace llmq diff --git a/src/llmq/dkgsessionmgr.cpp b/src/llmq/dkgsessionmgr.cpp index 76384c8e73ed..eb0486b91d4c 100644 --- a/src/llmq/dkgsessionmgr.cpp +++ b/src/llmq/dkgsessionmgr.cpp @@ -6,8 +6,8 @@ #include #include +#include #include -#include #include #include #include @@ -19,11 +19,6 @@ #include #include -static bool IsQuorumDKGEnabled(const CSporkManager& sporkman) -{ - return sporkman.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED); -} - namespace llmq { static const std::string DB_VVEC = "qdkg_V"; @@ -32,32 +27,17 @@ static const std::string DB_ENC_CONTRIB = "qdkg_E"; CDKGSessionManager::CDKGSessionManager(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman, const CSporkManager& sporkman, - const util::DbWrapperParams& db_params, bool quorums_watch) : + const util::DbWrapperParams& db_params) : m_dmnman{dmnman}, m_qsnapman{qsnapman}, m_chainman{chainman}, m_sporkman{sporkman}, - m_quorums_watch{quorums_watch}, db{util::MakeDbWrapper({db_params.path / "llmq" / "dkgdb", db_params.memory, db_params.wipe, /*cache_size=*/1 << 20})} { } CDKGSessionManager::~CDKGSessionManager() = default; -void CDKGSessionManager::StartThreads(CConnman& connman, PeerManager& peerman) -{ - for (auto& [_, dkgType] : dkgSessionHandlers) { - Assert(dkgType)->StartThread(connman, peerman); - } -} - -void CDKGSessionManager::StopThreads() -{ - for (auto& [_, dkgType] : dkgSessionHandlers) { - Assert(dkgType)->StopThread(); - } -} - void CDKGSessionManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload) { CleanupCache(); @@ -74,117 +54,6 @@ void CDKGSessionManager::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fIni } } -MessageProcessingResult CDKGSessionManager::ProcessMessage(CNode& pfrom, bool is_masternode, std::string_view msg_type, - CDataStream& vRecv) -{ - static Mutex cs_indexedQuorumsCache; - static std::map> indexedQuorumsCache GUARDED_BY(cs_indexedQuorumsCache); - - if (!IsQuorumDKGEnabled(m_sporkman)) - return {}; - - if (msg_type != NetMsgType::QCONTRIB - && msg_type != NetMsgType::QCOMPLAINT - && msg_type != NetMsgType::QJUSTIFICATION - && msg_type != NetMsgType::QPCOMMITMENT - && msg_type != NetMsgType::QWATCH) { - return {}; - } - - if (msg_type == NetMsgType::QWATCH) { - if (!is_masternode) { - // non-masternodes should never receive this - return MisbehavingError{10}; - } - pfrom.qwatch = true; - return {}; - } - - if (!is_masternode && !m_quorums_watch) { - // regular non-watching nodes should never receive any of these - return MisbehavingError{10}; - } - - if (vRecv.empty()) { - return MisbehavingError{100}; - } - - Consensus::LLMQType llmqType; - uint256 quorumHash; - vRecv >> llmqType; - vRecv >> quorumHash; - vRecv.Rewind(sizeof(uint256)); - vRecv.Rewind(sizeof(uint8_t)); - - const auto& llmq_params_opt = Params().GetLLMQ(llmqType); - if (!llmq_params_opt.has_value()) { - LogPrintf("CDKGSessionManager -- invalid llmqType [%d]\n", std23::to_underlying(llmqType)); - return MisbehavingError{100}; - } - const auto& llmq_params = llmq_params_opt.value(); - - int quorumIndex{-1}; - - // First check cache - { - LOCK(cs_indexedQuorumsCache); - if (indexedQuorumsCache.empty()) { - utils::InitQuorumsCache(indexedQuorumsCache, m_chainman.GetConsensus()); - } - indexedQuorumsCache[llmqType].get(quorumHash, quorumIndex); - } - - // No luck, try to compute - if (quorumIndex == -1) { - const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(::cs_main, - return m_chainman.m_blockman.LookupBlockIndex(quorumHash)); - if (pQuorumBaseBlockIndex == nullptr) { - LogPrintf("CDKGSessionManager -- unknown quorumHash %s\n", quorumHash.ToString()); - // NOTE: do not insta-ban for this, we might be lagging behind - return MisbehavingError{10}; - } - - if (!m_chainman.IsQuorumTypeEnabled(llmqType, pQuorumBaseBlockIndex->pprev)) { - LogPrintf("CDKGSessionManager -- llmqType [%d] quorums aren't active\n", std23::to_underlying(llmqType)); - return MisbehavingError{100}; - } - - quorumIndex = pQuorumBaseBlockIndex->nHeight % llmq_params.dkgInterval; - int quorumIndexMax = IsQuorumRotationEnabled(llmq_params, pQuorumBaseBlockIndex) ? - llmq_params.signingActiveQuorumCount - 1 : 0; - - if (quorumIndex > quorumIndexMax) { - LogPrintf("CDKGSessionManager -- invalid quorumHash %s\n", quorumHash.ToString()); - return MisbehavingError{100}; - } - - if (!dkgSessionHandlers.count({llmqType, quorumIndex})) { - LogPrintf("CDKGSessionManager -- no session handlers for quorumIndex [%d]\n", quorumIndex); - return MisbehavingError{100}; - } - } - - assert(quorumIndex != -1); - WITH_LOCK(cs_indexedQuorumsCache, indexedQuorumsCache[llmqType].insert(quorumHash, quorumIndex)); - return Assert(dkgSessionHandlers.at({llmqType, quorumIndex}))->ProcessMessage(pfrom.GetId(), msg_type, vRecv); -} - -bool CDKGSessionManager::AlreadyHave(const CInv& inv) const -{ - if (!IsQuorumDKGEnabled(m_sporkman)) - return false; - - for (const auto& [_, dkgType] : dkgSessionHandlers) { - if (Assert(dkgType)->pendingContributions.HasSeen(inv.hash) - || dkgType->pendingComplaints.HasSeen(inv.hash) - || dkgType->pendingJustifications.HasSeen(inv.hash) - || dkgType->pendingPrematureCommitments.HasSeen(inv.hash)) { - return true; - } - } - return false; -} - bool CDKGSessionManager::GetContribution(const uint256& hash, CDKGContribution& ret) const { if (!IsQuorumDKGEnabled(m_sporkman)) diff --git a/src/llmq/dkgsessionmgr.h b/src/llmq/dkgsessionmgr.h index 23cf14491ddd..759b1c9b67e6 100644 --- a/src/llmq/dkgsessionmgr.h +++ b/src/llmq/dkgsessionmgr.h @@ -6,44 +6,38 @@ #define BITCOIN_LLMQ_DKGSESSIONMGR_H #include -#include -#include -#include +#include +#include +#include +#include #include -#include - #include #include -#include template class CBLSIESMultiRecipientObjects; template class CBLSIESEncryptedObject; -class CActiveMasternodeManager; class CBlockIndex; class CDBWrapper; class CDeterministicMNManager; -class CDKGDebugManager; class ChainstateManager; -class CMasternodeMetaMan; +class CNode; class CSporkManager; -class PeerManager; -class CDKGContribution; -class CDKGComplaint; -class CDKGJustification; -class CDKGPrematureCommitment; namespace util { struct DbWrapperParams; } // namespace util -class UniValue; - namespace llmq { class CQuorumSnapshotManager; +class CDKGSessionHandler; +class CDKGContribution; +class CDKGComplaint; +class CDKGJustification; +class CDKGPrematureCommitment; class CDKGSessionManager { @@ -64,7 +58,6 @@ class CDKGSessionManager CQuorumSnapshotManager& m_qsnapman; const ChainstateManager& m_chainman; const CSporkManager& m_sporkman; - const bool m_quorums_watch{false}; private: std::unique_ptr db{nullptr}; @@ -96,7 +89,7 @@ class CDKGSessionManager CDKGSessionManager& operator=(const CDKGSessionManager&) = delete; explicit CDKGSessionManager(CDeterministicMNManager& dmnman, CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman, const CSporkManager& sporkman, - const util::DbWrapperParams& db_params, bool quorums_watch); + const util::DbWrapperParams& db_params); ~CDKGSessionManager(); template @@ -111,15 +104,36 @@ class CDKGSessionManager } } - void StartThreads(CConnman& connman, PeerManager& peerman); - void StopThreads(); + /** + * Visit every registered handler with @p fn(CDKGSessionHandler&). Used by + * the DKG NetHandler to drive per-handler phase threads. + */ + template + void ForEachHandler(HandlerFn&& fn) + { + for (auto& [_, handler] : dkgSessionHandlers) { + fn(*Assert(handler)); + } + } + + /** + * Invoke @p fn(CDKGSessionHandler&) for the handler registered under @p key, + * if one exists. Returns true iff the handler was found (and the callback ran). + * Returning the handler by reference inside the callback prevents callers from + * keeping a dangling pointer outside the handler's lifetime. + */ + template + bool DoForHandler(const SessionHandlerKey& key, HandlerFn&& fn) + { + auto it = dkgSessionHandlers.find(key); + if (it == dkgSessionHandlers.end()) return false; + fn(*Assert(it->second)); + return true; + } void UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload) EXCLUSIVE_LOCKS_REQUIRED(!contributionsCacheCs); - [[nodiscard]] MessageProcessingResult ProcessMessage(CNode& pfrom, bool is_masternode, std::string_view msg_type, - CDataStream& vRecv); - bool AlreadyHave(const CInv& inv) const; bool GetContribution(const uint256& hash, CDKGContribution& ret) const; bool GetComplaint(const uint256& hash, CDKGComplaint& ret) const; bool GetJustification(const uint256& hash, CDKGJustification& ret) const; diff --git a/src/llmq/net_dkg.cpp b/src/llmq/net_dkg.cpp new file mode 100644 index 000000000000..0f186379f37a --- /dev/null +++ b/src/llmq/net_dkg.cpp @@ -0,0 +1,609 @@ +// Copyright (c) 2018-2025 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace llmq { + +namespace { +// returns a set of NodeIds which sent invalid messages +template +std::unordered_set BatchVerifyMessageSigs(CDKGSession& session, + const std::vector>>& messages) +{ + if (messages.empty()) { + return {}; + } + + std::unordered_set ret; + bool revertToSingleVerification = false; + + CBLSSignature aggSig; + std::vector pubKeys; + std::vector messageHashes; + Uint256HashSet messageHashesSet; + pubKeys.reserve(messages.size()); + messageHashes.reserve(messages.size()); + bool first = true; + for (const auto& [nodeId, msg] : messages) { + auto member = session.GetMember(msg->proTxHash); + if (!member) { + // should not happen as it was verified before + ret.emplace(nodeId); + continue; + } + + if (first) { + aggSig = msg->sig; + } else { + aggSig.AggregateInsecure(msg->sig); + } + first = false; + + auto msgHash = msg->GetSignHash(); + if (!messageHashesSet.emplace(msgHash).second) { + // can only happen in 2 cases: + // 1. Someone sent us the same message twice but with differing signature, meaning that at least one of them + // must be invalid. In this case, we'd have to revert to single message verification nevertheless + // 2. Someone managed to find a way to create two different binary representations of a message that deserializes + // to the same object representation. This would be some form of malleability. However, this shouldn't be + // possible as only deterministic/unique BLS signatures and very simple data types are involved + revertToSingleVerification = true; + break; + } + + pubKeys.emplace_back(member->dmn->pdmnState->pubKeyOperator.Get()); + messageHashes.emplace_back(msgHash); + } + if (!revertToSingleVerification) { + if (aggSig.VerifyInsecureAggregated(pubKeys, messageHashes)) { + // all good + return ret; + } + + // are all messages from the same node? + bool nodeIdsAllSame = std::adjacent_find(messages.begin(), messages.end(), + [](const auto& first, const auto& second) { + return first.first != second.first; + }) == messages.end(); + + // if yes, take a short path and return a set with only him + if (nodeIdsAllSame) { + ret.emplace(messages[0].first); + return ret; + } + // different nodes, let's figure out who are the bad ones + } + + for (const auto& [nodeId, msg] : messages) { + if (ret.count(nodeId)) { + continue; + } + + auto member = session.GetMember(msg->proTxHash); + bool valid = msg->sig.VerifyInsecure(member->dmn->pdmnState->pubKeyOperator.Get(), msg->GetSignHash()); + if (!valid) { + ret.emplace(nodeId); + } + } + return ret; +} + +void RelayInvToParticipants(const CDKGSession& session, const CConnman& connman, PeerManagerInternal& peerman, + const CInv& inv) +{ + CDKGLogger logger(session, __func__, __LINE__); + std::stringstream ss; + const auto& relayMembers = session.RelayMembers(); + for (const auto& r : relayMembers) { + ss << r.ToString().substr(0, 4) << " | "; + } + logger.Batch("RelayInvToParticipants inv[%s] relayMembers[%d] GetNodeCount[%d] GetNetworkActive[%d] " + "HasMasternodeQuorumNodes[%d] for quorumHash[%s] forMember[%s] relayMembers[%s]", + inv.ToString(), relayMembers.size(), connman.GetNodeCount(ConnectionDirection::Both), + connman.GetNetworkActive(), + connman.HasMasternodeQuorumNodes(session.GetType(), session.BlockIndex()->GetBlockHash()), + session.BlockIndex()->GetBlockHash().ToString(), session.ProTx().ToString().substr(0, 4), ss.str()); + + std::stringstream ss2; + connman.ForEachNode([&](const CNode* pnode) { + if (pnode->qwatch || + (!pnode->GetVerifiedProRegTxHash().IsNull() && (relayMembers.count(pnode->GetVerifiedProRegTxHash()) != 0))) { + peerman.PeerPushInventory(pnode->GetId(), inv); + } + + if (pnode->GetVerifiedProRegTxHash().IsNull()) { + logger.Batch("node[%d:%s] not mn", pnode->GetId(), pnode->m_addr_name); + } else if (relayMembers.count(pnode->GetVerifiedProRegTxHash()) == 0) { + ss2 << pnode->GetVerifiedProRegTxHash().ToString().substr(0, 4) << " | "; + } + }); + logger.Batch("forMember[%s] NOTrelayMembers[%s]", session.ProTx().ToString().substr(0, 4), ss2.str()); + logger.Flush(); +} + +template +void EnqueueOwn(CDKGPendingMessages& pending, const Message& msg) +{ + CDataStream ds(SER_NETWORK, PROTOCOL_VERSION); + ds << msg; + auto pm = std::make_shared(std::move(ds)); + CHashWriter hw(SER_GETHASH, 0); + hw.write(AsWritableBytes(Span{*pm})); + pending.PushPendingMessage(/*from=*/-1, std::move(pm), hw.GetHash()); +} + +template +bool ProcessPendingMessageBatch(const CConnman& connman, CDKGSession& session, CDKGPendingMessages& pendingMessages, + PeerManagerInternal& peerman, size_t maxCount) +{ + auto msgs = pendingMessages.PopAndDeserializeMessages(maxCount); + if (msgs.empty()) { + return false; + } + + std::vector>> preverifiedMessages; + preverifiedMessages.reserve(msgs.size()); + + for (const auto& p : msgs) { + const NodeId& nodeId = p.first; + if (!p.second) { + LogPrint(BCLog::LLMQ_DKG, "%s -- failed to deserialize message, peer=%d\n", __func__, nodeId); + peerman.PeerMisbehaving(nodeId, 100); + continue; + } + bool ban = false; + if (!session.PreVerifyMessage(*p.second, ban)) { + if (ban) { + LogPrint(BCLog::LLMQ_DKG, "%s -- banning node due to failed preverification, peer=%d\n", __func__, nodeId); + peerman.PeerMisbehaving(nodeId, 100); + } + LogPrint(BCLog::LLMQ_DKG, "%s -- skipping message due to failed preverification, peer=%d\n", __func__, nodeId); + continue; + } + preverifiedMessages.emplace_back(p); + } + if (preverifiedMessages.empty()) { + return true; + } + + auto badNodes = BatchVerifyMessageSigs(session, preverifiedMessages); + if (!badNodes.empty()) { + for (auto nodeId : badNodes) { + LogPrint(BCLog::LLMQ_DKG, "%s -- failed to verify signature, peer=%d\n", __func__, nodeId); + peerman.PeerMisbehaving(nodeId, 100); + } + } + + for (const auto& p : preverifiedMessages) { + const NodeId& nodeId = p.first; + if (badNodes.count(nodeId)) { + continue; + } + const std::optional inv = session.ReceiveMessage(*p.second); + if (inv) { + RelayInvToParticipants(session, connman, peerman, *inv); + } + } + + return true; +} +} // namespace + + +NetDKG::NetDKG(PeerManagerInternal* peer_manager, const CSporkManager& sporkman, CDKGSessionManager& qdkgsman, + const ChainstateManager& chainman, bool quorums_watch, CQuorumManager& qman, QuorumRole& role) : + NetHandler(peer_manager), + m_qdkgsman{qdkgsman}, + m_qman{qman}, + m_sporkman{sporkman}, + m_chainman{chainman}, + m_quorums_watch{quorums_watch}, + m_active{nullptr} +{ + m_qdkgsman.InitializeHandlers([](const Consensus::LLMQParams& llmq_params, + [[maybe_unused]] int quorum_idx) -> std::unique_ptr { + return std::make_unique(llmq_params); + }); + m_qman.ConnectManagers(&role, &m_qdkgsman); +} + +NetDKG::NetDKG(PeerManagerInternal* peer_manager, const CSporkManager& sporkman, CDKGSessionManager& qdkgsman, + const ChainstateManager& chainman, bool quorums_watch, CQuorumManager& qman, QuorumRole& role, + CBLSWorker& bls_worker, CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman, + CDKGDebugManager& dkgdbgman, CQuorumBlockProcessor& qblockman, CQuorumSnapshotManager& qsnapman, + const CActiveMasternodeManager& mn_activeman, CConnman& connman) : + NetHandler(peer_manager), + m_qdkgsman{qdkgsman}, + m_qman{qman}, + m_sporkman{sporkman}, + m_chainman{chainman}, + m_quorums_watch{quorums_watch}, + m_active{std::make_unique(ActiveDKG{dmnman, mn_metaman, dkgdbgman, qblockman, qsnapman, connman})} +{ + m_qdkgsman.InitializeHandlers( + [&](const Consensus::LLMQParams& llmq_params, int quorum_idx) -> std::unique_ptr { + return std::make_unique(bls_worker, dmnman, mn_metaman, dkgdbgman, qdkgsman, + qblockman, qsnapman, mn_activeman, chainman, sporkman, + llmq_params, quorums_watch, quorum_idx); + }); + m_qman.ConnectManagers(&role, &m_qdkgsman); +} + +NetDKG::~NetDKG() { m_qman.DisconnectManagers(); } + +void NetDKG::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) +{ + if (!IsQuorumDKGEnabled(m_sporkman)) return; + + if (msg_type != NetMsgType::QCONTRIB && msg_type != NetMsgType::QCOMPLAINT && msg_type != NetMsgType::QJUSTIFICATION && + msg_type != NetMsgType::QPCOMMITMENT && msg_type != NetMsgType::QWATCH) { + return; + } + + const bool is_masternode = m_active != nullptr; + + if (msg_type == NetMsgType::QWATCH) { + if (!is_masternode) { + // non-masternodes should never receive this + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10); + return; + } + pfrom.qwatch = true; + return; + } + + if (!is_masternode && !m_quorums_watch) { + // regular non-watching nodes should never receive any of these + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10); + return; + } + + if (vRecv.empty()) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100); + return; + } + + Consensus::LLMQType llmqType; + uint256 quorumHash; + vRecv >> llmqType; + vRecv >> quorumHash; + vRecv.Rewind(sizeof(uint256)); + vRecv.Rewind(sizeof(uint8_t)); + + const auto& llmq_params_opt = Params().GetLLMQ(llmqType); + if (!llmq_params_opt.has_value()) { + LogPrintf("NetDKG -- invalid llmqType [%d]\n", std23::to_underlying(llmqType)); + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100); + return; + } + const auto& llmq_params = llmq_params_opt.value(); + + int quorumIndex{-1}; + { + LOCK(cs_indexed_quorums_cache); + if (indexed_quorums_cache.empty()) { + utils::InitQuorumsCache(indexed_quorums_cache, m_chainman.GetConsensus()); + } + indexed_quorums_cache[llmqType].get(quorumHash, quorumIndex); + } + + if (quorumIndex == -1) { + const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(::cs_main, + return m_chainman.m_blockman.LookupBlockIndex(quorumHash)); + if (pQuorumBaseBlockIndex == nullptr) { + LogPrintf("NetDKG -- unknown quorumHash %s\n", quorumHash.ToString()); + // NOTE: do not insta-ban for this, we might be lagging behind + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10); + return; + } + if (!m_chainman.IsQuorumTypeEnabled(llmqType, pQuorumBaseBlockIndex->pprev)) { + LogPrintf("NetDKG -- llmqType [%d] quorums aren't active\n", std23::to_underlying(llmqType)); + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100); + return; + } + quorumIndex = pQuorumBaseBlockIndex->nHeight % llmq_params.dkgInterval; + const int quorumIndexMax = IsQuorumRotationEnabled(llmq_params, pQuorumBaseBlockIndex) + ? llmq_params.signingActiveQuorumCount - 1 + : 0; + if (quorumIndex > quorumIndexMax) { + LogPrintf("NetDKG -- invalid quorumHash %s\n", quorumHash.ToString()); + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100); + return; + } + } + + int inv_type = 0; + if (msg_type == NetMsgType::QCONTRIB) + inv_type = MSG_QUORUM_CONTRIB; + else if (msg_type == NetMsgType::QCOMPLAINT) + inv_type = MSG_QUORUM_COMPLAINT; + else if (msg_type == NetMsgType::QJUSTIFICATION) + inv_type = MSG_QUORUM_JUSTIFICATION; + else if (msg_type == NetMsgType::QPCOMMITMENT) + inv_type = MSG_QUORUM_PREMATURE_COMMITMENT; + Assume(inv_type != 0); // guarded by the early-return above + + auto pm = std::make_shared(std::move(vRecv)); + CHashWriter hw(SER_GETHASH, 0); + hw.write(AsWritableBytes(Span{*pm})); + const uint256 hash = hw.GetHash(); + + const NodeId from = pfrom.GetId(); + const bool dispatched = m_qdkgsman.DoForHandler({llmqType, quorumIndex}, [&](CDKGSessionHandler& handler) { + CDKGPendingMessages* pending = nullptr; + switch (inv_type) { + case MSG_QUORUM_CONTRIB: + pending = &handler.pendingContributions; + break; + case MSG_QUORUM_COMPLAINT: + pending = &handler.pendingComplaints; + break; + case MSG_QUORUM_JUSTIFICATION: + pending = &handler.pendingJustifications; + break; + case MSG_QUORUM_PREMATURE_COMMITMENT: + pending = &handler.pendingPrematureCommitments; + break; + } + Assume(pending != nullptr); + WITH_LOCK(::cs_main, m_peer_manager->PeerEraseObjectRequest(from, CInv{static_cast(inv_type), hash})); + pending->PushPendingMessage(from, std::move(pm), hash); + }); + if (!dispatched) { + LogPrintf("NetDKG -- no session handlers for quorumIndex [%d]\n", quorumIndex); + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100); + return; + } + + WITH_LOCK(cs_indexed_quorums_cache, indexed_quorums_cache[llmqType].insert(quorumHash, quorumIndex)); +} + +bool NetDKG::AlreadyHave(const CInv& inv) +{ + switch (inv.type) { + case MSG_QUORUM_CONTRIB: + case MSG_QUORUM_COMPLAINT: + case MSG_QUORUM_JUSTIFICATION: + case MSG_QUORUM_PREMATURE_COMMITMENT: { + if (!IsQuorumDKGEnabled(m_sporkman)) return false; + bool seen = false; + m_qdkgsman.ForEachHandler([&](CDKGSessionHandler& h) { + if (seen) return; + if (h.pendingContributions.HasSeen(inv.hash) || h.pendingComplaints.HasSeen(inv.hash) || + h.pendingJustifications.HasSeen(inv.hash) || h.pendingPrematureCommitments.HasSeen(inv.hash)) { + seen = true; + } + }); + return seen; + } + } + return false; +} + +bool NetDKG::ProcessGetData(CNode& pfrom, const CInv& inv, CConnman& connman, const CNetMsgMaker& msgMaker) +{ + // Default implementations of GetContribution and the other virtual methods + // return false in observer mode; m_active is only an early exit and does + // not affect logic. + if (m_active == nullptr) return false; + + switch (inv.type) { + case MSG_QUORUM_CONTRIB: { + CDKGContribution o; + if (m_qdkgsman.GetContribution(inv.hash, o)) { + connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::QCONTRIB, o)); + return true; + } + return false; + } + case MSG_QUORUM_COMPLAINT: { + CDKGComplaint o; + if (m_qdkgsman.GetComplaint(inv.hash, o)) { + connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::QCOMPLAINT, o)); + return true; + } + return false; + } + case MSG_QUORUM_JUSTIFICATION: { + CDKGJustification o; + if (m_qdkgsman.GetJustification(inv.hash, o)) { + connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::QJUSTIFICATION, o)); + return true; + } + return false; + } + case MSG_QUORUM_PREMATURE_COMMITMENT: { + CDKGPrematureCommitment o; + if (m_qdkgsman.GetPrematureCommitment(inv.hash, o)) { + connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::QPCOMMITMENT, o)); + return true; + } + return false; + } + } + return false; +} + +void NetDKG::Start() +{ + if (m_active == nullptr) return; + if (!m_phase_threads.empty()) { + throw std::runtime_error("Tried to start PhaseHandlerThreads again."); + } + + m_qdkgsman.ForEachHandler([this](CDKGSessionHandler& base) { + auto& handler = dynamic_cast(base); + std::string thread_name = strprintf("llmq-%d-%d", std23::to_underlying(handler.params.type), handler.QuorumIndex()); + m_phase_threads.emplace_back([this, name = std::move(thread_name), &handler] { + util::TraceThread(name.c_str(), [this, &handler] { PhaseHandlerThread(handler); }); + }); + }); +} + +void NetDKG::Stop() +{ + Interrupt(); + for (auto& t : m_phase_threads) { + if (t.joinable()) t.join(); + } + m_phase_threads.clear(); +} + +void NetDKG::Interrupt() +{ + if (m_active == nullptr) return; + m_qdkgsman.ForEachHandler([](CDKGSessionHandler& base) { + if (auto* handler = dynamic_cast(&base)) { + handler->RequestStop(); + } + }); +} + +void NetDKG::PhaseHandlerThread(ActiveDKGSessionHandler& handler) +{ + while (!handler.IsStopRequested()) { + try { + LogPrint(BCLog::LLMQ_DKG, "NetDKG::%s -- %s qi[%d] - starting HandleDKGRound\n", __func__, + handler.params.name, handler.QuorumIndex()); + HandleDKGRound(handler); + } catch (AbortPhaseException& e) { + m_active->dkgdbgman.MarkAborted(handler.params.type, handler.QuorumIndex()); + LogPrint(BCLog::LLMQ_DKG, "NetDKG::%s -- %s qi[%d] - aborted current DKG session\n", __func__, + handler.params.name, handler.QuorumIndex()); + } + } +} + +void NetDKG::HandleDKGRound(ActiveDKGSessionHandler& handler) +{ + auto& active = *Assert(m_active); + + handler.WaitForNextPhase(std::nullopt, QuorumPhase::Initialized); + + handler.ClearPendingMessages(); + uint256 curQuorumHash = handler.GetCurrentQuorumHash(); + + const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(::cs_main, + return m_chainman.m_blockman.LookupBlockIndex(curQuorumHash)); + + if (!pQuorumBaseBlockIndex || !handler.InitNewQuorum(pQuorumBaseBlockIndex)) { + // should actually never happen + handler.WaitForNewQuorum(curQuorumHash); + throw AbortPhaseException(); + } + + active.dkgdbgman.MarkPhaseAdvanced(handler.params.type, handler.QuorumIndex(), QuorumPhase::Initialized); + + auto* curSession = handler.GetCurSession(); + if (handler.params.is_single_member()) { + auto finalCommitment = curSession->FinalizeSingleCommitment(); + if (!finalCommitment.IsNull()) { // it can be null only if we are not member + if (auto inv_opt = active.qblockman.AddMineableCommitment(finalCommitment); inv_opt.has_value()) { + m_peer_manager->PeerRelayInv(inv_opt.value()); + } + } + handler.WaitForNextPhase(QuorumPhase::Initialized, QuorumPhase::Contribute, curQuorumHash); + return; + } + + const auto tip_mn_list = active.dmnman.GetListAtChainTip(); + utils::EnsureQuorumConnections(handler.params, active.connman, m_sporkman, + {active.dmnman, active.qsnapman, m_chainman, pQuorumBaseBlockIndex}, tip_mn_list, + curSession->ProTx(), /*is_masternode=*/true, handler.QuorumsWatch()); + if (curSession->AreWeMember()) { + utils::AddQuorumProbeConnections(handler.params, active.connman, active.mn_metaman, m_sporkman, + {active.dmnman, active.qsnapman, m_chainman, pQuorumBaseBlockIndex}, + tip_mn_list, curSession->ProTx()); + } + + handler.WaitForNextPhase(QuorumPhase::Initialized, QuorumPhase::Contribute, curQuorumHash); + + // Contribute + auto fContributeStart = [curSession, &handler]() { + if (auto qc = curSession->Contribute(); qc) { + EnqueueOwn(handler.pendingContributions, *qc); + } + }; + auto fContributeWait = [this, curSession, &handler, &active] { + return ProcessPendingMessageBatch(active.connman, *curSession, handler.pendingContributions, + *m_peer_manager, 8); + }; + handler.HandlePhase(QuorumPhase::Contribute, QuorumPhase::Complain, curQuorumHash, 0.05, fContributeStart, + fContributeWait); + + // Complain + auto fComplainStart = [curSession, &handler, &active]() { + if (auto qc = curSession->VerifyAndComplain(active.connman); qc) { + EnqueueOwn(handler.pendingComplaints, *qc); + } + }; + auto fComplainWait = [this, curSession, &handler, &active] { + return ProcessPendingMessageBatch(active.connman, *curSession, handler.pendingComplaints, + *m_peer_manager, 8); + }; + handler.HandlePhase(QuorumPhase::Complain, QuorumPhase::Justify, curQuorumHash, 0.05, fComplainStart, fComplainWait); + + // Justify + auto fJustifyStart = [curSession, &handler]() { + if (auto qj = curSession->VerifyAndJustify(); qj) { + EnqueueOwn(handler.pendingJustifications, *qj); + } + }; + auto fJustifyWait = [this, curSession, &handler, &active] { + return ProcessPendingMessageBatch(active.connman, *curSession, handler.pendingJustifications, + *m_peer_manager, 8); + }; + handler.HandlePhase(QuorumPhase::Justify, QuorumPhase::Commit, curQuorumHash, 0.05, fJustifyStart, fJustifyWait); + + // Commit + auto fCommitStart = [curSession, &handler]() { + if (auto qc = curSession->VerifyAndCommit(); qc) { + EnqueueOwn(handler.pendingPrematureCommitments, *qc); + } + }; + auto fCommitWait = [this, curSession, &handler, &active] { + return ProcessPendingMessageBatch(active.connman, *curSession, + handler.pendingPrematureCommitments, *m_peer_manager, + 8); + }; + handler.HandlePhase(QuorumPhase::Commit, QuorumPhase::Finalize, curQuorumHash, 0.1, fCommitStart, fCommitWait); + + auto finalCommitments = curSession->FinalizeCommitments(); + for (const auto& fqc : finalCommitments) { + if (auto inv_opt = active.qblockman.AddMineableCommitment(fqc); inv_opt.has_value()) { + m_peer_manager->PeerRelayInv(inv_opt.value()); + } + } +} + +void NetDKGStub::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) +{ + if (msg_type == NetMsgType::QCONTRIB || msg_type == NetMsgType::QCOMPLAINT || msg_type == NetMsgType::QJUSTIFICATION || + msg_type == NetMsgType::QPCOMMITMENT || msg_type == NetMsgType::QWATCH) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10); + } +} + +} // namespace llmq diff --git a/src/llmq/net_dkg.h b/src/llmq/net_dkg.h new file mode 100644 index 000000000000..cf5d2d41af26 --- /dev/null +++ b/src/llmq/net_dkg.h @@ -0,0 +1,125 @@ +// Copyright (c) 2018-2025 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_LLMQ_NET_DKG_H +#define BITCOIN_LLMQ_NET_DKG_H + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +class CActiveMasternodeManager; +class CBLSWorker; +class CConnman; +class CDeterministicMNManager; +class ChainstateManager; +class CMasternodeMetaMan; +class CSporkManager; +namespace llmq { +class ActiveDKGSessionHandler; +class CDKGDebugManager; +class CDKGSessionManager; +class CQuorumBlockProcessor; +class CQuorumManager; +class CQuorumSnapshotManager; +class QuorumRole; +} // namespace llmq + +namespace llmq { +/** + * NetHandler responsible for DKG networking: + * - QCONTRIB / QCOMPLAINT / QJUSTIFICATION / QPCOMMITMENT / QWATCH ProcessMessage + * routing into CDKGSessionManager. The resulting MessageProcessingResult is + * consumed locally via PeerManagerInternal and never propagated up. + * - AlreadyHave for the four MSG_QUORUM_* DKG inv types. + * - ProcessGetData for the four MSG_QUORUM_* DKG inv types (active mode only; + * in observer mode the underlying Get* calls return false by construction). + * + * Active-mode-only deps live in @ref ActiveDKG; @ref m_active is null in + * observer mode and non-null in active mode (all-or-none). + * + * On nodes that run neither active nor observer mode, register @ref NetDKGStub + * instead. + */ +class NetDKG final : public NetHandler +{ +public: + //! Observer-mode constructor. + NetDKG(PeerManagerInternal* peer_manager, const CSporkManager& sporkman, CDKGSessionManager& qdkgsman, + const ChainstateManager& chainman, bool quorums_watch, CQuorumManager& qman, QuorumRole& role); + + //! Active-mode constructor: takes the masternode-only dep bundle as required references. + NetDKG(PeerManagerInternal* peer_manager, const CSporkManager& sporkman, CDKGSessionManager& qdkgsman, + const ChainstateManager& chainman, bool quorums_watch, CQuorumManager& qman, QuorumRole& role, + CBLSWorker& bls_worker, CDeterministicMNManager& dmnman, CMasternodeMetaMan& mn_metaman, + CDKGDebugManager& dkgdbgman, CQuorumBlockProcessor& qblockman, CQuorumSnapshotManager& qsnapman, + const CActiveMasternodeManager& mn_activeman, CConnman& connman); + + ~NetDKG(); + + // NetHandler + void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) override; + bool AlreadyHave(const CInv& inv) override; + bool ProcessGetData(CNode& pfrom, const CInv& inv, CConnman& connman, const CNetMsgMaker& msgMaker) override; + /** + * Drives one phase-handler thread per ActiveDKGSessionHandler in active mode; + * no-op in observer mode (no curSession to drive). + */ + void Start() override; + void Stop() override; + void Interrupt() override; + +private: + //! Bundle of refs that exist only in active masternode mode. + struct ActiveDKG { + CDeterministicMNManager& dmnman; + CMasternodeMetaMan& mn_metaman; + CDKGDebugManager& dkgdbgman; + CQuorumBlockProcessor& qblockman; + CQuorumSnapshotManager& qsnapman; + CConnman& connman; + }; + + void PhaseHandlerThread(ActiveDKGSessionHandler& handler); + void HandleDKGRound(ActiveDKGSessionHandler& handler); + + CDKGSessionManager& m_qdkgsman; + CQuorumManager& m_qman; + const CSporkManager& m_sporkman; + const ChainstateManager& m_chainman; + const bool m_quorums_watch; + const std::unique_ptr m_active; //!< null in observer mode, non-null in active mode + + /** Cache: quorum hash → quorum index, populated lazily by ProcessMessage. */ + mutable Mutex cs_indexed_quorums_cache; + mutable std::map> indexed_quorums_cache GUARDED_BY(cs_indexed_quorums_cache); + + std::vector m_phase_threads; +}; + +/** + * Minimal NetHandler installed on nodes that run neither active nor observer + * DKG mode. Just punishes peers that push DKG messages we cannot serve. + */ +class NetDKGStub final : public NetHandler +{ +public: + explicit NetDKGStub(PeerManagerInternal* peer_manager) : + NetHandler(peer_manager) + { + } + + // NetHandler + void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) override; +}; +} // namespace llmq + +#endif // BITCOIN_LLMQ_NET_DKG_H diff --git a/src/llmq/observer.cpp b/src/llmq/observer.cpp index b9ce11d31bfe..a4121b563efd 100644 --- a/src/llmq/observer.cpp +++ b/src/llmq/observer.cpp @@ -7,31 +7,17 @@ #include #include -#include -#include - namespace llmq { -ObserverContext::ObserverContext(CBLSWorker& bls_worker, CDeterministicMNManager& dmnman, - CMasternodeMetaMan& mn_metaman, - llmq::CQuorumBlockProcessor& qblockman, llmq::CQuorumManager& qman, +ObserverContext::ObserverContext(CDeterministicMNManager& dmnman, llmq::CQuorumManager& qman, llmq::CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman, const CSporkManager& sporkman, const util::DbWrapperParams& db_params) : QuorumRole{qman}, dkgdbgman{std::make_unique(dmnman, qsnapman, chainman)}, - qdkgsman{std::make_unique(dmnman, qsnapman, chainman, sporkman, db_params, - /*quorums_watch=*/true)} + qdkgsman{std::make_unique(dmnman, qsnapman, chainman, sporkman, db_params)} { - qdkgsman->InitializeHandlers([&](const Consensus::LLMQParams& llmq_params, - [[maybe_unused]] int quorum_idx) -> std::unique_ptr { - return std::make_unique(llmq_params); - }); - m_qman.ConnectManagers(this, qdkgsman.get()); } -ObserverContext::~ObserverContext() -{ - m_qman.DisconnectManagers(); -} +ObserverContext::~ObserverContext() = default; void ObserverContext::InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd) { diff --git a/src/llmq/observer.h b/src/llmq/observer.h index 52a820327aaf..d0fb60b2554e 100644 --- a/src/llmq/observer.h +++ b/src/llmq/observer.h @@ -9,22 +9,17 @@ #include -#include #include #include -class CBLSWorker; class CBlockIndex; class CDeterministicMNManager; -class CMasternodeMetaMan; class CSporkManager; namespace llmq { class CDKGDebugManager; class CDKGSessionManager; class CQuorum; -class CQuorumBlockProcessor; -class CQuorumDataRequest; class CQuorumSnapshotManager; } // namespace llmq namespace util { @@ -37,10 +32,9 @@ struct ObserverContext final : public QuorumRole, public CValidationInterface { ObserverContext() = delete; ObserverContext(const ObserverContext&) = delete; ObserverContext& operator=(const ObserverContext&) = delete; - ObserverContext(CBLSWorker& bls_worker, CDeterministicMNManager& dmnman, - CMasternodeMetaMan& mn_metaman, llmq::CQuorumBlockProcessor& qblockman, - llmq::CQuorumManager& qman, llmq::CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman, - const CSporkManager& sporkman, const util::DbWrapperParams& db_params); + ObserverContext(CDeterministicMNManager& dmnman, llmq::CQuorumManager& qman, llmq::CQuorumSnapshotManager& qsnapman, + const ChainstateManager& chainman, const CSporkManager& sporkman, + const util::DbWrapperParams& db_params); ~ObserverContext(); // QuorumRole diff --git a/src/llmq/options.cpp b/src/llmq/options.cpp index f6cad7966ba8..dc6330d087f0 100644 --- a/src/llmq/options.cpp +++ b/src/llmq/options.cpp @@ -39,6 +39,8 @@ bool IsAllMembersConnectedEnabled(const Consensus::LLMQType llmqType, const CSpo return EvalSpork(llmqType, sporkman.GetSporkValue(SPORK_21_QUORUM_ALL_CONNECTED)); } +bool IsQuorumDKGEnabled(const CSporkManager& sporkman) { return sporkman.IsSporkActive(SPORK_17_QUORUM_DKG_ENABLED); } + bool IsQuorumPoseEnabled(const Consensus::LLMQType llmqType, const CSporkManager& sporkman) { return EvalSpork(llmqType, sporkman.GetSporkValue(SPORK_23_QUORUM_POSE)); diff --git a/src/llmq/options.h b/src/llmq/options.h index e546190301d2..9973472d0e3b 100644 --- a/src/llmq/options.h +++ b/src/llmq/options.h @@ -39,6 +39,7 @@ extern int16_t DEFAULT_WORKER_COUNT; static constexpr int8_t MAX_BLSCHECK_THREADS{33}; bool IsAllMembersConnectedEnabled(const Consensus::LLMQType llmqType, const CSporkManager& sporkman); +bool IsQuorumDKGEnabled(const CSporkManager& sporkman); bool IsQuorumPoseEnabled(const Consensus::LLMQType llmqType, const CSporkManager& sporkman); bool IsQuorumRotationEnabled(const Consensus::LLMQParams& llmqParams, gsl::not_null pindex); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 3f8ba2e11b48..5fe43196b6f3 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -40,7 +40,6 @@ #include #include -#include #include #include #include @@ -54,9 +53,6 @@ #include #include #include -#include -#include -#include #include #include #include @@ -589,11 +585,10 @@ class PeerManagerImpl final : public PeerManager CMasternodeMetaMan& mn_metaman, CMasternodeSync& mn_sync, CGovernanceManager& govman, CSporkManager& sporkman, const chainlock::Chainlocks& chainlocks, chainlock::ChainlockHandler& clhandler, - const std::unique_ptr& active_ctx, + CActiveMasternodeManager* nodeman, const std::unique_ptr& dmnman, const std::unique_ptr& cj_walletman, - const std::unique_ptr& llmq_ctx, - const std::unique_ptr& observer_ctx, bool ignore_incoming_txs); + const std::unique_ptr& llmq_ctx, bool ignore_incoming_txs); ~PeerManagerImpl() { @@ -655,6 +650,7 @@ class PeerManagerImpl final : public PeerManager void PeerMisbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); bool PeerIsBanned(const NodeId node_id) override EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_peer_mutex); void PeerEraseObjectRequest(const NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(::cs_main); + void PeerPushInventory(NodeId nodeid, const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayInv(const CInv& inv) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayInvFiltered(const CInv& inv, const CTransaction& relatedTx) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void PeerRelayInvFiltered(const CInv& inv, const uint256& relatedTxHash) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -815,11 +811,10 @@ class PeerManagerImpl final : public PeerManager ChainstateManager& m_chainman; CTxMemPool& m_mempool; std::unique_ptr m_txreconciliation; - const std::unique_ptr& m_active_ctx; + CActiveMasternodeManager* const m_nodeman; //!< null if non-masternode mode; non-null implies masternode mode const std::unique_ptr& m_dmnman; const std::unique_ptr& m_cj_walletman; const std::unique_ptr& m_llmq_ctx; - const std::unique_ptr& m_observer_ctx; CMasternodeMetaMan& m_mn_metaman; CMasternodeSync& m_mn_sync; CGovernanceManager& m_govman; @@ -971,9 +966,6 @@ class PeerManagerImpl final : public PeerManager bool AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main, !m_recent_confirmed_transactions_mutex); - bool DKGSessionAlreadyHave(const CInv& inv); - MessageProcessingResult DKGSessionProcessMessage(CNode& pfrom, bool is_masternode, std::string_view msg_type, CDataStream& vRecv); - /** * Filter for transactions that were recently rejected by the mempool. * These are not rerequested until the chain tip changes, at which point @@ -1654,7 +1646,7 @@ void PeerManagerImpl::RequestObject(NodeId nodeid, const CInv& inv, std::chrono: // Calculate the time to try requesting this transaction. Use // fPreferredDownload as a proxy for outbound peers. - std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, /*is_masternode=*/m_active_ctx != nullptr, + std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, /*is_masternode=*/m_nodeman != nullptr, !state->fPreferredDownload); peer_download_state.m_object_process_time.emplace(process_time, inv); @@ -2066,13 +2058,12 @@ std::unique_ptr PeerManager::make(const CChainParams& chainparams, CMasternodeSync& mn_sync, CGovernanceManager& govman, CSporkManager& sporkman, const chainlock::Chainlocks& chainlocks, chainlock::ChainlockHandler& clhandler, - const std::unique_ptr& active_ctx, + CActiveMasternodeManager* nodeman, const std::unique_ptr& dmnman, const std::unique_ptr& cj_walletman, - const std::unique_ptr& llmq_ctx, - const std::unique_ptr& observer_ctx, bool ignore_incoming_txs) + const std::unique_ptr& llmq_ctx, bool ignore_incoming_txs) { - return std::make_unique(chainparams, connman, addrman, banman, dstxman, chainman, pool, mn_metaman, mn_sync, govman, sporkman, chainlocks, clhandler, active_ctx, dmnman, cj_walletman, llmq_ctx, observer_ctx, ignore_incoming_txs); + return std::make_unique(chainparams, connman, addrman, banman, dstxman, chainman, pool, mn_metaman, mn_sync, govman, sporkman, chainlocks, clhandler, nodeman, dmnman, cj_walletman, llmq_ctx, ignore_incoming_txs); } PeerManagerImpl::PeerManagerImpl(const CChainParams& chainparams, CConnman& connman, AddrMan& addrman, BanMan* banman, @@ -2081,11 +2072,10 @@ PeerManagerImpl::PeerManagerImpl(const CChainParams& chainparams, CConnman& conn CSporkManager& sporkman, const chainlock::Chainlocks& chainlocks, chainlock::ChainlockHandler& clhandler, - const std::unique_ptr& active_ctx, + CActiveMasternodeManager* nodeman, const std::unique_ptr& dmnman, const std::unique_ptr& cj_walletman, - const std::unique_ptr& llmq_ctx, - const std::unique_ptr& observer_ctx, bool ignore_incoming_txs) + const std::unique_ptr& llmq_ctx, bool ignore_incoming_txs) : m_chainparams(chainparams), m_connman(connman), m_addrman(addrman), @@ -2093,11 +2083,10 @@ PeerManagerImpl::PeerManagerImpl(const CChainParams& chainparams, CConnman& conn m_dstxman(dstxman), m_chainman(chainman), m_mempool(pool), - m_active_ctx(active_ctx), + m_nodeman(nodeman), m_dmnman(dmnman), m_cj_walletman(cj_walletman), m_llmq_ctx(llmq_ctx), - m_observer_ctx(observer_ctx), m_mn_metaman(mn_metaman), m_mn_sync(mn_sync), m_govman(govman), @@ -2374,11 +2363,6 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv) case MSG_QUORUM_FINAL_COMMITMENT: return m_llmq_ctx->quorum_block_processor->HasMineableCommitment(inv.hash); - case MSG_QUORUM_CONTRIB: - case MSG_QUORUM_COMPLAINT: - case MSG_QUORUM_JUSTIFICATION: - case MSG_QUORUM_PREMATURE_COMMITMENT: - return DKGSessionAlreadyHave(inv); case MSG_QUORUM_RECOVERED_SIG: // TODO: move it to NetSigning return m_llmq_ctx->sigman->AlreadyHave(inv); @@ -2391,6 +2375,10 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv) return m_mn_metaman.AlreadyHavePlatformBan(inv.hash); // At the end inventories that are handled by NetHandler + case MSG_QUORUM_CONTRIB: + case MSG_QUORUM_COMPLAINT: + case MSG_QUORUM_JUSTIFICATION: + case MSG_QUORUM_PREMATURE_COMMITMENT: case MSG_DSQ: if (m_cj_walletman && m_cj_walletman->hasQueue(inv.hash)) return true; for (const auto& handler : m_handlers) { @@ -2403,16 +2391,6 @@ bool PeerManagerImpl::AlreadyHave(const CInv& inv) return true; } -bool PeerManagerImpl::DKGSessionAlreadyHave(const CInv& inv) -{ - if (m_observer_ctx) { - return m_observer_ctx->qdkgsman->AlreadyHave(inv); - } else if (m_active_ctx) { - return m_active_ctx->qdkgsman->AlreadyHave(inv); - } - return false; -} - bool PeerManagerImpl::AlreadyHaveBlock(const uint256& block_hash) { return m_chainman.m_blockman.LookupBlockIndex(block_hash) != nullptr; @@ -2976,38 +2954,6 @@ void PeerManagerImpl::ProcessGetData(CNode& pfrom, Peer& peer, const std::atomic } } - if (!push && (inv.type == MSG_QUORUM_CONTRIB)) { - llmq::CDKGContribution o; - if (m_active_ctx && m_active_ctx->qdkgsman->GetContribution(inv.hash, o)) { - m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::QCONTRIB, o)); - push = true; - } - } - - if (!push && (inv.type == MSG_QUORUM_COMPLAINT)) { - llmq::CDKGComplaint o; - if (m_active_ctx && m_active_ctx->qdkgsman->GetComplaint(inv.hash, o)) { - m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::QCOMPLAINT, o)); - push = true; - } - } - - if (!push && (inv.type == MSG_QUORUM_JUSTIFICATION)) { - llmq::CDKGJustification o; - if (m_active_ctx && m_active_ctx->qdkgsman->GetJustification(inv.hash, o)) { - m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::QJUSTIFICATION, o)); - push = true; - } - } - - if (!push && (inv.type == MSG_QUORUM_PREMATURE_COMMITMENT)) { - llmq::CDKGPrematureCommitment o; - if (m_active_ctx && m_active_ctx->qdkgsman->GetPrematureCommitment(inv.hash, o)) { - m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::QPCOMMITMENT, o)); - push = true; - } - } - if (!push && (inv.type == MSG_QUORUM_RECOVERED_SIG)) { llmq::CRecoveredSig o; if (m_llmq_ctx->sigman->GetRecoveredSigForGetData(inv.hash, o)) { @@ -3759,7 +3705,7 @@ void PeerManagerImpl::ProcessMessage( LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId()); ::g_stats_client->inc("message.received." + SanitizeString(msg_type), 1.0f); - const bool is_masternode = m_active_ctx != nullptr; + const bool is_masternode = m_nodeman != nullptr; PeerRef peer = GetPeerRef(pfrom.GetId()); if (peer == nullptr) return; @@ -4033,7 +3979,7 @@ void PeerManagerImpl::ProcessMessage( } if (is_masternode && !pfrom.m_masternode_probe_connection) { - CMNAuth::PushMNAUTH(pfrom, m_connman, *m_active_ctx->nodeman); + CMNAuth::PushMNAUTH(pfrom, m_connman, *Assert(m_nodeman)); } // Tell our peer we prefer to receive headers rather than inv's @@ -5507,9 +5453,8 @@ void PeerManagerImpl::ProcessMessage( if (m_cj_walletman) { PostProcessMessage(m_cj_walletman->processMessage(pfrom, m_chainman.ActiveChainstate(), m_connman, m_mempool, msg_type, vRecv), pfrom.GetId()); } - PostProcessMessage(DKGSessionProcessMessage(pfrom, is_masternode, msg_type, vRecv), pfrom.GetId()); PostProcessMessage(m_sporkman.ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom.GetId()); - PostProcessMessage(CMNAuth::ProcessMessage(pfrom, peer->m_their_services, m_connman, m_mn_metaman, (m_active_ctx ? m_active_ctx->nodeman.get() : nullptr), m_mn_sync, m_dmnman->GetListAtChainTip(), msg_type, vRecv), pfrom.GetId()); + PostProcessMessage(CMNAuth::ProcessMessage(pfrom, peer->m_their_services, m_connman, m_mn_metaman, m_nodeman, m_mn_sync, m_dmnman->GetListAtChainTip(), msg_type, vRecv), pfrom.GetId()); PostProcessMessage(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId()); PostProcessMessage(ProcessPlatformBanMessage(pfrom.GetId(), msg_type, vRecv), pfrom.GetId()); @@ -5536,26 +5481,6 @@ void PeerManagerImpl::ProcessMessage( return; } -MessageProcessingResult PeerManagerImpl::DKGSessionProcessMessage(CNode& pfrom, bool is_masternode, std::string_view msg_type, CDataStream& vRecv) -{ - if (m_active_ctx) { - assert(is_masternode); - return m_active_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv); - } else if (m_observer_ctx) { - assert(!is_masternode); - return m_observer_ctx->qdkgsman->ProcessMessage(pfrom, is_masternode, msg_type, vRecv); - } - assert(!is_masternode); - if (msg_type == NetMsgType::QCONTRIB - || msg_type == NetMsgType::QCOMPLAINT - || msg_type == NetMsgType::QJUSTIFICATION - || msg_type == NetMsgType::QPCOMMITMENT - || msg_type == NetMsgType::QWATCH) { - return MisbehavingError{10}; - } - return {}; -} - bool PeerManagerImpl::MaybeDiscourageAndDisconnect(CNode& pnode, Peer& peer) { { @@ -6027,7 +5952,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto) assert(m_llmq_ctx); - const bool is_masternode = m_active_ctx != nullptr; + const bool is_masternode = m_nodeman != nullptr; PeerRef peer = GetPeerRef(pto->GetId()); if (!peer) return false; @@ -6641,6 +6566,11 @@ void PeerManagerImpl::PeerEraseObjectRequest(const NodeId nodeid, const CInv& in EraseObjectRequest(nodeid, inv); } +void PeerManagerImpl::PeerPushInventory(NodeId nodeid, const CInv& inv) +{ + PushInventory(nodeid, inv); +} + void PeerManagerImpl::PeerRelayInv(const CInv& inv) { RelayInv(inv); diff --git a/src/net_processing.h b/src/net_processing.h index 4967444fe346..b205a81b4a50 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -29,11 +29,7 @@ class CNetMsgMaker; class CSporkManager; class CTransaction; class CTxMemPool; -struct ActiveContext; struct LLMQContext; -namespace llmq { -struct ObserverContext; -} // namespace llmq namespace chainlock { class Chainlocks; class ChainlockHandler; @@ -68,6 +64,7 @@ class PeerManagerInternal virtual void PeerMisbehaving(const NodeId pnode, const int howmuch, const std::string& message = "") = 0; virtual bool PeerIsBanned(const NodeId node_id) = 0; virtual void PeerEraseObjectRequest(const NodeId nodeid, const CInv& inv) = 0; + virtual void PeerPushInventory(NodeId nodeid, const CInv& inv) = 0; virtual void PeerRelayInv(const CInv& inv) = 0; virtual void PeerRelayInvFiltered(const CInv& inv, const CTransaction& relatedTx) = 0; virtual void PeerRelayInvFiltered(const CInv& inv, const uint256& relatedTxHash) = 0; @@ -108,17 +105,20 @@ class NetHandler class PeerManager : public CValidationInterface, public NetEventsInterface, public PeerManagerInternal { public: + /** + * @param nodeman Non-null iff masternode mode is on; null otherwise. Used both + * as the masternode-mode indicator and for direct access. + */ static std::unique_ptr make(const CChainParams& chainparams, CConnman& connman, AddrMan& addrman, BanMan* banman, CDSTXManager& dstxman, ChainstateManager& chainman, CTxMemPool& pool, CMasternodeMetaMan& mn_metaman, CMasternodeSync& mn_sync, CGovernanceManager& govman, CSporkManager& sporkman, const chainlock::Chainlocks& chainlocks, chainlock::ChainlockHandler& clhandler, - const std::unique_ptr& active_ctx, + CActiveMasternodeManager* nodeman, const std::unique_ptr& dmnman, const std::unique_ptr& cj_walletman, - const std::unique_ptr& llmq_ctx, - const std::unique_ptr& observer_ctx, bool ignore_incoming_txs); + const std::unique_ptr& llmq_ctx, bool ignore_incoming_txs); virtual ~PeerManager() { } /** diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 8a96cb0c9ba4..5cbf4265c7c6 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -52,7 +52,6 @@ #include #include -#include #include #include #include @@ -134,8 +133,9 @@ std::unique_ptr MakePeerManager(CConnman& connman, bool ignore_incoming_txs) { return PeerManager::make(chainparams, connman, *node.addrman, banman, *node.dstxman, *node.chainman, *node.mempool, *node.mn_metaman, - *node.mn_sync, *node.govman, *node.sporkman, *node.chainlocks, *node.clhandler, node.active_ctx, node.dmnman, node.cj_walletman, - node.llmq_ctx, node.observer_ctx, ignore_incoming_txs); + *node.mn_sync, *node.govman, *node.sporkman, *node.chainlocks, *node.clhandler, + /*nodeman=*/nullptr, + node.dmnman, node.cj_walletman, node.llmq_ctx, ignore_incoming_txs); } void DashChainstateSetup(ChainstateManager& chainman, diff --git a/test/lint/lint-circular-dependencies.py b/test/lint/lint-circular-dependencies.py index 96ce0c51fa02..d8b01a20b90e 100755 --- a/test/lint/lint-circular-dependencies.py +++ b/test/lint/lint-circular-dependencies.py @@ -21,7 +21,6 @@ "wallet/wallet -> wallet/walletdb -> wallet/wallet", "kernel/coinstats -> validation -> kernel/coinstats", # Dash - "active/context -> active/dkgsessionhandler -> llmq/dkgsessionhandler -> net_processing -> active/context", "banman -> common/bloom -> evo/assetlocktx -> llmq/quorumsman -> llmq/blockprocessor -> net -> banman", "chainlock/chainlock -> spork -> net -> evo/deterministicmns -> evo/providertx -> validation -> chainlock/chainlock", "coinjoin/client -> coinjoin/util -> wallet/wallet -> psbt -> node/transaction -> net_processing -> coinjoin/walletman -> coinjoin/client", @@ -43,7 +42,6 @@ "instantsend/instantsend -> node/blockstorage -> validation -> txmempool -> instantsend/instantsend", "llmq/blockprocessor -> llmq/utils -> llmq/snapshot -> llmq/blockprocessor", "llmq/commitment -> llmq/utils -> llmq/snapshot -> llmq/commitment", - "llmq/dkgsessionhandler -> net_processing -> llmq/dkgsessionmgr -> llmq/dkgsessionhandler", "masternode/payments -> validation -> masternode/payments", "net -> netmessagemaker -> net", "netaddress -> netbase -> netaddress",