Sleep webseed peers after unhandled errors

This commit is contained in:
Matt Joiner 2022-12-31 11:27:47 +11:00
parent 61a303cbf7
commit 14cf045b6a
No known key found for this signature in database
GPG Key ID: 6B990B8185E7F782
4 changed files with 38 additions and 20 deletions

View File

@ -26,7 +26,7 @@ type peerImpl interface {
// Rebuke the peer // Rebuke the peer
ban() ban()
String() string String() string
connStatusString() string peerImplStatusLines() []string
// All if the peer should have everything, known if we know that for a fact. For example, we can // All if the peer should have everything, known if we know that for a fact. For example, we can
// guess at how many pieces are in a torrent, and assume they have all pieces based on them // guess at how many pieces are in a torrent, and assume they have all pieces based on them

View File

@ -171,8 +171,8 @@ type PeerConn struct {
peerSentHaveAll bool peerSentHaveAll bool
} }
func (cn *PeerConn) connStatusString() string { func (cn *PeerConn) peerImplStatusLines() []string {
return fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString) return []string{fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)}
} }
func (cn *Peer) updateExpectingChunks() { func (cn *Peer) updateExpectingChunks() {
@ -389,14 +389,14 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
if cn.closed.IsSet() { if cn.closed.IsSet() {
fmt.Fprint(w, "CLOSED: ") fmt.Fprint(w, "CLOSED: ")
} }
fmt.Fprintln(w, cn.connStatusString()) fmt.Fprintln(w, strings.Join(cn.peerImplStatusLines(), "\n"))
prio, err := cn.peerPriority() prio, err := cn.peerPriority()
prioStr := fmt.Sprintf("%08x", prio) prioStr := fmt.Sprintf("%08x", prio)
if err != nil { if err != nil {
prioStr += ": " + err.Error() prioStr += ": " + err.Error()
} }
fmt.Fprintf(w, " bep40-prio: %v\n", prioStr) fmt.Fprintf(w, "bep40-prio: %v\n", prioStr)
fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n", fmt.Fprintf(w, "last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
eventAgeString(cn.lastMessageReceived), eventAgeString(cn.lastMessageReceived),
eventAgeString(cn.completedHandshake), eventAgeString(cn.completedHandshake),
eventAgeString(cn.lastHelpful()), eventAgeString(cn.lastHelpful()),
@ -404,7 +404,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
cn.totalExpectingTime(), cn.totalExpectingTime(),
) )
fmt.Fprintf(w, fmt.Fprintf(w,
" %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n", "%s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n",
cn.completedString(), cn.completedString(),
len(cn.peerTouchedPieces), len(cn.peerTouchedPieces),
&cn._stats.ChunksReadUseful, &cn._stats.ChunksReadUseful,
@ -419,7 +419,7 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
cn.statusFlags(), cn.statusFlags(),
cn.downloadRate()/(1<<10), cn.downloadRate()/(1<<10),
) )
fmt.Fprintf(w, " requested pieces:") fmt.Fprintf(w, "requested pieces:")
cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) { cn.iterContiguousPieceRequests(func(piece pieceIndex, count int) {
fmt.Fprintf(w, " %v(%v)", piece, count) fmt.Fprintf(w, " %v(%v)", piece, count)
}) })

View File

@ -770,9 +770,14 @@ func (t *Torrent) writeStatus(w io.Writer) {
} }
return worseConn(i, j) return worseConn(i, j)
}) })
var buf bytes.Buffer
for i, c := range peers { for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1) fmt.Fprintf(w, "%2d. ", i+1)
c.writeStatus(w, t) buf.Reset()
c.writeStatus(&buf, t)
w.Write(bytes.TrimRight(
bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")),
" "))
} }
} }
@ -2425,7 +2430,6 @@ func (t *Torrent) addWebSeed(url string, opts ...AddWebSeedsOpt) {
}, },
}, },
activeRequests: make(map[Request]webseed.Request, maxRequests), activeRequests: make(map[Request]webseed.Request, maxRequests),
maxRequests: maxRequests,
} }
ws.peer.initRequestState() ws.peer.initRequestState()
for _, opt := range opts { for _, opt := range opts {

View File

@ -16,20 +16,27 @@ import (
"github.com/anacrolix/torrent/webseed" "github.com/anacrolix/torrent/webseed"
) )
const (
webseedPeerUnhandledErrorSleep = 5 * time.Second
webseedPeerCloseOnUnhandledError = false
)
type webseedPeer struct { type webseedPeer struct {
// First field for stats alignment. // First field for stats alignment.
peer Peer peer Peer
client webseed.Client client webseed.Client
activeRequests map[Request]webseed.Request activeRequests map[Request]webseed.Request
requesterCond sync.Cond requesterCond sync.Cond
// Number of requester routines. lastUnhandledErr time.Time
maxRequests int
} }
var _ peerImpl = (*webseedPeer)(nil) var _ peerImpl = (*webseedPeer)(nil)
func (me *webseedPeer) connStatusString() string { func (me *webseedPeer) peerImplStatusLines() []string {
return me.client.Url return []string{
me.client.Url,
fmt.Sprintf("last unhandled error: %v", eventAgeString(me.lastUnhandledErr)),
}
} }
func (ws *webseedPeer) String() string { func (ws *webseedPeer) String() string {
@ -86,6 +93,7 @@ func (ws *webseedPeer) requester(i int) {
defer ws.requesterCond.L.Unlock() defer ws.requesterCond.L.Unlock()
start: start:
for !ws.peer.closed.IsSet() { for !ws.peer.closed.IsSet() {
// Restart is set if we don't need to wait for the requestCond before trying again.
restart := false restart := false
ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool { ws.peer.requestState.Requests.Iterate(func(x RequestIndex) bool {
r := ws.peer.t.requestIndexToRequest(x) r := ws.peer.t.requestIndexToRequest(x)
@ -101,6 +109,7 @@ start:
if errors.Is(err, webseed.ErrTooFast) { if errors.Is(err, webseed.ErrTooFast) {
time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second)))) time.Sleep(time.Duration(rand.Int63n(int64(10 * time.Second))))
} }
time.Sleep(time.Until(ws.lastUnhandledErr.Add(webseedPeerUnhandledErrorSleep)))
ws.requesterCond.L.Lock() ws.requesterCond.L.Lock()
return false return false
}) })
@ -172,8 +181,13 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
// cfg := spew.NewDefaultConfig() // cfg := spew.NewDefaultConfig()
// cfg.DisableMethods = true // cfg.DisableMethods = true
// cfg.Dump(result.Err) // cfg.Dump(result.Err)
log.Printf("closing %v", ws)
ws.peer.close() if webseedPeerCloseOnUnhandledError {
log.Printf("closing %v", ws)
ws.peer.close()
} else {
ws.lastUnhandledErr = time.Now()
}
} }
if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) { if !ws.peer.remoteRejectedRequest(ws.peer.t.requestIndexFromRequest(r)) {
panic("invalid reject") panic("invalid reject")