Move priority management entirely into the download strategies

This commit is contained in:
Matt Joiner 2014-07-24 13:42:31 +10:00
parent 1ceb470bfc
commit ef7c4f4120
3 changed files with 141 additions and 79 deletions

View File

@ -19,7 +19,6 @@ import (
"bitbucket.org/anacrolix/go.torrent/dht"
"bitbucket.org/anacrolix/go.torrent/util"
"bufio"
"container/list"
"crypto/rand"
"crypto/sha1"
"errors"
@ -64,30 +63,7 @@ func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
if !t.haveInfo() {
return errors.New("missing metadata")
}
newPriorities := make([]request, 0, (len_+chunkSize-1)/chunkSize)
for len_ > 0 {
req, ok := t.offsetRequest(off)
if !ok {
return errors.New("bad offset")
}
reqOff := t.requestOffset(req)
// Gain the alignment adjustment.
len_ += off - reqOff
// Lose the length of this block.
len_ -= int64(req.Length)
off = reqOff + int64(req.Length)
if !t.wantPiece(int(req.Index)) {
continue
}
newPriorities = append(newPriorities, req)
}
if len(newPriorities) == 0 {
return nil
}
t.Priorities.PushFront(newPriorities[0])
for _, req := range newPriorities[1:] {
t.Priorities.PushBack(req)
}
me.DownloadStrategy.TorrentPrioritize(t, off, len_)
for _, cn := range t.Conns {
me.replenishConnRequests(t, cn)
}
@ -158,7 +134,6 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
err = io.EOF
return
}
t.lastReadPiece = int(index)
piece := t.Pieces[index]
pieceOff := pp.Integer(off % int64(t.PieceLength(0)))
high := int(t.PieceLength(index) - pieceOff)
@ -1142,13 +1117,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
}
// Unprioritize the chunk.
var next *list.Element
for e := t.Priorities.Front(); e != nil; e = next {
next = e.Next()
if e.Value.(request) == req {
t.Priorities.Remove(e)
}
}
me.DownloadStrategy.TorrentGotChunk(t, req)
// Cancel pending requests for this chunk.
cancelled := false
@ -1189,14 +1158,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
p.EverHashed = true
if correct {
p.PendingChunkSpecs = nil
// log.Printf("%s: got piece %d, (%d/%d)", t, piece, t.NumPiecesCompleted(), t.NumPieces())
var next *list.Element
for e := t.Priorities.Front(); e != nil; e = next {
next = e.Next()
if e.Value.(request).Index == piece {
t.Priorities.Remove(e)
}
}
me.DownloadStrategy.TorrentGotPiece(t, int(piece))
me.dataReady(dataSpec{
t.InfoHash,
request{

View File

@ -2,6 +2,7 @@ package torrent
import (
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
"container/list"
)
type DownloadStrategy interface {
@ -9,6 +10,9 @@ type DownloadStrategy interface {
TorrentStarted(t *torrent)
TorrentStopped(t *torrent)
DeleteRequest(t *torrent, r request)
TorrentPrioritize(t *torrent, off, _len int64)
TorrentGotChunk(t *torrent, r request)
TorrentGotPiece(t *torrent, piece int)
}
type DefaultDownloadStrategy struct {
@ -45,11 +49,11 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
return
}
// First request prioritized chunks.
for e := t.Priorities.Front(); e != nil; e = e.Next() {
if !addRequest(e.Value.(request)) {
return
}
}
// for e := t.Priorities.Front(); e != nil; e = e.Next() {
// if !addRequest(e.Value.(request)) {
// return
// }
// }
// Then finish off incomplete pieces in order of bytes remaining.
for _, heatThreshold := range []int{1, 4, 15, 60} {
for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
@ -93,31 +97,68 @@ func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
m[r]--
}
type ResponsiveDownloadStrategy struct {
// How many bytes to preemptively download starting at the beginning of
// the last piece read for a given torrent.
Readahead int
func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {}
func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {}
func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
func NewResponsiveDownloadStrategy(readahead int) *responsiveDownloadStrategy {
return &responsiveDownloadStrategy{
Readahead: readahead,
lastReadPiece: make(map[*torrent]int),
priorities: make(map[*torrent]*list.List),
}
}
func (ResponsiveDownloadStrategy) TorrentStarted(*torrent) {}
func (ResponsiveDownloadStrategy) TorrentStopped(*torrent) {}
func (ResponsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
type responsiveDownloadStrategy struct {
// How many bytes to preemptively download starting at the beginning of
// the last piece read for a given torrent.
Readahead int
lastReadPiece map[*torrent]int
priorities map[*torrent]*list.List
}
func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
for e := t.Priorities.Front(); e != nil; e = e.Next() {
func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) {
me.priorities[t] = list.New()
}
func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) {
delete(me.lastReadPiece, t)
delete(me.priorities, t)
}
func (responsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
if len(c.Requests) >= (c.PeerMaxRequests+1)/2 || len(c.Requests) >= 64 {
return
}
// Short circuit request fills at a level that might reduce receiving of
// unnecessary chunks.
requestWrapper := func(r request) bool {
if len(c.Requests) >= 64 {
return false
}
return c.Request(r)
}
prios := me.priorities[t]
for e := prios.Front(); e != nil; e = e.Next() {
req := e.Value.(request)
if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
panic(req)
}
if !c.Request(e.Value.(request)) {
if !requestWrapper(e.Value.(request)) {
return
}
}
readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
for i := t.lastReadPiece; i < t.lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ {
for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() {
if !c.Request(request{pp.Integer(i), cs}) {
return
if lastReadPiece, ok := me.lastReadPiece[t]; ok {
readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
for i := lastReadPiece; i < lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ {
for _, cs := range t.Pieces[i].shuffledPendingChunkSpecs() {
if !requestWrapper(request{pp.Integer(i), cs}) {
return
}
}
}
}
@ -131,9 +172,63 @@ func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
// Request chunks in random order to reduce overlap with other
// connections.
for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() {
if !c.Request(request{pp.Integer(index), cs}) {
if !requestWrapper(request{pp.Integer(index), cs}) {
return
}
}
}
}
func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {
prios := me.priorities[t]
var next *list.Element
for e := prios.Front(); e != nil; e = next {
next = e.Next()
if e.Value.(request) == req {
prios.Remove(e)
}
}
}
func (me *responsiveDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {
var next *list.Element
prios := me.priorities[t]
for e := prios.Front(); e != nil; e = next {
next = e.Next()
if int(e.Value.(request).Index) == piece {
prios.Remove(e)
}
}
}
func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {
newPriorities := make([]request, 0, (_len+chunkSize-1)/chunkSize)
for _len > 0 {
req, ok := t.offsetRequest(off)
if !ok {
panic("bad offset")
}
reqOff := t.requestOffset(req)
// Gain the alignment adjustment.
_len += off - reqOff
// Lose the length of this block.
_len -= int64(req.Length)
off = reqOff + int64(req.Length)
if !t.wantPiece(int(req.Index)) {
continue
}
newPriorities = append(newPriorities, req)
}
if len(newPriorities) == 0 {
return
}
s.lastReadPiece[t] = int(newPriorities[0].Index)
if t.wantChunk(newPriorities[0]) {
s.priorities[t].PushFront(newPriorities[0])
}
for _, req := range newPriorities[1:] {
if t.wantChunk(req) {
s.priorities[t].PushBack(req)
}
}
}

View File

@ -42,18 +42,16 @@ type torrent struct {
PiecesByBytesLeft *OrderedList
Data mmap_span.MMapSpan
// Prevent mutations to Data memory maps while in use as they're not safe.
dataLock sync.RWMutex
Info *metainfo.Info
Conns []*connection
Peers []Peer
Priorities *list.List
dataLock sync.RWMutex
Info *metainfo.Info
Conns []*connection
Peers []Peer
// BEP 12 Multitracker Metadata Extension. The tracker.Client instances
// mirror their respective URLs from the announce-list key.
Trackers [][]tracker.Client
lastReadPiece int
DisplayName string
MetaData []byte
metadataHave []bool
Trackers [][]tracker.Client
DisplayName string
MetaData []byte
metadataHave []bool
}
func (t *torrent) InvalidateMetadata() {
@ -124,7 +122,6 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte
piece.bytesLeftElement = t.PiecesByBytesLeft.Insert(index)
t.pendAllChunkSpecs(pp.Integer(index))
}
t.Priorities = list.New()
for _, conn := range t.Conns {
if err := conn.setNumPieces(t.NumPieces()); err != nil {
log.Printf("closing connection: %s", err)
@ -211,12 +208,12 @@ func (t *torrent) WriteStatus(w io.Writer) {
fmt.Fprintf(w, "%c", t.pieceStatusChar(index))
}
fmt.Fprintln(w)
fmt.Fprintln(w, "Priorities: ")
if t.Priorities != nil {
for e := t.Priorities.Front(); e != nil; e = e.Next() {
fmt.Fprintf(w, "\t%v\n", e.Value)
}
}
// fmt.Fprintln(w, "Priorities: ")
// if t.Priorities != nil {
// for e := t.Priorities.Front(); e != nil; e = e.Next() {
// fmt.Fprintf(w, "\t%v\n", e.Value)
// }
// }
fmt.Fprintf(w, "Pending peers: %d\n", len(t.Peers))
for _, c := range t.Conns {
c.WriteStatus(w)
@ -419,6 +416,14 @@ func (me *torrent) haveAnyPieces() bool {
return false
}
func (t *torrent) wantChunk(r request) bool {
if !t.wantPiece(int(r.Index)) {
return false
}
_, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec]
return ok
}
func (t *torrent) wantPiece(index int) bool {
if !t.haveInfo() {
return false