From 0d4e566fc03447ed94988886141f45ee3ba51d6d Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 13 May 2021 09:56:58 +1000 Subject: [PATCH] Extract request strategy stuff into a separate module --- client.go | 3 +- client_test.go | 2 + misc.go | 30 ++-- piece.go | 29 +--- request-strategy.go | 289 ++++++--------------------------- request-strategy/order.go | 226 ++++++++++++++++++++++++++ request-strategy/order_test.go | 1 + request-strategy/peer.go | 29 ++++ request-strategy/piece.go | 15 ++ types/types.go | 52 ++++++ 10 files changed, 392 insertions(+), 284 deletions(-) create mode 100644 request-strategy/order.go create mode 100644 request-strategy/order_test.go create mode 100644 request-strategy/peer.go create mode 100644 request-strategy/piece.go create mode 100644 types/types.go diff --git a/client.go b/client.go index e5915c1f..8544d4e3 100644 --- a/client.go +++ b/client.go @@ -25,6 +25,7 @@ import ( "github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/sync" "github.com/anacrolix/torrent/internal/limiter" + request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/webtorrent" "github.com/davecgh/go-spew/spew" @@ -82,7 +83,7 @@ type Client struct { activeAnnounceLimiter limiter.Instance - pieceRequestOrder clientPieceRequestOrder + pieceRequestOrder request_strategy.ClientPieceOrder } type ipStr string diff --git a/client_test.go b/client_test.go index adcd0c72..e8f652eb 100644 --- a/client_test.go +++ b/client_test.go @@ -554,6 +554,8 @@ func TestPeerInvalidHave(t *testing.T) { t: tt, }} cn.peerImpl = cn + cl.lock() + defer cl.unlock() assert.NoError(t, cn.peerSentHave(0)) assert.Error(t, cn.peerSentHave(1)) } diff --git a/misc.go b/misc.go index 0cd85a3f..508f0a63 100644 --- a/misc.go +++ b/misc.go @@ -5,29 +5,27 @@ import ( "net" "github.com/anacrolix/missinggo/v2" + "github.com/anacrolix/torrent/types" "golang.org/x/time/rate" "github.com/anacrolix/torrent/metainfo" pp "github.com/anacrolix/torrent/peer_protocol" ) -type ChunkSpec struct { - Begin, Length pp.Integer -} +type ( + Request = types.Request + ChunkSpec = types.ChunkSpec + piecePriority = types.PiecePriority +) -type Request struct { - Index pp.Integer - ChunkSpec -} - -func (r Request) ToMsg(mt pp.MessageType) pp.Message { - return pp.Message{ - Type: mt, - Index: r.Index, - Begin: r.Begin, - Length: r.Length, - } -} +const ( + PiecePriorityNormal = types.PiecePriorityNormal + PiecePriorityNone = types.PiecePriorityNone + PiecePriorityNow = types.PiecePriorityNow + PiecePriorityReadahead = types.PiecePriorityReadahead + PiecePriorityNext = types.PiecePriorityNext + PiecePriorityHigh = types.PiecePriorityHigh +) func newRequest(index, begin, length pp.Integer) Request { return Request{index, ChunkSpec{begin, length}} diff --git a/piece.go b/piece.go index af9607c4..3ee648d3 100644 --- a/piece.go +++ b/piece.go @@ -11,33 +11,6 @@ import ( "github.com/anacrolix/torrent/storage" ) -// Describes the importance of obtaining a particular piece. -type piecePriority byte - -func (pp *piecePriority) Raise(maybe piecePriority) bool { - if maybe > *pp { - *pp = maybe - return true - } - return false -} - -// Priority for use in PriorityBitmap -func (me piecePriority) BitmapPriority() int { - return -int(me) -} - -const ( - PiecePriorityNone piecePriority = iota // Not wanted. Must be the zero value. - PiecePriorityNormal // Wanted. - PiecePriorityHigh // Wanted a lot. - PiecePriorityReadahead // May be required soon. - // Succeeds a piece where a read occurred. Currently the same as Now, - // apparently due to issues with caching. - PiecePriorityNext - PiecePriorityNow // A Reader is reading in this piece. Highest urgency. -) - type Piece struct { // The completed piece SHA1 hash, from the metainfo "pieces" field. hash *metainfo.Hash @@ -272,7 +245,7 @@ func (p *Piece) State() PieceState { return p.t.PieceState(p.index) } -func (p *Piece) iterUndirtiedChunks(f func(ChunkSpec) bool) bool { +func (p *Piece) iterUndirtiedChunks(f func(cs ChunkSpec) bool) bool { for i := pp.Integer(0); i < p.numChunks(); i++ { if p.chunkIndexDirty(i) { continue diff --git a/request-strategy.go b/request-strategy.go index 4682720a..a1221ad6 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -1,46 +1,14 @@ package torrent import ( - "sort" + "log" "time" "unsafe" - "github.com/anacrolix/multiless" - pp "github.com/anacrolix/torrent/peer_protocol" - "github.com/bradfitz/iter" + request_strategy "github.com/anacrolix/torrent/request-strategy" + "github.com/anacrolix/torrent/types" ) -type clientPieceRequestOrder struct { - pieces []pieceRequestOrderPiece -} - -type pieceRequestOrderPiece struct { - t *Torrent - index pieceIndex - prio piecePriority - partial bool - availability int64 - request bool -} - -func (me *clientPieceRequestOrder) Len() int { - return len(me.pieces) -} - -func (me clientPieceRequestOrder) sort() { - sort.Slice(me.pieces, me.less) -} - -func (me clientPieceRequestOrder) less(_i, _j int) bool { - i := me.pieces[_i] - j := me.pieces[_j] - return multiless.New().Int( - int(j.prio), int(i.prio), - ).Bool( - j.partial, i.partial, - ).Int64(i.availability, j.availability).Int(i.index, j.index).Less() -} - func (cl *Client) requester() { for { func() { @@ -56,229 +24,72 @@ func (cl *Client) requester() { } } -type requestsPeer struct { - cur *Peer - nextRequests map[Request]struct{} - nextInterest bool - requestablePiecesRemaining int -} - -func (rp *requestsPeer) canRequestPiece(p pieceIndex) bool { - return rp.hasPiece(p) && (!rp.choking() || rp.pieceAllowedFast(p)) -} - -func (rp *requestsPeer) hasPiece(i pieceIndex) bool { - return rp.cur.peerHasPiece(i) -} - -func (rp *requestsPeer) pieceAllowedFast(p pieceIndex) bool { - return rp.cur.peerAllowedFast.Contains(p) -} - -func (rp *requestsPeer) choking() bool { - return rp.cur.peerChoking -} - -func (rp *requestsPeer) hasExistingRequest(r Request) bool { - _, ok := rp.cur.requests[r] - return ok -} - -func (rp *requestsPeer) canFitRequest() bool { - return len(rp.nextRequests) < rp.cur.nominalMaxRequests() -} - -// Returns true if it is added and wasn't there before. -func (rp *requestsPeer) addNextRequest(r Request) bool { - _, ok := rp.nextRequests[r] - if ok { - return false - } - rp.nextRequests[r] = struct{}{} - return true -} - -type peersForPieceRequests struct { - requestsInPiece int - *requestsPeer -} - -func (me *peersForPieceRequests) addNextRequest(r Request) { - if me.requestsPeer.addNextRequest(r) { - return - me.requestsInPiece++ - } -} - func (cl *Client) doRequests() { - requestOrder := &cl.pieceRequestOrder - requestOrder.pieces = requestOrder.pieces[:0] - allPeers := make(map[*Torrent][]*requestsPeer) - // Storage capacity left for this run, keyed by the storage capacity pointer on the storage - // TorrentImpl. - storageLeft := make(map[*func() *int64]*int64) + ts := make([]*request_strategy.Torrent, 0, len(cl.torrents)) for _, t := range cl.torrents { - // TODO: We could do metainfo requests here. - if !t.haveInfo() { - continue + rst := &request_strategy.Torrent{} + if t.storage != nil { + rst.Capacity = t.storage.Capacity } - key := t.storage.Capacity - if key != nil { - if _, ok := storageLeft[key]; !ok { - storageLeft[key] = (*key)() - } + for i := range t.pieces { + p := &t.pieces[i] + rst.Pieces = append(rst.Pieces, request_strategy.Piece{ + Request: !t.ignorePieceForRequests(i), + Priority: p.purePriority(), + Partial: t.piecePartiallyDownloaded(i), + Availability: p.availability, + Length: int64(p.length()), + NumPendingChunks: int(t.pieceNumPendingChunks(i)), + IterPendingChunks: func(f func(types.ChunkSpec)) { + p.iterUndirtiedChunks(func(cs ChunkSpec) bool { + f(cs) + return true + }) + }, + }) } - var peers []*requestsPeer t.iterPeers(func(p *Peer) { - if !p.closed.IsSet() { - peers = append(peers, &requestsPeer{ - cur: p, - nextRequests: make(map[Request]struct{}), - }) + if p.closed.IsSet() { + return } + rst.Peers = append(rst.Peers, &request_strategy.Peer{ + HasPiece: p.peerHasPiece, + MaxRequests: p.nominalMaxRequests, + HasExistingRequest: func(r request_strategy.Request) bool { + _, ok := p.requests[r] + return ok + }, + Choking: p.peerChoking, + PieceAllowedFast: func(i pieceIndex) bool { + return p.peerAllowedFast.Contains(i) + }, + DownloadRate: p.downloadRate(), + Age: time.Since(p.completedHandshake), + Id: unsafe.Pointer(p), + }) }) - for i := range iter.N(t.numPieces()) { - tp := t.piece(i) - pp := tp.purePriority() - request := !t.ignorePieceForRequests(i) - requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{ - t: t, - index: i, - prio: pp, - partial: t.piecePartiallyDownloaded(i), - availability: tp.availability, - request: request, - }) - if request { - for _, p := range peers { - if p.canRequestPiece(i) { - p.requestablePiecesRemaining++ - } - } - } - } - allPeers[t] = peers + ts = append(ts, rst) } - requestOrder.sort() - for _, p := range requestOrder.pieces { - torrentPiece := p.t.piece(p.index) - if left := storageLeft[p.t.storage.Capacity]; left != nil { - if *left < int64(torrentPiece.length()) { - continue - } - *left -= int64(torrentPiece.length()) - } - if !p.request { - continue - } - peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t])) - for _, peer := range allPeers[p.t] { - peersForPiece = append(peersForPiece, &peersForPieceRequests{ - requestsInPiece: 0, - requestsPeer: peer, - }) - } - sortPeersForPiece := func() { - sort.Slice(peersForPiece, func(i, j int) bool { - return multiless.New().Int( - peersForPiece[i].requestsInPiece, - peersForPiece[j].requestsInPiece, - ).Int( - peersForPiece[i].requestablePiecesRemaining, - peersForPiece[j].requestablePiecesRemaining, - ).Float64( - peersForPiece[j].cur.downloadRate(), - peersForPiece[i].cur.downloadRate(), - ).EagerSameLess( - peersForPiece[i].cur.completedHandshake.Equal(peersForPiece[j].cur.completedHandshake), - peersForPiece[i].cur.completedHandshake.Before(peersForPiece[j].cur.completedHandshake), - // TODO: Probably peer priority can come next - ).Uintptr( - uintptr(unsafe.Pointer(peersForPiece[j].cur)), - uintptr(unsafe.Pointer(peersForPiece[i].cur)), - ).Less() - }) - } - pendingChunksRemaining := int(p.t.pieceNumPendingChunks(p.index)) - torrentPiece.iterUndirtiedChunks(func(chunk ChunkSpec) bool { - req := Request{pp.Integer(p.index), chunk} - pendingChunksRemaining-- - sortPeersForPiece() - skipped := 0 - // Try up to the number of peers that could legitimately receive the request equal to - // the number of chunks left. This should ensure that only the best peers serve the last - // few chunks in a piece. - for _, peer := range peersForPiece { - if !peer.canFitRequest() || !peer.hasPiece(p.index) || (!peer.pieceAllowedFast(p.index) && peer.choking()) { - continue - } - if skipped > pendingChunksRemaining { - break - } - if !peer.hasExistingRequest(req) { - skipped++ - continue - } - if !peer.pieceAllowedFast(p.index) { - // We must stay interested for this. - peer.nextInterest = true - } - peer.addNextRequest(req) - return true - } - for _, peer := range peersForPiece { - if !peer.canFitRequest() { - continue - } - if !peer.hasPiece(p.index) { - continue - } - if !peer.pieceAllowedFast(p.index) { - // TODO: Verify that's okay to stay uninterested if we request allowed fast - // pieces. - peer.nextInterest = true - if peer.choking() { - continue - } - } - peer.addNextRequest(req) - return true - } - return true - }) - if pendingChunksRemaining != 0 { - panic(pendingChunksRemaining) - } - for _, peer := range peersForPiece { - if peer.canRequestPiece(p.index) { - peer.requestablePiecesRemaining-- - } - } - } - for _, peers := range allPeers { - for _, rp := range peers { - if rp.requestablePiecesRemaining != 0 { - panic(rp.requestablePiecesRemaining) - } - applyPeerNextRequests(rp) - } + nextPeerStates := cl.pieceRequestOrder.DoRequests(ts) + for p, state := range nextPeerStates { + applyPeerNextRequestState(p, state) } } -func applyPeerNextRequests(rp *requestsPeer) { - p := rp.cur - p.setInterested(rp.nextInterest) +func applyPeerNextRequestState(_p request_strategy.PeerPointer, rp request_strategy.PeerNextRequestState) { + p := (*Peer)(_p) + p.setInterested(rp.Interested) for req := range p.requests { - if _, ok := rp.nextRequests[req]; !ok { + if _, ok := rp.Requests[req]; !ok { p.cancel(req) } } - for req := range rp.nextRequests { + for req := range rp.Requests { err := p.request(req) if err != nil { panic(err) } else { - //log.Print(req) + log.Print(req) } } } diff --git a/request-strategy/order.go b/request-strategy/order.go new file mode 100644 index 00000000..3c7d82ab --- /dev/null +++ b/request-strategy/order.go @@ -0,0 +1,226 @@ +package request_strategy + +import ( + "sort" + + "github.com/anacrolix/multiless" + pp "github.com/anacrolix/torrent/peer_protocol" + "github.com/anacrolix/torrent/types" +) + +type ( + Request = types.Request + pieceIndex = types.PieceIndex + piecePriority = types.PiecePriority +) + +type ClientPieceOrder struct { + pieces []pieceRequestOrderPiece +} + +type pieceRequestOrderPiece struct { + t *Torrent + index pieceIndex + Piece +} + +func (me *ClientPieceOrder) Len() int { + return len(me.pieces) +} + +func (me ClientPieceOrder) sort() { + sort.Slice(me.pieces, me.less) +} + +func (me ClientPieceOrder) less(_i, _j int) bool { + i := me.pieces[_i] + j := me.pieces[_j] + return multiless.New().Int( + int(j.Priority), int(i.Priority), + ).Bool( + j.Partial, i.Partial, + ).Int64(i.Availability, j.Availability).Int(i.index, j.index).Less() +} + +type requestsPeer struct { + *Peer + nextState PeerNextRequestState + requestablePiecesRemaining int +} + +func (rp *requestsPeer) canFitRequest() bool { + return len(rp.nextState.Requests) < rp.MaxRequests() +} + +// Returns true if it is added and wasn't there before. +func (rp *requestsPeer) addNextRequest(r Request) bool { + _, ok := rp.nextState.Requests[r] + if ok { + return false + } + rp.nextState.Requests[r] = struct{}{} + return true +} + +type peersForPieceRequests struct { + requestsInPiece int + *requestsPeer +} + +func (me *peersForPieceRequests) addNextRequest(r Request) { + if me.requestsPeer.addNextRequest(r) { + return + me.requestsInPiece++ + } +} + +type Torrent struct { + Pieces []Piece + Capacity *func() *int64 + Peers []*Peer // not closed. +} + +func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerPointer]PeerNextRequestState { + requestOrder.pieces = requestOrder.pieces[:0] + allPeers := make(map[*Torrent][]*requestsPeer) + // Storage capacity left for this run, keyed by the storage capacity pointer on the storage + // TorrentImpl. + storageLeft := make(map[*func() *int64]*int64) + for _, t := range torrents { + // TODO: We could do metainfo requests here. + key := t.Capacity + if key != nil { + if _, ok := storageLeft[key]; !ok { + storageLeft[key] = (*key)() + } + } + var peers []*requestsPeer + for _, p := range t.Peers { + peers = append(peers, &requestsPeer{ + Peer: p, + nextState: PeerNextRequestState{ + Requests: make(map[Request]struct{}), + }, + }) + } + for i, tp := range t.Pieces { + requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{ + t: t, + index: i, + Piece: tp, + }) + if tp.Request { + for _, p := range peers { + if p.canRequestPiece(i) { + p.requestablePiecesRemaining++ + } + } + } + } + allPeers[t] = peers + } + requestOrder.sort() + for _, p := range requestOrder.pieces { + torrentPiece := p + if left := storageLeft[p.t.Capacity]; left != nil { + if *left < int64(torrentPiece.Length) { + continue + } + *left -= int64(torrentPiece.Length) + } + if !p.Request { + continue + } + peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t])) + for _, peer := range allPeers[p.t] { + peersForPiece = append(peersForPiece, &peersForPieceRequests{ + requestsInPiece: 0, + requestsPeer: peer, + }) + } + sortPeersForPiece := func() { + sort.Slice(peersForPiece, func(i, j int) bool { + return multiless.New().Int( + peersForPiece[i].requestsInPiece, + peersForPiece[j].requestsInPiece, + ).Int( + peersForPiece[i].requestablePiecesRemaining, + peersForPiece[j].requestablePiecesRemaining, + ).Float64( + peersForPiece[j].DownloadRate, + peersForPiece[i].DownloadRate, + ).Int64( + int64(peersForPiece[j].Age), int64(peersForPiece[i].Age), + // TODO: Probably peer priority can come next + ).Uintptr( + uintptr(peersForPiece[j].Id), + uintptr(peersForPiece[i].Id), + ).MustLess() + }) + } + pendingChunksRemaining := int(p.NumPendingChunks) + torrentPiece.IterPendingChunks(func(chunk types.ChunkSpec) { + req := Request{pp.Integer(p.index), chunk} + pendingChunksRemaining-- + sortPeersForPiece() + skipped := 0 + // Try up to the number of peers that could legitimately receive the request equal to + // the number of chunks left. This should ensure that only the best peers serve the last + // few chunks in a piece. + for _, peer := range peersForPiece { + if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.PieceAllowedFast(p.index) && peer.Choking) { + continue + } + if skipped > pendingChunksRemaining { + break + } + if !peer.HasExistingRequest(req) { + skipped++ + continue + } + if !peer.PieceAllowedFast(p.index) { + // We must stay interested for this. + peer.nextState.Interested = true + } + peer.addNextRequest(req) + return + } + for _, peer := range peersForPiece { + if !peer.canFitRequest() { + continue + } + if !peer.HasPiece(p.index) { + continue + } + if !peer.PieceAllowedFast(p.index) { + // TODO: Verify that's okay to stay uninterested if we request allowed fast + // pieces. + peer.nextState.Interested = true + if peer.Choking { + continue + } + } + peer.addNextRequest(req) + return + } + }) + if pendingChunksRemaining != 0 { + panic(pendingChunksRemaining) + } + for _, peer := range peersForPiece { + if peer.canRequestPiece(p.index) { + peer.requestablePiecesRemaining-- + } + } + } + ret := make(map[PeerPointer]PeerNextRequestState) + for _, peers := range allPeers { + for _, rp := range peers { + if rp.requestablePiecesRemaining != 0 { + panic(rp.requestablePiecesRemaining) + } + ret[rp.Id] = rp.nextState + } + } + return ret +} diff --git a/request-strategy/order_test.go b/request-strategy/order_test.go new file mode 100644 index 00000000..e014624e --- /dev/null +++ b/request-strategy/order_test.go @@ -0,0 +1 @@ +package request_strategy diff --git a/request-strategy/peer.go b/request-strategy/peer.go new file mode 100644 index 00000000..4a3d0689 --- /dev/null +++ b/request-strategy/peer.go @@ -0,0 +1,29 @@ +package request_strategy + +import ( + "time" + "unsafe" +) + +type PeerNextRequestState struct { + Interested bool + Requests map[Request]struct{} +} + +type PeerPointer = unsafe.Pointer + +type Peer struct { + HasPiece func(pieceIndex) bool + MaxRequests func() int + HasExistingRequest func(Request) bool + Choking bool + PieceAllowedFast func(pieceIndex) bool + DownloadRate float64 + Age time.Duration + Id PeerPointer +} + +// TODO: This might be used in more places I think. +func (p *Peer) canRequestPiece(i pieceIndex) bool { + return p.HasPiece(i) && (!p.Choking || p.PieceAllowedFast(i)) +} diff --git a/request-strategy/piece.go b/request-strategy/piece.go new file mode 100644 index 00000000..ec778a8c --- /dev/null +++ b/request-strategy/piece.go @@ -0,0 +1,15 @@ +package request_strategy + +import ( + "github.com/anacrolix/torrent/types" +) + +type Piece struct { + Request bool + Priority piecePriority + Partial bool + Availability int64 + Length int64 + NumPendingChunks int + IterPendingChunks func(func(types.ChunkSpec)) +} diff --git a/types/types.go b/types/types.go new file mode 100644 index 00000000..a06f7e6a --- /dev/null +++ b/types/types.go @@ -0,0 +1,52 @@ +package types + +import ( + pp "github.com/anacrolix/torrent/peer_protocol" +) + +type PieceIndex = int + +type ChunkSpec struct { + Begin, Length pp.Integer +} + +type Request struct { + Index pp.Integer + ChunkSpec +} + +func (r Request) ToMsg(mt pp.MessageType) pp.Message { + return pp.Message{ + Type: mt, + Index: r.Index, + Begin: r.Begin, + Length: r.Length, + } +} + +// Describes the importance of obtaining a particular piece. +type PiecePriority byte + +func (pp *PiecePriority) Raise(maybe PiecePriority) bool { + if maybe > *pp { + *pp = maybe + return true + } + return false +} + +// Priority for use in PriorityBitmap +func (me PiecePriority) BitmapPriority() int { + return -int(me) +} + +const ( + PiecePriorityNone PiecePriority = iota // Not wanted. Must be the zero value. + PiecePriorityNormal // Wanted. + PiecePriorityHigh // Wanted a lot. + PiecePriorityReadahead // May be required soon. + // Succeeds a piece where a read occurred. Currently the same as Now, + // apparently due to issues with caching. + PiecePriorityNext + PiecePriorityNow // A Reader is reading in this piece. Highest urgency. +)