Add requestStrategy 3, which duplicates requests only after a timeout

Possible solution for #253.
This commit is contained in:
Matt Joiner 2018-06-24 20:04:31 +10:00
parent d2602c7935
commit 195695042d
4 changed files with 87 additions and 18 deletions

View File

@ -975,6 +975,8 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
metadataChanged: sync.Cond{
L: &cl.mu,
},
lastRequested: make(map[request]time.Time),
duplicateRequestTimeout: 15 * time.Second,
}
t.logger = cl.logger.Clone().AddValue(t)
t.setChunkSize(defaultChunkSize)

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"math"
"math/rand"
"net"
"strconv"
@ -68,6 +69,10 @@ type connection struct {
lastBecameInterested time.Time
priorInterest time.Duration
lastStartedExpectingToReceiveChunks time.Time
cumulativeExpectedToReceiveChunks time.Duration
chunksReceivedWhileExpecting int64
Choked bool
requests map[request]struct{}
requestsLowWater int
@ -107,6 +112,23 @@ type connection struct {
writerCond sync.Cond
}
func (cn *connection) updateExpectingChunks() {
if cn.expectingChunks() {
if cn.lastStartedExpectingToReceiveChunks.IsZero() {
cn.lastStartedExpectingToReceiveChunks = time.Now()
}
} else {
if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
cn.lastStartedExpectingToReceiveChunks = time.Time{}
}
}
}
func (cn *connection) expectingChunks() bool {
return cn.Interested && !cn.PeerChoked
}
// Returns true if the connection is over IPv6.
func (cn *connection) ipv6() bool {
ip := missinggo.AddrIP(cn.remoteAddr())
@ -256,11 +278,12 @@ func (cn *connection) downloadRate() float64 {
func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
// \t isn't preserved in <pre> blocks?
fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s\n",
fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
eventAgeString(cn.lastMessageReceived),
eventAgeString(cn.completedHandshake),
eventAgeString(cn.lastHelpful()),
cn.cumInterest(),
cn.totalExpectingTime(),
)
fmt.Fprintf(w,
" %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
@ -351,9 +374,37 @@ func (cn *connection) requestedMetadataPiece(index int) bool {
// The actual value to use as the maximum outbound requests.
func (cn *connection) nominalMaxRequests() (ret int) {
if cn.t.requestStrategy == 3 {
expectingTime := int64(cn.totalExpectingTime())
if expectingTime == 0 {
expectingTime = math.MaxInt64
}
return int(clamp(
1,
int64(cn.PeerMaxRequests),
max(
// It makes sense to always pipeline at least one connection,
// since latency must be non-zero.
2,
// Request only as many as we expect to receive in the
// dupliateRequestTimeout window. We are trying to avoid having to
// duplicate requests.
cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime,
),
))
}
return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
}
func (cn *connection) totalExpectingTime() (ret time.Duration) {
ret = cn.cumulativeExpectedToReceiveChunks
if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
}
return
}
func (cn *connection) onPeerSentCancel(r request) {
if _, ok := cn.PeerRequests[r]; !ok {
torrent.Add("unexpected cancels received", 1)
@ -405,6 +456,7 @@ func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool)
} else if !cn.lastBecameInterested.IsZero() {
cn.priorInterest += time.Since(cn.lastBecameInterested)
}
cn.updateExpectingChunks()
// log.Printf("%p: setting interest: %v", cn, interested)
return msg(pp.Message{
Type: func() pp.MessageType {
@ -447,6 +499,8 @@ func (cn *connection) request(r request, mw messageWriter) bool {
}
cn.requests[r] = struct{}{}
cn.t.pendingRequests[r]++
cn.t.lastRequested[r] = time.Now()
cn.updateExpectingChunks()
return mw(pp.Message{
Type: pp.Request,
Index: r.Index,
@ -689,6 +743,9 @@ func (cn *connection) shouldRequestWithoutBias() bool {
}
func (cn *connection) pieceRequestOrderIter() iter.Func {
if cn.t.requestStrategy == 3 {
return cn.unbiasedPieceRequestOrder()
}
if cn.shouldRequestWithoutBias() {
return cn.unbiasedPieceRequestOrder()
} else {
@ -701,10 +758,16 @@ func (cn *connection) iterPendingRequests(f func(request) bool) {
piece := _piece.(int)
return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
r := request{pp.Integer(piece), cs}
// log.Println(r, cn.t.pendingRequests[r], cn.requests)
// if _, ok := cn.requests[r]; !ok && cn.t.pendingRequests[r] != 0 {
// return true
// }
if cn.t.requestStrategy == 3 {
lr := cn.t.lastRequested[r]
if !lr.IsZero() {
if time.Since(lr) < cn.t.duplicateRequestTimeout {
return true
} else {
torrent.Add("requests duplicated due to timeout", 1)
}
}
}
return f(r)
})
})
@ -1033,11 +1096,13 @@ func (c *connection) mainReadLoop() (err error) {
c.deleteAllRequests()
// We can then reset our interest.
c.updateRequests()
c.updateExpectingChunks()
case pp.Reject:
c.deleteRequest(newRequestFromMessage(&msg))
case pp.Unchoke:
c.PeerChoked = false
c.tickleWriter()
c.updateExpectingChunks()
case pp.Interested:
c.PeerInterested = true
c.tickleWriter()
@ -1215,6 +1280,9 @@ func (c *connection) receiveChunk(msg *pp.Message) {
// Request has been satisfied.
if c.deleteRequest(req) {
if c.expectingChunks() {
c.chunksReceivedWhileExpecting++
}
c.updateRequests()
} else {
torrent.Add("chunks received unexpected", 1)
@ -1400,6 +1468,8 @@ func (c *connection) deleteRequest(r request) bool {
return false
}
delete(c.requests, r)
c.updateExpectingChunks()
delete(c.t.lastRequested, r)
pr := c.t.pendingRequests
pr[r]--
n := pr[r]

View File

@ -9,7 +9,6 @@ import (
"github.com/anacrolix/missinggo/pubsub"
"github.com/bradfitz/iter"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/anacrolix/torrent/metainfo"
@ -139,14 +138,3 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
require.NoError(b, <-mrlErr)
require.EqualValues(b, b.N, cn.stats.ChunksReadUseful.Int64())
}
func TestConnectionReceiveBadChunkIndex(t *testing.T) {
cn := connection{
t: &Torrent{},
}
require.False(t, cn.t.haveInfo())
assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{Type: pp.Piece}) })
cn.t.info = &metainfo.Info{}
require.True(t, cn.t.haveInfo())
assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{Type: pp.Piece}) })
}

View File

@ -52,11 +52,16 @@ type Torrent struct {
logger *log.Logger
networkingEnabled bool
// Determines what chunks to request from peers. 1: Favour higher priority
// pieces with some fuzzing to reduce overlaps and wastage across
// connections. 2: The fastest connection downloads strictly in order of
// priority, while all others adher to their piece inclications.
// priority, while all others adher to their piece inclications. 3:
// Requests are strictly by piece priority, and not duplicated until
// duplicateRequestTimeout is reached.
requestStrategy int
// How long to avoid duplicating a pending request.
duplicateRequestTimeout time.Duration
closed missinggo.Event
infoHash metainfo.Hash
@ -140,6 +145,9 @@ type Torrent struct {
// Count of each request across active connections.
pendingRequests map[request]int
// The last time we requested a chunk. Deleting the request from any
// connection will clear this value.
lastRequested map[request]time.Time
}
func (t *Torrent) tickleReaders() {
@ -399,6 +407,7 @@ func (t *Torrent) onSetInfo() {
t.gotMetainfo.Set()
t.updateWantPeersEvent()
t.pendingRequests = make(map[request]int)
t.lastRequested = make(map[request]time.Time)
}
// Called when metadata for a torrent becomes available.