Move uploading to the connection writer

This commit is contained in:
Matt Joiner 2017-09-18 12:09:08 +10:00
parent ed0fa62340
commit bb53c97d38
3 changed files with 42 additions and 46 deletions

View File

@ -1142,7 +1142,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
}
}
func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
func (cl *Client) sendChunk(t *Torrent, c *connection, r request, msg func(pp.Message) bool) (more bool, err error) {
// Count the chunk being sent, even if it isn't.
b := make([]byte, r.Length)
p := t.info.Piece(int(r.Index))
@ -1151,9 +1151,9 @@ func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
if err == nil {
panic("expected error")
}
return err
return
}
c.Post(pp.Message{
more = msg(pp.Message{
Type: pp.Piece,
Index: r.Index,
Begin: r.Begin,
@ -1162,7 +1162,7 @@ func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
c.chunksSent++
uploadChunksPosted.Add(1)
c.lastChunkSent = time.Now()
return nil
return
}
func (cl *Client) openNewConns(t *Torrent) {

View File

@ -99,6 +99,7 @@ type connection struct {
pieceRequestOrder prioritybitmap.PriorityBitmap
postedBuffer bytes.Buffer
uploadTimer *time.Timer
writerCond sync.Cond
}
@ -297,25 +298,25 @@ func (cn *connection) PeerCancel(r request) bool {
return true
}
func (cn *connection) Choke() {
func (cn *connection) Choke(msg func(pp.Message) bool) bool {
if cn.Choked {
return
return true
}
cn.Post(pp.Message{
Type: pp.Choke,
})
cn.PeerRequests = nil
cn.Choked = true
return msg(pp.Message{
Type: pp.Choke,
})
}
func (cn *connection) Unchoke() {
func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
if !cn.Choked {
return
return true
}
cn.Post(pp.Message{
cn.Choked = false
return msg(pp.Message{
Type: pp.Unchoke,
})
cn.Choked = false
}
func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
@ -378,6 +379,7 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
// have more write buffer space.
cn.requestsLowWater = len(cn.requests) / 2
}
cn.upload(msg)
}
// Writes buffers to the socket from the write channel.
@ -757,33 +759,24 @@ func (c *connection) mainReadLoop() error {
c.tickleWriter()
case pp.Interested:
c.PeerInterested = true
c.upload()
c.tickleWriter()
case pp.NotInterested:
c.PeerInterested = false
c.Choke()
c.PeerRequests = nil
case pp.Have:
err = c.peerSentHave(int(msg.Index))
case pp.Request:
if c.Choked {
break
}
if !c.PeerInterested {
err = errors.New("peer sent request but isn't interested")
break
}
if !t.havePiece(msg.Index.Int()) {
// This isn't necessarily them screwing up. We can drop pieces
// from our storage, and can't communicate this to peers
// except by reconnecting.
requestsReceivedForMissingPieces.Add(1)
err = errors.New("peer requested piece we don't have")
if len(c.PeerRequests) >= maxRequests {
break
}
if c.PeerRequests == nil {
c.PeerRequests = make(map[request]struct{}, maxRequests)
}
c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
c.upload()
c.tickleWriter()
case pp.Cancel:
req := newRequest(msg.Index, msg.Begin, msg.Length)
if !c.PeerCancel(req) {
@ -971,8 +964,6 @@ func (c *connection) receiveChunk(msg *pp.Message) {
c.UsefulChunksReceived++
c.lastUsefulChunkReceived = time.Now()
c.upload()
// Need to record that it hasn't been written yet, before we attempt to do
// anything with it.
piece.incrementPendingWrites()
@ -1016,40 +1007,45 @@ func (c *connection) receiveChunk(msg *pp.Message) {
}
// Also handles choking and unchoking of the remote peer.
func (c *connection) upload() {
func (c *connection) upload(msg func(pp.Message) bool) bool {
t := c.t
cl := t.cl
if cl.config.NoUpload {
return
return true
}
if !c.PeerInterested {
return
return true
}
seeding := t.seeding()
if !seeding && !c.peerHasWantedPieces() {
// There's no reason to upload to this peer.
return
return true
}
// Breaking or completing this loop means we don't want to upload to the
// peer anymore, and we choke them.
another:
for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
// We want to upload to the peer.
c.Unchoke()
if !c.Unchoke(msg) {
return false
}
for r := range c.PeerRequests {
res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
if !res.OK() {
panic(res)
}
delay := res.Delay()
if delay > 0 {
res.Cancel()
go func() {
time.Sleep(delay)
cl.mu.Lock()
defer cl.mu.Unlock()
c.upload()
}()
return
if c.uploadTimer == nil {
c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
} else {
c.uploadTimer.Reset(delay)
}
// Hard to say what to return here.
return true
}
err := cl.sendChunk(t, c, r)
more, err := cl.sendChunk(t, c, r, msg)
if err != nil {
i := int(r.Index)
if t.pieceComplete(i) {
@ -1068,11 +1064,14 @@ another:
break another
}
delete(c.PeerRequests, r)
if !more {
return false
}
goto another
}
return
return true
}
c.Choke()
return c.Choke(msg)
}
func (cn *connection) Drop() {

View File

@ -1481,9 +1481,6 @@ func (t *Torrent) onPieceCompleted(piece int) {
t.cancelRequestsForPiece(piece)
for conn := range t.conns {
conn.Have(piece)
// Could check here if peer doesn't have piece, but due to caching
// some peers may have said they have a piece but they don't.
conn.upload()
}
}