4
0
mirror of https://github.com/QuasarApp/QuasarAppCoin.git synced 2025-05-07 06:59:37 +00:00

Merge : 0.18: rc3 backports

95faffed264cf54a3b3041db2471c10f5011aabe qa: Check unconfirmed balance after loadwallet (João Barbosa)
59716ec395daaf914924fe5c1a4fbeb5d5031907 wallet: Update transactions with current mempool after load (João Barbosa)
ed0498af2827ccf033c9a7c4f46b82424e411083 interfaces: Add Chain::requestMempoolTransactions (João Barbosa)
ebf65666c26b7e2dff1b35b17d8fc466c3f347a6 wallet: Move CWallet::ReacceptWalletTransactions locks to callers (João Barbosa)
a90db2f175f86b78d8edc5c03b7bb351c8f43e5e [tests] Add test for wallet rebroadcasts (John Newbery)
50c56f2fcf00385dbe8f91588af3ee1a89a9d2d0 Interrupt orphan processing after every transaction (Pieter Wuille)
bb60121da1eb3484ecf20c5d1130d9e2f6f8f8c8 [MOVEONLY] Move processing of orphan queue to ProcessOrphanTx (Pieter Wuille)
6355214fd70ce7b44739acb8d546aaaf243f90b3 Simplify orphan processing in preparation for interruptibility (Pieter Wuille)

Pull request description:

  Remaining backports for rc3

ACKs for commit 95faff:
  promag:
    ACK 95faffe, well done in ed0498af2827ccf033c9a7c4f46b82424e411083 - verified all cherry picks.

Tree-SHA512: 597ee45493ac04e8c3149628a357c3238169d4b4f78158f22a3531e5f66289c228ecd6582f99026698883216f1ee7934d6315d19c99fc5f4f33dd1bed4300186
This commit is contained in:
Wladimir J. van der Laan 2019-04-02 13:25:21 +02:00
commit 32ec900850
No known key found for this signature in database
GPG Key ID: 1E4AED62986CD25D
9 changed files with 190 additions and 84 deletions

@ -8,6 +8,7 @@
#include <chainparams.h>
#include <primitives/block.h>
#include <sync.h>
#include <txmempool.h>
#include <uint256.h>
#include <util/system.h>
#include <validation.h>
@ -177,6 +178,13 @@ public:
LOCK(cs_main);
return GuessVerificationProgress(Params().TxData(), LookupBlockIndex(block_hash));
}
void requestMempoolTransactions(std::function<void(const CTransactionRef&)> fn) override
{
LOCK2(::cs_main, ::mempool.cs);
for (const CTxMemPoolEntry& entry : ::mempool.mapTx) {
fn(entry.GetSharedTx());
}
}
};
} // namespace

@ -16,6 +16,9 @@ class CBlock;
class CScheduler;
class uint256;
struct CBlockLocator;
class CTransaction;
using CTransactionRef = std::shared_ptr<const CTransaction>;
namespace interfaces {
@ -127,6 +130,16 @@ public:
//! Estimate fraction of total transactions verified if blocks up to
//! the specified block hash are verified.
virtual double guessVerificationProgress(const uint256& block_hash) = 0;
//! Synchronously send TransactionAddedToMempool notifications about all
//! current mempool transactions to the specified handler and return after
//! the last one is sent. These notifications aren't coordinated with async
//! notifications sent by handleNotifications, so out of date async
//! notifications from handleNotifications can arrive during and after
//! synchronous notifications from requestMempoolTransactions. Clients need
//! to be prepared to handle this by ignoring notifications about unknown
//! removed transactions and already added new transactions.
virtual void requestMempoolTransactions(std::function<void(const CTransactionRef&)> fn) = 0;
};
//! Interface to let node manage chain clients (wallets, or maybe tools for

@ -739,6 +739,8 @@ public:
CAmount lastSentFeeFilter{0};
int64_t nextSendTimeFeeFilter{0};
std::set<uint256> orphan_work_set;
CNode(NodeId id, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress &addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const CAddress &addrBindIn, const std::string &addrNameIn = "", bool fInboundIn = false);
~CNode();
CNode(const CNode&) = delete;

@ -1713,6 +1713,67 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve
return true;
}
void static ProcessOrphanTx(CConnman* connman, std::set<uint256>& orphan_work_set, std::list<CTransactionRef>& removed_txn) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans)
{
AssertLockHeld(cs_main);
AssertLockHeld(g_cs_orphans);
std::set<NodeId> setMisbehaving;
bool done = false;
while (!done && !orphan_work_set.empty()) {
const uint256 orphanHash = *orphan_work_set.begin();
orphan_work_set.erase(orphan_work_set.begin());
auto orphan_it = mapOrphanTransactions.find(orphanHash);
if (orphan_it == mapOrphanTransactions.end()) continue;
const CTransactionRef porphanTx = orphan_it->second.tx;
const CTransaction& orphanTx = *porphanTx;
NodeId fromPeer = orphan_it->second.fromPeer;
bool fMissingInputs2 = false;
// Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
// resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
// anyone relaying LegitTxX banned)
CValidationState stateDummy;
if (setMisbehaving.count(fromPeer)) continue;
if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &removed_txn, false /* bypass_limits */, 0 /* nAbsurdFee */)) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
RelayTransaction(orphanTx, connman);
for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
orphan_work_set.insert(elem->first);
}
}
}
EraseOrphanTx(orphanHash);
done = true;
} else if (!fMissingInputs2) {
int nDos = 0;
if (stateDummy.IsInvalid(nDos) && nDos > 0) {
// Punish peer that gave us an invalid orphan tx
Misbehaving(fromPeer, nDos);
setMisbehaving.insert(fromPeer);
LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
}
// Has inputs but not accepted to mempool
// Probably non-standard or insufficient fee
LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) {
// Do not use rejection cache for witness transactions or
// witness-stripped transactions, as they can have been malleated.
// See https://github.com/bitcoin/bitcoin/issues/8279 for details.
assert(recentRejects);
recentRejects->insert(orphanHash);
}
EraseOrphanTx(orphanHash);
done = true;
}
mempool.check(pcoinsTip.get());
}
}
bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc, bool enable_bip61)
{
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId());
@ -2342,8 +2403,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return true;
}
std::deque<COutPoint> vWorkQueue;
std::vector<uint256> vEraseQueue;
CTransactionRef ptx;
vRecv >> ptx;
const CTransaction& tx = *ptx;
@ -2368,7 +2427,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.check(pcoinsTip.get());
RelayTransaction(tx, connman);
for (unsigned int i = 0; i < tx.vout.size(); i++) {
vWorkQueue.emplace_back(inv.hash, i);
auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i));
if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
for (const auto& elem : it_by_prev->second) {
pfrom->orphan_work_set.insert(elem->first);
}
}
}
pfrom->nLastTXTime = GetTime();
@ -2379,65 +2443,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.size(), mempool.DynamicMemoryUsage() / 1000);
// Recursively process any orphan transactions that depended on this one
std::set<NodeId> setMisbehaving;
while (!vWorkQueue.empty()) {
auto itByPrev = mapOrphanTransactionsByPrev.find(vWorkQueue.front());
vWorkQueue.pop_front();
if (itByPrev == mapOrphanTransactionsByPrev.end())
continue;
for (auto mi = itByPrev->second.begin();
mi != itByPrev->second.end();
++mi)
{
const CTransactionRef& porphanTx = (*mi)->second.tx;
const CTransaction& orphanTx = *porphanTx;
const uint256& orphanHash = orphanTx.GetHash();
NodeId fromPeer = (*mi)->second.fromPeer;
bool fMissingInputs2 = false;
// Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
// resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
// anyone relaying LegitTxX banned)
CValidationState stateDummy;
if (setMisbehaving.count(fromPeer))
continue;
if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &lRemovedTxn, false /* bypass_limits */, 0 /* nAbsurdFee */)) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
RelayTransaction(orphanTx, connman);
for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
vWorkQueue.emplace_back(orphanHash, i);
}
vEraseQueue.push_back(orphanHash);
}
else if (!fMissingInputs2)
{
int nDos = 0;
if (stateDummy.IsInvalid(nDos) && nDos > 0)
{
// Punish peer that gave us an invalid orphan tx
Misbehaving(fromPeer, nDos);
setMisbehaving.insert(fromPeer);
LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
}
// Has inputs but not accepted to mempool
// Probably non-standard or insufficient fee
LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
vEraseQueue.push_back(orphanHash);
if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) {
// Do not use rejection cache for witness transactions or
// witness-stripped transactions, as they can have been malleated.
// See https://github.com/bitcoin/bitcoin/issues/8279 for details.
assert(recentRejects);
recentRejects->insert(orphanHash);
}
}
mempool.check(pcoinsTip.get());
}
}
for (const uint256& hash : vEraseQueue)
EraseOrphanTx(hash);
ProcessOrphanTx(connman, pfrom->orphan_work_set, lRemovedTxn);
}
else if (fMissingInputs)
{
@ -3145,11 +3151,21 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams, connman, interruptMsgProc);
if (!pfrom->orphan_work_set.empty()) {
std::list<CTransactionRef> removed_txn;
LOCK2(cs_main, g_cs_orphans);
ProcessOrphanTx(connman, pfrom->orphan_work_set, removed_txn);
for (const CTransactionRef& removedTx : removed_txn) {
AddToCompactExtraTransactions(removedTx);
}
}
if (pfrom->fDisconnect)
return false;
// this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return true;
if (!pfrom->orphan_work_set.empty()) return true;
// Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend)

@ -346,7 +346,11 @@ UniValue importaddress(const JSONRPCRequest& request)
if (fRescan)
{
RescanWallet(*pwallet, reserver);
pwallet->ReacceptWalletTransactions();
{
auto locked_chain = pwallet->chain().lock();
LOCK(pwallet->cs_wallet);
pwallet->ReacceptWalletTransactions(*locked_chain);
}
}
return NullUniValue;
@ -529,7 +533,11 @@ UniValue importpubkey(const JSONRPCRequest& request)
if (fRescan)
{
RescanWallet(*pwallet, reserver);
pwallet->ReacceptWalletTransactions();
{
auto locked_chain = pwallet->chain().lock();
LOCK(pwallet->cs_wallet);
pwallet->ReacceptWalletTransactions(*locked_chain);
}
}
return NullUniValue;
@ -1464,7 +1472,11 @@ UniValue importmulti(const JSONRPCRequest& mainRequest)
}
if (fRescan && fRunScan && requests.size()) {
int64_t scannedTime = pwallet->RescanFromTime(nLowestTimestamp, reserver, true /* update */);
pwallet->ReacceptWalletTransactions();
{
auto locked_chain = pwallet->chain().lock();
LOCK(pwallet->cs_wallet);
pwallet->ReacceptWalletTransactions(*locked_chain);
}
if (pwallet->IsAbortingRescan()) {
throw JSONRPCError(RPC_MISC_ERROR, "Rescan aborted by user.");

@ -1859,13 +1859,11 @@ CWallet::ScanResult CWallet::ScanForWalletTransactions(const uint256& start_bloc
return result;
}
void CWallet::ReacceptWalletTransactions()
void CWallet::ReacceptWalletTransactions(interfaces::Chain::Lock& locked_chain)
{
// If transactions aren't being broadcasted, don't let them into local mempool either
if (!fBroadcastTransactions)
return;
auto locked_chain = chain().lock();
LOCK(cs_wallet);
std::map<int64_t, CWalletTx*> mapSorted;
// Sort pending wallet transactions based on their initial wallet insertion order
@ -1875,7 +1873,7 @@ void CWallet::ReacceptWalletTransactions()
CWalletTx& wtx = item.second;
assert(wtx.GetHash() == wtxid);
int nDepth = wtx.GetDepthInMainChain(*locked_chain);
int nDepth = wtx.GetDepthInMainChain(locked_chain);
if (!wtx.IsCoinBase() && (nDepth == 0 && !wtx.isAbandoned())) {
mapSorted.insert(std::make_pair(wtx.nOrderPos, &wtx));
@ -1886,7 +1884,7 @@ void CWallet::ReacceptWalletTransactions()
for (const std::pair<const int64_t, CWalletTx*>& item : mapSorted) {
CWalletTx& wtx = *(item.second);
CValidationState state;
wtx.AcceptToMemoryPool(*locked_chain, maxTxFee, state);
wtx.AcceptToMemoryPool(locked_chain, maxTxFee, state);
}
}
@ -4425,9 +4423,15 @@ std::shared_ptr<CWallet> CWallet::CreateWalletFromFile(interfaces::Chain& chain,
void CWallet::postInitProcess()
{
auto locked_chain = chain().lock();
LOCK(cs_wallet);
// Add wallet transactions that aren't already in a block to mempool
// Do this here as mempool requires genesis block to be loaded
ReacceptWalletTransactions();
ReacceptWalletTransactions(*locked_chain);
// Update wallet transactions with current mempool transactions.
chain().requestMempoolTransactions([this](const CTransactionRef& tx) { TransactionAddedToMempool(tx); });
}
bool CWallet::BackupWallet(const std::string& strDest)

@ -943,7 +943,7 @@ public:
};
ScanResult ScanForWalletTransactions(const uint256& first_block, const uint256& last_block, const WalletRescanReserver& reserver, bool fUpdate);
void TransactionRemovedFromMempool(const CTransactionRef &ptx) override;
void ReacceptWalletTransactions();
void ReacceptWalletTransactions(interfaces::Chain::Lock& locked_chain) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet);
void ResendWalletTransactions(int64_t nBestBlockTime, CConnman* connman) override EXCLUSIVE_LOCKS_REQUIRED(cs_main);
// ResendWalletTransactionsBefore may only be called if fBroadcastTransactions!
std::vector<uint256> ResendWalletTransactionsBefore(interfaces::Chain::Lock& locked_chain, int64_t nTime, CConnman* connman);

@ -129,5 +129,17 @@ class WalletTest(BitcoinTestFramework):
# getbalance with minconf=2 will show the new balance.
assert_equal(self.nodes[1].getbalance(minconf=2), Decimal('0'))
# check mempool transactions count for wallet unconfirmed balance after
# dynamically loading the wallet.
before = self.nodes[1].getunconfirmedbalance()
dst = self.nodes[1].getnewaddress()
self.nodes[1].unloadwallet('')
self.nodes[0].sendtoaddress(dst, 0.1)
self.sync_all()
self.nodes[1].loadwallet('')
after = self.nodes[1].getunconfirmedbalance()
assert_equal(before + Decimal('0.1'), after)
if __name__ == '__main__':
WalletTest().main()

@ -2,31 +2,70 @@
# Copyright (c) 2017-2018 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test resendwallettransactions RPC."""
"""Test that the wallet resends transactions periodically."""
from collections import defaultdict
import time
from test_framework.blocktools import create_block, create_coinbase
from test_framework.messages import ToHex
from test_framework.mininode import P2PInterface, mininode_lock
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_raises_rpc_error
from test_framework.util import assert_equal, wait_until
class P2PStoreTxInvs(P2PInterface):
def __init__(self):
super().__init__()
self.tx_invs_received = defaultdict(int)
def on_inv(self, message):
# Store how many times invs have been received for each tx.
for i in message.inv:
if i.type == 1:
# save txid
self.tx_invs_received[i.hash] += 1
class ResendWalletTransactionsTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.extra_args = [['--walletbroadcast=false']]
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
# Should raise RPC_WALLET_ERROR (-4) if walletbroadcast is disabled.
assert_raises_rpc_error(-4, "Error: Wallet transaction broadcasting is disabled with -walletbroadcast", self.nodes[0].resendwallettransactions)
node = self.nodes[0] # alias
# Should return an empty array if there aren't unconfirmed wallet transactions.
self.stop_node(0)
self.start_node(0, extra_args=[])
assert_equal(self.nodes[0].resendwallettransactions(), [])
node.add_p2p_connection(P2PStoreTxInvs())
# Should return an array with the unconfirmed wallet transaction.
txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1)
assert_equal(self.nodes[0].resendwallettransactions(), [txid])
self.log.info("Create a new transaction and wait until it's broadcast")
txid = int(node.sendtoaddress(node.getnewaddress(), 1), 16)
# Can take a few seconds due to transaction trickling
wait_until(lambda: node.p2p.tx_invs_received[txid] >= 1, lock=mininode_lock)
# Add a second peer since txs aren't rebroadcast to the same peer (see filterInventoryKnown)
node.add_p2p_connection(P2PStoreTxInvs())
self.log.info("Create a block")
# Create and submit a block without the transaction.
# Transactions are only rebroadcast if there has been a block at least five minutes
# after the last time we tried to broadcast. Use mocktime and give an extra minute to be sure.
block_time = int(time.time()) + 6 * 60
node.setmocktime(block_time)
block = create_block(int(node.getbestblockhash(), 16), create_coinbase(node.getblockchaininfo()['blocks']), block_time)
block.nVersion = 3
block.rehash()
block.solve()
node.submitblock(ToHex(block))
# Transaction should not be rebroadcast
node.p2ps[1].sync_with_ping()
assert_equal(node.p2ps[1].tx_invs_received[txid], 0)
self.log.info("Transaction should be rebroadcast after 30 minutes")
# Use mocktime and give an extra 5 minutes to be sure.
rebroadcast_time = int(time.time()) + 41 * 60
node.setmocktime(rebroadcast_time)
wait_until(lambda: node.p2ps[1].tx_invs_received[txid] >= 1, lock=mininode_lock)
if __name__ == '__main__':
ResendWalletTransactionsTest().main()