diff --git a/peer-impl.go b/peer-impl.go index f4ad12a2..880b8f35 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -10,12 +10,14 @@ import ( type peerImpl interface { updateRequests() writeInterested(interested bool) bool - cancel(Request) bool - // Return true if there's room for more activity. - request(Request) bool + + // Neither of these return buffer room anymore, because they're currently both posted. There's + // also PeerConn.writeBufferFull for when/where it matters. + _cancel(Request) + _request(Request) + connectionFlags() string onClose() - _postCancel(Request) onGotInfo(*metainfo.Info) drop() String() string diff --git a/peerconn.go b/peerconn.go index 1ce06994..aa31eec8 100644 --- a/peerconn.go +++ b/peerconn.go @@ -529,15 +529,15 @@ func (pc *PeerConn) writeInterested(interested bool) bool { // are okay. type messageWriter func(pp.Message) bool -func (cn *Peer) request(r Request) (more bool, err error) { +func (cn *Peer) request(r Request) error { if _, ok := cn.requests[r]; ok { - return true, nil + return nil } if cn.numLocalRequests() >= cn.nominalMaxRequests() { - return true, errors.New("too many outstanding requests") + return errors.New("too many outstanding requests") } if !cn.peerHasPiece(pieceIndex(r.Index)) { - return true, errors.New("requesting piece peer doesn't have") + return errors.New("requesting piece peer doesn't have") } if !cn.t.peerIsActive(cn) { panic("requesting but not in active conns") @@ -545,19 +545,22 @@ func (cn *Peer) request(r Request) (more bool, err error) { if cn.closed.IsSet() { panic("requesting when connection is closed") } - if cn.peerChoking { - if cn.peerAllowedFast.Get(int(r.Index)) { - torrent.Add("allowed fast requests sent", 1) - } else { - return cn.setInterested(true), errors.New("requesting while choked and not allowed fast") - } - } if cn.t.hashingPiece(pieceIndex(r.Index)) { panic("piece is being hashed") } if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) { panic("piece is queued for hash") } + if !cn.setInterested(true) { + return errors.New("write buffer full after expressing interest") + } + if cn.peerChoking { + if cn.peerAllowedFast.Get(int(r.Index)) { + torrent.Add("allowed fast requests sent", 1) + } else { + errors.New("peer choking and piece not in allowed fast set") + } + } if cn.requests == nil { cn.requests = make(map[Request]struct{}) } @@ -571,11 +574,12 @@ func (cn *Peer) request(r Request) (more bool, err error) { for _, f := range cn.callbacks.SentRequest { f(PeerRequestEvent{cn, r}) } - return cn.peerImpl.request(r), nil + cn.peerImpl._request(r) + return nil } -func (me *PeerConn) request(r Request) bool { - return me.write(pp.Message{ +func (me *PeerConn) _request(r Request) { + me.write(pp.Message{ Type: pp.Request, Index: r.Index, Begin: r.Begin, @@ -583,8 +587,14 @@ func (me *PeerConn) request(r Request) bool { }) } -func (me *PeerConn) cancel(r Request) bool { - return me.write(makeCancelMessage(r)) +func (me *Peer) cancel(r Request) { + if me.deleteRequest(r) { + me.peerImpl._cancel(r) + } +} + +func (me *PeerConn) _cancel(r Request) { + me.write(makeCancelMessage(r)) } func (cn *PeerConn) fillWriteBuffer() { @@ -1317,7 +1327,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { if p == c { return } - p.postCancel(req) + p.cancel(req) }) err := func() error { @@ -1472,26 +1482,6 @@ func (c *Peer) deleteRequest(r Request) bool { if n < 0 { panic(n) } - // If a request fails, updating the requests for the current peer first may miss the opportunity - // to try other peers for that request instead, depending on the request strategy. This might - // only affect webseed peers though, since they synchronously issue new requests: PeerConns do - // it in the writer routine. - const updateCurrentConnRequestsFirst = false - if updateCurrentConnRequestsFirst { - c.updateRequests() - } - // Give other conns a chance to pick up the request. - c.t.iterPeers(func(_c *Peer) { - // We previously checked that the peer wasn't interested to to only wake connections that - // were unable to issue requests due to starvation by the request strategy. There could be - // performance ramifications. - if _c != c && c.peerHasPiece(pieceIndex(r.Index)) { - _c.updateRequests() - } - }) - if !updateCurrentConnRequestsFirst { - c.updateRequests() - } return true } @@ -1513,18 +1503,6 @@ func (c *PeerConn) tickleWriter() { c.writerCond.Broadcast() } -func (c *Peer) postCancel(r Request) bool { - if !c.deleteRequest(r) { - return false - } - c.peerImpl._postCancel(r) - return true -} - -func (c *PeerConn) _postCancel(r Request) { - c.post(makeCancelMessage(r)) -} - func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) { c.lastChunkSent = time.Now() return msg(pp.Message{ diff --git a/webseed-peer.go b/webseed-peer.go index 9fa77a28..5f2980c3 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -41,30 +41,23 @@ func (ws *webseedPeer) onGotInfo(info *metainfo.Info) { ws.client.Info = info } -func (ws *webseedPeer) _postCancel(r Request) { - ws.cancel(r) -} - func (ws *webseedPeer) writeInterested(interested bool) bool { return true } -func (ws *webseedPeer) cancel(r Request) bool { +func (ws *webseedPeer) _cancel(r Request) { active, ok := ws.activeRequests[r] - if !ok { - return false + if ok { + active.Cancel() } - active.Cancel() - return true } func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)} } -func (ws *webseedPeer) request(r Request) bool { +func (ws *webseedPeer) _request(r Request) { ws.requesterCond.Signal() - return true } func (ws *webseedPeer) doRequest(r Request) {