Improve uploading/seeding

This commit is contained in:
Matt Joiner 2015-06-16 16:57:47 +10:00
parent 764f5db512
commit ced5733c88
7 changed files with 116 additions and 78 deletions

126
client.go
View File

@ -232,7 +232,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
w.WriteString("<missing metainfo>")
}
fmt.Fprint(w, "\n")
t.writeStatus(w)
t.writeStatus(w, cl)
fmt.Fprintln(w)
}
}
@ -342,7 +342,6 @@ func (t *torrent) connPendPiece(c *connection, piece int) {
func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
if t.Pieces[piece].Priority < priority {
cl.prioritizePiece(t, piece, priority)
cl.event.Broadcast()
}
}
@ -1404,6 +1403,54 @@ func (cl *Client) peerHasAll(t *torrent, cn *connection) {
}
}
func (me *Client) upload(t *torrent, c *connection) {
if me.config.NoUpload {
return
}
if !c.PeerInterested {
return
}
if !me.seeding(t) && !t.connHasWantedPieces(c) {
return
}
another:
for c.chunksSent < c.UsefulChunksReceived+6 {
c.Unchoke()
for r := range c.PeerRequests {
err := me.sendChunk(t, c, r)
if err != nil {
log.Printf("error sending chunk to peer: %s", err)
}
delete(c.PeerRequests, r)
goto another
}
return
}
c.Choke()
}
func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
b := make([]byte, r.Length)
p := t.Info.Piece(int(r.Index))
n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin))
if err != nil {
return err
}
if n != len(b) {
log.Fatal(b)
}
c.Post(pp.Message{
Type: pp.Piece,
Index: r.Index,
Begin: r.Begin,
Piece: b,
})
uploadChunksPosted.Add(1)
c.chunksSent++
c.lastChunkSent = time.Now()
return nil
}
// Processes incoming bittorrent messages. The client lock is held upon entry
// and exit.
func (me *Client) connectionLoop(t *torrent, c *connection) error {
@ -1448,11 +1495,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
me.peerUnchoked(t, c)
case pp.Interested:
c.PeerInterested = true
// TODO: This should be done from a dedicated unchoking routine.
if me.config.NoUpload {
break
}
c.Unchoke()
me.upload(t, c)
case pp.NotInterested:
c.PeerInterested = false
c.Choke()
@ -1462,30 +1505,15 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
if c.Choked {
break
}
request := newRequest(msg.Index, msg.Begin, msg.Length)
// TODO: Requests should be satisfied from a dedicated upload
// routine.
// c.PeerRequests[request] = struct{}{}
// if c.PeerRequests == nil {
// c.PeerRequests = make(map[request]struct{}, maxRequests)
// }
p := make([]byte, msg.Length)
n, err := dataReadAt(t.data, p, int64(t.pieceLength(0))*int64(msg.Index)+int64(msg.Begin))
// TODO: Failing to read for a request should not be fatal to the connection.
if err != nil {
return fmt.Errorf("reading t data to serve request %q: %s", request, err)
if !c.PeerInterested {
err = errors.New("peer sent request but isn't interested")
break
}
if n != int(msg.Length) {
return fmt.Errorf("bad request: %v", msg)
if c.PeerRequests == nil {
c.PeerRequests = make(map[request]struct{}, maxRequests)
}
c.Post(pp.Message{
Type: pp.Piece,
Index: msg.Index,
Begin: msg.Begin,
Piece: p,
})
uploadChunksPosted.Add(1)
c.chunksSent++
c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
me.upload(t, c)
case pp.Cancel:
req := newRequest(msg.Index, msg.Begin, msg.Length)
if !c.PeerCancel(req) {
@ -1699,7 +1727,7 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
// TODO: This should probably be done by a routine that kills off bad
// connections, and extra connections killed here instead.
if len(t.Conns) > socketsPerTorrent {
wcs := t.worstConnsHeap()
wcs := t.worstConnsHeap(me)
heap.Pop(wcs).(*connection).Close()
}
return true
@ -1717,26 +1745,29 @@ func (t *torrent) needData() bool {
return false
}
// TODO: I'm sure there's something here to do with seeding.
func (t *torrent) badConn(c *connection) bool {
func (cl *Client) usefulConn(t *torrent, c *connection) bool {
// A 30 second grace for initial messages to go through.
if time.Since(c.completedHandshake) < 30*time.Second {
return false
return true
}
if !t.haveInfo() {
if !c.supportsExtension("ut_metadata") {
return false
}
if time.Since(c.completedHandshake) < 2*time.Minute {
return true
}
if time.Since(c.completedHandshake) > 2*time.Minute {
return true
}
return false
}
return !t.connHasWantedPieces(c)
if cl.seeding(t) {
return c.PeerInterested
}
return t.connHasWantedPieces(c)
}
func (t *torrent) numGoodConns() (num int) {
func (t *torrent) numGoodConns(cl *Client) (num int) {
for _, c := range t.Conns {
if !t.badConn(c) {
if cl.usefulConn(t, c) {
num++
}
}
@ -1744,10 +1775,10 @@ func (t *torrent) numGoodConns() (num int) {
}
func (me *Client) wantConns(t *torrent) bool {
if me.config.NoUpload && !t.needData() {
if !me.seeding(t) && !t.needData() {
return false
}
if t.numGoodConns() >= socketsPerTorrent {
if t.numGoodConns(me) >= socketsPerTorrent {
return false
}
return true
@ -2228,7 +2259,16 @@ func (cl *Client) waitWantPeers(t *torrent) bool {
// Returns whether the client should make effort to seed the torrent.
func (cl *Client) seeding(t *torrent) bool {
return cl.config.Seed && !cl.config.NoUpload
if cl.config.NoUpload {
return false
}
if !cl.config.Seed {
return false
}
if t.needData() {
return false
}
return true
}
func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
@ -2515,6 +2555,8 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
c.UsefulChunksReceived++
c.lastUsefulChunkReceived = time.Now()
me.upload(t, c)
// Write the chunk out.
err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
if err != nil {

View File

@ -18,8 +18,8 @@ import (
"gopkg.in/check.v1"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/data/blob"
"github.com/anacrolix/torrent/data"
"github.com/anacrolix/torrent/data/blob"
"github.com/anacrolix/torrent/internal/testutil"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/util"
@ -251,6 +251,7 @@ func TestClientTransfer(t *testing.T) {
greetingTempDir, mi := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingTempDir)
cfg := TestingConfig
cfg.Seed = true
cfg.DataDir = greetingTempDir
seeder, err := NewClient(&cfg)
if err != nil {

View File

@ -52,6 +52,7 @@ type connection struct {
lastMessageReceived time.Time
completedHandshake time.Time
lastUsefulChunkReceived time.Time
lastChunkSent time.Time
// Stuff controlled by the local peer.
Interested bool

View File

@ -170,6 +170,7 @@ func TestDownloadOnDemand(t *testing.T) {
DisableTrackers: true,
NoDHT: true,
ListenAddr: ":0",
Seed: true,
NoDefaultBlocklist: true,
// Ensure that the metainfo is obtained over the wire, since we added

View File

@ -20,7 +20,8 @@ const (
)
type piece struct {
Hash pieceSum // The completed piece SHA1 hash, from the metainfo "pieces" field.
// The completed piece SHA1 hash, from the metainfo "pieces" field.
Hash pieceSum
// Chunks we don't have. The offset and length can be determined by the
// request chunkSize in use.
PendingChunkSpecs []bool

View File

@ -131,10 +131,11 @@ func (t *torrent) addrActive(addr string) bool {
return false
}
func (t *torrent) worstConnsHeap() (wcs *worstConns) {
func (t *torrent) worstConnsHeap(cl *Client) (wcs *worstConns) {
wcs = &worstConns{
c: append([]*connection{}, t.Conns...),
t: t,
c: append([]*connection{}, t.Conns...),
t: t,
cl: cl,
}
heap.Init(wcs)
return
@ -376,7 +377,7 @@ func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
return
}
func (t *torrent) writeStatus(w io.Writer) {
func (t *torrent) writeStatus(w io.Writer, cl *Client) {
fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
fmt.Fprintf(w, "Metadata have: ")
@ -421,8 +422,9 @@ func (t *torrent) writeStatus(w io.Writer) {
fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
sort.Sort(&worstConns{
c: t.Conns,
t: t,
c: t.Conns,
t: t,
cl: cl,
})
for _, c := range t.Conns {
c.WriteStatus(w, t)
@ -685,8 +687,9 @@ func (t *torrent) wantChunk(r request) bool {
}
func (t *torrent) urgentChunkInPiece(piece int) bool {
p := pp.Integer(piece)
for req := range t.urgent {
if int(req.Index) == piece {
if req.Index == p {
return true
}
}

View File

@ -6,12 +6,13 @@ import (
// Implements heap functions such that [0] is the worst connection.
type worstConns struct {
c []*connection
t *torrent
c []*connection
t *torrent
cl *Client
}
func (me worstConns) Len() int { return len(me.c) }
func (me worstConns) Swap(i, j int) { me.c[i], me.c[j] = me.c[j], me.c[i] }
func (me *worstConns) Len() int { return len(me.c) }
func (me *worstConns) Swap(i, j int) { me.c[i], me.c[j] = me.c[j], me.c[i] }
func (me *worstConns) Pop() (ret interface{}) {
old := me.c
@ -26,37 +27,25 @@ func (me *worstConns) Push(x interface{}) {
}
type worstConnsSortKey struct {
// Peer has something we want.
useless bool
// A fabricated duration since peer was last helpful.
age time.Duration
useful bool
lastHelpful time.Time
}
func (me worstConnsSortKey) Less(other worstConnsSortKey) bool {
if me.useless != other.useless {
return me.useless
if me.useful != other.useful {
return !me.useful
}
return me.age > other.age
return me.lastHelpful.Before(other.lastHelpful)
}
func (me worstConns) key(i int) (key worstConnsSortKey) {
func (me *worstConns) key(i int) (key worstConnsSortKey) {
c := me.c[i]
// Peer has had time to declare what they have.
if time.Now().Sub(c.completedHandshake) >= 30*time.Second {
if !me.t.haveInfo() {
key.useless = !c.supportsExtension("ut_metadata")
} else {
if !me.t.connHasWantedPieces(c) {
key.useless = true
}
}
key.useful = me.cl.usefulConn(me.t, c)
if me.cl.seeding(me.t) {
key.lastHelpful = c.lastChunkSent
} else {
key.lastHelpful = c.lastUsefulChunkReceived
}
key.age = time.Duration(1+3*c.UnwantedChunksReceived) * time.Now().Sub(func() time.Time {
if !c.lastUsefulChunkReceived.IsZero() {
return c.lastUsefulChunkReceived
}
return c.completedHandshake.Add(-time.Minute)
}()) / time.Duration(1+c.UsefulChunksReceived)
return
}