Optimize peersForPieces allocations for make and sorting

This commit is contained in:
Matt Joiner 2021-09-10 23:06:23 +10:00
parent c4445fd201
commit 6c9a7fec39
1 changed files with 86 additions and 52 deletions

View File

@ -3,6 +3,7 @@ package request_strategy
import ( import (
"fmt" "fmt"
"sort" "sort"
"sync"
"github.com/anacrolix/multiless" "github.com/anacrolix/multiless"
@ -199,12 +200,12 @@ func Run(input Input) map[PeerId]PeerNextRequestState {
} }
// Checks that a sorted peersForPiece slice makes sense. // Checks that a sorted peersForPiece slice makes sense.
func ensureValidSortedPeersForPieceRequests(peers []*peersForPieceRequests, sortLess func(_, _ int) bool) { func ensureValidSortedPeersForPieceRequests(peers peersForPieceSorter) {
if !sort.SliceIsSorted(peers, sortLess) { if !sort.IsSorted(peers) {
panic("not sorted") panic("not sorted")
} }
peerMap := make(map[*peersForPieceRequests]struct{}, len(peers)) peerMap := make(map[*peersForPieceRequests]struct{}, peers.Len())
for _, p := range peers { for _, p := range peers.peersForPiece {
if _, ok := peerMap[p]; ok { if _, ok := peerMap[p]; ok {
panic(p) panic(p)
} }
@ -212,8 +213,80 @@ func ensureValidSortedPeersForPieceRequests(peers []*peersForPieceRequests, sort
} }
} }
var peersForPiecesPool sync.Pool
func makePeersForPiece(cap int) []*peersForPieceRequests {
got := peersForPiecesPool.Get()
if got == nil {
return make([]*peersForPieceRequests, 0, cap)
}
return got.([]*peersForPieceRequests)[:0]
}
type peersForPieceSorter struct {
peersForPiece []*peersForPieceRequests
req *Request
p requestablePiece
}
func (me peersForPieceSorter) Len() int {
return len(me.peersForPiece)
}
func (me peersForPieceSorter) Swap(i, j int) {
me.peersForPiece[i], me.peersForPiece[j] = me.peersForPiece[j], me.peersForPiece[i]
}
func (me peersForPieceSorter) Less(_i, _j int) bool {
i := me.peersForPiece[_i]
j := me.peersForPiece[_j]
req := me.req
p := me.p
byHasRequest := func() multiless.Computation {
ml := multiless.New()
if req != nil {
_, iHas := i.nextState.Requests[*req]
_, jHas := j.nextState.Requests[*req]
ml = ml.Bool(jHas, iHas)
}
return ml
}()
ml := multiless.New()
// We always "reallocate", that is force even striping amongst peers that are either on
// the last piece they can contribute too, or for pieces marked for this behaviour.
// Striping prevents starving peers of requests, and will always re-balance to the
// fastest known peers.
if !p.alwaysReallocate {
ml = ml.Bool(
j.requestablePiecesRemaining == 1,
i.requestablePiecesRemaining == 1)
}
if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
ml = ml.Int(
i.requestsInPiece,
j.requestsInPiece)
} else {
ml = ml.AndThen(byHasRequest)
}
ml = ml.Int(
i.requestablePiecesRemaining,
j.requestablePiecesRemaining,
).Float64(
j.DownloadRate,
i.DownloadRate,
)
ml = ml.AndThen(byHasRequest)
return ml.Int64(
int64(j.Age), int64(i.Age),
// TODO: Probably peer priority can come next
).Uintptr(
i.Id.Uintptr(),
j.Id.Uintptr(),
).MustLess()
}
func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) { func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
peersForPiece := make([]*peersForPieceRequests, 0, len(peers)) peersForPiece := makePeersForPiece(len(peers))
for _, peer := range peers { for _, peer := range peers {
peersForPiece = append(peersForPiece, &peersForPieceRequests{ peersForPiece = append(peersForPiece, &peersForPieceRequests{
requestsInPiece: 0, requestsInPiece: 0,
@ -226,55 +299,16 @@ func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
peer.requestablePiecesRemaining-- peer.requestablePiecesRemaining--
} }
} }
peersForPiecesPool.Put(peersForPiece)
}() }()
peersForPieceSorter := peersForPieceSorter{
peersForPiece: peersForPiece,
p: p,
}
sortPeersForPiece := func(req *Request) { sortPeersForPiece := func(req *Request) {
less := func(_i, _j int) bool { peersForPieceSorter.req = req
i := peersForPiece[_i] sort.Sort(&peersForPieceSorter)
j := peersForPiece[_j] //ensureValidSortedPeersForPieceRequests(peersForPieceSorter)
byHasRequest := func() multiless.Computation {
ml := multiless.New()
if req != nil {
_, iHas := i.nextState.Requests[*req]
_, jHas := j.nextState.Requests[*req]
ml = ml.Bool(jHas, iHas)
}
return ml
}()
ml := multiless.New()
// We always "reallocate", that is force even striping amongst peers that are either on
// the last piece they can contribute too, or for pieces marked for this behaviour.
// Striping prevents starving peers of requests, and will always re-balance to the
// fastest known peers.
if !p.alwaysReallocate {
ml = ml.Bool(
j.requestablePiecesRemaining == 1,
i.requestablePiecesRemaining == 1)
}
if p.alwaysReallocate || j.requestablePiecesRemaining == 1 {
ml = ml.Int(
i.requestsInPiece,
j.requestsInPiece)
} else {
ml = ml.AndThen(byHasRequest)
}
ml = ml.Int(
i.requestablePiecesRemaining,
j.requestablePiecesRemaining,
).Float64(
j.DownloadRate,
i.DownloadRate,
)
ml = ml.AndThen(byHasRequest)
return ml.Int64(
int64(j.Age), int64(i.Age),
// TODO: Probably peer priority can come next
).Uintptr(
i.Id.Uintptr(),
j.Id.Uintptr(),
).MustLess()
}
sort.Slice(peersForPiece, less)
//ensureValidSortedPeersForPieceRequests(peersForPiece, less)
} }
// Chunks can be preassigned several times, if peers haven't been able to update their "actual" // Chunks can be preassigned several times, if peers haven't been able to update their "actual"
// with "next" request state before another request strategy run occurs. // with "next" request state before another request strategy run occurs.