diff --git a/client.go b/client.go index 1efd9896..72013b99 100644 --- a/client.go +++ b/client.go @@ -957,8 +957,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { defer t.dropConnection(c) c.startWriter() cl.sendInitialMessages(c, t) - c.updateRequestsTimer = time.AfterFunc(math.MaxInt64, c.updateRequestsTimerFunc) - c.updateRequestsTimer.Stop() + c.initUpdateRequestsTimer() err := c.mainReadLoop() if err != nil { return fmt.Errorf("main read loop: %w", err) @@ -966,7 +965,19 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { return nil } -func (c *PeerConn) updateRequestsTimerFunc() { +const check = false + +func (p *Peer) initUpdateRequestsTimer() { + if check { + if p.updateRequestsTimer != nil { + panic(p.updateRequestsTimer) + } + } + p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc) + p.updateRequestsTimer.Stop() +} + +func (c *Peer) updateRequestsTimerFunc() { c.locker().Lock() defer c.locker().Unlock() if c.needRequestUpdate != "" { diff --git a/peer-impl.go b/peer-impl.go index eb926c74..4dbc6b4c 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -8,7 +8,8 @@ import ( // BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with // legacy PeerConn methods. type peerImpl interface { - updateRequests(reason string) + // Trigger the actual request state to get updated + handleUpdateRequests() writeInterested(interested bool) bool // Neither of these return buffer room anymore, because they're currently both posted. There's diff --git a/peerconn.go b/peerconn.go index 48453f2b..a9d0baad 100644 --- a/peerconn.go +++ b/peerconn.go @@ -243,7 +243,7 @@ func (cn *Peer) peerHasAllPieces() (all bool, known bool) { return roaring.Flip(&cn._peerPieces, 0, bitmap.BitRange(cn.t.numPieces())).IsEmpty(), true } -func (cn *PeerConn) locker() *lockWithDeferreds { +func (cn *Peer) locker() *lockWithDeferreds { return cn.t.cl.locker() } @@ -403,6 +403,9 @@ func (p *Peer) close() { if !p.closed.Set() { return } + if p.updateRequestsTimer != nil { + p.updateRequestsTimer.Stop() + } p.peerImpl.onClose() if p.t != nil { p.t.decPeerPieceAvailability(p) @@ -416,9 +419,6 @@ func (cn *PeerConn) onClose() { if cn.pex.IsEnabled() { cn.pex.Close() } - if cn.updateRequestsTimer != nil { - cn.updateRequestsTimer.Stop() - } cn.tickleWriter() if cn.conn != nil { cn.conn.Close() @@ -638,7 +638,7 @@ func (me *PeerConn) _cancel(r RequestIndex) bool { } func (cn *PeerConn) fillWriteBuffer() { - if !cn.applyNextRequestState() { + if !cn.maybeUpdateActualRequestState() { return } if cn.pex.IsEnabled() { @@ -674,11 +674,17 @@ func (cn *PeerConn) postBitfield() { cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()} } -func (cn *PeerConn) updateRequests(reason string) { +// Sets a reason to update requests, and if there wasn't already one, handle it. +func (cn *Peer) updateRequests(reason string) { if cn.needRequestUpdate != "" { return } cn.needRequestUpdate = reason + cn.handleUpdateRequests() +} + +func (cn *PeerConn) handleUpdateRequests() { + // The writer determines the request state as needed when it can write. cn.tickleWriter() } diff --git a/requesting.go b/requesting.go index 97e46e9c..6d707fde 100644 --- a/requesting.go +++ b/requesting.go @@ -237,7 +237,7 @@ func (p *Peer) getDesiredRequestState() (desired requestState) { return } -func (p *Peer) applyNextRequestState() bool { +func (p *Peer) maybeUpdateActualRequestState() bool { if p.needRequestUpdate == "" { return true } @@ -253,6 +253,7 @@ func (p *Peer) applyNextRequestState() bool { return more } +// Transmit/action the request state to the peer. func (p *Peer) applyRequestState(next requestState) bool { current := &p.actualRequestState if !p.setInterested(next.Interested) { @@ -267,6 +268,12 @@ func (p *Peer) applyRequestState(next requestState) bool { if !more { return false } + // We randomize the order in which requests are issued, to reduce the overlap with requests to + // other peers. Note that although it really depends on what order the peer services the + // requests, if we are only able to issue some requests before buffering, or the peer starts + // handling our requests before they've all arrived, then this randomization should reduce + // overlap. Note however that if we received the desired requests in priority order, then + // randomizing would throw away that benefit. for _, x := range rand.Perm(int(next.Requests.GetCardinality())) { req, err := next.Requests.Select(uint32(x)) if err != nil { @@ -277,6 +284,11 @@ func (p *Peer) applyRequestState(next requestState) bool { // requests, so we can skip this one with no additional consideration. continue } + // The cardinality of our desired requests shouldn't exceed the max requests since it's used + // in the calculation of the requests. However if we cancelled requests and they haven't + // been rejected or serviced yet with the fast extension enabled, we can end up with more + // extra outstanding requests. We could subtract the number of outstanding cancels from the + // next request cardinality, but peers might not like that. if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() { //log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]", // next.Requests.GetCardinality(), diff --git a/torrent.go b/torrent.go index 9af6a44d..92277342 100644 --- a/torrent.go +++ b/torrent.go @@ -2226,6 +2226,7 @@ func (t *Torrent) addWebSeed(url string) { }, activeRequests: make(map[Request]webseed.Request, maxRequests), } + ws.peer.initUpdateRequestsTimer() ws.requesterCond.L = t.cl.locker() for i := 0; i < maxRequests; i += 1 { go ws.requester() diff --git a/webseed-peer.go b/webseed-peer.go index 01518f45..68fbc3e7 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -8,6 +8,7 @@ import ( "strings" "sync" + "github.com/anacrolix/log" "github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" @@ -105,11 +106,12 @@ func (ws *webseedPeer) connectionFlags() string { // return bool if this is even possible, and if it isn't, skip to the next drop candidate. func (ws *webseedPeer) drop() {} -func (ws *webseedPeer) updateRequests(reason string) { +func (ws *webseedPeer) handleUpdateRequests() { + ws.peer.maybeUpdateActualRequestState() } func (ws *webseedPeer) onClose() { - ws.peer.logger.Print("closing") + ws.peer.logger.WithLevel(log.Debug).Print("closing") for _, r := range ws.activeRequests { r.Cancel() }