Extract request strategy stuff into a separate module

This commit is contained in:
Matt Joiner 2021-05-13 09:56:58 +10:00
parent 233135493f
commit 0d4e566fc0
10 changed files with 392 additions and 284 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync" "github.com/anacrolix/sync"
"github.com/anacrolix/torrent/internal/limiter" "github.com/anacrolix/torrent/internal/limiter"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/tracker"
"github.com/anacrolix/torrent/webtorrent" "github.com/anacrolix/torrent/webtorrent"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
@ -82,7 +83,7 @@ type Client struct {
activeAnnounceLimiter limiter.Instance activeAnnounceLimiter limiter.Instance
pieceRequestOrder clientPieceRequestOrder pieceRequestOrder request_strategy.ClientPieceOrder
} }
type ipStr string type ipStr string

View File

@ -554,6 +554,8 @@ func TestPeerInvalidHave(t *testing.T) {
t: tt, t: tt,
}} }}
cn.peerImpl = cn cn.peerImpl = cn
cl.lock()
defer cl.unlock()
assert.NoError(t, cn.peerSentHave(0)) assert.NoError(t, cn.peerSentHave(0))
assert.Error(t, cn.peerSentHave(1)) assert.Error(t, cn.peerSentHave(1))
} }

30
misc.go
View File

@ -5,29 +5,27 @@ import (
"net" "net"
"github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/torrent/types"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
) )
type ChunkSpec struct { type (
Begin, Length pp.Integer Request = types.Request
} ChunkSpec = types.ChunkSpec
piecePriority = types.PiecePriority
)
type Request struct { const (
Index pp.Integer PiecePriorityNormal = types.PiecePriorityNormal
ChunkSpec PiecePriorityNone = types.PiecePriorityNone
} PiecePriorityNow = types.PiecePriorityNow
PiecePriorityReadahead = types.PiecePriorityReadahead
func (r Request) ToMsg(mt pp.MessageType) pp.Message { PiecePriorityNext = types.PiecePriorityNext
return pp.Message{ PiecePriorityHigh = types.PiecePriorityHigh
Type: mt, )
Index: r.Index,
Begin: r.Begin,
Length: r.Length,
}
}
func newRequest(index, begin, length pp.Integer) Request { func newRequest(index, begin, length pp.Integer) Request {
return Request{index, ChunkSpec{begin, length}} return Request{index, ChunkSpec{begin, length}}

View File

@ -11,33 +11,6 @@ import (
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
) )
// Describes the importance of obtaining a particular piece.
type piecePriority byte
func (pp *piecePriority) Raise(maybe piecePriority) bool {
if maybe > *pp {
*pp = maybe
return true
}
return false
}
// Priority for use in PriorityBitmap
func (me piecePriority) BitmapPriority() int {
return -int(me)
}
const (
PiecePriorityNone piecePriority = iota // Not wanted. Must be the zero value.
PiecePriorityNormal // Wanted.
PiecePriorityHigh // Wanted a lot.
PiecePriorityReadahead // May be required soon.
// Succeeds a piece where a read occurred. Currently the same as Now,
// apparently due to issues with caching.
PiecePriorityNext
PiecePriorityNow // A Reader is reading in this piece. Highest urgency.
)
type Piece struct { type Piece struct {
// The completed piece SHA1 hash, from the metainfo "pieces" field. // The completed piece SHA1 hash, from the metainfo "pieces" field.
hash *metainfo.Hash hash *metainfo.Hash
@ -272,7 +245,7 @@ func (p *Piece) State() PieceState {
return p.t.PieceState(p.index) return p.t.PieceState(p.index)
} }
func (p *Piece) iterUndirtiedChunks(f func(ChunkSpec) bool) bool { func (p *Piece) iterUndirtiedChunks(f func(cs ChunkSpec) bool) bool {
for i := pp.Integer(0); i < p.numChunks(); i++ { for i := pp.Integer(0); i < p.numChunks(); i++ {
if p.chunkIndexDirty(i) { if p.chunkIndexDirty(i) {
continue continue

View File

@ -1,46 +1,14 @@
package torrent package torrent
import ( import (
"sort" "log"
"time" "time"
"unsafe" "unsafe"
"github.com/anacrolix/multiless" request_strategy "github.com/anacrolix/torrent/request-strategy"
pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/types"
"github.com/bradfitz/iter"
) )
type clientPieceRequestOrder struct {
pieces []pieceRequestOrderPiece
}
type pieceRequestOrderPiece struct {
t *Torrent
index pieceIndex
prio piecePriority
partial bool
availability int64
request bool
}
func (me *clientPieceRequestOrder) Len() int {
return len(me.pieces)
}
func (me clientPieceRequestOrder) sort() {
sort.Slice(me.pieces, me.less)
}
func (me clientPieceRequestOrder) less(_i, _j int) bool {
i := me.pieces[_i]
j := me.pieces[_j]
return multiless.New().Int(
int(j.prio), int(i.prio),
).Bool(
j.partial, i.partial,
).Int64(i.availability, j.availability).Int(i.index, j.index).Less()
}
func (cl *Client) requester() { func (cl *Client) requester() {
for { for {
func() { func() {
@ -56,229 +24,72 @@ func (cl *Client) requester() {
} }
} }
type requestsPeer struct {
cur *Peer
nextRequests map[Request]struct{}
nextInterest bool
requestablePiecesRemaining int
}
func (rp *requestsPeer) canRequestPiece(p pieceIndex) bool {
return rp.hasPiece(p) && (!rp.choking() || rp.pieceAllowedFast(p))
}
func (rp *requestsPeer) hasPiece(i pieceIndex) bool {
return rp.cur.peerHasPiece(i)
}
func (rp *requestsPeer) pieceAllowedFast(p pieceIndex) bool {
return rp.cur.peerAllowedFast.Contains(p)
}
func (rp *requestsPeer) choking() bool {
return rp.cur.peerChoking
}
func (rp *requestsPeer) hasExistingRequest(r Request) bool {
_, ok := rp.cur.requests[r]
return ok
}
func (rp *requestsPeer) canFitRequest() bool {
return len(rp.nextRequests) < rp.cur.nominalMaxRequests()
}
// Returns true if it is added and wasn't there before.
func (rp *requestsPeer) addNextRequest(r Request) bool {
_, ok := rp.nextRequests[r]
if ok {
return false
}
rp.nextRequests[r] = struct{}{}
return true
}
type peersForPieceRequests struct {
requestsInPiece int
*requestsPeer
}
func (me *peersForPieceRequests) addNextRequest(r Request) {
if me.requestsPeer.addNextRequest(r) {
return
me.requestsInPiece++
}
}
func (cl *Client) doRequests() { func (cl *Client) doRequests() {
requestOrder := &cl.pieceRequestOrder ts := make([]*request_strategy.Torrent, 0, len(cl.torrents))
requestOrder.pieces = requestOrder.pieces[:0]
allPeers := make(map[*Torrent][]*requestsPeer)
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl.
storageLeft := make(map[*func() *int64]*int64)
for _, t := range cl.torrents { for _, t := range cl.torrents {
// TODO: We could do metainfo requests here. rst := &request_strategy.Torrent{}
if !t.haveInfo() { if t.storage != nil {
continue rst.Capacity = t.storage.Capacity
} }
key := t.storage.Capacity for i := range t.pieces {
if key != nil { p := &t.pieces[i]
if _, ok := storageLeft[key]; !ok { rst.Pieces = append(rst.Pieces, request_strategy.Piece{
storageLeft[key] = (*key)() Request: !t.ignorePieceForRequests(i),
Priority: p.purePriority(),
Partial: t.piecePartiallyDownloaded(i),
Availability: p.availability,
Length: int64(p.length()),
NumPendingChunks: int(t.pieceNumPendingChunks(i)),
IterPendingChunks: func(f func(types.ChunkSpec)) {
p.iterUndirtiedChunks(func(cs ChunkSpec) bool {
f(cs)
return true
})
},
})
} }
}
var peers []*requestsPeer
t.iterPeers(func(p *Peer) { t.iterPeers(func(p *Peer) {
if !p.closed.IsSet() { if p.closed.IsSet() {
peers = append(peers, &requestsPeer{ return
cur: p, }
nextRequests: make(map[Request]struct{}), rst.Peers = append(rst.Peers, &request_strategy.Peer{
HasPiece: p.peerHasPiece,
MaxRequests: p.nominalMaxRequests,
HasExistingRequest: func(r request_strategy.Request) bool {
_, ok := p.requests[r]
return ok
},
Choking: p.peerChoking,
PieceAllowedFast: func(i pieceIndex) bool {
return p.peerAllowedFast.Contains(i)
},
DownloadRate: p.downloadRate(),
Age: time.Since(p.completedHandshake),
Id: unsafe.Pointer(p),
}) })
}
}) })
for i := range iter.N(t.numPieces()) { ts = append(ts, rst)
tp := t.piece(i)
pp := tp.purePriority()
request := !t.ignorePieceForRequests(i)
requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
t: t,
index: i,
prio: pp,
partial: t.piecePartiallyDownloaded(i),
availability: tp.availability,
request: request,
})
if request {
for _, p := range peers {
if p.canRequestPiece(i) {
p.requestablePiecesRemaining++
}
}
}
}
allPeers[t] = peers
}
requestOrder.sort()
for _, p := range requestOrder.pieces {
torrentPiece := p.t.piece(p.index)
if left := storageLeft[p.t.storage.Capacity]; left != nil {
if *left < int64(torrentPiece.length()) {
continue
}
*left -= int64(torrentPiece.length())
}
if !p.request {
continue
}
peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
for _, peer := range allPeers[p.t] {
peersForPiece = append(peersForPiece, &peersForPieceRequests{
requestsInPiece: 0,
requestsPeer: peer,
})
}
sortPeersForPiece := func() {
sort.Slice(peersForPiece, func(i, j int) bool {
return multiless.New().Int(
peersForPiece[i].requestsInPiece,
peersForPiece[j].requestsInPiece,
).Int(
peersForPiece[i].requestablePiecesRemaining,
peersForPiece[j].requestablePiecesRemaining,
).Float64(
peersForPiece[j].cur.downloadRate(),
peersForPiece[i].cur.downloadRate(),
).EagerSameLess(
peersForPiece[i].cur.completedHandshake.Equal(peersForPiece[j].cur.completedHandshake),
peersForPiece[i].cur.completedHandshake.Before(peersForPiece[j].cur.completedHandshake),
// TODO: Probably peer priority can come next
).Uintptr(
uintptr(unsafe.Pointer(peersForPiece[j].cur)),
uintptr(unsafe.Pointer(peersForPiece[i].cur)),
).Less()
})
}
pendingChunksRemaining := int(p.t.pieceNumPendingChunks(p.index))
torrentPiece.iterUndirtiedChunks(func(chunk ChunkSpec) bool {
req := Request{pp.Integer(p.index), chunk}
pendingChunksRemaining--
sortPeersForPiece()
skipped := 0
// Try up to the number of peers that could legitimately receive the request equal to
// the number of chunks left. This should ensure that only the best peers serve the last
// few chunks in a piece.
for _, peer := range peersForPiece {
if !peer.canFitRequest() || !peer.hasPiece(p.index) || (!peer.pieceAllowedFast(p.index) && peer.choking()) {
continue
}
if skipped > pendingChunksRemaining {
break
}
if !peer.hasExistingRequest(req) {
skipped++
continue
}
if !peer.pieceAllowedFast(p.index) {
// We must stay interested for this.
peer.nextInterest = true
}
peer.addNextRequest(req)
return true
}
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
}
if !peer.hasPiece(p.index) {
continue
}
if !peer.pieceAllowedFast(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast
// pieces.
peer.nextInterest = true
if peer.choking() {
continue
}
}
peer.addNextRequest(req)
return true
}
return true
})
if pendingChunksRemaining != 0 {
panic(pendingChunksRemaining)
}
for _, peer := range peersForPiece {
if peer.canRequestPiece(p.index) {
peer.requestablePiecesRemaining--
}
}
}
for _, peers := range allPeers {
for _, rp := range peers {
if rp.requestablePiecesRemaining != 0 {
panic(rp.requestablePiecesRemaining)
}
applyPeerNextRequests(rp)
} }
nextPeerStates := cl.pieceRequestOrder.DoRequests(ts)
for p, state := range nextPeerStates {
applyPeerNextRequestState(p, state)
} }
} }
func applyPeerNextRequests(rp *requestsPeer) { func applyPeerNextRequestState(_p request_strategy.PeerPointer, rp request_strategy.PeerNextRequestState) {
p := rp.cur p := (*Peer)(_p)
p.setInterested(rp.nextInterest) p.setInterested(rp.Interested)
for req := range p.requests { for req := range p.requests {
if _, ok := rp.nextRequests[req]; !ok { if _, ok := rp.Requests[req]; !ok {
p.cancel(req) p.cancel(req)
} }
} }
for req := range rp.nextRequests { for req := range rp.Requests {
err := p.request(req) err := p.request(req)
if err != nil { if err != nil {
panic(err) panic(err)
} else { } else {
//log.Print(req) log.Print(req)
} }
} }
} }

226
request-strategy/order.go Normal file
View File

@ -0,0 +1,226 @@
package request_strategy
import (
"sort"
"github.com/anacrolix/multiless"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/types"
)
type (
Request = types.Request
pieceIndex = types.PieceIndex
piecePriority = types.PiecePriority
)
type ClientPieceOrder struct {
pieces []pieceRequestOrderPiece
}
type pieceRequestOrderPiece struct {
t *Torrent
index pieceIndex
Piece
}
func (me *ClientPieceOrder) Len() int {
return len(me.pieces)
}
func (me ClientPieceOrder) sort() {
sort.Slice(me.pieces, me.less)
}
func (me ClientPieceOrder) less(_i, _j int) bool {
i := me.pieces[_i]
j := me.pieces[_j]
return multiless.New().Int(
int(j.Priority), int(i.Priority),
).Bool(
j.Partial, i.Partial,
).Int64(i.Availability, j.Availability).Int(i.index, j.index).Less()
}
type requestsPeer struct {
*Peer
nextState PeerNextRequestState
requestablePiecesRemaining int
}
func (rp *requestsPeer) canFitRequest() bool {
return len(rp.nextState.Requests) < rp.MaxRequests()
}
// Returns true if it is added and wasn't there before.
func (rp *requestsPeer) addNextRequest(r Request) bool {
_, ok := rp.nextState.Requests[r]
if ok {
return false
}
rp.nextState.Requests[r] = struct{}{}
return true
}
type peersForPieceRequests struct {
requestsInPiece int
*requestsPeer
}
func (me *peersForPieceRequests) addNextRequest(r Request) {
if me.requestsPeer.addNextRequest(r) {
return
me.requestsInPiece++
}
}
type Torrent struct {
Pieces []Piece
Capacity *func() *int64
Peers []*Peer // not closed.
}
func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerPointer]PeerNextRequestState {
requestOrder.pieces = requestOrder.pieces[:0]
allPeers := make(map[*Torrent][]*requestsPeer)
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl.
storageLeft := make(map[*func() *int64]*int64)
for _, t := range torrents {
// TODO: We could do metainfo requests here.
key := t.Capacity
if key != nil {
if _, ok := storageLeft[key]; !ok {
storageLeft[key] = (*key)()
}
}
var peers []*requestsPeer
for _, p := range t.Peers {
peers = append(peers, &requestsPeer{
Peer: p,
nextState: PeerNextRequestState{
Requests: make(map[Request]struct{}),
},
})
}
for i, tp := range t.Pieces {
requestOrder.pieces = append(requestOrder.pieces, pieceRequestOrderPiece{
t: t,
index: i,
Piece: tp,
})
if tp.Request {
for _, p := range peers {
if p.canRequestPiece(i) {
p.requestablePiecesRemaining++
}
}
}
}
allPeers[t] = peers
}
requestOrder.sort()
for _, p := range requestOrder.pieces {
torrentPiece := p
if left := storageLeft[p.t.Capacity]; left != nil {
if *left < int64(torrentPiece.Length) {
continue
}
*left -= int64(torrentPiece.Length)
}
if !p.Request {
continue
}
peersForPiece := make([]*peersForPieceRequests, 0, len(allPeers[p.t]))
for _, peer := range allPeers[p.t] {
peersForPiece = append(peersForPiece, &peersForPieceRequests{
requestsInPiece: 0,
requestsPeer: peer,
})
}
sortPeersForPiece := func() {
sort.Slice(peersForPiece, func(i, j int) bool {
return multiless.New().Int(
peersForPiece[i].requestsInPiece,
peersForPiece[j].requestsInPiece,
).Int(
peersForPiece[i].requestablePiecesRemaining,
peersForPiece[j].requestablePiecesRemaining,
).Float64(
peersForPiece[j].DownloadRate,
peersForPiece[i].DownloadRate,
).Int64(
int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
// TODO: Probably peer priority can come next
).Uintptr(
uintptr(peersForPiece[j].Id),
uintptr(peersForPiece[i].Id),
).MustLess()
})
}
pendingChunksRemaining := int(p.NumPendingChunks)
torrentPiece.IterPendingChunks(func(chunk types.ChunkSpec) {
req := Request{pp.Integer(p.index), chunk}
pendingChunksRemaining--
sortPeersForPiece()
skipped := 0
// Try up to the number of peers that could legitimately receive the request equal to
// the number of chunks left. This should ensure that only the best peers serve the last
// few chunks in a piece.
for _, peer := range peersForPiece {
if !peer.canFitRequest() || !peer.HasPiece(p.index) || (!peer.PieceAllowedFast(p.index) && peer.Choking) {
continue
}
if skipped > pendingChunksRemaining {
break
}
if !peer.HasExistingRequest(req) {
skipped++
continue
}
if !peer.PieceAllowedFast(p.index) {
// We must stay interested for this.
peer.nextState.Interested = true
}
peer.addNextRequest(req)
return
}
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
}
if !peer.HasPiece(p.index) {
continue
}
if !peer.PieceAllowedFast(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast
// pieces.
peer.nextState.Interested = true
if peer.Choking {
continue
}
}
peer.addNextRequest(req)
return
}
})
if pendingChunksRemaining != 0 {
panic(pendingChunksRemaining)
}
for _, peer := range peersForPiece {
if peer.canRequestPiece(p.index) {
peer.requestablePiecesRemaining--
}
}
}
ret := make(map[PeerPointer]PeerNextRequestState)
for _, peers := range allPeers {
for _, rp := range peers {
if rp.requestablePiecesRemaining != 0 {
panic(rp.requestablePiecesRemaining)
}
ret[rp.Id] = rp.nextState
}
}
return ret
}

View File

@ -0,0 +1 @@
package request_strategy

29
request-strategy/peer.go Normal file
View File

@ -0,0 +1,29 @@
package request_strategy
import (
"time"
"unsafe"
)
type PeerNextRequestState struct {
Interested bool
Requests map[Request]struct{}
}
type PeerPointer = unsafe.Pointer
type Peer struct {
HasPiece func(pieceIndex) bool
MaxRequests func() int
HasExistingRequest func(Request) bool
Choking bool
PieceAllowedFast func(pieceIndex) bool
DownloadRate float64
Age time.Duration
Id PeerPointer
}
// TODO: This might be used in more places I think.
func (p *Peer) canRequestPiece(i pieceIndex) bool {
return p.HasPiece(i) && (!p.Choking || p.PieceAllowedFast(i))
}

15
request-strategy/piece.go Normal file
View File

@ -0,0 +1,15 @@
package request_strategy
import (
"github.com/anacrolix/torrent/types"
)
type Piece struct {
Request bool
Priority piecePriority
Partial bool
Availability int64
Length int64
NumPendingChunks int
IterPendingChunks func(func(types.ChunkSpec))
}

52
types/types.go Normal file
View File

@ -0,0 +1,52 @@
package types
import (
pp "github.com/anacrolix/torrent/peer_protocol"
)
type PieceIndex = int
type ChunkSpec struct {
Begin, Length pp.Integer
}
type Request struct {
Index pp.Integer
ChunkSpec
}
func (r Request) ToMsg(mt pp.MessageType) pp.Message {
return pp.Message{
Type: mt,
Index: r.Index,
Begin: r.Begin,
Length: r.Length,
}
}
// Describes the importance of obtaining a particular piece.
type PiecePriority byte
func (pp *PiecePriority) Raise(maybe PiecePriority) bool {
if maybe > *pp {
*pp = maybe
return true
}
return false
}
// Priority for use in PriorityBitmap
func (me PiecePriority) BitmapPriority() int {
return -int(me)
}
const (
PiecePriorityNone PiecePriority = iota // Not wanted. Must be the zero value.
PiecePriorityNormal // Wanted.
PiecePriorityHigh // Wanted a lot.
PiecePriorityReadahead // May be required soon.
// Succeeds a piece where a read occurred. Currently the same as Now,
// apparently due to issues with caching.
PiecePriorityNext
PiecePriorityNow // A Reader is reading in this piece. Highest urgency.
)