8000 draft: btcd: test pr 2389 by GustavoStingelin · Pull Request #2390 · btcsuite/btcd · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

draft: btcd: test pr 2389 #2390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 130 additions & 77 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ type server struct {
// agentWhitelist is a list of whitelisted user agent substrings, no
// whitelisting will be applied if the list is empty or nil.
agentWhitelist []string

// pushInventoryFunc allows injection for testing.
pushInventoryFunc func(sp *serverPeer, iv *wire.InvVect, doneChan chan<- struct{}) error
}

// serverPeer extends the peer to maintain state shared by the server and
Expand Down Expand Up @@ -690,82 +693,144 @@ func (sp *serverPeer) OnHeaders(_ *peer.Peer, msg *wire.MsgHeaders) {
// handleGetData is invoked when a peer receives a getdata bitcoin message and
// is used to deliver block and transaction information.
func (sp *serverPeer) OnGetData(_ *peer.Peer, msg *wire.MsgGetData) {
numAdded := 0
notFound := wire.NewMsgNotFound()
// failedMsg is an inventory that stores all the failed msgs - either
// the msg is an unknown type, or there's an error processing it.
failedMsg := wire.NewMsgNotFound()

length := len(msg.InvList)
// A decaying ban score increase is applied to prevent exhausting resources
// with unusually large inventory queries.
// Requesting more than the maximum inventory vector length within a short
// period of time yields a score above the default ban threshold. Sustained
// bursts of small requests are not penalized as that would potentially ban
// peers performing IBD.

// A decaying ban score increase is applied to prevent exhausting
// resources with unusually large inventory queries.
//
// Requesting more than the maximum inventory vector length within a
// short period of time yields a score above the default ban threshold.
// Sustained bursts of small requests are not penalized as that would
// potentially ban peers performing IBD.
//
// This incremental score decays each minute to half of its value.
if sp.addBanScore(0, uint32(length)*99/wire.MaxInvPerMsg, "getdata") {
return
}

// We wait on this wait channel periodically to prevent queuing
// far more data than we can send in a reasonable time, wasting memory.
// The waiting occurs after the database fetch for the next one to
// provide a little pipelining.
var waitChan chan struct{}
doneChan := make(chan struct{}, 1)
// We wait on this wait channel periodically to prevent queuing far
// more data than we can send in a reasonable time, wasting memory. The
// waiting occurs after the database fetch for the next one to provide
// a little pipelining.

// We now create a doneChans with a size of 5, which essentially
// behaves like a semaphore that allows 5 goroutines to be running at
// the same time.
const numBuffered = 5
doneChans := make([]chan struct{}, 0, numBuffered)

pushInventory := sp.server.pushInventory
if sp.server.pushInventoryFunc != nil {
pushInventory = sp.server.pushInventoryFunc
}

for i, iv := range msg.InvList {
var c chan struct{}
// If this will be the last message we send.
if i == length-1 && len(notFound.InvList) == 0 {
c = doneChan
} else if (i+1)%3 == 0 {
// Buffered so as to not make the send goroutine block.
c = make(chan struct{}, 1)
// doneChan behaves like a semaphore - every time a msg is
// processed, either succeeded or failed, a signal is sent to
// this doneChan.
doneChan := make(chan struct{}, 1)

// Add this doneChan for tracking.
doneChans = append(doneChans, doneChan)

err := pushInventory(sp, iv, doneChan)
if err != nil {
failedMsg.AddInvVect(iv)
}
var err error
switch iv.Type {
case wire.InvTypeWitnessTx:
err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
case wire.InvTypeTx:
err = sp.server.pushTxMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
case wire.InvTypeWitnessBlock:
err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
case wire.InvTypeBlock:
err = sp.server.pushBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
case wire.InvTypeFilteredWitnessBlock:
err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.WitnessEncoding)
case wire.InvTypeFilteredBlock:
err = sp.server.pushMerkleBlockMsg(sp, &iv.Hash, c, waitChan, wire.BaseEncoding)
default:
peerLog.Warnf("Unknown type in inventory request %d",
iv.Type)

// Move to the next item if we haven't processed 5 times yet.
if (i+1)%numBuffered != 0 {
continue
}
if err != nil {
notFound.AddInvVect(iv)

// When there is a failure fetching the final entry
// and the done channel was sent in due to there
// being no outstanding not found inventory, consume
// it here because there is now not found inventory
// that will use the channel momentarily.
if i == len(msg.InvList)-1 && c != nil {
<-c

// Empty all the slots.
for _, dc := range doneChans {
select {
// NOTE: We always expect am empty struct to be sent to
// this doneChan, even when `pushInventory` failed.
case <-dc:

// Exit if the server is shutting down.
case <-sp.quit:
peerLog.Debug("Server shutting down in " +
"OnGetData")

return
}
}
numAdded++
waitChan = c

// Re-initialize the done chans.
doneChans = make([]chan struct{}, numBuffered)
}
if len(notFound.InvList) != 0 {
sp.QueueMessage(notFound, doneChan)

if len(failedMsg.InvList) != 0 {
doneChan := make(chan struct{}, 1)

// Add this doneChan for tracking.
doneChans = append(doneChans, doneChan)

// Send the failed msgs.
sp.QueueMessage(failedMsg, doneChan)
}

// Wait for messages to be sent. We can send quite a lot of data at this
// point and this will keep the peer busy for a decent amount of time.
// We don't process anything else by them in this time so that we
// Wait for messages to be sent. We can send quite a lot of data at
// this point and this will keep the peer busy for a decent amount of
// time. We don't process anything else by them in this time so that we
// have an idea of when we should hear back from them - else the idle
// timeout could fire when we were only half done sending the blocks.
if numAdded > 0 {
<-doneChan
for _, dc := range doneChans {
select {
case <-dc:

// Exit if the server is shutting down.
case <-sp.quit:
peerLog.Debug("Server shutting down in OnGetData")
return
}
}
}

// pushInventory sends the requested inventory to the given peer.
func (s *server) pushInventory(sp *serverPeer, iv *wire.InvVect,
doneChan chan<- struct{}) error {

switch iv.Type {
case wire.InvTypeWitnessTx:
return s.pushTxMsg(sp, &iv.Hash, doneChan, wire.WitnessEncoding)

case wire.InvTypeTx:
return s.pushTxMsg(sp, &iv.Hash, doneChan, wire.BaseEncoding)

case wire.InvTypeWitnessBlock:
return s.pushBlockMsg(
sp, &iv.Hash, doneChan, wire.WitnessEncoding,
)

case wire.InvTypeBlock:
return s.pushBlockMsg(sp, &iv.Hash, doneChan, wire.BaseEncoding)

case wire.InvTypeFilteredWitnessBlock:
return s.pushMerkleBlockMsg(
sp, &iv.Hash, doneChan, wire.WitnessEncoding,
)

case wire.InvTypeFilteredBlock:
return s.pushMerkleBlockMsg(
sp, &iv.Hash, doneChan, wire.BaseEncoding,
)

default:
peerLog.Warnf("Unknown type in inventory request %d", iv.Type)

if doneChan != nil {
doneChan <- struct{}{}
}

return errors.New("unknown inventory type")
}
}

Expand Down Expand Up @@ -1536,8 +1601,8 @@ func (s *server) TransactionConfirmed(tx *btcutil.Tx) {

// pushTxMsg sends a tx message for the provided transaction hash to the
// connected peer. An error is returned if the transaction hash is not known.
func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash,
doneChan chan<- struct{}, encoding wire.MessageEncoding) error {

// Attempt to fetch the requested transaction from the pool. A
// call could be made to check for existence first, but simply trying
Expand All @@ -1553,20 +1618,15 @@ func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<-
return err
}

// Once we have fetched data wait for any previous operation to finish.
if waitChan != nil {
<-waitChan
}

sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding)

return nil
}

// pushBlockMsg sends a block message for the provided block hash to the
// connected peer. An error is returned if the block hash is not known.
func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan chan<- struct{},
waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash,
doneChan chan<- struct{}, encoding wire.MessageEncoding) error {

// Fetch the raw block bytes from the database.
var blockBytes []byte
Expand Down Expand Up @@ -1598,11 +1658,6 @@ func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan cha
return err
}

// Once we have fetched data wait for any previous operation to finish.
if waitChan != nil {
<-waitChan
}

// We only send the channel for this message if we aren't sending
// an inv straight after.
var dc chan<- struct{}
Expand Down Expand Up @@ -1634,7 +1689,7 @@ func (s *server) pushBlockMsg(sp *serverPeer, hash *chainhash.Hash, doneChan cha
// loaded, this call will simply be ignored if there is no filter loaded. An
// error is returned if the block hash is not known.
func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash,
doneChan chan<- struct{}, waitChan <-chan struct{}, encoding wire.MessageEncoding) error {
doneChan chan<- struct{}, encoding wire.MessageEncoding) error {

// Do not send a response if the peer doesn't have a filter loaded.
if !sp.filter.IsLoaded() {
Expand All @@ -1660,11 +1715,6 @@ func (s *server) pushMerkleBlockMsg(sp *serverPeer, hash *chainhash.Hash,
// to the filter for the peer.
merkle, matchedTxIndices := bloom.NewMerkleBlock(blk, sp.filter)

// Once we have fetched data wait for any previous operation to finish.
if waitChan != nil {
<-waitChan
}

// Send the merkleblock. Only send the done channel with this message
// if no transactions will be sent afterwards.
var dc chan<- struct{}
Expand Down Expand Up @@ -2831,6 +2881,9 @@ func newServer(listenAddrs, agentBlacklist, agentWhitelist []string,
agentWhitelist: agentWhitelist,
}

// Default pushInventoryFunc to real pushInventory if not set
s.pushInventoryFunc = s.pushInventory

// Create the transaction and address indexes if needed.
//
// CAUTION: the txindex needs to be first in the indexes array because
Expand Down
82 changes: 82 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"fmt"
"sync"
"testing"
"time"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
)

func TestOnGetData_FinalBatchDrained_Table(t *testing.T) {
cfg = &config{DisableBanning: true}

for _, invSize := range []int{0, 1, 2, 3, 4, 5, 6, 7, 10, 11, 15, 150, 1500, 5000} {
t.Run(fmt.Sprintf("invSize_%d", invSize), func(t *testing.T) {
t.Parallel()

msg := wire.NewMsgGetData()
doneChans := make([]chan struct{}, invSize)
for i := range doneChans {
h := chainhash.Hash{byte(i)}
msg.AddInvVect(wire.NewInvVect(wire.InvTypeTx, &h))
doneChans[i] = make(chan struct{}, 1)
}

s := &server{}
sp := &serverPeer{
server: s,
quit: make(chan struct{}),
}

closed := make([]bool, invSize)
var mu sync.Mutex

s.pushInventoryFunc = func(_ *serverPeer, _ *wire.InvVect, doneChan chan<- struct{}) error {
mu.Lock()
idx := -1
for i, wasClosed := range closed {
if !wasClosed {
closed[i] = true
idx = i
break
}
}
mu.Unlock()

if idx == -1 {
t.Fatal("No available index to mark closed")
}

go func(i int) {
time.Sleep(time.Microsecond)
doneChans[i] <- struct{}{}
doneChan <- struct{}{}
}(idx)
return nil
}

done := make(chan struct{})
go func() {
sp.OnGetData(nil, msg)
close(done)
}()

select {
case <-done:
case <-time.After(time.Millisecond*time.Duration(invSize) + time.Second):
t.Fatal("OnGetData did not return, possible deadlock")
}

for i, ch := range doneChans {
select {
case <-ch:
case <-time.After(100 * time.Millisecond):
t.Fatalf("doneChan %d not closed", i)
}
}
})
}
}
Loading
0