Partition piece request strategy by storage capacity key

This commit is contained in:
Matt Joiner 2021-11-29 13:07:18 +11:00
parent ad082bc644
commit 67ed5d0032
3 changed files with 66 additions and 53 deletions

View File

@ -6,7 +6,6 @@ import (
"sync"
"github.com/anacrolix/multiless"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/types"
)
@ -26,8 +25,6 @@ type ClientPieceOrder struct{}
type filterTorrent struct {
*Torrent
unverifiedBytes int64
// Potentially shared with other torrents.
storageLeft *int64
}
func sortFilterPieces(pieces []filterPiece) {
@ -104,25 +101,17 @@ func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex i
pieces := make([]filterPiece, 0, maxPieces)
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl. A nil value means no capacity limit.
storageLeft := make(map[storage.TorrentCapacity]*int64)
var storageLeft *int64
if input.Capacity != nil {
storageLeft = new(int64)
*storageLeft = *input.Capacity
}
for _t := range input.Torrents {
// TODO: We could do metainfo requests here.
t := &filterTorrent{
Torrent: &input.Torrents[_t],
unverifiedBytes: 0,
}
key := t.Capacity
if key != nil {
if _, ok := storageLeft[key]; !ok {
capacity, ok := (*key)()
if ok {
storageLeft[key] = &capacity
} else {
storageLeft[key] = nil
}
}
t.storageLeft = storageLeft[key]
}
for i := range t.Pieces {
pieces = append(pieces, filterPiece{
t: t,
@ -134,11 +123,11 @@ func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex i
sortFilterPieces(pieces)
var allTorrentsUnverifiedBytes int64
for _, piece := range pieces {
if left := piece.t.storageLeft; left != nil {
if *left < int64(piece.Length) {
if left := storageLeft; left != nil {
if *left < piece.Length {
continue
}
*left -= int64(piece.Length)
*left -= piece.Length
}
if !piece.Request || piece.NumPendingChunks == 0 {
// TODO: Clarify exactly what is verified. Stuff that's being hashed should be
@ -159,7 +148,14 @@ func GetRequestablePieces(input Input, f func(t *Torrent, p *Piece, pieceIndex i
}
type Input struct {
// This is all torrents that share the same capacity below (or likely a single torrent if there
// is infinite capacity, since you could just run it separately for each Torrent if that's the
// case).
Torrents []Torrent
// Must not be modified. Non-nil if capacity is not infinite, meaning that pieces of torrents
// that share the same capacity key must be incorporated in piece ordering.
Capacity *int64
// Across all the Torrents. This might be partitioned by storage capacity key now.
MaxUnverifiedBytes int64
}

View File

@ -2,15 +2,13 @@ package request_strategy
import (
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
)
type Torrent struct {
Pieces []Piece
Capacity storage.TorrentCapacity
// Some value that's unique and stable between runs.
InfoHash metainfo.Hash
ChunksPerPiece uint32
// TODO: This isn't actually configurable anywhere yet.
MaxUnverifiedBytes int64
}

View File

@ -16,22 +16,46 @@ import (
request_strategy "github.com/anacrolix/torrent/request-strategy"
)
func (cl *Client) getRequestStrategyInput() request_strategy.Input {
ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
// Returns what is necessary to run request_strategy.GetRequestablePieces for primaryTorrent.
func (cl *Client) getRequestStrategyInput(primaryTorrent *Torrent) (input request_strategy.Input) {
input.MaxUnverifiedBytes = cl.config.MaxUnverifiedBytes
if !primaryTorrent.haveInfo() {
return
}
if capFunc := primaryTorrent.storage.Capacity; capFunc != nil {
if cap, ok := (*capFunc)(); ok {
input.Capacity = &cap
}
}
if input.Capacity == nil {
input.Torrents = []request_strategy.Torrent{primaryTorrent.requestStrategyTorrentInput()}
return
}
input.Torrents = make([]request_strategy.Torrent, 0, len(cl.torrents))
for _, t := range cl.torrents {
if !t.haveInfo() {
// This would be removed if metadata is handled here. We have to guard against not
// knowing the piece size. If we have no info, we have no pieces too, so the end result
// is the same.
// This would be removed if metadata is handled here. Determining chunks per piece
// requires the info. If we have no info, we have no pieces too, so the end result is
// the same.
continue
}
if t.storage.Capacity != primaryTorrent.storage.Capacity {
continue
}
input.Torrents = append(input.Torrents, t.requestStrategyTorrentInput())
}
return
}
func (t *Torrent) getRequestStrategyInput() request_strategy.Input {
return t.cl.getRequestStrategyInput(t)
}
func (t *Torrent) requestStrategyTorrentInput() request_strategy.Torrent {
rst := request_strategy.Torrent{
InfoHash: t.infoHash,
ChunksPerPiece: t.chunksPerRegularPiece(),
}
if t.storage != nil {
rst.Capacity = t.storage.Capacity
}
rst.Pieces = make([]request_strategy.Piece, 0, len(t.pieces))
for i := range t.pieces {
p := &t.pieces[i]
@ -45,12 +69,7 @@ func (cl *Client) getRequestStrategyInput() request_strategy.Input {
IterPendingChunks: &p.undirtiedChunksIter,
})
}
ts = append(ts, rst)
}
return request_strategy.Input{
Torrents: ts,
MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
}
return rst
}
func init() {
@ -173,7 +192,7 @@ type desiredRequestState struct {
}
func (p *Peer) getDesiredRequestState() (desired desiredRequestState) {
input := p.t.cl.getRequestStrategyInput()
input := p.t.getRequestStrategyInput()
requestHeap := peerRequests{
peer: p,
}