Move fillRequests and replenishConnRequests into connection.go

This commit is contained in:
Matt Joiner 2016-01-24 15:21:17 +11:00
parent cafac95de4
commit 0ad4dda9fc
3 changed files with 57 additions and 53 deletions

View File

@ -1185,13 +1185,13 @@ func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error {
c.PeerPieces[piece] = true c.PeerPieces[piece] = true
} }
if t.wantPiece(piece) { if t.wantPiece(piece) {
me.replenishConnRequests(t, c) c.updateRequests()
} }
return nil return nil
} }
func (me *Client) peerUnchoked(torrent *torrent, conn *connection) { func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
me.replenishConnRequests(torrent, conn) conn.updateRequests()
} }
func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) { func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
@ -1416,10 +1416,10 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
me.connDeleteRequest(t, c, r) me.connDeleteRequest(t, c, r)
} }
// We can then reset our interest. // We can then reset our interest.
me.replenishConnRequests(t, c) c.updateRequests()
case pp.Reject: case pp.Reject:
me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length)) me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
me.replenishConnRequests(t, c) c.updateRequests()
case pp.Unchoke: case pp.Unchoke:
c.PeerChoked = false c.PeerChoked = false
me.peerUnchoked(t, c) me.peerUnchoked(t, c)
@ -1671,6 +1671,7 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
panic(len(t.Conns)) panic(len(t.Conns))
} }
t.Conns = append(t.Conns, c) t.Conns = append(t.Conns, c)
c.t = t
return true return true
} }
@ -2362,20 +2363,6 @@ func (me *Client) WaitAll() bool {
return true return true
} }
func (me *Client) replenishConnRequests(t *torrent, c *connection) {
if !t.haveInfo() {
return
}
t.fillRequests(c)
if len(c.Requests) == 0 && !c.PeerChoked {
// So we're not choked, but we don't want anything right now. We may
// have completed readahead, and the readahead window has not rolled
// over to the next piece. Better to stay interested in case we're
// going to want data in the near future.
c.SetInterested(!t.haveAllPieces())
}
}
// Handle a received chunk from a peer. // Handle a received chunk from a peer.
func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error { func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
chunksReceived.Add(1) chunksReceived.Add(1)
@ -2384,7 +2371,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
// Request has been satisfied. // Request has been satisfied.
if me.connDeleteRequest(t, c, req) { if me.connDeleteRequest(t, c, req) {
defer me.replenishConnRequests(t, c) defer c.updateRequests()
} else { } else {
unexpectedChunksReceived.Add(1) unexpectedChunksReceived.Add(1)
} }
@ -2447,7 +2434,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
// Cancel pending requests for this chunk. // Cancel pending requests for this chunk.
for _, c := range t.Conns { for _, c := range t.Conns {
if me.connCancel(t, c, req) { if me.connCancel(t, c, req) {
me.replenishConnRequests(t, c) c.updateRequests()
} }
} }
@ -2520,7 +2507,7 @@ func (me *Client) onFailedPiece(t *torrent, piece int) {
me.openNewConns(t) me.openNewConns(t)
for _, conn := range t.Conns { for _, conn := range t.Conns {
if conn.PeerHasPiece(piece) { if conn.PeerHasPiece(piece) {
me.replenishConnRequests(t, conn) conn.updateRequests()
} }
} }
} }

View File

@ -30,6 +30,7 @@ const (
// Maintains the state of a connection with a peer. // Maintains the state of a connection with a peer.
type connection struct { type connection struct {
t *torrent
conn net.Conn conn net.Conn
rw io.ReadWriter // The real slim shady rw io.ReadWriter // The real slim shady
encrypted bool encrypted bool
@ -534,3 +535,46 @@ func (cn *connection) Bitfield(haves []bool) {
}) })
cn.sentHaves = haves cn.sentHaves = haves
} }
func (c *connection) updateRequests() {
if !c.t.haveInfo() {
return
}
if c.Interested {
if c.PeerChoked {
return
}
if len(c.Requests) > c.requestsLowWater {
return
}
}
c.fillRequests()
if len(c.Requests) == 0 && !c.PeerChoked {
// So we're not choked, but we don't want anything right now. We may
// have completed readahead, and the readahead window has not rolled
// over to the next piece. Better to stay interested in case we're
// going to want data in the near future.
c.SetInterested(!c.t.haveAllPieces())
}
}
func (c *connection) fillRequests() {
if !c.t.forUrgentPieces(func(piece int) (again bool) {
return c.t.connRequestPiecePendingChunks(c, piece)
}) {
return
}
c.t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
for i := begin + 1; i < end; i++ {
if !c.t.connRequestPiecePendingChunks(c, i) {
return false
}
}
return true
})
for i := range c.t.pendingPieces {
if !c.t.connRequestPiecePendingChunks(c, i) {
return
}
}
}

View File

@ -458,7 +458,7 @@ func (t *torrent) String() string {
} }
func (t *torrent) haveInfo() bool { func (t *torrent) haveInfo() bool {
return t != nil && t.Info != nil return t.Info != nil
} }
// TODO: Include URIs that weren't converted to tracker clients. // TODO: Include URIs that weren't converted to tracker clients.
@ -814,7 +814,7 @@ func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
func (t *torrent) readersChanged(cl *Client) { func (t *torrent) readersChanged(cl *Client) {
for _, c := range t.Conns { for _, c := range t.Conns {
cl.replenishConnRequests(t, c) c.updateRequests()
} }
cl.openNewConns(t) cl.openNewConns(t)
} }
@ -897,8 +897,10 @@ func (t *torrent) pendPiece(piece int, cl *Client) {
if !c.PeerHasPiece(piece) { if !c.PeerHasPiece(piece) {
continue continue
} }
c.updateRequests()
} }
cl.openNewConns(t)
cl.pieceChanged(t, piece)
} }
func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) { func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
@ -913,32 +915,3 @@ func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more
} }
return true return true
} }
func (t *torrent) fillRequests(c *connection) {
if c.Interested {
if c.PeerChoked {
return
}
if len(c.Requests) > c.requestsLowWater {
return
}
}
if !t.forUrgentPieces(func(piece int) (again bool) {
return t.connRequestPiecePendingChunks(c, piece)
}) {
return
}
t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
for i := begin + 1; i < end; i++ {
if !t.connRequestPiecePendingChunks(c, i) {
return false
}
}
return true
})
for i := range t.pendingPieces {
if !t.connRequestPiecePendingChunks(c, i) {
return
}
}
}