Use interfaces to lazily expose the bare minimum inputs to GetRequestablePieces

This commit is contained in:
Matt Joiner 2021-12-01 19:21:25 +11:00
parent 4088e847f3
commit 135f21fb64
5 changed files with 114 additions and 449 deletions

74
request-strategy-impls.go Normal file
View File

@ -0,0 +1,74 @@
package torrent
import (
"github.com/anacrolix/torrent/metainfo"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
)
type requestStrategyInput struct {
cl *Client
capFunc storage.TorrentCapacity
}
func (r requestStrategyInput) Torrent(ih metainfo.Hash) request_strategy.Torrent {
return requestStrategyTorrent{r.cl.torrents[ih]}
}
func (r requestStrategyInput) Capacity() (int64, bool) {
if r.capFunc == nil {
return 0, false
}
return (*r.capFunc)()
}
func (r requestStrategyInput) MaxUnverifiedBytes() int64 {
return r.cl.config.MaxUnverifiedBytes
}
var _ request_strategy.Input = requestStrategyInput{}
// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
return requestStrategyInput{
cl: cl,
capFunc: primaryTorrent.storage.Capacity,
}
}
func (t *Torrent) getRequestStrategyInput() request_strategy.Input {
return t.cl.getRequestStrategyInput(t)
}
type requestStrategyTorrent struct {
t *Torrent
}
func (r requestStrategyTorrent) Piece(i int) request_strategy.Piece {
return requestStrategyPiece{r.t, i}
}
func (r requestStrategyTorrent) ChunksPerPiece() uint32 {
return r.t.chunksPerRegularPiece()
}
func (r requestStrategyTorrent) PieceLength() int64 {
return r.t.info.PieceLength
}
var _ request_strategy.Torrent = requestStrategyTorrent{}
type requestStrategyPiece struct {
t *Torrent
i pieceIndex
}
func (r requestStrategyPiece) Request() bool {
return !r.t.ignorePieceForRequests(r.i)
}
func (r requestStrategyPiece) NumPendingChunks() int {
return int(r.t.pieceNumPendingChunks(r.i))
}
var _ request_strategy.Piece = requestStrategyPiece{}

View File

@ -3,9 +3,6 @@ package request_strategy
import (
"bytes"
"expvar"
"runtime"
"sort"
"sync"
"github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/metainfo"
@ -24,46 +21,6 @@ type (
ChunkSpec = types.ChunkSpec
)
type ClientPieceOrder struct{}
func equalFilterPieces(l, r []filterPiece) bool {
if len(l) != len(r) {
return false
}
for i := range l {
lp := &l[i]
rp := &r[i]
if lp.Priority != rp.Priority ||
lp.Partial != rp.Partial ||
lp.Availability != rp.Availability ||
lp.index != rp.index ||
lp.t.InfoHash != rp.t.InfoHash {
return false
}
}
return true
}
type pieceSorter struct {
swap func(i, j int)
get func(i int) *filterPiece
len int
}
func (me pieceSorter) Len() int {
return me.len
}
func (me pieceSorter) Swap(i, j int) {
me.swap(i, j)
}
func (me pieceSorter) Less(_i, _j int) bool {
i := me.get(_i)
j := me.get(_j)
return pieceOrderLess(i.toPieceOrderInput(), j.toPieceOrderInput()).MustLess()
}
type pieceOrderInput struct {
PieceRequestOrderState
PieceRequestOrderKey
@ -112,339 +69,49 @@ func (me *peersForPieceRequests) addNextRequest(r RequestIndex) {
me.requestsInPiece++
}
type requestablePiece struct {
index pieceIndex
t *Torrent
alwaysReallocate bool
NumPendingChunks int
IterPendingChunks ChunksIterFunc
}
func (p *requestablePiece) chunkIndexToRequestIndex(c ChunkIndex) RequestIndex {
return p.t.ChunksPerPiece*uint32(p.index) + c
}
type filterPiece struct {
t *Torrent
index pieceIndex
*Piece
}
func (fp *filterPiece) toPieceOrderInput() (ret pieceOrderInput) {
ret.Partial = fp.Partial
ret.InfoHash = fp.t.InfoHash
ret.Availability = fp.Availability
ret.Priority = fp.Priority
ret.Index = fp.index
return
}
var (
sortsMu sync.Mutex
sorts = map[*[]filterPiece][]int{}
)
func reorderedFilterPieces(pieces []filterPiece, indices []int) (ret []filterPiece) {
ret = make([]filterPiece, len(indices))
for i, j := range indices {
ret[i] = pieces[j]
}
return
}
var packageExpvarMap = expvar.NewMap("request-strategy")
func getSortedFilterPieces(unsorted []filterPiece) []filterPiece {
const cachePieceSorts = false
if !cachePieceSorts {
sort.Sort(pieceSorter{
len: len(unsorted),
swap: func(i, j int) {
unsorted[i], unsorted[j] = unsorted[j], unsorted[i]
},
get: func(i int) *filterPiece {
return &unsorted[i]
},
})
return unsorted
}
sortsMu.Lock()
defer sortsMu.Unlock()
for key, order := range sorts {
if equalFilterPieces(*key, unsorted) {
packageExpvarMap.Add("reused filter piece ordering", 1)
return reorderedFilterPieces(unsorted, order)
}
}
indices := make([]int, len(unsorted))
for i := 0; i < len(indices); i++ {
indices[i] = i
}
sort.Sort(pieceSorter{
len: len(unsorted),
swap: func(i, j int) {
indices[i], indices[j] = indices[j], indices[i]
},
get: func(i int) *filterPiece {
return &unsorted[indices[i]]
},
})
packageExpvarMap.Add("added filter piece ordering", 1)
sorts[&unsorted] = indices
runtime.SetFinalizer(&pieceOrderingFinalizer{unsorted: &unsorted}, func(me *pieceOrderingFinalizer) {
packageExpvarMap.Add("finalized filter piece ordering", 1)
sortsMu.Lock()
defer sortsMu.Unlock()
delete(sorts, me.unsorted)
})
return reorderedFilterPieces(unsorted, indices)
}
type pieceOrderingFinalizer struct {
unsorted *[]filterPiece
}
// Calls f with requestable pieces in order.
func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(t *Torrent, p *Piece, pieceIndex int)) {
func GetRequestablePieces(input Input, pro *PieceRequestOrder, f func(ih metainfo.Hash, pieceIndex int)) {
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl. A nil value means no capacity limit.
var storageLeft *int64
if input.Capacity != nil {
storageLeft = new(int64)
*storageLeft = *input.Capacity
if cap, ok := input.Capacity(); ok {
storageLeft = &cap
}
var allTorrentsUnverifiedBytes int64
torrentUnverifiedBytes := map[metainfo.Hash]int64{}
pro.tree.Ascend(func(i btree.Item) bool {
_i := i.(pieceRequestOrderItem)
var t Torrent = input.Torrents[_i.key.InfoHash]
var piece *Piece = &t.Pieces[_i.key.Index]
if left := storageLeft; left != nil {
if *left < piece.Length {
ih := _i.key.InfoHash
var t Torrent = input.Torrent(ih)
var piece Piece = t.Piece(_i.key.Index)
pieceLength := t.PieceLength()
if storageLeft != nil {
if *storageLeft < pieceLength {
return true
}
*left -= piece.Length
*storageLeft -= pieceLength
}
if !piece.Request || piece.NumPendingChunks == 0 {
if !piece.Request() || piece.NumPendingChunks() == 0 {
// TODO: Clarify exactly what is verified. Stuff that's being hashed should be
// considered unverified and hold up further requests.
return true
}
if t.MaxUnverifiedBytes != 0 && torrentUnverifiedBytes[t.InfoHash]+piece.Length > t.MaxUnverifiedBytes {
if input.MaxUnverifiedBytes() != 0 && allTorrentsUnverifiedBytes+pieceLength > input.MaxUnverifiedBytes() {
return true
}
if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
return true
}
torrentUnverifiedBytes[t.InfoHash] += piece.Length
allTorrentsUnverifiedBytes += piece.Length
f(&t, piece, _i.key.Index)
allTorrentsUnverifiedBytes += pieceLength
f(ih, _i.key.Index)
return true
})
return
}
type Input struct {
// This is all torrents that share the same capacity below (or likely a single torrent if there
// is infinite capacity, since you could just run it separately for each Torrent if that's the
// case).
Torrents map[metainfo.Hash]Torrent
// Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents
// that share the same capacity key must be incorporated in piece ordering.
Capacity *int64
type Input interface {
Torrent(metainfo.Hash) Torrent
// Storage capacity, shared among all Torrents with the same storage.TorrentCapacity pointer in
// their storage.Torrent references.
Capacity() (cap int64, capped bool)
// Across all the Torrents. This might be partitioned by storage capacity key now.
MaxUnverifiedBytes int64
}
// Checks that a sorted peersForPiece slice makes sense.
func ensureValidSortedPeersForPieceRequests(peers *peersForPieceSorter) {
if !sort.IsSorted(peers) {
panic("not sorted")
}
peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
for _, p := range peers.peersForPiece {
if _, ok := peerMap[p]; ok {
panic(p)
}
peerMap[p] = struct{}{}
}
}
var peersForPiecesPool sync.Pool
func makePeersForPiece(cap int) []*peersForPieceRequests {
got := peersForPiecesPool.Get()
if got == nil {
return make([]*peersForPieceRequests, 0, cap)
}
return got.([]*peersForPieceRequests)[:0]
}
type peersForPieceSorter struct {
peersForPiece []*peersForPieceRequests
req *RequestIndex
p requestablePiece
}
func (me *peersForPieceSorter) Len() int {
return len(me.peersForPiece)
}
func (me *peersForPieceSorter) Swap(i, j int) {
me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
}
func (me *peersForPieceSorter) Less(_i, _j int) bool {
i := me.peersForPiece[_i]
j := me.peersForPiece[_j]
req := me.req
p := &me.p
byHasRequest := func() multiless.Computation {
ml := multiless.New()
if req != nil {
iHas := i.nextState.Requests.Contains(*req)
jHas := j.nextState.Requests.Contains(*req)
ml = ml.Bool(jHas, iHas)
}
return ml
}()
ml := multiless.New()
// We always "reallocate", that is force even striping amongst peers that are either on
// the last piece they can contribute too, or for pieces marked for this behaviour.
// Striping prevents starving peers of requests, and will always re-balance to the
// fastest known peers.
if !p.alwaysReallocate {
ml = ml.Bool(
j.requestablePiecesRemaining == 1,
i.requestablePiecesRemaining == 1)
}
if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
ml = ml.Int(
i.requestsInPiece,
j.requestsInPiece)
} else {
ml = ml.AndThen(byHasRequest)
}
ml = ml.Int(
i.requestablePiecesRemaining,
j.requestablePiecesRemaining,
).Float64(
j.DownloadRate,
i.DownloadRate,
)
if ml.Ok() {
return ml.Less()
}
ml = ml.AndThen(byHasRequest)
return ml.Int64(
int64(j.Age), int64(i.Age),
// TODO: Probably peer priority can come next
).Uintptr(
i.Id.Uintptr(),
j.Id.Uintptr(),
).MustLess()
}
func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
peersForPiece := makePeersForPiece(len(peers))
for _, peer := range peers {
if !peer.canRequestPiece(p.index) {
continue
}
if !peer.canFitRequest() {
peer.requestablePiecesRemaining--
continue
}
peersForPiece = append(peersForPiece, &peersForPieceRequests{
requestsInPiece: 0,
requestsPeer: peer,
})
}
defer func() {
for _, peer := range peersForPiece {
peer.requestablePiecesRemaining--
}
peersForPiecesPool.Put(peersForPiece)
}()
peersForPieceSorter := peersForPieceSorter{
peersForPiece: peersForPiece,
p: p,
}
sortPeersForPiece := func(req *RequestIndex) {
peersForPieceSorter.req = req
sort.Sort(&peersForPieceSorter)
// ensureValidSortedPeersForPieceRequests(&peersForPieceSorter)
}
// Chunks can be preassigned several times, if peers haven't been able to update their "actual"
// with "next" request state before another request strategy run occurs.
preallocated := make([][]*peersForPieceRequests, p.t.ChunksPerPiece)
p.IterPendingChunks(func(spec ChunkIndex) {
req := p.chunkIndexToRequestIndex(spec)
for _, peer := range peersForPiece {
if !peer.ExistingRequests.Contains(req) {
continue
}
if !peer.canFitRequest() {
continue
}
preallocated[spec] = append(preallocated[spec], peer)
peer.addNextRequest(req)
}
})
pendingChunksRemaining := int(p.NumPendingChunks)
p.IterPendingChunks(func(chunk ChunkIndex) {
if len(preallocated[chunk]) != 0 {
return
}
req := p.chunkIndexToRequestIndex(chunk)
defer func() { pendingChunksRemaining-- }()
sortPeersForPiece(nil)
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
}
if !peer.PieceAllowedFast.ContainsInt(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {
continue
}
}
peer.addNextRequest(req)
break
}
})
chunk:
for chunk, prePeers := range preallocated {
if len(prePeers) == 0 {
continue
}
pendingChunksRemaining--
req := p.chunkIndexToRequestIndex(ChunkIndex(chunk))
for _, pp := range prePeers {
pp.requestsInPiece--
}
sortPeersForPiece(&req)
for _, pp := range prePeers {
pp.nextState.Requests.Remove(req)
}
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
}
if !peer.PieceAllowedFast.ContainsInt(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {
continue
}
}
peer.addNextRequest(req)
continue chunk
}
}
if pendingChunksRemaining != 0 {
panic(pendingChunksRemaining)
}
MaxUnverifiedBytes() int64
}

View File

@ -6,19 +6,7 @@ type ChunksIter interface {
Iter(func(ci ChunkIndex))
}
type Piece struct {
Request bool
Priority piecePriority
Partial bool
Availability int64
Length int64
NumPendingChunks int
IterPendingChunks ChunksIter
}
func (p Piece) iterPendingChunksWrapper(f func(ChunkIndex)) {
i := p.IterPendingChunks
if i != nil {
i.Iter(f)
}
type Piece interface {
Request() bool
NumPendingChunks() int
}

View File

@ -1,14 +1,7 @@
package request_strategy
import (
"github.com/anacrolix/torrent/metainfo"
)
type Torrent struct {
Pieces []Piece
// Some value that's unique and stable between runs.
InfoHash metainfo.Hash
ChunksPerPiece uint32
// TODO: This isn't actually configurable anywhere yet.
MaxUnverifiedBytes int64
type Torrent interface {
Piece(int) Piece
ChunksPerPiece() uint32
PieceLength() int64
}

View File

@ -12,54 +12,10 @@ import (
"github.com/anacrolix/log"
"github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/metainfo"
request_strategy "github.com/anacrolix/torrent/request-strategy"
)
// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
input.MaxUnverifiedBytes = cl.config.MaxUnverifiedBytes
if !primaryTorrent.haveInfo() {
return
}
if capFunc := primaryTorrent.storage.Capacity; capFunc != nil {
if cap, ok := (*capFunc)(); ok {
input.Capacity = &cap
}
}
input.Torrents = make(map[metainfo.Hash]request_strategy.Torrent, len(cl.torrents))
for _, t := range cl.torrents {
if !t.haveInfo() {
// This would be removed if metadata is handled here. Determining chunks per piece
// requires the info. If we have no info, we have no pieces too, so the end result is
// the same.
continue
}
if t.storage.Capacity != primaryTorrent.storage.Capacity {
continue
}
input.Torrents[t.infoHash] = t.requestStrategyTorrentInput()
}
return
}
func (t *Torrent) getRequestStrategyInput() request_strategy.Input {
return t.cl.getRequestStrategyInput(t)
}
func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
rst := request_strategy.Torrent{
InfoHash: t.infoHash,
ChunksPerPiece: t.chunksPerRegularPiece(),
}
rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
for i := range t.pieces {
rst.Pieces = append(rst.Pieces, t.makeRequestStrategyPiece(i))
}
return rst
}
func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRequestOrderState {
return request_strategy.PieceRequestOrderState{
Priority: t.piece(i).purePriority(),
@ -68,19 +24,6 @@ func (t *Torrent) requestStrategyPieceOrderState(i int) request_strategy.PieceRe
}
}
func (t *Torrent) makeRequestStrategyPiece(i int) request_strategy.Piece {
p := &t.pieces[i]
return request_strategy.Piece{
Request: !t.ignorePieceForRequests(i),
Priority: p.purePriority(),
Partial: t.piecePartiallyDownloaded(i),
Availability: p.availability,
Length: int64(p.length()),
NumPendingChunks: int(t.pieceNumPendingChunks(i)),
IterPendingChunks: &p.undirtiedChunksIter,
}
}
func init() {
gob.Register(peerId{})
}
@ -125,9 +68,8 @@ type (
)
type peerRequests struct {
requestIndexes []RequestIndex
peer *Peer
torrentStrategyInput request_strategy.Torrent
requestIndexes []RequestIndex
peer *Peer
}
func (p *peerRequests) Len() int {
@ -138,8 +80,8 @@ func (p *peerRequests) Less(i, j int) bool {
leftRequest := p.requestIndexes[i]
rightRequest := p.requestIndexes[j]
t := p.peer.t
leftPieceIndex := leftRequest / p.torrentStrategyInput.ChunksPerPiece
rightPieceIndex := rightRequest / p.torrentStrategyInput.ChunksPerPiece
leftPieceIndex := leftRequest / t.chunksPerRegularPiece()
rightPieceIndex := rightRequest / t.chunksPerRegularPiece()
leftCurrent := p.peer.actualRequestState.Requests.Contains(leftRequest)
rightCurrent := p.peer.actualRequestState.Requests.Contains(rightRequest)
pending := func(index RequestIndex, current bool) int {
@ -168,13 +110,15 @@ func (p *peerRequests) Less(i, j int) bool {
pending(leftRequest, leftCurrent),
pending(rightRequest, rightCurrent))
ml = ml.Bool(!leftCurrent, !rightCurrent)
leftPiece := t.piece(int(leftPieceIndex))
rightPiece := t.piece(int(rightPieceIndex))
ml = ml.Int(
-int(p.torrentStrategyInput.Pieces[leftPieceIndex].Priority),
-int(p.torrentStrategyInput.Pieces[rightPieceIndex].Priority),
-int(leftPiece.purePriority()),
-int(rightPiece.purePriority()),
)
ml = ml.Int(
int(p.torrentStrategyInput.Pieces[leftPieceIndex].Availability),
int(p.torrentStrategyInput.Pieces[rightPieceIndex].Availability))
int(leftPiece.availability),
int(rightPiece.availability))
ml = ml.Uint32(leftPieceIndex, rightPieceIndex)
ml = ml.Uint32(leftRequest, rightRequest)
return ml.MustLess()
@ -205,19 +149,18 @@ func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
requestHeap := peerRequests{
peer: p,
}
requestHeap.torrentStrategyInput = input.Torrents[p.t.infoHash]
request_strategy.GetRequestablePieces(
input,
p.t.cl.pieceRequestOrder[p.t.storage.Capacity],
func(t *request_strategy.Torrent, rsp *request_strategy.Piece, pieceIndex int) {
if t.InfoHash != p.t.infoHash {
func(ih InfoHash, pieceIndex int) {
if ih != p.t.infoHash {
return
}
if !p.peerHasPiece(pieceIndex) {
return
}
allowedFast := p.peerAllowedFast.ContainsInt(pieceIndex)
rsp.IterPendingChunks.Iter(func(ci request_strategy.ChunkIndex) {
p.t.piece(pieceIndex).undirtiedChunksIter.Iter(func(ci request_strategy.ChunkIndex) {
r := p.t.pieceRequestIndexOffset(pieceIndex) + ci
// if p.t.pendingRequests.Get(r) != 0 && !p.actualRequestState.Requests.Contains(r) {
// return