Prepare to implement max unverified bytes

This commit is contained in:
Matt Joiner 2021-05-14 11:50:41 +10:00
parent 8e9cb9f2be
commit 307d6d178f
3 changed files with 55 additions and 22 deletions

View File

@ -26,7 +26,10 @@ func (cl *Client) requester() {
func (cl *Client) doRequests() {
ts := make([]*request_strategy.Torrent, 0, len(cl.torrents))
for _, t := range cl.torrents {
rst := &request_strategy.Torrent{}
rst := &request_strategy.Torrent{
StableId: uintptr(unsafe.Pointer(t)),
//MaxUnverifiedBytes: 1 << 20,
}
if t.storage != nil {
rst.Capacity = t.storage.Capacity
}

View File

@ -20,8 +20,16 @@ type ClientPieceOrder struct {
pieces []pieceRequestOrderPiece
}
type orderTorrent struct {
*Torrent
unverifiedBytes int64
// Potentially shared with other torrents.
storageLeft *int64
peers []*requestsPeer
}
type pieceRequestOrderPiece struct {
t *Torrent
t *orderTorrent
index pieceIndex
Piece
}
@ -41,7 +49,13 @@ func (me ClientPieceOrder) less(_i, _j int) bool {
int(j.Priority), int(i.Priority),
).Bool(
j.Partial, i.Partial,
).Int64(i.Availability, j.Availability).Int(i.index, j.index).Less()
).Int64(
i.Availability, j.Availability,
).Int(
i.index, j.index,
).Uintptr(
i.t.StableId, j.t.StableId,
).MustLess()
}
type requestsPeer struct {
@ -72,25 +86,24 @@ func (me *peersForPieceRequests) addNextRequest(r Request) {
me.requestsInPiece++
}
type Torrent struct {
Pieces []Piece
Capacity *func() *int64
Peers []Peer // not closed.
}
func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId]PeerNextRequestState {
requestOrder.pieces = requestOrder.pieces[:0]
allPeers := make(map[*Torrent][]*requestsPeer)
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl.
storageLeft := make(map[*func() *int64]*int64)
for _, t := range torrents {
orderTorrents := make([]*orderTorrent, 0, len(torrents))
for _, _t := range torrents {
// TODO: We could do metainfo requests here.
t := &orderTorrent{
Torrent: _t,
unverifiedBytes: 0,
}
key := t.Capacity
if key != nil {
if _, ok := storageLeft[key]; !ok {
storageLeft[key] = (*key)()
}
t.storageLeft = storageLeft[key]
}
var peers []*requestsPeer
for _, p := range t.Peers {
@ -107,7 +120,7 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId
index: i,
Piece: tp,
})
if tp.Request {
if tp.Request && tp.NumPendingChunks != 0 {
for _, p := range peers {
if p.canRequestPiece(i) {
p.requestablePiecesRemaining++
@ -115,25 +128,31 @@ func (requestOrder *ClientPieceOrder) DoRequests(torrents []*Torrent) map[PeerId
}
}
}
allPeers[t] = peers
t.peers = peers
orderTorrents = append(orderTorrents, t)
}
requestOrder.sort()
for _, p := range requestOrder.pieces {
torrentPiece := p
if left := storageLeft[p.t.Capacity]; left != nil {
if *left < int64(torrentPiece.Length) {
for _, piece := range requestOrder.pieces {
if left := piece.t.storageLeft; left != nil {
if *left < int64(piece.Length) {
continue
}
*left -= int64(torrentPiece.Length)
*left -= int64(piece.Length)
}
if !p.Request {
if !piece.Request || piece.NumPendingChunks == 0 {
continue
}
allocatePendingChunks(p, allPeers[p.t])
if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
//log.Print("skipping piece")
continue
}
allocatePendingChunks(piece, piece.t.peers)
piece.t.unverifiedBytes += piece.Length
//log.Print(piece.t.unverifiedBytes)
}
ret := make(map[PeerId]PeerNextRequestState)
for _, peers := range allPeers {
for _, rp := range peers {
for _, ots := range orderTorrents {
for _, rp := range ots.peers {
if rp.requestablePiecesRemaining != 0 {
panic(rp.requestablePiecesRemaining)
}

View File

@ -0,0 +1,11 @@
package request_strategy
type Torrent struct {
Pieces []Piece
Capacity *func() *int64
Peers []Peer // not closed.
// Some value that's unique and stable between runs. Could even use the infohash?
StableId uintptr
MaxUnverifiedBytes int64
}