Improve the data waiter system

This commit is contained in:
Matt Joiner 2014-09-14 03:50:15 +10:00
parent 4bcfdc8fc9
commit 97ae59dfaf
2 changed files with 69 additions and 18 deletions

View File

@ -29,6 +29,7 @@ import (
mathRand "math/rand" mathRand "math/rand"
"net" "net"
"os" "os"
"strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
@ -77,6 +78,14 @@ func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
go cl.verifyPiece(t, pieceIndex) go cl.verifyPiece(t, pieceIndex)
} }
func (cl *Client) queueFirstHash(t *torrent, piece int) {
p := t.Pieces[piece]
if p.EverHashed || p.Hashing || p.QueuedForHash {
return
}
cl.queuePieceCheck(t, pp.Integer(piece))
}
// Queues the torrent data for the given region for download. The beginning of // Queues the torrent data for the given region for download. The beginning of
// the region is given highest priority to allow a subsequent read at the same // the region is given highest priority to allow a subsequent read at the same
// offset to return data ASAP. // offset to return data ASAP.
@ -97,9 +106,9 @@ func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
return nil return nil
} }
type dataSpec struct { type dataWait struct {
InfoHash offset int64
request ready chan struct{}
} }
type Client struct { type Client struct {
@ -120,8 +129,7 @@ type Client struct {
handshaking int handshaking int
torrents map[InfoHash]*torrent torrents map[InfoHash]*torrent
dataWaiterMutex sync.Mutex dataWaits map[*torrent][]dataWait
dataWaiter chan struct{}
} }
func (me *Client) ListenAddr() net.Addr { func (me *Client) ListenAddr() net.Addr {
@ -153,6 +161,11 @@ func (cl *Client) WriteStatus(w io.Writer) {
return 100 * (1 - float32(t.BytesLeft())/float32(t.Length())) return 100 * (1 - float32(t.BytesLeft())/float32(t.Length()))
} }
}()) }())
fmt.Fprint(w, "Blocked reads:")
for _, dw := range cl.dataWaits[t] {
fmt.Fprintf(w, " %d", dw.offset)
}
fmt.Fprintln(w)
t.WriteStatus(w) t.WriteStatus(w)
fmt.Fprintln(w) fmt.Fprintln(w)
} }
@ -223,6 +236,8 @@ func NewClient(cfg *Config) (cl *Client, err error) {
quit: make(chan struct{}), quit: make(chan struct{}),
torrents: make(map[InfoHash]*torrent), torrents: make(map[InfoHash]*torrent),
dataWaits: make(map[*torrent][]dataWait),
} }
cl.event.L = &cl.mu cl.event.L = &cl.mu
cl.mu.Init(2) cl.mu.Init(2)
@ -1128,6 +1143,10 @@ func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
} }
delete(me.torrents, infoHash) delete(me.torrents, infoHash)
me.downloadStrategy.TorrentStopped(t) me.downloadStrategy.TorrentStopped(t)
for _, dw := range me.dataWaits[t] {
close(dw.ready)
}
delete(me.dataWaits, t)
return return
} }
@ -1369,7 +1388,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
// Record that we have the chunk. // Record that we have the chunk.
delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec) delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
me.dataReady(dataSpec{t.InfoHash, req}) me.dataReady(t, req)
if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 { if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
me.queuePieceCheck(t, req.Index) me.queuePieceCheck(t, req.Index)
} }
@ -1390,24 +1409,43 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
return nil return nil
} }
func (cl *Client) dataReady(ds dataSpec) { func (cl *Client) dataReady(t *torrent, r request) {
cl.dataWaiterMutex.Lock() dws := cl.dataWaits[t]
if cl.dataWaiter != nil { begin := t.requestOffset(r)
close(cl.dataWaiter) end := begin + int64(r.Length)
for i := 0; i < len(dws); {
dw := dws[i]
if begin <= dw.offset && dw.offset < end {
close(dw.ready)
dws[i] = dws[len(dws)-1]
dws = dws[:len(dws)-1]
} else {
i++
}
} }
cl.dataWaiter = nil cl.dataWaits[t] = dws
cl.dataWaiterMutex.Unlock()
} }
// Returns a channel that is closed when new data has become available in the // Returns a channel that is closed when new data has become available in the
// client. // client.
func (me *Client) DataWaiter() (ret <-chan struct{}) { func (me *Client) DataWaiter(ih InfoHash, off int64) (ret <-chan struct{}) {
me.dataWaiterMutex.Lock() me.mu.Lock()
if me.dataWaiter == nil { defer me.mu.Unlock()
me.dataWaiter = make(chan struct{}) ch := make(chan struct{})
ret = ch
t := me.torrents[ih]
if t == nil {
close(ch)
return
} }
ret = me.dataWaiter if r, ok := t.offsetRequest(off); !ok || t.haveChunk(r) {
me.dataWaiterMutex.Unlock() close(ch)
return
}
me.dataWaits[t] = append(me.dataWaits[t], dataWait{
offset: off,
ready: ch,
})
return return
} }

View File

@ -533,6 +533,19 @@ func (me *torrent) haveAnyPieces() bool {
return false return false
} }
func (t *torrent) havePiece(index int) bool {
return t.haveInfo() && t.Pieces[index].Complete()
}
func (t *torrent) haveChunk(r request) bool {
p := t.Pieces[r.Index]
if !p.EverHashed {
return false
}
_, ok := p.PendingChunkSpecs[r.chunkSpec]
return !ok
}
func (t *torrent) wantChunk(r request) bool { func (t *torrent) wantChunk(r request) bool {
if !t.wantPiece(int(r.Index)) { if !t.wantPiece(int(r.Index)) {
return false return false