Expose more callbacks and Request and ChunkSpec

This commit is contained in:
Matt Joiner 2021-01-28 14:23:22 +11:00
parent f409daa93f
commit 456a2f7c5d
15 changed files with 127 additions and 97 deletions

View File

@ -20,9 +20,21 @@ type Callbacks struct {
ReceiveEncryptedHandshakeSkeys mse.SecretKeyIter ReceiveEncryptedHandshakeSkeys mse.SecretKeyIter
ReceivedUsefulData []func(ReceivedUsefulDataEvent) ReceivedUsefulData []func(ReceivedUsefulDataEvent)
ReceivedRequested []func(PeerMessageEvent)
DeletedRequest []func(PeerRequestEvent)
SentRequest []func(PeerRequestEvent)
PeerClosed []func(*Peer)
NewPeer []func(*Peer)
} }
type ReceivedUsefulDataEvent struct { type ReceivedUsefulDataEvent = PeerMessageEvent
type PeerMessageEvent struct {
Peer *Peer Peer *Peer
Message *pp.Message Message *pp.Message
} }
type PeerRequestEvent struct {
Peer *Peer
Request
}

View File

@ -1380,11 +1380,11 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot
RemoteAddr: remoteAddr, RemoteAddr: remoteAddr,
Network: network, Network: network,
callbacks: &cl.config.Callbacks,
}, },
connString: connString, connString: connString,
conn: nc, conn: nc,
writeBuffer: new(bytes.Buffer), writeBuffer: new(bytes.Buffer),
callbacks: &cl.config.Callbacks,
} }
c.peerImpl = c c.peerImpl = c
c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c) c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
@ -1395,6 +1395,9 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot
r: c.r, r: c.r,
} }
c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing) c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
for _, f := range cl.config.Callbacks.NewPeer {
f(&c.Peer)
}
return return
} }

View File

@ -108,7 +108,7 @@ func TestTorrentInitialState(t *testing.T) {
tor.cl.lock() tor.cl.lock()
assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0)) assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
tor.cl.unlock() tor.cl.unlock()
assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize)) assert.EqualValues(t, ChunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
} }
func TestReducedDialTimeout(t *testing.T) { func TestReducedDialTimeout(t *testing.T) {

24
misc.go
View File

@ -11,16 +11,16 @@ import (
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
) )
type chunkSpec struct { type ChunkSpec struct {
Begin, Length pp.Integer Begin, Length pp.Integer
} }
type request struct { type Request struct {
Index pp.Integer Index pp.Integer
chunkSpec ChunkSpec
} }
func (r request) ToMsg(mt pp.MessageType) pp.Message { func (r Request) ToMsg(mt pp.MessageType) pp.Message {
return pp.Message{ return pp.Message{
Type: mt, Type: mt,
Index: r.Index, Index: r.Index,
@ -29,11 +29,11 @@ func (r request) ToMsg(mt pp.MessageType) pp.Message {
} }
} }
func newRequest(index, begin, length pp.Integer) request { func newRequest(index, begin, length pp.Integer) Request {
return request{index, chunkSpec{begin, length}} return Request{index, ChunkSpec{begin, length}}
} }
func newRequestFromMessage(msg *pp.Message) request { func newRequestFromMessage(msg *pp.Message) Request {
switch msg.Type { switch msg.Type {
case pp.Request, pp.Cancel, pp.Reject: case pp.Request, pp.Cancel, pp.Reject:
return newRequest(msg.Index, msg.Begin, msg.Length) return newRequest(msg.Index, msg.Begin, msg.Length)
@ -55,7 +55,7 @@ func metadataPieceSize(totalSize int, piece int) int {
// Return the request that would include the given offset into the torrent data. // Return the request that would include the given offset into the torrent data.
func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) ( func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
r request, ok bool) { r Request, ok bool) {
if offset < 0 || offset >= torrentLength { if offset < 0 || offset >= torrentLength {
return return
} }
@ -74,10 +74,10 @@ func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
return return
} }
func torrentRequestOffset(torrentLength, pieceSize int64, r request) (off int64) { func torrentRequestOffset(torrentLength, pieceSize int64, r Request) (off int64) {
off = int64(r.Index)*pieceSize + int64(r.Begin) off = int64(r.Index)*pieceSize + int64(r.Begin)
if off < 0 || off >= torrentLength { if off < 0 || off >= torrentLength {
panic("invalid request") panic("invalid Request")
} }
return return
} }
@ -98,8 +98,8 @@ func validateInfo(info *metainfo.Info) error {
return nil return nil
} }
func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) chunkSpec { func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) ChunkSpec {
ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize} ret := ChunkSpec{pp.Integer(index) * chunkSize, chunkSize}
if ret.Begin+ret.Length > pieceLength { if ret.Begin+ret.Length > pieceLength {
ret.Length = pieceLength - ret.Begin ret.Length = pieceLength - ret.Begin
} }

View File

@ -12,7 +12,7 @@ import (
) )
func TestTorrentOffsetRequest(t *testing.T) { func TestTorrentOffsetRequest(t *testing.T) {
check := func(tl, ps, off int64, expected request, ok bool) { check := func(tl, ps, off int64, expected Request, ok bool) {
req, _ok := torrentOffsetRequest(tl, ps, defaultChunkSize, off) req, _ok := torrentOffsetRequest(tl, ps, defaultChunkSize, off)
assert.Equal(t, _ok, ok) assert.Equal(t, _ok, ok)
assert.Equal(t, req, expected) assert.Equal(t, req, expected)
@ -20,7 +20,7 @@ func TestTorrentOffsetRequest(t *testing.T) {
check(13, 5, 0, newRequest(0, 0, 5), true) check(13, 5, 0, newRequest(0, 0, 5), true)
check(13, 5, 3, newRequest(0, 0, 5), true) check(13, 5, 3, newRequest(0, 0, 5), true)
check(13, 5, 11, newRequest(2, 0, 3), true) check(13, 5, 11, newRequest(2, 0, 3), true)
check(13, 5, 13, request{}, false) check(13, 5, 13, Request{}, false)
} }
func TestIterBitmapsDistinct(t *testing.T) { func TestIterBitmapsDistinct(t *testing.T) {

View File

@ -10,12 +10,12 @@ import (
type peerImpl interface { type peerImpl interface {
updateRequests() updateRequests()
writeInterested(interested bool) bool writeInterested(interested bool) bool
cancel(request) bool cancel(Request) bool
// Return true if there's room for more activity. // Return true if there's room for more activity.
request(request) bool request(Request) bool
connectionFlags() string connectionFlags() string
onClose() onClose()
_postCancel(request) _postCancel(Request)
onGotInfo(*metainfo.Info) onGotInfo(*metainfo.Info)
drop() drop()
String() string String() string

View File

@ -53,6 +53,7 @@ type Peer struct {
t *Torrent t *Torrent
peerImpl peerImpl
callbacks *Callbacks
outgoing bool outgoing bool
Network string Network string
@ -82,12 +83,12 @@ type Peer struct {
_chunksReceivedWhileExpecting int64 _chunksReceivedWhileExpecting int64
choking bool choking bool
requests map[request]struct{} requests map[Request]struct{}
requestsLowWater int requestsLowWater int
// Chunks that we might reasonably expect to receive from the peer. Due to // Chunks that we might reasonably expect to receive from the peer. Due to
// latency, buffering, and implementation differences, we may receive // latency, buffering, and implementation differences, we may receive
// chunks that are no longer in the set of requests actually want. // chunks that are no longer in the set of requests actually want.
validReceiveChunks map[request]int validReceiveChunks map[Request]int
// Indexed by metadata piece, set to true if posted and pending a // Indexed by metadata piece, set to true if posted and pending a
// response. // response.
metadataRequests []bool metadataRequests []bool
@ -96,7 +97,7 @@ type Peer struct {
// Stuff controlled by the remote peer. // Stuff controlled by the remote peer.
peerInterested bool peerInterested bool
peerChoking bool peerChoking bool
peerRequests map[request]*peerRequestState peerRequests map[Request]*peerRequestState
PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
PeerListenPort int PeerListenPort int
// The pieces the peer has claimed to have. // The pieces the peer has claimed to have.
@ -146,8 +147,6 @@ type PeerConn struct {
writerCond sync.Cond writerCond sync.Cond
pex pexConnState pex pexConnState
callbacks *Callbacks
} }
func (cn *PeerConn) connStatusString() string { func (cn *PeerConn) connStatusString() string {
@ -354,6 +353,9 @@ func (cn *Peer) close() {
cn.discardPieceInclination() cn.discardPieceInclination()
cn._pieceRequestOrder.Clear() cn._pieceRequestOrder.Clear()
cn.peerImpl.onClose() cn.peerImpl.onClose()
for _, f := range cn.callbacks.PeerClosed {
f(cn)
}
} }
func (cn *PeerConn) onClose() { func (cn *PeerConn) onClose() {
@ -451,7 +453,7 @@ func (cn *Peer) totalExpectingTime() (ret time.Duration) {
} }
func (cn *PeerConn) onPeerSentCancel(r request) { func (cn *PeerConn) onPeerSentCancel(r Request) {
if _, ok := cn.peerRequests[r]; !ok { if _, ok := cn.peerRequests[r]; !ok {
torrent.Add("unexpected cancels received", 1) torrent.Add("unexpected cancels received", 1)
return return
@ -523,7 +525,7 @@ func (pc *PeerConn) writeInterested(interested bool) bool {
// are okay. // are okay.
type messageWriter func(pp.Message) bool type messageWriter func(pp.Message) bool
func (cn *Peer) request(r request) bool { func (cn *Peer) request(r Request) bool {
if _, ok := cn.requests[r]; ok { if _, ok := cn.requests[r]; ok {
panic("chunk already requested") panic("chunk already requested")
} }
@ -550,20 +552,23 @@ func (cn *Peer) request(r request) bool {
panic("piece is queued for hash") panic("piece is queued for hash")
} }
if cn.requests == nil { if cn.requests == nil {
cn.requests = make(map[request]struct{}) cn.requests = make(map[Request]struct{})
} }
cn.requests[r] = struct{}{} cn.requests[r] = struct{}{}
if cn.validReceiveChunks == nil { if cn.validReceiveChunks == nil {
cn.validReceiveChunks = make(map[request]int) cn.validReceiveChunks = make(map[Request]int)
} }
cn.validReceiveChunks[r]++ cn.validReceiveChunks[r]++
cn.t.pendingRequests[r]++ cn.t.pendingRequests[r]++
cn.t.requestStrategy.hooks().sentRequest(r) cn.t.requestStrategy.hooks().sentRequest(r)
cn.updateExpectingChunks() cn.updateExpectingChunks()
for _, f := range cn.callbacks.SentRequest {
f(PeerRequestEvent{cn, r})
}
return cn.peerImpl.request(r) return cn.peerImpl.request(r)
} }
func (me *PeerConn) request(r request) bool { func (me *PeerConn) request(r Request) bool {
return me.write(pp.Message{ return me.write(pp.Message{
Type: pp.Request, Type: pp.Request,
Index: r.Index, Index: r.Index,
@ -572,7 +577,7 @@ func (me *PeerConn) request(r request) bool {
}) })
} }
func (me *PeerConn) cancel(r request) bool { func (me *PeerConn) cancel(r Request) bool {
return me.write(makeCancelMessage(r)) return me.write(makeCancelMessage(r))
} }
@ -593,7 +598,7 @@ func (cn *Peer) doRequestState() bool {
} else if len(cn.requests) <= cn.requestsLowWater { } else if len(cn.requests) <= cn.requestsLowWater {
filledBuffer := false filledBuffer := false
cn.iterPendingPieces(func(pieceIndex pieceIndex) bool { cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
cn.iterPendingRequests(pieceIndex, func(r request) bool { cn.iterPendingRequests(pieceIndex, func(r Request) bool {
if !cn.setInterested(true) { if !cn.setInterested(true) {
filledBuffer = true filledBuffer = true
return false return false
@ -789,11 +794,11 @@ func (cn *Peer) iterPendingPiecesUntyped(f iter.Callback) {
cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) }) cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
} }
func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(request) bool) bool { func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(Request) bool) bool {
return cn.t.requestStrategy.iterUndirtiedChunks( return cn.t.requestStrategy.iterUndirtiedChunks(
cn.t.piece(piece).requestStrategyPiece(), cn.t.piece(piece).requestStrategyPiece(),
func(cs chunkSpec) bool { func(cs ChunkSpec) bool {
return f(request{pp.Integer(piece), cs}) return f(Request{pp.Integer(piece), cs})
}, },
) )
} }
@ -1003,7 +1008,7 @@ func (c *PeerConn) fastEnabled() bool {
return c.PeerExtensionBytes.SupportsFast() && c.t.cl.config.Extensions.SupportsFast() return c.PeerExtensionBytes.SupportsFast() && c.t.cl.config.Extensions.SupportsFast()
} }
func (c *PeerConn) reject(r request) { func (c *PeerConn) reject(r Request) {
if !c.fastEnabled() { if !c.fastEnabled() {
panic("fast not enabled") panic("fast not enabled")
} }
@ -1011,7 +1016,7 @@ func (c *PeerConn) reject(r request) {
delete(c.peerRequests, r) delete(c.peerRequests, r)
} }
func (c *PeerConn) onReadRequest(r request) error { func (c *PeerConn) onReadRequest(r Request) error {
requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1) requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
if _, ok := c.peerRequests[r]; ok { if _, ok := c.peerRequests[r]; ok {
torrent.Add("duplicate requests received", 1) torrent.Add("duplicate requests received", 1)
@ -1043,10 +1048,10 @@ func (c *PeerConn) onReadRequest(r request) error {
// Check this after we know we have the piece, so that the piece length will be known. // Check this after we know we have the piece, so that the piece length will be known.
if r.Begin+r.Length > c.t.pieceLength(pieceIndex(r.Index)) { if r.Begin+r.Length > c.t.pieceLength(pieceIndex(r.Index)) {
torrent.Add("bad requests received", 1) torrent.Add("bad requests received", 1)
return errors.New("bad request") return errors.New("bad Request")
} }
if c.peerRequests == nil { if c.peerRequests == nil {
c.peerRequests = make(map[request]*peerRequestState, maxRequests) c.peerRequests = make(map[Request]*peerRequestState, maxRequests)
} }
value := &peerRequestState{} value := &peerRequestState{}
c.peerRequests[r] = value c.peerRequests[r] = value
@ -1055,7 +1060,7 @@ func (c *PeerConn) onReadRequest(r request) error {
return nil return nil
} }
func (c *PeerConn) peerRequestDataReader(r request, prs *peerRequestState) { func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
b, err := readPeerRequestData(r, c) b, err := readPeerRequestData(r, c)
c.locker().Lock() c.locker().Lock()
defer c.locker().Unlock() defer c.locker().Unlock()
@ -1072,8 +1077,8 @@ func (c *PeerConn) peerRequestDataReader(r request, prs *peerRequestState) {
// If this is maintained correctly, we might be able to support optional synchronous reading for // If this is maintained correctly, we might be able to support optional synchronous reading for
// chunk sending, the way it used to work. // chunk sending, the way it used to work.
func (c *PeerConn) peerRequestDataReadFailed(err error, r request) { func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer request %v: %v", r, err) c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer Request %v: %v", r, err)
i := pieceIndex(r.Index) i := pieceIndex(r.Index)
if c.t.pieceComplete(i) { if c.t.pieceComplete(i) {
// There used to be more code here that just duplicated the following break. Piece // There used to be more code here that just duplicated the following break. Piece
@ -1092,7 +1097,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r request) {
c.choke(c.post) c.choke(c.post)
} }
func readPeerRequestData(r request, c *PeerConn) ([]byte, error) { func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) {
b := make([]byte, r.Length) b := make([]byte, r.Length)
p := c.t.info.Piece(int(r.Index)) p := c.t.info.Piece(int(r.Index))
n, err := c.t.readAt(b, p.Offset()+int64(r.Begin)) n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
@ -1241,13 +1246,13 @@ func (c *PeerConn) mainReadLoop() (err error) {
} }
} }
func (c *Peer) remoteRejectedRequest(r request) { func (c *Peer) remoteRejectedRequest(r Request) {
if c.deleteRequest(r) { if c.deleteRequest(r) {
c.decExpectedChunkReceive(r) c.decExpectedChunkReceive(r)
} }
} }
func (c *Peer) decExpectedChunkReceive(r request) { func (c *Peer) decExpectedChunkReceive(r Request) {
count := c.validReceiveChunks[r] count := c.validReceiveChunks[r]
if count == 1 { if count == 1 {
delete(c.validReceiveChunks, r) delete(c.validReceiveChunks, r)
@ -1396,7 +1401,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
piece.incrementPendingWrites() piece.incrementPendingWrites()
// Record that we have the chunk, so we aren't trying to download it while // Record that we have the chunk, so we aren't trying to download it while
// waiting for it to be written to storage. // waiting for it to be written to storage.
piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) piece.unpendChunkIndex(chunkIndex(req.ChunkSpec, t.chunkSize))
// Cancel pending requests for this chunk. // Cancel pending requests for this chunk.
for c := range t.conns { for c := range t.conns {
@ -1537,11 +1542,14 @@ func (c *Peer) numLocalRequests() int {
return len(c.requests) return len(c.requests)
} }
func (c *Peer) deleteRequest(r request) bool { func (c *Peer) deleteRequest(r Request) bool {
if _, ok := c.requests[r]; !ok { if _, ok := c.requests[r]; !ok {
return false return false
} }
delete(c.requests, r) delete(c.requests, r)
for _, f := range c.callbacks.DeletedRequest {
f(PeerRequestEvent{c, r})
}
c.updateExpectingChunks() c.updateExpectingChunks()
c.t.requestStrategy.hooks().deletedRequest(r) c.t.requestStrategy.hooks().deletedRequest(r)
pr := c.t.pendingRequests pr := c.t.pendingRequests
@ -1594,7 +1602,7 @@ func (c *PeerConn) tickleWriter() {
c.writerCond.Broadcast() c.writerCond.Broadcast()
} }
func (c *Peer) postCancel(r request) bool { func (c *Peer) postCancel(r Request) bool {
if !c.deleteRequest(r) { if !c.deleteRequest(r) {
return false return false
} }
@ -1602,11 +1610,11 @@ func (c *Peer) postCancel(r request) bool {
return true return true
} }
func (c *PeerConn) _postCancel(r request) { func (c *PeerConn) _postCancel(r Request) {
c.post(makeCancelMessage(r)) c.post(makeCancelMessage(r))
} }
func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool, state *peerRequestState) (more bool) { func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
c.lastChunkSent = time.Now() c.lastChunkSent = time.Now()
return msg(pp.Message{ return msg(pp.Message{
Type: pp.Piece, Type: pp.Piece,

View File

@ -134,7 +134,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
// The chunk must be written to storage everytime, to ensure the // The chunk must be written to storage everytime, to ensure the
// writeSem is unlocked. // writeSem is unlocked.
t.pieces[0]._dirtyChunks.Clear() t.pieces[0]._dirtyChunks.Clear()
cn.validReceiveChunks = map[request]int{newRequestFromMessage(&msg): 1} cn.validReceiveChunks = map[Request]int{newRequestFromMessage(&msg): 1}
cl.unlock() cl.unlock()
n, err := w.Write(wb) n, err := w.Write(wb)
require.NoError(b, err) require.NoError(b, err)

View File

@ -82,7 +82,7 @@ func (p *Piece) pendingChunkIndex(chunkIndex int) bool {
return !p._dirtyChunks.Contains(chunkIndex) return !p._dirtyChunks.Contains(chunkIndex)
} }
func (p *Piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool { func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool {
return p.pendingChunkIndex(chunkIndex(cs, chunkSize)) return p.pendingChunkIndex(chunkIndex(cs, chunkSize))
} }
@ -137,12 +137,12 @@ func (p *Piece) chunkIndexDirty(chunk pp.Integer) bool {
return p._dirtyChunks.Contains(bitmap.BitIndex(chunk)) return p._dirtyChunks.Contains(bitmap.BitIndex(chunk))
} }
func (p *Piece) chunkIndexSpec(chunk pp.Integer) chunkSpec { func (p *Piece) chunkIndexSpec(chunk pp.Integer) ChunkSpec {
return chunkIndexSpec(chunk, p.length(), p.chunkSize()) return chunkIndexSpec(chunk, p.length(), p.chunkSize())
} }
func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) request { func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) Request {
return request{ return Request{
pp.Integer(p.index), pp.Integer(p.index),
chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()), chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()),
} }

View File

@ -4,6 +4,6 @@ import (
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
) )
func makeCancelMessage(r request) pp.Message { func makeCancelMessage(r Request) pp.Message {
return pp.MakeCancelMessage(r.Index, r.Begin, r.Length) return pp.MakeCancelMessage(r.Index, r.Begin, r.Length)
} }

View File

@ -11,12 +11,12 @@ type requestStrategyDefaults struct{}
func (requestStrategyDefaults) hooks() requestStrategyHooks { func (requestStrategyDefaults) hooks() requestStrategyHooks {
return requestStrategyHooks{ return requestStrategyHooks{
sentRequest: func(request) {}, sentRequest: func(Request) {},
deletedRequest: func(request) {}, deletedRequest: func(Request) {},
} }
} }
func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
chunkIndices := p.dirtyChunks().Copy() chunkIndices := p.dirtyChunks().Copy()
chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks())) chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
return iter.ForPerm(chunkIndices.Len(), func(i int) bool { return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
@ -24,7 +24,7 @@ func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f fun
if err != nil { if err != nil {
panic(err) panic(err)
} }
return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec) return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
}) })
} }

View File

@ -14,7 +14,7 @@ import (
type requestStrategyPiece interface { type requestStrategyPiece interface {
numChunks() pp.Integer numChunks() pp.Integer
dirtyChunks() bitmap.Bitmap dirtyChunks() bitmap.Bitmap
chunkIndexRequest(i pp.Integer) request chunkIndexRequest(i pp.Integer) Request
} }
type requestStrategyTorrent interface { type requestStrategyTorrent interface {
@ -39,7 +39,7 @@ type requestStrategyConnection interface {
type requestStrategy interface { type requestStrategy interface {
iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool
iterUndirtiedChunks(requestStrategyPiece, func(chunkSpec) bool) bool iterUndirtiedChunks(requestStrategyPiece, func(ChunkSpec) bool) bool
nominalMaxRequests(requestStrategyConnection) int nominalMaxRequests(requestStrategyConnection) int
shouldRequestWithoutBias(requestStrategyConnection) bool shouldRequestWithoutBias(requestStrategyConnection) bool
piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
@ -47,12 +47,12 @@ type requestStrategy interface {
} }
type requestStrategyHooks struct { type requestStrategyHooks struct {
sentRequest func(request) sentRequest func(Request)
deletedRequest func(request) deletedRequest func(Request)
} }
type requestStrategyCallbacks interface { type requestStrategyCallbacks interface {
requestTimedOut(request) requestTimedOut(Request)
} }
type requestStrategyFuzzing struct { type requestStrategyFuzzing struct {
@ -103,7 +103,7 @@ type requestStrategyDuplicateRequestTimeout struct {
// The last time we requested a chunk. Deleting the request from any connection will clear this // The last time we requested a chunk. Deleting the request from any connection will clear this
// value. // value.
lastRequested map[request]*time.Timer lastRequested map[Request]*time.Timer
// The lock to take when running a request timeout handler. // The lock to take when running a request timeout handler.
timeoutLocker sync.Locker timeoutLocker sync.Locker
} }
@ -118,7 +118,7 @@ func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duratio
return requestStrategyDuplicateRequestTimeout{ return requestStrategyDuplicateRequestTimeout{
duplicateRequestTimeout: duplicateRequestTimeout, duplicateRequestTimeout: duplicateRequestTimeout,
callbacks: callbacks, callbacks: callbacks,
lastRequested: make(map[request]*time.Timer), lastRequested: make(map[Request]*time.Timer),
timeoutLocker: clientLocker, timeoutLocker: clientLocker,
} }
} }
@ -126,7 +126,7 @@ func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duratio
func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks { func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
return requestStrategyHooks{ return requestStrategyHooks{
deletedRequest: func(r request) { deletedRequest: func(r Request) {
if t, ok := rs.lastRequested[r]; ok { if t, ok := rs.lastRequested[r]; ok {
t.Stop() t.Stop()
delete(rs.lastRequested, r) delete(rs.lastRequested, r)
@ -136,7 +136,7 @@ func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
} }
} }
func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool { func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ { for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
if p.dirtyChunks().Get(bitmap.BitIndex(i)) { if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
continue continue
@ -145,7 +145,7 @@ func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestSt
if rs.wouldDuplicateRecent(r) { if rs.wouldDuplicateRecent(r) {
continue continue
} }
if !f(r.chunkSpec) { if !f(r.ChunkSpec) {
return false return false
} }
} }
@ -185,7 +185,7 @@ func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection,
return defaultIterPendingPieces(rs, cn, cb) return defaultIterPendingPieces(rs, cn, cb)
} }
func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r request) { func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r Request) {
rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() { rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
rs.timeoutLocker.Lock() rs.timeoutLocker.Lock()
delete(rs.lastRequested, r) delete(rs.lastRequested, r)
@ -215,7 +215,7 @@ func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestSt
), ),
)) ))
} }
func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r request) bool { func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r Request) bool {
// This piece has been requested on another connection, and the duplicate request timer is still // This piece has been requested on another connection, and the duplicate request timer is still
// running. // running.
_, ok := rs.lastRequested[r] _, ok := rs.lastRequested[r]

View File

@ -144,7 +144,7 @@ type Torrent struct {
connPieceInclinationPool sync.Pool connPieceInclinationPool sync.Pool
// Count of each request across active connections. // Count of each request across active connections.
pendingRequests map[request]int pendingRequests map[Request]int
pex pexState pex pexState
} }
@ -424,7 +424,7 @@ func (t *Torrent) onSetInfo() {
t.cl.event.Broadcast() t.cl.event.Broadcast()
t.gotMetainfo.Set() t.gotMetainfo.Set()
t.updateWantPeersEvent() t.updateWantPeersEvent()
t.pendingRequests = make(map[request]int) t.pendingRequests = make(map[Request]int)
t.tryCreateMorePieceHashers() t.tryCreateMorePieceHashers()
} }
@ -744,13 +744,13 @@ func (t *Torrent) close() (err error) {
return return
} }
func (t *Torrent) requestOffset(r request) int64 { func (t *Torrent) requestOffset(r Request) int64 {
return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r) return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
} }
// Return the request that would include the given offset into the torrent data. Returns !ok if // Return the request that would include the given offset into the torrent data. Returns !ok if
// there is no such request. // there is no such request.
func (t *Torrent) offsetRequest(off int64) (req request, ok bool) { func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off) return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
} }
@ -846,7 +846,7 @@ func (t *Torrent) maybeDropMutuallyCompletePeer(
p.drop() p.drop()
} }
func (t *Torrent) haveChunk(r request) (ret bool) { func (t *Torrent) haveChunk(r Request) (ret bool) {
// defer func() { // defer func() {
// log.Println("have chunk", r, ret) // log.Println("have chunk", r, ret)
// }() // }()
@ -857,10 +857,10 @@ func (t *Torrent) haveChunk(r request) (ret bool) {
return true return true
} }
p := &t.pieces[r.Index] p := &t.pieces[r.Index]
return !p.pendingChunk(r.chunkSpec, t.chunkSize) return !p.pendingChunk(r.ChunkSpec, t.chunkSize)
} }
func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int { func chunkIndex(cs ChunkSpec, chunkSize pp.Integer) int {
return int(cs.Begin / chunkSize) return int(cs.Begin / chunkSize)
} }
@ -1073,8 +1073,8 @@ func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
return ret return ret
} }
func (t *Torrent) pendRequest(req request) { func (t *Torrent) pendRequest(req Request) {
ci := chunkIndex(req.chunkSpec, t.chunkSize) ci := chunkIndex(req.ChunkSpec, t.chunkSize)
t.pieces[req.Index].pendChunkIndex(ci) t.pieces[req.Index].pendChunkIndex(ci)
} }
@ -2012,8 +2012,8 @@ type torrentRequestStrategyCallbacks struct {
t *Torrent t *Torrent
} }
func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) { func (cb torrentRequestStrategyCallbacks) requestTimedOut(r Request) {
torrent.Add("request timeouts", 1) torrent.Add("Request timeouts", 1)
cb.t.cl.lock() cb.t.cl.lock()
defer cb.t.cl.unlock() defer cb.t.cl.unlock()
cb.t.iterPeers(func(cn *Peer) { cb.t.iterPeers(func(cn *Peer) {
@ -2098,6 +2098,10 @@ func (t *Torrent) iterPeers(f func(*Peer)) {
} }
} }
func (t *Torrent) callbacks() *Callbacks {
return &t.cl.config.Callbacks
}
func (t *Torrent) addWebSeed(url string) { func (t *Torrent) addWebSeed(url string) {
if t.cl.config.DisableWebseeds { if t.cl.config.DisableWebseeds {
return return
@ -2120,7 +2124,10 @@ func (t *Torrent) addWebSeed(url string) {
HttpClient: http.DefaultClient, HttpClient: http.DefaultClient,
Url: url, Url: url,
}, },
requests: make(map[request]webseed.Request, maxRequests), requests: make(map[Request]webseed.Request, maxRequests),
}
for _, f := range t.callbacks().NewPeer {
f(&ws.peer)
} }
ws.peer.logger = t.logger.WithContextValue(&ws) ws.peer.logger = t.logger.WithContextValue(&ws)
ws.peer.peerImpl = &ws ws.peer.peerImpl = &ws

View File

@ -19,8 +19,8 @@ import (
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
) )
func r(i, b, l pp.Integer) request { func r(i, b, l pp.Integer) Request {
return request{i, chunkSpec{b, l}} return Request{i, ChunkSpec{b, l}}
} }
// Check the given request is correct for various torrent offsets. // Check the given request is correct for various torrent offsets.
@ -28,15 +28,15 @@ func TestTorrentRequest(t *testing.T) {
const s = 472183431 // Length of torrent. const s = 472183431 // Length of torrent.
for _, _case := range []struct { for _, _case := range []struct {
off int64 // An offset into the torrent. off int64 // An offset into the torrent.
req request // The expected request. The zero value means !ok. req Request // The expected request. The zero value means !ok.
}{ }{
// Invalid offset. // Invalid offset.
{-1, request{}}, {-1, Request{}},
{0, r(0, 0, 16384)}, {0, r(0, 0, 16384)},
// One before the end of a piece. // One before the end of a piece.
{1<<18 - 1, r(0, 1<<18-16384, 16384)}, {1<<18 - 1, r(0, 1<<18-16384, 16384)},
// Offset beyond torrent length. // Offset beyond torrent length.
{472 * 1 << 20, request{}}, {472 * 1 << 20, Request{}},
// One before the end of the torrent. Complicates the chunk length. // One before the end of the torrent. Complicates the chunk length.
{s - 1, r((s-1)/(1<<18), (s-1)%(1<<18)/(16384)*(16384), 12935)}, {s - 1, r((s-1)/(1<<18), (s-1)%(1<<18)/(16384)*(16384), 12935)},
{1, r(0, 0, 16384)}, {1, r(0, 0, 16384)},
@ -46,7 +46,7 @@ func TestTorrentRequest(t *testing.T) {
{16384, r(0, 16384, 16384)}, {16384, r(0, 16384, 16384)},
} { } {
req, ok := torrentOffsetRequest(472183431, 1<<18, 16384, _case.off) req, ok := torrentOffsetRequest(472183431, 1<<18, 16384, _case.off)
if (_case.req == request{}) == ok { if (_case.req == Request{}) == ok {
t.Fatalf("expected %v, got %v", _case.req, req) t.Fatalf("expected %v, got %v", _case.req, req)
} }
if req != _case.req { if req != _case.req {

View File

@ -14,7 +14,7 @@ import (
type webseedPeer struct { type webseedPeer struct {
client webseed.Client client webseed.Client
requests map[request]webseed.Request requests map[Request]webseed.Request
peer Peer peer Peer
} }
@ -33,7 +33,7 @@ func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
ws.client.Info = info ws.client.Info = info
} }
func (ws *webseedPeer) _postCancel(r request) { func (ws *webseedPeer) _postCancel(r Request) {
ws.cancel(r) ws.cancel(r)
} }
@ -41,16 +41,16 @@ func (ws *webseedPeer) writeInterested(interested bool) bool {
return true return true
} }
func (ws *webseedPeer) cancel(r request) bool { func (ws *webseedPeer) cancel(r Request) bool {
ws.requests[r].Cancel() ws.requests[r].Cancel()
return true return true
} }
func (ws *webseedPeer) intoSpec(r request) webseed.RequestSpec { func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)} return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
} }
func (ws *webseedPeer) request(r request) bool { func (ws *webseedPeer) request(r Request) bool {
webseedRequest := ws.client.NewRequest(ws.intoSpec(r)) webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
ws.requests[r] = webseedRequest ws.requests[r] = webseedRequest
go ws.requestResultHandler(r, webseedRequest) go ws.requestResultHandler(r, webseedRequest)
@ -71,12 +71,12 @@ func (ws *webseedPeer) updateRequests() {
func (ws *webseedPeer) onClose() {} func (ws *webseedPeer) onClose() {}
func (ws *webseedPeer) requestResultHandler(r request, webseedRequest webseed.Request) { func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
result := <-webseedRequest.Result result := <-webseedRequest.Result
ws.peer.t.cl.lock() ws.peer.t.cl.lock()
defer ws.peer.t.cl.unlock() defer ws.peer.t.cl.unlock()
if result.Err != nil { if result.Err != nil {
ws.peer.logger.Printf("request %v rejected: %v", r, result.Err) ws.peer.logger.Printf("Request %v rejected: %v", r, result.Err)
// Always close for now. We need to filter out temporary errors, but this is a nightmare in // Always close for now. We need to filter out temporary errors, but this is a nightmare in
// Go. Currently a bad webseed URL can starve out the good ones due to the chunk selection // Go. Currently a bad webseed URL can starve out the good ones due to the chunk selection
// algorithm. // algorithm.