From 62b1e1b749c7d527bdf0bced9ec07b04867af058 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 6 Nov 2017 14:53:00 +1100 Subject: [PATCH] Unbiased request ordering was requesting chunks the peer doesn't have --- connection.go | 50 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/connection.go b/connection.go index 3667cf7a..54ebf59b 100644 --- a/connection.go +++ b/connection.go @@ -335,6 +335,29 @@ func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) }) } +// The function takes a message to be sent, and returns true if more messages +// are okay. +type messageWriter func(pp.Message) bool + +func (cn *connection) request(r request, mw messageWriter) bool { + if cn.requests == nil { + cn.requests = make(map[request]struct{}, cn.nominalMaxRequests()) + } + if _, ok := cn.requests[r]; ok { + panic("chunk already requested") + } + if !cn.PeerHasPiece(r.Index.Int()) { + panic("requesting piece peer doesn't have") + } + cn.requests[r] = struct{}{} + return mw(pp.Message{ + Type: pp.Request, + Index: r.Index, + Begin: r.Begin, + Length: r.Length, + }) +} + func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) { numFillBuffers.Add(1) cancel, new, i := cn.desiredRequestState() @@ -354,23 +377,13 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) { if len(new) != 0 { fillBufferSentRequests.Add(1) for _, r := range new { - if cn.requests == nil { - cn.requests = make(map[request]struct{}, cn.nominalMaxRequests()) - } - cn.requests[r] = struct{}{} - // log.Printf("%p: requesting %v", cn, r) - if !msg(pp.Message{ - Type: pp.Request, - Index: r.Index, - Begin: r.Begin, - Length: r.Length, - }) { + if !cn.request(r, msg) { + // If we didn't completely top up the requests, we shouldn't + // mark the low water, since we'll want to top up the requests + // as soon as we have more write buffer space. return } } - // If we didn't completely top up the requests, we shouldn't mark the - // low water, since we'll want to top up the requests as soon as we - // have more write buffer space. cn.requestsLowWater = len(cn.requests) / 2 } cn.upload(msg) @@ -503,6 +516,9 @@ func (cn *connection) updateRequests() { cn.tickleWriter() } +// Emits the indices in the Bitmaps bms in order, never repeating any index. +// skip is mutated during execution, and its initial values will never be +// emitted. func iterBitmapsDistinct(skip bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { return func(cb iter.Callback) { for _, bm := range bms { @@ -518,7 +534,9 @@ func iterBitmapsDistinct(skip bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { func (cn *connection) unbiasedPieceRequestOrder() iter.Func { now, readahead := cn.t.readerPiecePriorities() - return iterBitmapsDistinct(cn.t.completedPieces.Copy(), now, readahead, cn.t.pendingPieces) + skip := bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces()) + skip.Union(cn.t.completedPieces) + return iterBitmapsDistinct(skip, now, readahead, cn.t.pendingPieces) } // The connection should download highest priority pieces first, without any @@ -831,7 +849,7 @@ func (c *connection) mainReadLoop() error { // from our storage, and can't communicate this to peers // except by reconnecting. requestsReceivedForMissingPieces.Add(1) - err = errors.New("peer requested piece we don't have") + err = fmt.Errorf("peer requested piece we don't have: %v", msg.Index.Int()) break } if c.PeerRequests == nil {