Update webseeds for peer requesting

This commit is contained in:
Matt Joiner 2021-10-21 10:28:57 +11:00
parent faf385c163
commit 8eec0b665e
6 changed files with 46 additions and 13 deletions

View File

@ -957,8 +957,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
defer t.dropConnection(c)
c.startWriter()
cl.sendInitialMessages(c, t)
c.updateRequestsTimer = time.AfterFunc(math.MaxInt64, c.updateRequestsTimerFunc)
c.updateRequestsTimer.Stop()
c.initUpdateRequestsTimer()
err := c.mainReadLoop()
if err != nil {
return fmt.Errorf("main read loop: %w", err)
@ -966,7 +965,19 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
return nil
}
func (c *PeerConn) updateRequestsTimerFunc() {
const check = false
func (p *Peer) initUpdateRequestsTimer() {
if check {
if p.updateRequestsTimer != nil {
panic(p.updateRequestsTimer)
}
}
p.updateRequestsTimer = time.AfterFunc(math.MaxInt64, p.updateRequestsTimerFunc)
p.updateRequestsTimer.Stop()
}
func (c *Peer) updateRequestsTimerFunc() {
c.locker().Lock()
defer c.locker().Unlock()
if c.needRequestUpdate != "" {

View File

@ -8,7 +8,8 @@ import (
// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
// legacy PeerConn methods.
type peerImpl interface {
updateRequests(reason string)
// Trigger the actual request state to get updated
handleUpdateRequests()
writeInterested(interested bool) bool
// Neither of these return buffer room anymore, because they're currently both posted. There's

View File

@ -243,7 +243,7 @@ func (cn *Peer) peerHasAllPieces() (all bool, known bool) {
return roaring.Flip(&cn._peerPieces, 0, bitmap.BitRange(cn.t.numPieces())).IsEmpty(), true
}
func (cn *PeerConn) locker() *lockWithDeferreds {
func (cn *Peer) locker() *lockWithDeferreds {
return cn.t.cl.locker()
}
@ -403,6 +403,9 @@ func (p *Peer) close() {
if !p.closed.Set() {
return
}
if p.updateRequestsTimer != nil {
p.updateRequestsTimer.Stop()
}
p.peerImpl.onClose()
if p.t != nil {
p.t.decPeerPieceAvailability(p)
@ -416,9 +419,6 @@ func (cn *PeerConn) onClose() {
if cn.pex.IsEnabled() {
cn.pex.Close()
}
if cn.updateRequestsTimer != nil {
cn.updateRequestsTimer.Stop()
}
cn.tickleWriter()
if cn.conn != nil {
cn.conn.Close()
@ -638,7 +638,7 @@ func (me *PeerConn) _cancel(r RequestIndex) bool {
}
func (cn *PeerConn) fillWriteBuffer() {
if !cn.applyNextRequestState() {
if !cn.maybeUpdateActualRequestState() {
return
}
if cn.pex.IsEnabled() {
@ -674,11 +674,17 @@ func (cn *PeerConn) postBitfield() {
cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()}
}
func (cn *PeerConn) updateRequests(reason string) {
// Sets a reason to update requests, and if there wasn't already one, handle it.
func (cn *Peer) updateRequests(reason string) {
if cn.needRequestUpdate != "" {
return
}
cn.needRequestUpdate = reason
cn.handleUpdateRequests()
}
func (cn *PeerConn) handleUpdateRequests() {
// The writer determines the request state as needed when it can write.
cn.tickleWriter()
}

View File

@ -237,7 +237,7 @@ func (p *Peer) getDesiredRequestState() (desired requestState) {
return
}
func (p *Peer) applyNextRequestState() bool {
func (p *Peer) maybeUpdateActualRequestState() bool {
if p.needRequestUpdate == "" {
return true
}
@ -253,6 +253,7 @@ func (p *Peer) applyNextRequestState() bool {
return more
}
// Transmit/action the request state to the peer.
func (p *Peer) applyRequestState(next requestState) bool {
current := &p.actualRequestState
if !p.setInterested(next.Interested) {
@ -267,6 +268,12 @@ func (p *Peer) applyRequestState(next requestState) bool {
if !more {
return false
}
// We randomize the order in which requests are issued, to reduce the overlap with requests to
// other peers. Note that although it really depends on what order the peer services the
// requests, if we are only able to issue some requests before buffering, or the peer starts
// handling our requests before they've all arrived, then this randomization should reduce
// overlap. Note however that if we received the desired requests in priority order, then
// randomizing would throw away that benefit.
for _, x := range rand.Perm(int(next.Requests.GetCardinality())) {
req, err := next.Requests.Select(uint32(x))
if err != nil {
@ -277,6 +284,11 @@ func (p *Peer) applyRequestState(next requestState) bool {
// requests, so we can skip this one with no additional consideration.
continue
}
// The cardinality of our desired requests shouldn't exceed the max requests since it's used
// in the calculation of the requests. However if we cancelled requests and they haven't
// been rejected or serviced yet with the fast extension enabled, we can end up with more
// extra outstanding requests. We could subtract the number of outstanding cancels from the
// next request cardinality, but peers might not like that.
if maxRequests(current.Requests.GetCardinality()) >= p.nominalMaxRequests() {
//log.Printf("not assigning all requests [desired=%v, cancelled=%v, current=%v, max=%v]",
// next.Requests.GetCardinality(),

View File

@ -2226,6 +2226,7 @@ func (t *Torrent) addWebSeed(url string) {
},
activeRequests: make(map[Request]webseed.Request, maxRequests),
}
ws.peer.initUpdateRequestsTimer()
ws.requesterCond.L = t.cl.locker()
for i := 0; i < maxRequests; i += 1 {
go ws.requester()

View File

@ -8,6 +8,7 @@ import (
"strings"
"sync"
"github.com/anacrolix/log"
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
@ -105,11 +106,12 @@ func (ws *webseedPeer) connectionFlags() string {
// return bool if this is even possible, and if it isn't, skip to the next drop candidate.
func (ws *webseedPeer) drop() {}
func (ws *webseedPeer) updateRequests(reason string) {
func (ws *webseedPeer) handleUpdateRequests() {
ws.peer.maybeUpdateActualRequestState()
}
func (ws *webseedPeer) onClose() {
ws.peer.logger.Print("closing")
ws.peer.logger.WithLevel(log.Debug).Print("closing")
for _, r := range ws.activeRequests {
r.Cancel()
}