Factor out connection.onReadRequest

This commit is contained in:
Matt Joiner 2018-02-02 19:19:14 +11:00
parent fe991128ed
commit 7a95714df2
3 changed files with 42 additions and 26 deletions

View File

@ -823,6 +823,30 @@ func (c *connection) lastHelpful() (ret time.Time) {
return
}
func (c *connection) onReadRequest(r request) error {
requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
if c.Choked {
return nil
}
if len(c.PeerRequests) >= maxRequests {
// TODO: Should we drop them or Choke them instead?
return nil
}
if !c.t.havePiece(r.Index.Int()) {
// This isn't necessarily them screwing up. We can drop pieces
// from our storage, and can't communicate this to peers
// except by reconnecting.
requestsReceivedForMissingPieces.Add(1)
return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
}
if c.PeerRequests == nil {
c.PeerRequests = make(map[request]struct{}, maxRequests)
}
c.PeerRequests[r] = struct{}{}
c.tickleWriter()
return nil
}
// Processes incoming bittorrent messages. The client lock is held upon entry
// and exit. Returning will end the connection.
func (c *connection) mainReadLoop() error {
@ -864,7 +888,7 @@ func (c *connection) mainReadLoop() error {
// We can then reset our interest.
c.updateRequests()
case pp.Reject:
if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
if c.deleteRequest(newRequestFromMessage(&msg)) {
c.updateRequests()
}
case pp.Unchoke:
@ -879,29 +903,10 @@ func (c *connection) mainReadLoop() error {
case pp.Have:
err = c.peerSentHave(int(msg.Index))
case pp.Request:
requestedChunkLengths.Add(strconv.FormatUint(msg.Length.Uint64(), 10), 1)
if c.Choked {
break
}
if len(c.PeerRequests) >= maxRequests {
// TODO: Should we drop them or Choke them instead?
break
}
if !t.havePiece(msg.Index.Int()) {
// This isn't necessarily them screwing up. We can drop pieces
// from our storage, and can't communicate this to peers
// except by reconnecting.
requestsReceivedForMissingPieces.Add(1)
err = fmt.Errorf("peer requested piece we don't have: %v", msg.Index.Int())
break
}
if c.PeerRequests == nil {
c.PeerRequests = make(map[request]struct{}, maxRequests)
}
c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
c.tickleWriter()
r := newRequestFromMessage(&msg)
err = c.onReadRequest(r)
case pp.Cancel:
req := newRequest(msg.Index, msg.Begin, msg.Length)
req := newRequestFromMessage(&msg)
if !c.PeerCancel(req) {
unexpectedCancels.Add(1)
}
@ -1062,7 +1067,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
cl := t.cl
chunksReceived.Add(1)
req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
req := newRequestFromMessage(msg)
// Request has been satisfied.
if c.deleteRequest(req) {

View File

@ -138,8 +138,8 @@ func TestConnectionReceiveBadChunkIndex(t *testing.T) {
t: &Torrent{},
}
require.False(t, cn.t.haveInfo())
assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{}) })
assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{Type: pp.Piece}) })
cn.t.info = &metainfo.Info{}
require.True(t, cn.t.haveInfo())
assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{}) })
assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{Type: pp.Piece}) })
}

11
misc.go
View File

@ -21,6 +21,17 @@ func newRequest(index, begin, length pp.Integer) request {
return request{index, chunkSpec{begin, length}}
}
func newRequestFromMessage(msg *pp.Message) request {
switch msg.Type {
case pp.Request:
return newRequest(msg.Index, msg.Begin, msg.Length)
case pp.Piece:
return newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
default:
panic(msg.Type)
}
}
// The size in bytes of a metadata extension piece.
func metadataPieceSize(totalSize int, piece int) int {
ret := totalSize - piece*(1<<14)