Tidy up request and cancel

This commit is contained in:
Matt Joiner 2021-05-09 23:38:38 +10:00
parent 5f437e6b7f
commit 6e97ce952f
3 changed files with 37 additions and 64 deletions

View File

@ -10,12 +10,14 @@ import (
type peerImpl interface { type peerImpl interface {
updateRequests() updateRequests()
writeInterested(interested bool) bool writeInterested(interested bool) bool
cancel(Request) bool
// Return true if there's room for more activity. // Neither of these return buffer room anymore, because they're currently both posted. There's
request(Request) bool // also PeerConn.writeBufferFull for when/where it matters.
_cancel(Request)
_request(Request)
connectionFlags() string connectionFlags() string
onClose() onClose()
_postCancel(Request)
onGotInfo(*metainfo.Info) onGotInfo(*metainfo.Info)
drop() drop()
String() string String() string

View File

@ -529,15 +529,15 @@ func (pc *PeerConn) writeInterested(interested bool) bool {
// are okay. // are okay.
type messageWriter func(pp.Message) bool type messageWriter func(pp.Message) bool
func (cn *Peer) request(r Request) (more bool, err error) { func (cn *Peer) request(r Request) error {
if _, ok := cn.requests[r]; ok { if _, ok := cn.requests[r]; ok {
return true, nil return nil
} }
if cn.numLocalRequests() >= cn.nominalMaxRequests() { if cn.numLocalRequests() >= cn.nominalMaxRequests() {
return true, errors.New("too many outstanding requests") return errors.New("too many outstanding requests")
} }
if !cn.peerHasPiece(pieceIndex(r.Index)) { if !cn.peerHasPiece(pieceIndex(r.Index)) {
return true, errors.New("requesting piece peer doesn't have") return errors.New("requesting piece peer doesn't have")
} }
if !cn.t.peerIsActive(cn) { if !cn.t.peerIsActive(cn) {
panic("requesting but not in active conns") panic("requesting but not in active conns")
@ -545,19 +545,22 @@ func (cn *Peer) request(r Request) (more bool, err error) {
if cn.closed.IsSet() { if cn.closed.IsSet() {
panic("requesting when connection is closed") panic("requesting when connection is closed")
} }
if cn.peerChoking {
if cn.peerAllowedFast.Get(int(r.Index)) {
torrent.Add("allowed fast requests sent", 1)
} else {
return cn.setInterested(true), errors.New("requesting while choked and not allowed fast")
}
}
if cn.t.hashingPiece(pieceIndex(r.Index)) { if cn.t.hashingPiece(pieceIndex(r.Index)) {
panic("piece is being hashed") panic("piece is being hashed")
} }
if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) { if cn.t.pieceQueuedForHash(pieceIndex(r.Index)) {
panic("piece is queued for hash") panic("piece is queued for hash")
} }
if !cn.setInterested(true) {
return errors.New("write buffer full after expressing interest")
}
if cn.peerChoking {
if cn.peerAllowedFast.Get(int(r.Index)) {
torrent.Add("allowed fast requests sent", 1)
} else {
errors.New("peer choking and piece not in allowed fast set")
}
}
if cn.requests == nil { if cn.requests == nil {
cn.requests = make(map[Request]struct{}) cn.requests = make(map[Request]struct{})
} }
@ -571,11 +574,12 @@ func (cn *Peer) request(r Request) (more bool, err error) {
for _, f := range cn.callbacks.SentRequest { for _, f := range cn.callbacks.SentRequest {
f(PeerRequestEvent{cn, r}) f(PeerRequestEvent{cn, r})
} }
return cn.peerImpl.request(r), nil cn.peerImpl._request(r)
return nil
} }
func (me *PeerConn) request(r Request) bool { func (me *PeerConn) _request(r Request) {
return me.write(pp.Message{ me.write(pp.Message{
Type: pp.Request, Type: pp.Request,
Index: r.Index, Index: r.Index,
Begin: r.Begin, Begin: r.Begin,
@ -583,8 +587,14 @@ func (me *PeerConn) request(r Request) bool {
}) })
} }
func (me *PeerConn) cancel(r Request) bool { func (me *Peer) cancel(r Request) {
return me.write(makeCancelMessage(r)) if me.deleteRequest(r) {
me.peerImpl._cancel(r)
}
}
func (me *PeerConn) _cancel(r Request) {
me.write(makeCancelMessage(r))
} }
func (cn *PeerConn) fillWriteBuffer() { func (cn *PeerConn) fillWriteBuffer() {
@ -1317,7 +1327,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
if p == c { if p == c {
return return
} }
p.postCancel(req) p.cancel(req)
}) })
err := func() error { err := func() error {
@ -1472,26 +1482,6 @@ func (c *Peer) deleteRequest(r Request) bool {
if n < 0 { if n < 0 {
panic(n) panic(n)
} }
// If a request fails, updating the requests for the current peer first may miss the opportunity
// to try other peers for that request instead, depending on the request strategy. This might
// only affect webseed peers though, since they synchronously issue new requests: PeerConns do
// it in the writer routine.
const updateCurrentConnRequestsFirst = false
if updateCurrentConnRequestsFirst {
c.updateRequests()
}
// Give other conns a chance to pick up the request.
c.t.iterPeers(func(_c *Peer) {
// We previously checked that the peer wasn't interested to to only wake connections that
// were unable to issue requests due to starvation by the request strategy. There could be
// performance ramifications.
if _c != c && c.peerHasPiece(pieceIndex(r.Index)) {
_c.updateRequests()
}
})
if !updateCurrentConnRequestsFirst {
c.updateRequests()
}
return true return true
} }
@ -1513,18 +1503,6 @@ func (c *PeerConn) tickleWriter() {
c.writerCond.Broadcast() c.writerCond.Broadcast()
} }
func (c *Peer) postCancel(r Request) bool {
if !c.deleteRequest(r) {
return false
}
c.peerImpl._postCancel(r)
return true
}
func (c *PeerConn) _postCancel(r Request) {
c.post(makeCancelMessage(r))
}
func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) { func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
c.lastChunkSent = time.Now() c.lastChunkSent = time.Now()
return msg(pp.Message{ return msg(pp.Message{

View File

@ -41,30 +41,23 @@ func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
ws.client.Info = info ws.client.Info = info
} }
func (ws *webseedPeer) _postCancel(r Request) {
ws.cancel(r)
}
func (ws *webseedPeer) writeInterested(interested bool) bool { func (ws *webseedPeer) writeInterested(interested bool) bool {
return true return true
} }
func (ws *webseedPeer) cancel(r Request) bool { func (ws *webseedPeer) _cancel(r Request) {
active, ok := ws.activeRequests[r] active, ok := ws.activeRequests[r]
if !ok { if ok {
return false
}
active.Cancel() active.Cancel()
return true }
} }
func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec { func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)} return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
} }
func (ws *webseedPeer) request(r Request) bool { func (ws *webseedPeer) _request(r Request) {
ws.requesterCond.Signal() ws.requesterCond.Signal()
return true
} }
func (ws *webseedPeer) doRequest(r Request) { func (ws *webseedPeer) doRequest(r Request) {