Merge branch 'request-strategy-rewrite'

This commit is contained in:
Matt Joiner 2021-06-21 12:04:06 +10:00
commit ebd19af795
44 changed files with 1808 additions and 974 deletions

View File

@ -15,11 +15,9 @@ type badStorage struct{}
var _ storage.ClientImpl = badStorage{} var _ storage.ClientImpl = badStorage{}
func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) { func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
return bs, nil return storage.TorrentImpl{
} Piece: bs.Piece,
}, nil
func (bs badStorage) Close() error {
return nil
} }
func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl { func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {

View File

@ -205,68 +205,70 @@ func (d *Decoder) parseString(v reflect.Value) error {
// Info for parsing a dict value. // Info for parsing a dict value.
type dictField struct { type dictField struct {
Value reflect.Value // Storage for the parsed value. Type reflect.Type
// True if field value should be parsed into Value. If false, the value Get func(value reflect.Value) func(reflect.Value)
// should be parsed and discarded. Tags tag
Ok bool
Set func() // Call this after parsing into Value.
IgnoreUnmarshalTypeError bool
} }
// Returns specifics for parsing a dict field value. // Returns specifics for parsing a dict field value.
func getDictField(dict reflect.Value, key string) dictField { func getDictField(dict reflect.Type, key string) dictField {
// get valuev as a map value or as a struct field // get valuev as a map value or as a struct field
switch dict.Kind() { switch dict.Kind() {
case reflect.Map: case reflect.Map:
value := reflect.New(dict.Type().Elem()).Elem()
return dictField{ return dictField{
Value: value, Type: dict.Elem(),
Ok: true, Get: func(mapValue reflect.Value) func(reflect.Value) {
Set: func() { return func(value reflect.Value) {
if dict.IsNil() { if mapValue.IsNil() {
dict.Set(reflect.MakeMap(dict.Type())) mapValue.Set(reflect.MakeMap(dict))
} }
// Assigns the value into the map. // Assigns the value into the map.
dict.SetMapIndex(reflect.ValueOf(key).Convert(dict.Type().Key()), value) //log.Printf("map type: %v", mapValue.Type())
mapValue.SetMapIndex(reflect.ValueOf(key).Convert(dict.Key()), value)
}
}, },
} }
case reflect.Struct: case reflect.Struct:
sf, ok := getStructFieldForKey(dict.Type(), key) return getStructFieldForKey(dict, key)
if !ok { //if sf.r.PkgPath != "" {
return dictField{} // panic(&UnmarshalFieldError{
} // Key: key,
if sf.r.PkgPath != "" { // Type: dict.Type(),
panic(&UnmarshalFieldError{ // Field: sf.r,
Key: key, // })
Type: dict.Type(), //}
Field: sf.r,
})
}
return dictField{
Value: dict.FieldByIndex(sf.r.Index),
Ok: true,
Set: func() {},
IgnoreUnmarshalTypeError: sf.tag.IgnoreUnmarshalTypeError(),
}
default: default:
panic("unimplemented")
return dictField{} return dictField{}
} }
} }
type structField struct {
r reflect.StructField
tag tag
}
var ( var (
structFieldsMu sync.Mutex structFieldsMu sync.Mutex
structFields = map[reflect.Type]map[string]structField{} structFields = map[reflect.Type]map[string]dictField{}
) )
func parseStructFields(struct_ reflect.Type, each func(string, structField)) { func parseStructFields(struct_ reflect.Type, each func(key string, df dictField)) {
for i, n := 0, struct_.NumField(); i < n; i++ { for _i, n := 0, struct_.NumField(); _i < n; _i++ {
i := _i
f := struct_.Field(i) f := struct_.Field(i)
if f.Anonymous { if f.Anonymous {
t := f.Type
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
parseStructFields(t, func(key string, df dictField) {
innerGet := df.Get
df.Get = func(value reflect.Value) func(reflect.Value) {
anonPtr := value.Field(i)
if anonPtr.Kind() == reflect.Ptr && anonPtr.IsNil() {
anonPtr.Set(reflect.New(f.Type.Elem()))
anonPtr = anonPtr.Elem()
}
return innerGet(anonPtr)
}
each(key, df)
})
continue continue
} }
tagStr := f.Tag.Get("bencode") tagStr := f.Tag.Get("bencode")
@ -278,25 +280,35 @@ func parseStructFields(struct_ reflect.Type, each func(string, structField)) {
if key == "" { if key == "" {
key = f.Name key = f.Name
} }
each(key, structField{f, tag}) each(key, dictField{f.Type, func(value reflect.Value) func(reflect.Value) {
return value.Field(i).Set
}, tag})
} }
} }
func saveStructFields(struct_ reflect.Type) { func saveStructFields(struct_ reflect.Type) {
m := make(map[string]structField) m := make(map[string]dictField)
parseStructFields(struct_, func(key string, sf structField) { parseStructFields(struct_, func(key string, sf dictField) {
m[key] = sf m[key] = sf
}) })
structFields[struct_] = m structFields[struct_] = m
} }
func getStructFieldForKey(struct_ reflect.Type, key string) (f structField, ok bool) { func getStructFieldForKey(struct_ reflect.Type, key string) (f dictField) {
structFieldsMu.Lock() structFieldsMu.Lock()
if _, ok := structFields[struct_]; !ok { if _, ok := structFields[struct_]; !ok {
saveStructFields(struct_) saveStructFields(struct_)
} }
f, ok = structFields[struct_][key] f, ok := structFields[struct_][key]
structFieldsMu.Unlock() structFieldsMu.Unlock()
if !ok {
var discard interface{}
return dictField{
Type: reflect.TypeOf(discard),
Get: func(reflect.Value) func(reflect.Value) { return func(reflect.Value) {} },
Tags: nil,
}
}
return return
} }
@ -314,31 +326,33 @@ func (d *Decoder) parseDict(v reflect.Value) error {
return nil return nil
} }
df := getDictField(v, keyStr) df := getDictField(v.Type(), keyStr)
// now we need to actually parse it // now we need to actually parse it
if df.Ok { if df.Type == nil {
// log.Printf("parsing ok struct field for key %q", keyStr)
ok, err = d.parseValue(df.Value)
} else {
// Discard the value, there's nowhere to put it. // Discard the value, there's nowhere to put it.
var if_ interface{} var if_ interface{}
if_, ok = d.parseValueInterface() if_, ok = d.parseValueInterface()
if if_ == nil { if if_ == nil {
err = fmt.Errorf("error parsing value for key %q", keyStr) return fmt.Errorf("error parsing value for key %q", keyStr)
} }
if !ok {
return fmt.Errorf("missing value for key %q", keyStr)
} }
continue
}
setValue := reflect.New(df.Type).Elem()
//log.Printf("parsing into %v", setValue.Type())
ok, err = d.parseValue(setValue)
if err != nil { if err != nil {
if _, ok := err.(*UnmarshalTypeError); !ok || !df.IgnoreUnmarshalTypeError { if _, ok := err.(*UnmarshalTypeError); !ok || !df.Tags.IgnoreUnmarshalTypeError() {
return fmt.Errorf("parsing value for key %q: %s", keyStr, err) return fmt.Errorf("parsing value for key %q: %s", keyStr, err)
} }
} }
if !ok { if !ok {
return fmt.Errorf("missing value for key %q", keyStr) return fmt.Errorf("missing value for key %q", keyStr)
} }
if df.Ok { df.Get(v)(setValue)
df.Set()
}
} }
} }

View File

@ -7,6 +7,7 @@ import (
"reflect" "reflect"
"testing" "testing"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -144,7 +145,7 @@ func TestIgnoreUnmarshalTypeError(t *testing.T) {
}{} }{}
require.Error(t, Unmarshal([]byte("d6:Normal5:helloe"), &s)) require.Error(t, Unmarshal([]byte("d6:Normal5:helloe"), &s))
assert.NoError(t, Unmarshal([]byte("d6:Ignore5:helloe"), &s)) assert.NoError(t, Unmarshal([]byte("d6:Ignore5:helloe"), &s))
require.Nil(t, Unmarshal([]byte("d6:Ignorei42ee"), &s)) qt.Assert(t, Unmarshal([]byte("d6:Ignorei42ee"), &s), qt.IsNil)
assert.EqualValues(t, 42, s.Ignore) assert.EqualValues(t, 42, s.Ignore)
} }

View File

@ -133,13 +133,16 @@ func (e *Encoder) reflectValue(v reflect.Value) {
e.reflectString(v.String()) e.reflectString(v.String())
case reflect.Struct: case reflect.Struct:
e.writeString("d") e.writeString("d")
for _, ef := range encodeFields(v.Type()) { for _, ef := range getEncodeFields(v.Type()) {
field_value := v.Field(ef.i) fieldValue := ef.i(v)
if ef.omit_empty && isEmptyValue(field_value) { if !fieldValue.IsValid() {
continue
}
if ef.omitEmpty && isEmptyValue(fieldValue) {
continue continue
} }
e.reflectString(ef.tag) e.reflectString(ef.tag)
e.reflectValue(field_value) e.reflectValue(fieldValue)
} }
e.writeString("e") e.writeString("e")
case reflect.Map: case reflect.Map:
@ -190,9 +193,9 @@ func (e *Encoder) reflectValue(v reflect.Value) {
} }
type encodeField struct { type encodeField struct {
i int i func(v reflect.Value) reflect.Value
tag string tag string
omit_empty bool omitEmpty bool
} }
type encodeFieldsSortType []encodeField type encodeFieldsSortType []encodeField
@ -206,31 +209,55 @@ var (
encodeFieldsCache = make(map[reflect.Type][]encodeField) encodeFieldsCache = make(map[reflect.Type][]encodeField)
) )
func encodeFields(t reflect.Type) []encodeField { func getEncodeFields(t reflect.Type) []encodeField {
typeCacheLock.RLock() typeCacheLock.RLock()
fs, ok := encodeFieldsCache[t] fs, ok := encodeFieldsCache[t]
typeCacheLock.RUnlock() typeCacheLock.RUnlock()
if ok { if ok {
return fs return fs
} }
fs = makeEncodeFields(t)
typeCacheLock.Lock() typeCacheLock.Lock()
defer typeCacheLock.Unlock() defer typeCacheLock.Unlock()
fs, ok = encodeFieldsCache[t] encodeFieldsCache[t] = fs
if ok {
return fs return fs
} }
for i, n := 0, t.NumField(); i < n; i++ { func makeEncodeFields(t reflect.Type) (fs []encodeField) {
for _i, n := 0, t.NumField(); _i < n; _i++ {
i := _i
f := t.Field(i) f := t.Field(i)
if f.PkgPath != "" { if f.PkgPath != "" {
continue continue
} }
if f.Anonymous { if f.Anonymous {
t := f.Type
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
anonEFs := makeEncodeFields(t)
for aefi := range anonEFs {
anonEF := anonEFs[aefi]
bottomField := anonEF
bottomField.i = func(v reflect.Value) reflect.Value {
v = v.Field(i)
if v.Kind() == reflect.Ptr {
if v.IsNil() {
// This will skip serializing this value.
return reflect.Value{}
}
v = v.Elem()
}
return anonEF.i(v)
}
fs = append(fs, bottomField)
}
continue continue
} }
var ef encodeField var ef encodeField
ef.i = i ef.i = func(v reflect.Value) reflect.Value {
return v.Field(i)
}
ef.tag = f.Name ef.tag = f.Name
tv := getTag(f.Tag) tv := getTag(f.Tag)
@ -240,11 +267,10 @@ func encodeFields(t reflect.Type) []encodeField {
if tv.Key() != "" { if tv.Key() != "" {
ef.tag = tv.Key() ef.tag = tv.Key()
} }
ef.omit_empty = tv.OmitEmpty() ef.omitEmpty = tv.OmitEmpty()
fs = append(fs, ef) fs = append(fs, ef)
} }
fss := encodeFieldsSortType(fs) fss := encodeFieldsSortType(fs)
sort.Sort(fss) sort.Sort(fss)
encodeFieldsCache[t] = fs
return fs return fs
} }

View File

@ -24,6 +24,9 @@ func (me tag) Key() string {
} }
func (me tag) HasOpt(opt string) bool { func (me tag) HasOpt(opt string) bool {
if len(me) < 1 {
return false
}
for _, s := range me[1:] { for _, s := range me[1:] {
if s == opt { if s == opt {
return true return true

View File

@ -2,7 +2,6 @@ package torrent
import ( import (
"bufio" "bufio"
"bytes"
"context" "context"
"crypto/rand" "crypto/rand"
"encoding/binary" "encoding/binary"
@ -18,15 +17,14 @@ import (
"github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/log" "github.com/anacrolix/log"
"github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/conntrack"
"github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync" "github.com/anacrolix/sync"
"github.com/anacrolix/torrent/internal/limiter"
"github.com/anacrolix/torrent/tracker"
"github.com/anacrolix/torrent/webtorrent"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
"github.com/google/btree" "github.com/google/btree"
@ -34,15 +32,17 @@ import (
"golang.org/x/time/rate" "golang.org/x/time/rate"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/anacrolix/missinggo/v2" "github.com/anacrolix/chansync"
"github.com/anacrolix/missinggo/v2/conntrack"
"github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/internal/limiter"
"github.com/anacrolix/torrent/iplist" "github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse" "github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/tracker"
"github.com/anacrolix/torrent/webtorrent"
) )
// Clients contain zero or more Torrents. A Client manages a blocklist, the // Clients contain zero or more Torrents. A Client manages a blocklist, the
@ -81,6 +81,8 @@ type Client struct {
websocketTrackers websocketTrackers websocketTrackers websocketTrackers
activeAnnounceLimiter limiter.Instance activeAnnounceLimiter limiter.Instance
updateRequests chansync.BroadcastCond
} }
type ipStr string type ipStr string
@ -259,7 +261,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
cl.dhtServers = append(cl.dhtServers, anacrolixDhtServerWrapper{ds}) cl.dhtServers = append(cl.dhtServers, AnacrolixDhtServerWrapper{ds})
cl.onClose = append(cl.onClose, func() { ds.Close() }) cl.onClose = append(cl.onClose, func() { ds.Close() })
} }
} }
@ -293,6 +295,8 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
}, },
} }
go cl.requester()
return return
} }
@ -311,6 +315,10 @@ func (cl *Client) AddDialer(d Dialer) {
} }
} }
func (cl *Client) Listeners() []Listener {
return cl.listeners
}
// Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way // Registers a Listener, and starts Accepting on it. You must Close Listeners provided this way
// yourself. // yourself.
func (cl *Client) AddListener(l Listener) { func (cl *Client) AddListener(l Listener) {
@ -955,7 +963,7 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
return fmt.Errorf("adding connection: %w", err) return fmt.Errorf("adding connection: %w", err)
} }
defer t.dropConnection(c) defer t.dropConnection(c)
go c.writer(time.Minute) c.startWriter()
cl.sendInitialMessages(c, t) cl.sendInitialMessages(c, t)
err := c.mainReadLoop() err := c.mainReadLoop()
if err != nil { if err != nil {
@ -964,10 +972,15 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
return nil return nil
} }
// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
// instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
// (1<<19) cached for sending, for 16KiB (1<<14) chunks.
const localClientReqq = 1 << 5
// See the order given in Transmission's tr_peerMsgsNew. // See the order given in Transmission's tr_peerMsgsNew.
func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() { if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
conn.post(pp.Message{ conn.write(pp.Message{
Type: pp.Extended, Type: pp.Extended,
ExtendedID: pp.HandshakeExtendedID, ExtendedID: pp.HandshakeExtendedID,
ExtendedPayload: func() []byte { ExtendedPayload: func() []byte {
@ -976,10 +989,7 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
pp.ExtensionNameMetadata: metadataExtendedId, pp.ExtensionNameMetadata: metadataExtendedId,
}, },
V: cl.config.ExtendedHandshakeClientVersion, V: cl.config.ExtendedHandshakeClientVersion,
// If peer requests are buffered on read, this instructs the amount of memory Reqq: localClientReqq,
// that might be used to cache pending writes. Assuming 512KiB cached for
// sending, for 16KiB chunks.
Reqq: 1 << 5,
YourIp: pp.CompactIp(conn.remoteIp()), YourIp: pp.CompactIp(conn.remoteIp()),
Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred, Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
Port: cl.incomingPeerPort(), Port: cl.incomingPeerPort(),
@ -999,11 +1009,11 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
func() { func() {
if conn.fastEnabled() { if conn.fastEnabled() {
if torrent.haveAllPieces() { if torrent.haveAllPieces() {
conn.post(pp.Message{Type: pp.HaveAll}) conn.write(pp.Message{Type: pp.HaveAll})
conn.sentHaves.AddRange(0, bitmap.BitIndex(conn.t.NumPieces())) conn.sentHaves.AddRange(0, bitmap.BitRange(conn.t.NumPieces()))
return return
} else if !torrent.haveAnyPieces() { } else if !torrent.haveAnyPieces() {
conn.post(pp.Message{Type: pp.HaveNone}) conn.write(pp.Message{Type: pp.HaveNone})
conn.sentHaves.Clear() conn.sentHaves.Clear()
return return
} }
@ -1011,7 +1021,7 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
conn.postBitfield() conn.postBitfield()
}() }()
if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() { if conn.PeerExtensionBytes.SupportsDHT() && cl.config.Extensions.SupportsDHT() && cl.haveDhtServer() {
conn.post(pp.Message{ conn.write(pp.Message{
Type: pp.Port, Type: pp.Port,
Port: cl.dhtPort(), Port: cl.dhtPort(),
}) })
@ -1069,12 +1079,12 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *PeerCon
return err return err
case pp.RequestMetadataExtensionMsgType: case pp.RequestMetadataExtensionMsgType:
if !t.haveMetadataPiece(piece) { if !t.haveMetadataPiece(piece) {
c.post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil)) c.write(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
return nil return nil
} }
start := (1 << 14) * piece start := (1 << 14) * piece
c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece) c.logger.WithDefaultLevel(log.Debug).Printf("sending metadata piece %d", piece)
c.post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)])) c.write(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
return nil return nil
case pp.RejectMetadataExtensionMsgType: case pp.RejectMetadataExtensionMsgType:
return nil return nil
@ -1139,7 +1149,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
webSeeds: make(map[string]*Peer), webSeeds: make(map[string]*Peer),
} }
t._pendingPieces.NewSet = priorityBitmapStableNewSet t._pendingPieces.NewSet = priorityBitmapStableNewSet
t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
t.logger = cl.logger.WithContextValue(t) t.logger = cl.logger.WithContextValue(t)
t.setChunkSize(defaultChunkSize) t.setChunkSize(defaultChunkSize)
return return
@ -1405,11 +1414,9 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemot
}, },
connString: connString, connString: connString,
conn: nc, conn: nc,
writeBuffer: new(bytes.Buffer),
} }
c.peerImpl = c c.peerImpl = c
c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c) c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
c.writerCond.L = cl.locker()
c.setRW(connStatsReadWriter{nc, c}) c.setRW(connStatsReadWriter{nc, c})
c.r = &rateLimitedReader{ c.r = &rateLimitedReader{
l: cl.config.DownloadRateLimiter, l: cl.config.DownloadRateLimiter,

View File

@ -322,7 +322,7 @@ func TestDhtInheritBlocklist(t *testing.T) {
numServers := 0 numServers := 0
cl.eachDhtServer(func(s DhtServer) { cl.eachDhtServer(func(s DhtServer) {
t.Log(s) t.Log(s)
assert.Equal(t, ipl, s.(anacrolixDhtServerWrapper).Server.IPBlocklist()) assert.Equal(t, ipl, s.(AnacrolixDhtServerWrapper).Server.IPBlocklist())
numServers++ numServers++
}) })
assert.EqualValues(t, 2, numServers) assert.EqualValues(t, 2, numServers)
@ -554,6 +554,8 @@ func TestPeerInvalidHave(t *testing.T) {
t: tt, t: tt,
}} }}
cn.peerImpl = cn cn.peerImpl = cn
cl.lock()
defer cl.unlock()
assert.NoError(t, cn.peerSentHave(0)) assert.NoError(t, cn.peerSentHave(0))
assert.Error(t, cn.peerSentHave(1)) assert.Error(t, cn.peerSentHave(1))
} }

View File

@ -173,6 +173,7 @@ type DownloadCmd struct {
TestPeer []string `help:"addresses of some starting peers"` TestPeer []string `help:"addresses of some starting peers"`
Seed bool `help:"seed after download is complete"` Seed bool `help:"seed after download is complete"`
Addr string `help:"network listen addr"` Addr string `help:"network listen addr"`
MaxUnverifiedBytes tagflag.Bytes `help:"maximum number bytes to have pending verification"`
UploadRate *tagflag.Bytes `help:"max piece bytes to send per second"` UploadRate *tagflag.Bytes `help:"max piece bytes to send per second"`
DownloadRate *tagflag.Bytes `help:"max bytes per second down from peers"` DownloadRate *tagflag.Bytes `help:"max bytes per second down from peers"`
PackedBlocklist string PackedBlocklist string
@ -311,6 +312,7 @@ func downloadErr() error {
if flags.Quiet { if flags.Quiet {
clientConfig.Logger = log.Discard clientConfig.Logger = log.Discard
} }
clientConfig.MaxUnverifiedBytes = flags.MaxUnverifiedBytes.Int64()
var stop missinggo.SynchronizedEvent var stop missinggo.SynchronizedEvent
defer func() { defer func() {

View File

@ -59,6 +59,8 @@ type ClientConfig struct {
// (~4096), and the requested chunk size (~16KiB, see // (~4096), and the requested chunk size (~16KiB, see
// TorrentSpec.ChunkSize). // TorrentSpec.ChunkSize).
DownloadRateLimiter *rate.Limiter DownloadRateLimiter *rate.Limiter
// Maximum unverified bytes across all torrents. Not used if zero.
MaxUnverifiedBytes int64
// User-provided Client peer ID. If not present, one is generated automatically. // User-provided Client peer ID. If not present, one is generated automatically.
PeerID string PeerID string
@ -137,8 +139,6 @@ type ClientConfig struct {
// OnQuery hook func // OnQuery hook func
DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool) DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool)
DefaultRequestStrategy requestStrategyMaker
Extensions PeerExtensionBits Extensions PeerExtensionBits
DisableWebtorrent bool DisableWebtorrent bool
@ -185,9 +185,6 @@ func NewDefaultClientConfig() *ClientConfig {
CryptoSelector: mse.DefaultCryptoSelector, CryptoSelector: mse.DefaultCryptoSelector,
CryptoProvides: mse.AllSupportedCrypto, CryptoProvides: mse.AllSupportedCrypto,
ListenPort: 42069, ListenPort: 42069,
DefaultRequestStrategy: RequestStrategyDuplicateRequestTimeout(5 * time.Second),
Extensions: defaultPeerExtensionBytes(), Extensions: defaultPeerExtensionBytes(),
} }
//cc.ConnTracker.SetNoMaxEntries() //cc.ConnTracker.SetNoMaxEntries()

View File

@ -23,6 +23,7 @@ type ConnStats struct {
BytesRead Count BytesRead Count
BytesReadData Count BytesReadData Count
BytesReadUsefulData Count BytesReadUsefulData Count
BytesReadUsefulIntendedData Count
ChunksWritten Count ChunksWritten Count
@ -78,14 +79,9 @@ func (cs *ConnStats) wroteMsg(msg *pp.Message) {
} }
} }
func (cs *ConnStats) readMsg(msg *pp.Message) { func (cs *ConnStats) receivedChunk(size int64) {
// We want to also handle extended metadata pieces here, but we wouldn't
// have decoded the extended payload yet.
switch msg.Type {
case pp.Piece:
cs.ChunksRead.Add(1) cs.ChunksRead.Add(1)
cs.BytesReadData.Add(int64(len(msg.Piece))) cs.BytesReadData.Add(size)
}
} }
func (cs *ConnStats) incrementPiecesDirtiedGood() { func (cs *ConnStats) incrementPiecesDirtiedGood() {

10
dht.go
View File

@ -29,11 +29,11 @@ type DhtAnnounce interface {
Peers() <-chan dht.PeersValues Peers() <-chan dht.PeersValues
} }
type anacrolixDhtServerWrapper struct { type AnacrolixDhtServerWrapper struct {
*dht.Server *dht.Server
} }
func (me anacrolixDhtServerWrapper) Stats() interface{} { func (me AnacrolixDhtServerWrapper) Stats() interface{} {
return me.Server.Stats() return me.Server.Stats()
} }
@ -45,13 +45,13 @@ func (me anacrolixDhtAnnounceWrapper) Peers() <-chan dht.PeersValues {
return me.Announce.Peers return me.Announce.Peers
} }
func (me anacrolixDhtServerWrapper) Announce(hash [20]byte, port int, impliedPort bool) (DhtAnnounce, error) { func (me AnacrolixDhtServerWrapper) Announce(hash [20]byte, port int, impliedPort bool) (DhtAnnounce, error) {
ann, err := me.Server.Announce(hash, port, impliedPort) ann, err := me.Server.Announce(hash, port, impliedPort)
return anacrolixDhtAnnounceWrapper{ann}, err return anacrolixDhtAnnounceWrapper{ann}, err
} }
func (me anacrolixDhtServerWrapper) Ping(addr *net.UDPAddr) { func (me AnacrolixDhtServerWrapper) Ping(addr *net.UDPAddr) {
me.Server.Ping(addr) me.Server.Ping(addr)
} }
var _ DhtServer = anacrolixDhtServerWrapper{} var _ DhtServer = AnacrolixDhtServerWrapper{}

12
file.go
View File

@ -66,20 +66,20 @@ func fileBytesLeft(
switch numPiecesSpanned { switch numPiecesSpanned {
case 0: case 0:
case 1: case 1:
if !torrentCompletedPieces.Get(fileFirstPieceIndex) { if !torrentCompletedPieces.Get(bitmap.BitIndex(fileFirstPieceIndex)) {
left += fileLength left += fileLength
} }
default: default:
if !torrentCompletedPieces.Get(fileFirstPieceIndex) { if !torrentCompletedPieces.Get(bitmap.BitIndex(fileFirstPieceIndex)) {
left += torrentUsualPieceSize - (fileTorrentOffset % torrentUsualPieceSize) left += torrentUsualPieceSize - (fileTorrentOffset % torrentUsualPieceSize)
} }
if !torrentCompletedPieces.Get(fileEndPieceIndex - 1) { if !torrentCompletedPieces.Get(bitmap.BitIndex(fileEndPieceIndex - 1)) {
left += fileTorrentOffset + fileLength - int64(fileEndPieceIndex-1)*torrentUsualPieceSize left += fileTorrentOffset + fileLength - int64(fileEndPieceIndex-1)*torrentUsualPieceSize
} }
completedMiddlePieces := torrentCompletedPieces.Copy() completedMiddlePieces := torrentCompletedPieces.Copy()
completedMiddlePieces.RemoveRange(0, fileFirstPieceIndex+1) completedMiddlePieces.RemoveRange(0, bitmap.BitRange(fileFirstPieceIndex+1))
completedMiddlePieces.RemoveRange(fileEndPieceIndex-1, bitmap.ToEnd) completedMiddlePieces.RemoveRange(bitmap.BitRange(fileEndPieceIndex-1), bitmap.ToEnd)
left += int64(numPiecesSpanned-2-completedMiddlePieces.Len()) * torrentUsualPieceSize left += int64(numPiecesSpanned-2-pieceIndex(completedMiddlePieces.Len())) * torrentUsualPieceSize
} }
return return
} }

View File

@ -9,7 +9,6 @@ import (
const ( const (
pieceHash = crypto.SHA1 pieceHash = crypto.SHA1
maxRequests = 250 // Maximum pending requests we allow peers to send us.
defaultChunkSize = 0x4000 // 16KiB defaultChunkSize = 0x4000 // 16KiB
) )

7
go.mod
View File

@ -5,6 +5,7 @@ require (
crawshaw.io/sqlite v0.3.3-0.20210127221821-98b1f83c5508 crawshaw.io/sqlite v0.3.3-0.20210127221821-98b1f83c5508
github.com/RoaringBitmap/roaring v0.6.0 // indirect github.com/RoaringBitmap/roaring v0.6.0 // indirect
github.com/alexflint/go-arg v1.3.0 github.com/alexflint/go-arg v1.3.0
github.com/anacrolix/chansync v0.0.0-20210524073341-a336ebc2de92 // indirect
github.com/anacrolix/confluence v1.7.1-0.20210311004351-d642adb8546c // indirect github.com/anacrolix/confluence v1.7.1-0.20210311004351-d642adb8546c // indirect
github.com/anacrolix/dht/v2 v2.9.1 github.com/anacrolix/dht/v2 v2.9.1
github.com/anacrolix/envpprof v1.1.1 github.com/anacrolix/envpprof v1.1.1
@ -12,9 +13,9 @@ require (
github.com/anacrolix/log v0.9.0 github.com/anacrolix/log v0.9.0
github.com/anacrolix/missinggo v1.2.1 github.com/anacrolix/missinggo v1.2.1
github.com/anacrolix/missinggo/perf v1.0.0 github.com/anacrolix/missinggo/perf v1.0.0
github.com/anacrolix/missinggo/v2 v2.5.0 github.com/anacrolix/missinggo/v2 v2.5.1-0.20210520011502-b3d95d6b1d02
github.com/anacrolix/multiless v0.1.0 github.com/anacrolix/multiless v0.1.1-0.20210520040635-10ee7b5f3cff
github.com/anacrolix/sync v0.2.0 github.com/anacrolix/sync v0.3.0
github.com/anacrolix/tagflag v1.3.0 github.com/anacrolix/tagflag v1.3.0
github.com/anacrolix/upnp v0.1.2-0.20200416075019-5e9378ed1425 github.com/anacrolix/upnp v0.1.2-0.20200416075019-5e9378ed1425
github.com/anacrolix/utp v0.1.0 github.com/anacrolix/utp v0.1.0

19
go.sum
View File

@ -8,6 +8,7 @@ cloud.google.com/go v0.37.0 h1:69FNAINiZfsEuwH3fKq8QrAAnHz+2m4XL4kVYi5BX0Q=
cloud.google.com/go v0.37.0/go.mod h1:TS1dMSSfndXH133OKGwekG838Om/cQT0BUHV3HcBgoo= cloud.google.com/go v0.37.0/go.mod h1:TS1dMSSfndXH133OKGwekG838Om/cQT0BUHV3HcBgoo=
crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797 h1:yDf7ARQc637HoxDho7xjqdvO5ZA2Yb+xzv/fOnnvZzw= crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797 h1:yDf7ARQc637HoxDho7xjqdvO5ZA2Yb+xzv/fOnnvZzw=
crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797/go.mod h1:sXBiorCo8c46JlQV3oXPKINnZ8mcqnye1EkVkqsectk= crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797/go.mod h1:sXBiorCo8c46JlQV3oXPKINnZ8mcqnye1EkVkqsectk=
crawshaw.io/sqlite v0.3.2/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4=
crawshaw.io/sqlite v0.3.3-0.20210127221821-98b1f83c5508 h1:fILCBBFnjnrQ0whVJlGhfv1E/QiaFDNtGFBObEVRnYg= crawshaw.io/sqlite v0.3.3-0.20210127221821-98b1f83c5508 h1:fILCBBFnjnrQ0whVJlGhfv1E/QiaFDNtGFBObEVRnYg=
crawshaw.io/sqlite v0.3.3-0.20210127221821-98b1f83c5508/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4= crawshaw.io/sqlite v0.3.3-0.20210127221821-98b1f83c5508/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4=
dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3 h1:hJiie5Bf3QucGRa4ymsAUOxyhYwGEz1xrsVk0P8erlw= dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3 h1:hJiie5Bf3QucGRa4ymsAUOxyhYwGEz1xrsVk0P8erlw=
@ -55,6 +56,8 @@ github.com/alexflint/go-arg v1.3.0 h1:UfldqSdFWeLtoOuVRosqofU4nmhI1pYEbT4ZFS34Bd
github.com/alexflint/go-arg v1.3.0/go.mod h1:9iRbDxne7LcR/GSvEr7ma++GLpdIU1zrghf2y2768kM= github.com/alexflint/go-arg v1.3.0/go.mod h1:9iRbDxne7LcR/GSvEr7ma++GLpdIU1zrghf2y2768kM=
github.com/alexflint/go-scalar v1.0.0 h1:NGupf1XV/Xb04wXskDFzS0KWOLH632W/EO4fAFi+A70= github.com/alexflint/go-scalar v1.0.0 h1:NGupf1XV/Xb04wXskDFzS0KWOLH632W/EO4fAFi+A70=
github.com/alexflint/go-scalar v1.0.0/go.mod h1:GpHzbCOZXEKMEcygYQ5n/aa4Aq84zbxjy3MxYW0gjYw= github.com/alexflint/go-scalar v1.0.0/go.mod h1:GpHzbCOZXEKMEcygYQ5n/aa4Aq84zbxjy3MxYW0gjYw=
github.com/anacrolix/chansync v0.0.0-20210524073341-a336ebc2de92 h1:WGk37RyXPWcIALJxTkTNrXN3yLQp7hSFa3x5GkrK/Rs=
github.com/anacrolix/chansync v0.0.0-20210524073341-a336ebc2de92/go.mod h1:DZsatdsdXxD0WiwcGl0nJVwyjCKMDv+knl1q2iBjA2k=
github.com/anacrolix/confluence v1.7.1-0.20210221224747-9cb14aa2c53a/go.mod h1:T0JHvSaf9UfoiUdCtCOUuRroHm/tauUJTbLc6/vd5YA= github.com/anacrolix/confluence v1.7.1-0.20210221224747-9cb14aa2c53a/go.mod h1:T0JHvSaf9UfoiUdCtCOUuRroHm/tauUJTbLc6/vd5YA=
github.com/anacrolix/confluence v1.7.1-0.20210221225853-90405640e928/go.mod h1:NoLcfoRet+kYttjLXJRmh4qBVrylJsfIItik5GGj21A= github.com/anacrolix/confluence v1.7.1-0.20210221225853-90405640e928/go.mod h1:NoLcfoRet+kYttjLXJRmh4qBVrylJsfIItik5GGj21A=
github.com/anacrolix/confluence v1.7.1-0.20210311004351-d642adb8546c h1:HfbeiZS/0hwdotwtQhllrd3PagmuLgCN9O8CHJgzPGQ= github.com/anacrolix/confluence v1.7.1-0.20210311004351-d642adb8546c h1:HfbeiZS/0hwdotwtQhllrd3PagmuLgCN9O8CHJgzPGQ=
@ -108,14 +111,18 @@ github.com/anacrolix/missinggo/v2 v2.3.1/go.mod h1:3XNH0OEmyMUZuvXmYdl+FDfXd0vvS
github.com/anacrolix/missinggo/v2 v2.4.1-0.20200227072623-f02f6484f997/go.mod h1:KY+ij+mWvwGuqSuecLjjPv5LFw5ICUc1UvRems3VAZE= github.com/anacrolix/missinggo/v2 v2.4.1-0.20200227072623-f02f6484f997/go.mod h1:KY+ij+mWvwGuqSuecLjjPv5LFw5ICUc1UvRems3VAZE=
github.com/anacrolix/missinggo/v2 v2.5.0 h1:75aciOVrzVV1bTH9rl8tYLbXO9A7HXFtHexTChawe/U= github.com/anacrolix/missinggo/v2 v2.5.0 h1:75aciOVrzVV1bTH9rl8tYLbXO9A7HXFtHexTChawe/U=
github.com/anacrolix/missinggo/v2 v2.5.0/go.mod h1:HYuCbwvJXY3XbcmcIcTgZXHleoDXawxPWx/YiPzFzV0= github.com/anacrolix/missinggo/v2 v2.5.0/go.mod h1:HYuCbwvJXY3XbcmcIcTgZXHleoDXawxPWx/YiPzFzV0=
github.com/anacrolix/missinggo/v2 v2.5.1-0.20210520011502-b3d95d6b1d02 h1:wf3HKUunewks4FdGJqkViby+vr3n5/IFpPsyEMokxYE=
github.com/anacrolix/missinggo/v2 v2.5.1-0.20210520011502-b3d95d6b1d02/go.mod h1:WEjqh2rmKECd0t1VhQkLGTdIWXO6f6NLjp5GlMZ+6FA=
github.com/anacrolix/mmsg v0.0.0-20180515031531-a4a3ba1fc8bb/go.mod h1:x2/ErsYUmT77kezS63+wzZp8E3byYB0gzirM/WMBLfw= github.com/anacrolix/mmsg v0.0.0-20180515031531-a4a3ba1fc8bb/go.mod h1:x2/ErsYUmT77kezS63+wzZp8E3byYB0gzirM/WMBLfw=
github.com/anacrolix/mmsg v1.0.0 h1:btC7YLjOn29aTUAExJiVUhQOuf/8rhm+/nWCMAnL3Hg= github.com/anacrolix/mmsg v1.0.0 h1:btC7YLjOn29aTUAExJiVUhQOuf/8rhm+/nWCMAnL3Hg=
github.com/anacrolix/mmsg v1.0.0/go.mod h1:x8kRaJY/dCrY9Al0PEcj1mb/uFHwP6GCJ9fLl4thEPc= github.com/anacrolix/mmsg v1.0.0/go.mod h1:x8kRaJY/dCrY9Al0PEcj1mb/uFHwP6GCJ9fLl4thEPc=
github.com/anacrolix/multiless v0.0.0-20191223025854-070b7994e841/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4= github.com/anacrolix/multiless v0.0.0-20191223025854-070b7994e841/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4=
github.com/anacrolix/multiless v0.0.0-20200413040533-acfd16f65d5d/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4= github.com/anacrolix/multiless v0.0.0-20200413040533-acfd16f65d5d/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4=
github.com/anacrolix/multiless v0.0.0-20210222022749-ef43011a77ec/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4= github.com/anacrolix/multiless v0.0.0-20210222022749-ef43011a77ec/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4=
github.com/anacrolix/multiless v0.1.0 h1:gjR3SdJ+E0avnmEoAV/7K7n2kILZhVu/M6aQEtz8H3s= github.com/anacrolix/multiless v0.1.1-0.20210510014912-3f17cb19bda9 h1:fmNDxh5ysBPinRq249xYYYOLQ/h95DoyGE9e9Gp9xqo=
github.com/anacrolix/multiless v0.1.0/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4= github.com/anacrolix/multiless v0.1.1-0.20210510014912-3f17cb19bda9/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4=
github.com/anacrolix/multiless v0.1.1-0.20210520040635-10ee7b5f3cff h1:ve99yq2FjiN3OANMjRz2rjCX4f2PSKMf3NeDFnmcs8s=
github.com/anacrolix/multiless v0.1.1-0.20210520040635-10ee7b5f3cff/go.mod h1:TrCLEZfIDbMVfLoQt5tOoiBS/uq4y8+ojuEVVvTNPX4=
github.com/anacrolix/stm v0.1.0/go.mod h1:ZKz7e7ERWvP0KgL7WXfRjBXHNRhlVRlbBQecqFtPq+A= github.com/anacrolix/stm v0.1.0/go.mod h1:ZKz7e7ERWvP0KgL7WXfRjBXHNRhlVRlbBQecqFtPq+A=
github.com/anacrolix/stm v0.1.1-0.20191106051447-e749ba3531cf/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg= github.com/anacrolix/stm v0.1.1-0.20191106051447-e749ba3531cf/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg=
github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg= github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQA4hsg=
@ -128,6 +135,10 @@ github.com/anacrolix/sync v0.0.0-20180611022320-3c4cb11f5a01/go.mod h1:+u91KiUuf
github.com/anacrolix/sync v0.0.0-20180808010631-44578de4e778/go.mod h1:s735Etp3joe/voe2sdaXLcqDdJSay1O0OPnM0ystjqk= github.com/anacrolix/sync v0.0.0-20180808010631-44578de4e778/go.mod h1:s735Etp3joe/voe2sdaXLcqDdJSay1O0OPnM0ystjqk=
github.com/anacrolix/sync v0.2.0 h1:oRe22/ZB+v7v/5Mbc4d2zE0AXEZy0trKyKLjqYOt6tY= github.com/anacrolix/sync v0.2.0 h1:oRe22/ZB+v7v/5Mbc4d2zE0AXEZy0trKyKLjqYOt6tY=
github.com/anacrolix/sync v0.2.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g= github.com/anacrolix/sync v0.2.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
github.com/anacrolix/sync v0.2.1-0.20210520084835-26aa6614542f h1:7KqmZoEOIXa0UbR2WQ/YPF4H+MPV6rhWk4E4tcv5eDg=
github.com/anacrolix/sync v0.2.1-0.20210520084835-26aa6614542f/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
github.com/anacrolix/sync v0.3.0 h1:ZPjTrkqQWEfnYVGTQHh5qNjokWaXnjsyXTJSMsKY0TA=
github.com/anacrolix/sync v0.3.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v0.0.0-20180605133421-f477c8c2f14c/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v0.0.0-20180605133421-f477c8c2f14c/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v0.0.0-20180803105420-3a8ff5428f76/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v0.0.0-20180803105420-3a8ff5428f76/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
@ -135,7 +146,6 @@ github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pm
github.com/anacrolix/tagflag v1.0.1/go.mod h1:gb0fiMQ02qU1djCSqaxGmruMvZGrMwSReidMB0zjdxo= github.com/anacrolix/tagflag v1.0.1/go.mod h1:gb0fiMQ02qU1djCSqaxGmruMvZGrMwSReidMB0zjdxo=
github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8=
github.com/anacrolix/tagflag v1.1.1-0.20200411025953-9bb5209d56c2/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= github.com/anacrolix/tagflag v1.1.1-0.20200411025953-9bb5209d56c2/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8=
github.com/anacrolix/tagflag v1.2.0 h1:WdSv10SpxOI97++f5FUKnKPFkVGMiPlpYm52XPaMkp4=
github.com/anacrolix/tagflag v1.2.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= github.com/anacrolix/tagflag v1.2.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8=
github.com/anacrolix/tagflag v1.3.0 h1:5NI+9CniDnEH0BWA4UcQbERyFPjKJqZnVkItGVIDy/s= github.com/anacrolix/tagflag v1.3.0 h1:5NI+9CniDnEH0BWA4UcQbERyFPjKJqZnVkItGVIDy/s=
github.com/anacrolix/tagflag v1.3.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= github.com/anacrolix/tagflag v1.3.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8=
@ -268,8 +278,6 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0 h1:zvFSvII5rTbMZ3idAqSUjUCDgZFbWMKzxQot3/Y7nzA=
github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1 h1:j3L6gSLQalDETeEg/Jg0mGY0/y/N6zI2xX1978P0Uqw= github.com/gliderlabs/ssh v0.1.1 h1:j3L6gSLQalDETeEg/Jg0mGY0/y/N6zI2xX1978P0Uqw=
@ -424,6 +432,7 @@ github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09
github.com/hashicorp/go.net v0.0.1 h1:sNCoNyDEvN1xa+X0baata4RdcpKwcMS6DH+xwfqPgjw= github.com/hashicorp/go.net v0.0.1 h1:sNCoNyDEvN1xa+X0baata4RdcpKwcMS6DH+xwfqPgjw=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=

40
misc.go
View File

@ -5,29 +5,27 @@ import (
"net" "net"
"github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/torrent/types"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
) )
type ChunkSpec struct { type (
Begin, Length pp.Integer Request = types.Request
} ChunkSpec = types.ChunkSpec
piecePriority = types.PiecePriority
)
type Request struct { const (
Index pp.Integer PiecePriorityNormal = types.PiecePriorityNormal
ChunkSpec PiecePriorityNone = types.PiecePriorityNone
} PiecePriorityNow = types.PiecePriorityNow
PiecePriorityReadahead = types.PiecePriorityReadahead
func (r Request) ToMsg(mt pp.MessageType) pp.Message { PiecePriorityNext = types.PiecePriorityNext
return pp.Message{ PiecePriorityHigh = types.PiecePriorityHigh
Type: mt, )
Index: r.Index,
Begin: r.Begin,
Length: r.Length,
}
}
func newRequest(index, begin, length pp.Integer) Request { func newRequest(index, begin, length pp.Integer) Request {
return Request{index, ChunkSpec{begin, length}} return Request{index, ChunkSpec{begin, length}}
@ -151,6 +149,16 @@ func min(as ...int64) int64 {
return ret return ret
} }
func minInt(as ...int) int {
ret := as[0]
for _, a := range as[1:] {
if a < ret {
ret = a
}
}
return ret
}
var unlimited = rate.NewLimiter(rate.Inf, 0) var unlimited = rate.NewLimiter(rate.Inf, 0)
type ( type (

View File

@ -35,7 +35,7 @@ func BenchmarkIterBitmapsDistinct(t *testing.B) {
output := iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second)) output := iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second))
t.StopTimer() t.StopTimer()
assert.Equal(t, []interface{}{0, 3, 2}, output) assert.Equal(t, []interface{}{0, 3, 2}, output)
assert.Equal(t, []int{1}, skip.ToSortedSlice()) assert.Equal(t, []bitmap.BitIndex{1}, skip.ToSortedSlice())
} }
} }

133
peer-conn-msg-writer.go Normal file
View File

@ -0,0 +1,133 @@
package torrent
import (
"bytes"
"io"
"time"
"github.com/anacrolix/chansync"
"github.com/anacrolix/log"
"github.com/anacrolix/sync"
pp "github.com/anacrolix/torrent/peer_protocol"
)
func (pc *PeerConn) startWriter() {
w := &pc.messageWriter
*w = peerConnMsgWriter{
fillWriteBuffer: func() {
pc.locker().Lock()
defer pc.locker().Unlock()
if pc.closed.IsSet() {
return
}
pc.fillWriteBuffer()
},
closed: &pc.closed,
logger: pc.logger,
w: pc.w,
keepAlive: func() bool {
pc.locker().Lock()
defer pc.locker().Unlock()
return pc.useful()
},
writeBuffer: new(bytes.Buffer),
}
go func() {
defer pc.locker().Unlock()
defer pc.close()
defer pc.locker().Lock()
pc.messageWriter.run(time.Minute)
}()
}
type peerConnMsgWriter struct {
// Must not be called with the local mutex held, as it will call back into the write method.
fillWriteBuffer func()
closed *chansync.SetOnce
logger log.Logger
w io.Writer
keepAlive func() bool
mu sync.Mutex
writeCond chansync.BroadcastCond
// Pointer so we can swap with the "front buffer".
writeBuffer *bytes.Buffer
}
// Routine that writes to the peer. Some of what to write is buffered by
// activity elsewhere in the Client, and some is determined locally when the
// connection is writable.
func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
var (
lastWrite time.Time = time.Now()
keepAliveTimer *time.Timer
)
keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
cn.mu.Lock()
defer cn.mu.Unlock()
if time.Since(lastWrite) >= keepAliveTimeout {
cn.writeCond.Broadcast()
}
keepAliveTimer.Reset(keepAliveTimeout)
})
cn.mu.Lock()
defer cn.mu.Unlock()
defer keepAliveTimer.Stop()
frontBuf := new(bytes.Buffer)
for {
if cn.closed.IsSet() {
return
}
if cn.writeBuffer.Len() == 0 {
func() {
cn.mu.Unlock()
defer cn.mu.Lock()
cn.fillWriteBuffer()
}()
}
if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() {
cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
torrent.Add("written keepalives", 1)
}
if cn.writeBuffer.Len() == 0 {
writeCond := cn.writeCond.Signaled()
cn.mu.Unlock()
select {
case <-cn.closed.Done():
case <-writeCond:
}
cn.mu.Lock()
continue
}
// Flip the buffers.
frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
cn.mu.Unlock()
n, err := cn.w.Write(frontBuf.Bytes())
cn.mu.Lock()
if n != 0 {
lastWrite = time.Now()
keepAliveTimer.Reset(keepAliveTimeout)
}
if err != nil {
cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
return
}
if n != frontBuf.Len() {
panic("short write")
}
frontBuf.Reset()
}
}
func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
cn.mu.Lock()
defer cn.mu.Unlock()
cn.writeBuffer.Write(msg.MustMarshalBinary())
cn.writeCond.Broadcast()
return !cn.writeBufferFull()
}
func (cn *peerConnMsgWriter) writeBufferFull() bool {
return cn.writeBuffer.Len() >= writeBufferHighWaterLen
}

View File

@ -8,16 +8,20 @@ import (
// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with // BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
// legacy PeerConn methods. // legacy PeerConn methods.
type peerImpl interface { type peerImpl interface {
onNextRequestStateChanged()
updateRequests() updateRequests()
writeInterested(interested bool) bool writeInterested(interested bool) bool
cancel(Request) bool
// Return true if there's room for more activity. // Neither of these return buffer room anymore, because they're currently both posted. There's
request(Request) bool // also PeerConn.writeBufferFull for when/where it matters.
_cancel(Request) bool
_request(Request) bool
connectionFlags() string connectionFlags() string
onClose() onClose()
_postCancel(Request)
onGotInfo(*metainfo.Info) onGotInfo(*metainfo.Info)
drop() drop()
String() string String() string
connStatusString() string connStatusString() string
writeBufferFull() bool
} }

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,6 @@ import (
"net" "net"
"sync" "sync"
"testing" "testing"
"time"
"github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/pubsub"
"github.com/bradfitz/iter" "github.com/bradfitz/iter"
@ -32,7 +31,7 @@ func TestSendBitfieldThenHave(t *testing.T) {
r, w := io.Pipe() r, w := io.Pipe()
//c.r = r //c.r = r
c.w = w c.w = w
go c.writer(time.Minute) c.startWriter()
c.locker().Lock() c.locker().Lock()
c.t._completedPieces.Add(1) c.t._completedPieces.Add(1)
c.postBitfield( /*[]bool{false, true, false}*/ ) c.postBitfield( /*[]bool{false, true, false}*/ )
@ -98,7 +97,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
ts := &torrentStorage{} ts := &torrentStorage{}
t := &Torrent{ t := &Torrent{
cl: cl, cl: cl,
storage: &storage.Torrent{TorrentImpl: ts}, storage: &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}},
pieceStateChanges: pubsub.NewPubSub(), pieceStateChanges: pubsub.NewPubSub(),
} }
require.NoError(b, t.setInfo(&metainfo.Info{ require.NoError(b, t.setInfo(&metainfo.Info{

View File

@ -21,7 +21,7 @@ func TestPexConnState(t *testing.T) {
c := cl.newConnection(nil, false, addr, addr.Network(), "") c := cl.newConnection(nil, false, addr, addr.Network(), "")
c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber) c.PeerExtensionIDs = make(map[pp.ExtensionName]pp.ExtensionNumber)
c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId c.PeerExtensionIDs[pp.ExtensionNamePex] = pexExtendedId
c.writerCond.L.Lock() c.messageWriter.mu.Lock()
c.setTorrent(torrent) c.setTorrent(torrent)
torrent.addPeerConn(c) torrent.addPeerConn(c)
@ -36,7 +36,7 @@ func TestPexConnState(t *testing.T) {
out = m out = m
return true return true
} }
c.writerCond.Wait() <-c.messageWriter.writeCond.Signaled()
c.pex.Share(testWriter) c.pex.Share(testWriter)
require.True(t, writerCalled) require.True(t, writerCalled)
require.EqualValues(t, pp.Extended, out.Type) require.EqualValues(t, pp.Extended, out.Type)

View File

@ -11,33 +11,6 @@ import (
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
) )
// Describes the importance of obtaining a particular piece.
type piecePriority byte
func (pp *piecePriority) Raise(maybe piecePriority) bool {
if maybe > *pp {
*pp = maybe
return true
}
return false
}
// Priority for use in PriorityBitmap
func (me piecePriority) BitmapPriority() int {
return -int(me)
}
const (
PiecePriorityNone piecePriority = iota // Not wanted. Must be the zero value.
PiecePriorityNormal // Wanted.
PiecePriorityHigh // Wanted a lot.
PiecePriorityReadahead // May be required soon.
// Succeeds a piece where a read occurred. Currently the same as Now,
// apparently due to issues with caching.
PiecePriorityNext
PiecePriorityNow // A Reader is reading in this piece. Highest urgency.
)
type Piece struct { type Piece struct {
// The completed piece SHA1 hash, from the metainfo "pieces" field. // The completed piece SHA1 hash, from the metainfo "pieces" field.
hash *metainfo.Hash hash *metainfo.Hash
@ -55,6 +28,7 @@ type Piece struct {
publicPieceState PieceState publicPieceState PieceState
priority piecePriority priority piecePriority
availability int64
// This can be locked when the Client lock is taken, but probably not vice versa. // This can be locked when the Client lock is taken, but probably not vice versa.
pendingWritesMutex sync.Mutex pendingWritesMutex sync.Mutex
@ -79,7 +53,7 @@ func (p *Piece) Storage() storage.Piece {
} }
func (p *Piece) pendingChunkIndex(chunkIndex int) bool { func (p *Piece) pendingChunkIndex(chunkIndex int) bool {
return !p._dirtyChunks.Contains(chunkIndex) return !p._dirtyChunks.Contains(bitmap.BitIndex(chunkIndex))
} }
func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool { func (p *Piece) pendingChunk(cs ChunkSpec, chunkSize pp.Integer) bool {
@ -95,12 +69,12 @@ func (p *Piece) numDirtyChunks() pp.Integer {
} }
func (p *Piece) unpendChunkIndex(i int) { func (p *Piece) unpendChunkIndex(i int) {
p._dirtyChunks.Add(i) p._dirtyChunks.Add(bitmap.BitIndex(i))
p.t.tickleReaders() p.t.tickleReaders()
} }
func (p *Piece) pendChunkIndex(i int) { func (p *Piece) pendChunkIndex(i int) {
p._dirtyChunks.Remove(i) p._dirtyChunks.Remove(bitmap.BitIndex(i))
} }
func (p *Piece) numChunks() pp.Integer { func (p *Piece) numChunks() pp.Integer {
@ -144,7 +118,7 @@ func (p *Piece) chunkIndexSpec(chunk pp.Integer) ChunkSpec {
func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) Request { func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) Request {
return Request{ return Request{
pp.Integer(p.index), pp.Integer(p.index),
chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()), p.chunkIndexSpec(chunkIndex),
} }
} }
@ -221,14 +195,11 @@ func (p *Piece) SetPriority(prio piecePriority) {
p.t.updatePiecePriority(p.index) p.t.updatePiecePriority(p.index)
} }
func (p *Piece) uncachedPriority() (ret piecePriority) { func (p *Piece) purePriority() (ret piecePriority) {
if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) {
return PiecePriorityNone
}
for _, f := range p.files { for _, f := range p.files {
ret.Raise(f.prio) ret.Raise(f.prio)
} }
if p.t.readerNowPieces().Contains(int(p.index)) { if p.t.readerNowPieces().Contains(bitmap.BitIndex(p.index)) {
ret.Raise(PiecePriorityNow) ret.Raise(PiecePriorityNow)
} }
// if t._readerNowPieces.Contains(piece - 1) { // if t._readerNowPieces.Contains(piece - 1) {
@ -241,6 +212,13 @@ func (p *Piece) uncachedPriority() (ret piecePriority) {
return return
} }
func (p *Piece) uncachedPriority() (ret piecePriority) {
if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) {
return PiecePriorityNone
}
return p.purePriority()
}
// Tells the Client to refetch the completion status from storage, updating priority etc. if // Tells the Client to refetch the completion status from storage, updating priority etc. if
// necessary. Might be useful if you know the state of the piece data has changed externally. // necessary. Might be useful if you know the state of the piece data has changed externally.
func (p *Piece) UpdateCompletion() { func (p *Piece) UpdateCompletion() {
@ -256,11 +234,7 @@ func (p *Piece) completion() (ret storage.Completion) {
} }
func (p *Piece) allChunksDirty() bool { func (p *Piece) allChunksDirty() bool {
return p._dirtyChunks.Len() == int(p.numChunks()) return p._dirtyChunks.Len() == bitmap.BitRange(p.numChunks())
}
func (p *Piece) requestStrategyPiece() requestStrategyPiece {
return p
} }
func (p *Piece) dirtyChunks() bitmap.Bitmap { func (p *Piece) dirtyChunks() bitmap.Bitmap {
@ -270,3 +244,15 @@ func (p *Piece) dirtyChunks() bitmap.Bitmap {
func (p *Piece) State() PieceState { func (p *Piece) State() PieceState {
return p.t.PieceState(p.index) return p.t.PieceState(p.index)
} }
func (p *Piece) iterUndirtiedChunks(f func(cs ChunkSpec) bool) bool {
for i := pp.Integer(0); i < p.numChunks(); i++ {
if p.chunkIndexDirty(i) {
continue
}
if !f(p.chunkIndexSpec(i)) {
return false
}
}
return true
}

View File

@ -1,45 +0,0 @@
package torrent
import (
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
pp "github.com/anacrolix/torrent/peer_protocol"
)
// Provides default implementations for requestStrategy methods. Could be embedded, or delegated to.
type requestStrategyDefaults struct{}
func (requestStrategyDefaults) hooks() requestStrategyHooks {
return requestStrategyHooks{
sentRequest: func(Request) {},
deletedRequest: func(Request) {},
}
}
func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
chunkIndices := p.dirtyChunks().Copy()
chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks()))
return iter.ForPerm(chunkIndices.Len(), func(i int) bool {
ci, err := chunkIndices.RB.Select(uint32(i))
if err != nil {
panic(err)
}
return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec)
})
}
func (requestStrategyDefaults) nominalMaxRequests(cn requestStrategyConnection) int {
return int(
max(
64,
cn.stats().ChunksReadUseful.Int64()-
(cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64())))
}
func (requestStrategyDefaults) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
return prio
}
func (requestStrategyDefaults) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
return false
}

View File

@ -1,223 +0,0 @@
package torrent
import (
"math"
"sync"
"time"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/prioritybitmap"
pp "github.com/anacrolix/torrent/peer_protocol"
)
type requestStrategyPiece interface {
numChunks() pp.Integer
dirtyChunks() bitmap.Bitmap
chunkIndexRequest(i pp.Integer) Request
}
type requestStrategyTorrent interface {
numConns() int
numReaders() int
numPieces() int
readerPiecePriorities() (now, readahead bitmap.Bitmap)
ignorePieces() bitmap.Bitmap
pendingPieces() *prioritybitmap.PriorityBitmap
}
type requestStrategyConnection interface {
torrent() requestStrategyTorrent
peerPieces() bitmap.Bitmap
pieceRequestOrder() *prioritybitmap.PriorityBitmap
fastest() bool
stats() *ConnStats
totalExpectingTime() time.Duration
peerMaxRequests() int
chunksReceivedWhileExpecting() int64
}
type requestStrategy interface {
iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool
iterUndirtiedChunks(requestStrategyPiece, func(ChunkSpec) bool) bool
nominalMaxRequests(requestStrategyConnection) int
shouldRequestWithoutBias(requestStrategyConnection) bool
piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
hooks() requestStrategyHooks
}
type requestStrategyHooks struct {
sentRequest func(Request)
deletedRequest func(Request)
}
type requestStrategyCallbacks interface {
requestTimedOut(Request)
}
type requestStrategyFuzzing struct {
requestStrategyDefaults
}
type requestStrategyFastest struct {
requestStrategyDefaults
}
func newRequestStrategyMaker(rs requestStrategy) requestStrategyMaker {
return func(requestStrategyCallbacks, sync.Locker) requestStrategy {
return rs
}
}
// The fastest connection downloads strictly in order of priority, while all others adhere to their
// piece inclinations.
func RequestStrategyFastest() requestStrategyMaker {
return newRequestStrategyMaker(requestStrategyFastest{})
}
// Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across
// connections.
func RequestStrategyFuzzing() requestStrategyMaker {
return newRequestStrategyMaker(requestStrategyFuzzing{})
}
func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool {
if cn.torrent().numReaders() == 0 {
return false
}
if cn.torrent().numConns() == 1 {
return true
}
if cn.fastest() {
return true
}
return false
}
type requestStrategyDuplicateRequestTimeout struct {
requestStrategyDefaults
// How long to avoid duplicating a pending request.
duplicateRequestTimeout time.Duration
callbacks requestStrategyCallbacks
// The last time we requested a chunk. Deleting the request from any connection will clear this
// value.
lastRequested map[Request]*time.Timer
// The lock to take when running a request timeout handler.
timeoutLocker sync.Locker
}
// Generates a request strategy instance for a given torrent. callbacks are probably specific to the torrent.
type requestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy
// Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is
// reached.
func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duration) requestStrategyMaker {
return func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy {
return requestStrategyDuplicateRequestTimeout{
duplicateRequestTimeout: duplicateRequestTimeout,
callbacks: callbacks,
lastRequested: make(map[Request]*time.Timer),
timeoutLocker: clientLocker,
}
}
}
func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks {
return requestStrategyHooks{
deletedRequest: func(r Request) {
if t, ok := rs.lastRequested[r]; ok {
t.Stop()
delete(rs.lastRequested, r)
}
},
sentRequest: rs.onSentRequest,
}
}
func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool {
for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
continue
}
r := p.chunkIndexRequest(i)
if rs.wouldDuplicateRecent(r) {
continue
}
if !f(r.ChunkSpec) {
return false
}
}
return true
}
func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int {
switch tpp {
case PiecePriorityNormal:
case PiecePriorityReadahead:
prio -= int(cn.torrent().numPieces())
case PiecePriorityNext, PiecePriorityNow:
prio -= 2 * int(cn.torrent().numPieces())
default:
panic(tpp)
}
prio += int(piece / 3)
return prio
}
func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool {
return iterUnbiasedPieceRequestOrder(cn, f)
}
func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool {
if rs.shouldRequestWithoutBias(cn) {
return iterUnbiasedPieceRequestOrder(cn, f)
} else {
return cn.pieceRequestOrder().IterTyped(func(i int) bool {
return f(pieceIndex(i))
})
}
}
func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
return defaultIterPendingPieces(rs, cn, cb)
}
func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool {
return defaultIterPendingPieces(rs, cn, cb)
}
func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r Request) {
rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
rs.timeoutLocker.Lock()
delete(rs.lastRequested, r)
rs.timeoutLocker.Unlock()
rs.callbacks.requestTimedOut(r)
})
}
// The actual value to use as the maximum outbound requests.
func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
expectingTime := int64(cn.totalExpectingTime())
if expectingTime == 0 {
expectingTime = math.MaxInt64
} else {
expectingTime *= 2
}
return int(clamp(
1,
int64(cn.peerMaxRequests()),
max(
// It makes sense to always pipeline at least one connection, since latency must be
// non-zero.
2,
// Request only as many as we expect to receive in the duplicateRequestTimeout
// window. We are trying to avoid having to duplicate requests.
cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
),
))
}
func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r Request) bool {
// This piece has been requested on another connection, and the duplicate request timer is still
// running.
_, ok := rs.lastRequested[r]
return ok
}

349
request-strategy/order.go Normal file
View File

@ -0,0 +1,349 @@
package request_strategy
import (
"fmt"
"sort"
"github.com/anacrolix/multiless"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/types"
)
type (
Request = types.Request
pieceIndex = types.PieceIndex
piecePriority = types.PiecePriority
// This can be made into a type-param later, will be great for testing.
ChunkSpec = types.ChunkSpec
)
type ClientPieceOrder struct{}
type filterTorrent struct {
Torrent
unverifiedBytes int64
// Potentially shared with other torrents.
storageLeft *int64
}
func sortFilterPieces(pieces []filterPiece) {
sort.Slice(pieces, func(_i, _j int) bool {
i := pieces[_i]
j := pieces[_j]
return multiless.New().Int(
int(j.Priority), int(i.Priority),
).Bool(
j.Partial, i.Partial,
).Int64(
i.Availability, j.Availability,
).Int(
i.index, j.index,
).Uintptr(
i.t.StableId, j.t.StableId,
).MustLess()
})
}
type requestsPeer struct {
Peer
nextState PeerNextRequestState
requestablePiecesRemaining int
}
func (rp *requestsPeer) canFitRequest() bool {
return len(rp.nextState.Requests) < rp.MaxRequests
}
func (rp *requestsPeer) addNextRequest(r Request) {
_, ok := rp.nextState.Requests[r]
if ok {
panic("should only add once")
}
rp.nextState.Requests[r] = struct{}{}
}
type peersForPieceRequests struct {
requestsInPiece int
*requestsPeer
}
func (me *peersForPieceRequests) addNextRequest(r Request) {
me.requestsPeer.addNextRequest(r)
me.requestsInPiece++
}
type requestablePiece struct {
index pieceIndex
t Torrent
alwaysReallocate bool
NumPendingChunks int
IterPendingChunks ChunksIter
}
type filterPiece struct {
t *filterTorrent
index pieceIndex
Piece
}
func getRequestablePieces(input Input) (ret []requestablePiece) {
// Storage capacity left for this run, keyed by the storage capacity pointer on the storage
// TorrentImpl.
storageLeft := make(map[*func() *int64]*int64)
var pieces []filterPiece
for _, _t := range input.Torrents {
// TODO: We could do metainfo requests here.
t := &filterTorrent{
Torrent: _t,
unverifiedBytes: 0,
}
key := t.Capacity
if key != nil {
if _, ok := storageLeft[key]; !ok {
storageLeft[key] = (*key)()
}
t.storageLeft = storageLeft[key]
}
for i, tp := range t.Pieces {
pieces = append(pieces, filterPiece{
t: t,
index: i,
Piece: tp,
})
}
}
sortFilterPieces(pieces)
var allTorrentsUnverifiedBytes int64
for _, piece := range pieces {
if left := piece.t.storageLeft; left != nil {
if *left < int64(piece.Length) {
continue
}
*left -= int64(piece.Length)
}
if !piece.Request || piece.NumPendingChunks == 0 {
// TODO: Clarify exactly what is verified. Stuff that's being hashed should be
// considered unverified and hold up further requests.
continue
}
if piece.t.MaxUnverifiedBytes != 0 && piece.t.unverifiedBytes+piece.Length > piece.t.MaxUnverifiedBytes {
continue
}
if input.MaxUnverifiedBytes != 0 && allTorrentsUnverifiedBytes+piece.Length > input.MaxUnverifiedBytes {
continue
}
piece.t.unverifiedBytes += piece.Length
allTorrentsUnverifiedBytes += piece.Length
ret = append(ret, requestablePiece{
index: piece.index,
t: piece.t.Torrent,
NumPendingChunks: piece.NumPendingChunks,
IterPendingChunks: piece.iterPendingChunksWrapper,
alwaysReallocate: piece.Priority >= types.PiecePriorityNext,
})
}
return
}
type Input struct {
Torrents []Torrent
MaxUnverifiedBytes int64
}
// TODO: We could do metainfo requests here.
func Run(input Input) map[PeerId]PeerNextRequestState {
requestPieces := getRequestablePieces(input)
torrents := input.Torrents
allPeers := make(map[uintptr][]*requestsPeer, len(torrents))
for _, t := range torrents {
peers := make([]*requestsPeer, 0, len(t.Peers))
for _, p := range t.Peers {
peers = append(peers, &requestsPeer{
Peer: p,
nextState: PeerNextRequestState{
Requests: make(map[Request]struct{}, p.MaxRequests),
},
})
}
allPeers[t.StableId] = peers
}
for _, piece := range requestPieces {
for _, peer := range allPeers[piece.t.StableId] {
if peer.canRequestPiece(piece.index) {
peer.requestablePiecesRemaining++
}
}
}
for _, piece := range requestPieces {
allocatePendingChunks(piece, allPeers[piece.t.StableId])
}
ret := make(map[PeerId]PeerNextRequestState)
for _, peers := range allPeers {
for _, rp := range peers {
if rp.requestablePiecesRemaining != 0 {
panic(rp.requestablePiecesRemaining)
}
if _, ok := ret[rp.Id]; ok {
panic(fmt.Sprintf("duplicate peer id: %v", rp.Id))
}
ret[rp.Id] = rp.nextState
}
}
return ret
}
// Checks that a sorted peersForPiece slice makes sense.
func ensureValidSortedPeersForPieceRequests(peers []*peersForPieceRequests, sortLess func(_, _ int) bool) {
if !sort.SliceIsSorted(peers, sortLess) {
panic("not sorted")
}
peerMap := make(map[*peersForPieceRequests]struct{}, len(peers))
for _, p := range peers {
if _, ok := peerMap[p]; ok {
panic(p)
}
peerMap[p] = struct{}{}
}
}
func allocatePendingChunks(p requestablePiece, peers []*requestsPeer) {
peersForPiece := make([]*peersForPieceRequests, 0, len(peers))
for _, peer := range peers {
peersForPiece = append(peersForPiece, &peersForPieceRequests{
requestsInPiece: 0,
requestsPeer: peer,
})
}
defer func() {
for _, peer := range peersForPiece {
if peer.canRequestPiece(p.index) {
peer.requestablePiecesRemaining--
}
}
}()
sortPeersForPiece := func(req *Request) {
less := func(i, j int) bool {
byHasRequest := func() multiless.Computation {
ml := multiless.New()
if req != nil {
_, iHas := peersForPiece[i].nextState.Requests[*req]
_, jHas := peersForPiece[j].nextState.Requests[*req]
ml = ml.Bool(jHas, iHas)
}
return ml
}()
ml := multiless.New()
// We always "reallocate", that is force even striping amongst peers that are either on
// the last piece they can contribute too, or for pieces marked for this behaviour.
// Striping prevents starving peers of requests, and will always re-balance to the
// fastest known peers.
if !p.alwaysReallocate {
ml = ml.Bool(
peersForPiece[j].requestablePiecesRemaining == 1,
peersForPiece[i].requestablePiecesRemaining == 1)
}
if p.alwaysReallocate || peersForPiece[j].requestablePiecesRemaining == 1 {
ml = ml.Int(
peersForPiece[i].requestsInPiece,
peersForPiece[j].requestsInPiece)
} else {
ml = ml.AndThen(byHasRequest)
}
ml = ml.Int(
peersForPiece[i].requestablePiecesRemaining,
peersForPiece[j].requestablePiecesRemaining,
).Float64(
peersForPiece[j].DownloadRate,
peersForPiece[i].DownloadRate,
)
ml = ml.AndThen(byHasRequest)
return ml.Int64(
int64(peersForPiece[j].Age), int64(peersForPiece[i].Age),
// TODO: Probably peer priority can come next
).Uintptr(
peersForPiece[i].Id.Uintptr(),
peersForPiece[j].Id.Uintptr(),
).MustLess()
}
sort.Slice(peersForPiece, less)
ensureValidSortedPeersForPieceRequests(peersForPiece, less)
}
// Chunks can be preassigned several times, if peers haven't been able to update their "actual"
// with "next" request state before another request strategy run occurs.
preallocated := make(map[ChunkSpec][]*peersForPieceRequests, p.NumPendingChunks)
p.IterPendingChunks(func(spec ChunkSpec) {
req := Request{pp.Integer(p.index), spec}
for _, peer := range peersForPiece {
if h := peer.HasExistingRequest; h == nil || !h(req) {
continue
}
if !peer.canFitRequest() {
continue
}
if !peer.canRequestPiece(p.index) {
continue
}
preallocated[spec] = append(preallocated[spec], peer)
peer.addNextRequest(req)
}
})
pendingChunksRemaining := int(p.NumPendingChunks)
p.IterPendingChunks(func(chunk types.ChunkSpec) {
if _, ok := preallocated[chunk]; ok {
return
}
req := Request{pp.Integer(p.index), chunk}
defer func() { pendingChunksRemaining-- }()
sortPeersForPiece(nil)
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
}
if !peer.HasPiece(p.index) {
continue
}
if !peer.pieceAllowedFastOrDefault(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {
continue
}
}
peer.addNextRequest(req)
break
}
})
chunk:
for chunk, prePeers := range preallocated {
pendingChunksRemaining--
req := Request{pp.Integer(p.index), chunk}
for _, pp := range prePeers {
pp.requestsInPiece--
}
sortPeersForPiece(&req)
for _, pp := range prePeers {
delete(pp.nextState.Requests, req)
}
for _, peer := range peersForPiece {
if !peer.canFitRequest() {
continue
}
if !peer.HasPiece(p.index) {
continue
}
if !peer.pieceAllowedFastOrDefault(p.index) {
// TODO: Verify that's okay to stay uninterested if we request allowed fast pieces.
peer.nextState.Interested = true
if peer.Choking {
continue
}
}
peer.addNextRequest(req)
continue chunk
}
}
if pendingChunksRemaining != 0 {
panic(pendingChunksRemaining)
}
}

View File

@ -0,0 +1,297 @@
package request_strategy
import (
"math"
"testing"
"github.com/bradfitz/iter"
qt "github.com/frankban/quicktest"
pp "github.com/anacrolix/torrent/peer_protocol"
)
func r(i pieceIndex, begin int) Request {
return Request{pp.Integer(i), ChunkSpec{pp.Integer(begin), 1}}
}
func chunkIterRange(end int) func(func(ChunkSpec)) {
return func(f func(ChunkSpec)) {
for offset := range iter.N(end) {
f(ChunkSpec{pp.Integer(offset), 1})
}
}
}
func chunkIter(offsets ...int) func(func(ChunkSpec)) {
return func(f func(ChunkSpec)) {
for _, offset := range offsets {
f(ChunkSpec{pp.Integer(offset), 1})
}
}
}
func requestSetFromSlice(rs ...Request) (ret map[Request]struct{}) {
ret = make(map[Request]struct{}, len(rs))
for _, r := range rs {
ret[r] = struct{}{}
}
return
}
type intPeerId int
func (i intPeerId) Uintptr() uintptr {
return uintptr(i)
}
func TestStealingFromSlowerPeer(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
},
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
stealee.HasExistingRequest = func(r Request) bool {
return true
}
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
Pieces: []Piece{{
Request: true,
NumPendingChunks: 5,
IterPendingChunks: chunkIterRange(5),
}},
Peers: []Peer{
stealee,
firstStealer,
secondStealer,
},
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l int) {
c.Check(results[p].Requests, qt.HasLen, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(stealee.Id, 1)
check(firstStealer.Id, 2)
check(secondStealer.Id, 2)
}
func checkNumRequestsAndInterest(c *qt.C, next PeerNextRequestState, num int, interest bool) {
c.Check(next.Requests, qt.HasLen, num)
c.Check(next.Interested, qt.Equals, interest)
}
func TestStealingFromSlowerPeersBasic(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
},
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
stealee := basePeer
stealee.DownloadRate = 1
stealee.HasExistingRequest = func(r Request) bool {
return true
}
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
Pieces: []Piece{{
Request: true,
NumPendingChunks: 2,
IterPendingChunks: chunkIter(0, 1),
}},
Peers: []Peer{
stealee,
firstStealer,
secondStealer,
},
}}})
checkNumRequestsAndInterest(c, results[firstStealer.Id], 1, true)
checkNumRequestsAndInterest(c, results[secondStealer.Id], 1, true)
checkNumRequestsAndInterest(c, results[stealee.Id], 0, false)
}
func TestPeerKeepsExistingIfReasonable(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
},
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
keepReq := r(0, 0)
stealee.HasExistingRequest = func(r Request) bool {
return r == keepReq
}
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
results := Run(Input{Torrents: []Torrent{{
Pieces: []Piece{{
Request: true,
NumPendingChunks: 4,
IterPendingChunks: chunkIter(0, 1, 3, 4),
}},
Peers: []Peer{
stealee,
firstStealer,
secondStealer,
},
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l int) {
c.Check(results[p].Requests, qt.HasLen, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(firstStealer.Id, 2)
check(secondStealer.Id, 1)
c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{
Interested: true,
Requests: requestSetFromSlice(keepReq),
})
}
func TestDontStealUnnecessarily(t *testing.T) {
c := qt.New(t)
basePeer := Peer{
HasPiece: func(i pieceIndex) bool {
return true
},
MaxRequests: math.MaxInt16,
DownloadRate: 2,
}
// Slower than the stealers, but has all requests already.
stealee := basePeer
stealee.DownloadRate = 1
keepReqs := requestSetFromSlice(
r(3, 2), r(3, 4), r(3, 6), r(3, 8),
r(4, 0), r(4, 1), r(4, 7), r(4, 8))
stealee.HasExistingRequest = func(r Request) bool {
_, ok := keepReqs[r]
return ok
}
stealee.Id = intPeerId(1)
firstStealer := basePeer
firstStealer.Id = intPeerId(2)
secondStealer := basePeer
secondStealer.Id = intPeerId(3)
secondStealer.HasPiece = func(i pieceIndex) bool {
switch i {
case 1, 3:
return true
default:
return false
}
}
results := Run(Input{Torrents: []Torrent{{
Pieces: []Piece{
{
Request: true,
NumPendingChunks: 0,
IterPendingChunks: chunkIterRange(9),
},
{
Request: true,
NumPendingChunks: 7,
IterPendingChunks: chunkIterRange(7),
},
{
Request: true,
NumPendingChunks: 0,
IterPendingChunks: chunkIterRange(0),
},
{
Request: true,
NumPendingChunks: 9,
IterPendingChunks: chunkIterRange(9),
},
{
Request: true,
NumPendingChunks: 9,
IterPendingChunks: chunkIterRange(9),
}},
Peers: []Peer{
firstStealer,
stealee,
secondStealer,
},
}}})
c.Assert(results, qt.HasLen, 3)
check := func(p PeerId, l int) {
c.Check(results[p].Requests, qt.HasLen, l)
c.Check(results[p].Interested, qt.Equals, l > 0)
}
check(firstStealer.Id, 5)
check(secondStealer.Id, 7+9)
c.Check(results[stealee.Id], qt.ContentEquals, PeerNextRequestState{
Interested: true,
Requests: requestSetFromSlice(r(4, 0), r(4, 1), r(4, 7), r(4, 8)),
})
}
// This tests a situation where multiple peers had the same existing request, due to "actual" and
// "next" request states being out of sync. This reasonable occurs when a peer hasn't fully updated
// its actual request state since the last request strategy run.
func TestDuplicatePreallocations(t *testing.T) {
peer := func(id int, downloadRate float64) Peer {
return Peer{
HasExistingRequest: func(r Request) bool {
return true
},
MaxRequests: 2,
HasPiece: func(i pieceIndex) bool {
return true
},
Id: intPeerId(id),
DownloadRate: downloadRate,
}
}
results := Run(Input{
Torrents: []Torrent{{
Pieces: []Piece{{
Request: true,
NumPendingChunks: 1,
IterPendingChunks: chunkIterRange(1),
}, {
Request: true,
NumPendingChunks: 1,
IterPendingChunks: chunkIterRange(1),
}},
Peers: []Peer{
// The second peer was be marked as the preallocation, clobbering the first. The
// first peer is preferred, and the piece isn't striped, so it gets preallocated a
// request, and then gets reallocated from the peer the same request.
peer(1, 2),
peer(2, 1),
},
}},
})
c := qt.New(t)
c.Assert(2, qt.Equals, len(results[intPeerId(1)].Requests)+len(results[intPeerId(2)].Requests))
}

38
request-strategy/peer.go Normal file
View File

@ -0,0 +1,38 @@
package request_strategy
import (
"time"
)
type PeerNextRequestState struct {
Interested bool
Requests map[Request]struct{}
}
type PeerId interface {
Uintptr() uintptr
}
type Peer struct {
HasPiece func(i pieceIndex) bool
MaxRequests int
HasExistingRequest func(r Request) bool
Choking bool
PieceAllowedFast func(pieceIndex) bool
DownloadRate float64
Age time.Duration
// This is passed back out at the end, so must support equality. Could be a type-param later.
Id PeerId
}
func (p *Peer) pieceAllowedFastOrDefault(i pieceIndex) bool {
if f := p.PieceAllowedFast; f != nil {
return f(i)
}
return false
}
// TODO: This might be used in more places I think.
func (p *Peer) canRequestPiece(i pieceIndex) bool {
return p.HasPiece(i) && (!p.Choking || (p.PieceAllowedFast != nil && p.PieceAllowedFast(i)))
}

24
request-strategy/piece.go Normal file
View File

@ -0,0 +1,24 @@
package request_strategy
import (
"github.com/anacrolix/torrent/types"
)
type ChunksIter func(func(types.ChunkSpec))
type Piece struct {
Request bool
Priority piecePriority
Partial bool
Availability int64
Length int64
NumPendingChunks int
IterPendingChunks ChunksIter
}
func (p Piece) iterPendingChunksWrapper(f func(ChunkSpec)) {
i := p.IterPendingChunks
if i != nil {
i(f)
}
}

View File

@ -0,0 +1,11 @@
package request_strategy
type Torrent struct {
Pieces []Piece
Capacity *func() *int64
Peers []Peer // not closed.
// Some value that's unique and stable between runs. Could even use the infohash?
StableId uintptr
MaxUnverifiedBytes int64
}

136
requesting.go Normal file
View File

@ -0,0 +1,136 @@
package torrent
import (
"time"
"unsafe"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/chansync"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/types"
)
func (cl *Client) requester() {
for {
update := func() chansync.Signaled {
cl.lock()
defer cl.unlock()
cl.doRequests()
return cl.updateRequests.Signaled()
}()
// We can probably tune how often to heed this signal. TODO: Currently disabled to retain
// existing behaviour, while the signalling is worked out.
update = nil
select {
case <-cl.closed.LockedChan(cl.locker()):
return
case <-update:
case <-time.After(100 * time.Millisecond):
}
}
}
func (cl *Client) tickleRequester() {
cl.updateRequests.Broadcast()
}
func (cl *Client) doRequests() {
ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
for _, t := range cl.torrents {
rst := request_strategy.Torrent{
StableId: uintptr(unsafe.Pointer(t)),
}
if t.storage != nil {
rst.Capacity = t.storage.Capacity
}
for i := range t.pieces {
p := &t.pieces[i]
rst.Pieces = append(rst.Pieces, request_strategy.Piece{
Request: !t.ignorePieceForRequests(i),
Priority: p.purePriority(),
Partial: t.piecePartiallyDownloaded(i),
Availability: p.availability,
Length: int64(p.length()),
NumPendingChunks: int(t.pieceNumPendingChunks(i)),
IterPendingChunks: func(f func(types.ChunkSpec)) {
p.iterUndirtiedChunks(func(cs ChunkSpec) bool {
f(cs)
return true
})
},
})
}
t.iterPeers(func(p *Peer) {
if p.closed.IsSet() {
return
}
if p.piecesReceivedSinceLastRequestUpdate > p.maxPiecesReceivedBetweenRequestUpdates {
p.maxPiecesReceivedBetweenRequestUpdates = p.piecesReceivedSinceLastRequestUpdate
}
p.piecesReceivedSinceLastRequestUpdate = 0
rst.Peers = append(rst.Peers, request_strategy.Peer{
HasPiece: p.peerHasPiece,
MaxRequests: p.nominalMaxRequests(),
HasExistingRequest: func(r request_strategy.Request) bool {
_, ok := p.actualRequestState.Requests[r]
return ok
},
Choking: p.peerChoking,
PieceAllowedFast: func(i pieceIndex) bool {
return p.peerAllowedFast.Contains(bitmap.BitIndex(i))
},
DownloadRate: p.downloadRate(),
Age: time.Since(p.completedHandshake),
Id: (*peerId)(p),
})
})
ts = append(ts, rst)
}
nextPeerStates := request_strategy.Run(request_strategy.Input{
Torrents: ts,
MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
})
for p, state := range nextPeerStates {
setPeerNextRequestState(p, state)
}
}
type peerId Peer
func (p *peerId) Uintptr() uintptr {
return uintptr(unsafe.Pointer(p))
}
func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
p := (*Peer)(_p.(*peerId))
p.nextRequestState = rp
p.onNextRequestStateChanged()
}
func (p *Peer) applyNextRequestState() bool {
next := p.nextRequestState
current := p.actualRequestState
if !p.setInterested(next.Interested) {
return false
}
for req := range current.Requests {
if _, ok := next.Requests[req]; !ok {
if !p.cancel(req) {
return false
}
}
}
for req := range next.Requests {
more, err := p.request(req)
if err != nil {
panic(err)
} else {
//log.Print(req)
}
if !more {
return false
}
}
return true
}

View File

@ -43,7 +43,11 @@ func (me *boltClient) Close() error {
} }
func (me *boltClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { func (me *boltClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return &boltTorrent{me, infoHash}, nil t := &boltTorrent{me, infoHash}
return TorrentImpl{
Piece: t.Piece,
Close: t.Close,
}, nil
} }
func (me *boltTorrent) Piece(p metainfo.Piece) PieceImpl { func (me *boltTorrent) Piece(p metainfo.Piece) PieceImpl {

View File

@ -0,0 +1,58 @@
package disabled
import (
"errors"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
)
type Client struct{}
var capacity int64
func (c Client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
capFunc := func() *int64 {
return &capacity
}
return storage.TorrentImpl{
Piece: func(piece metainfo.Piece) storage.PieceImpl {
return Piece{}
},
Close: func() error {
return nil
},
Capacity: &capFunc,
}, nil
}
func (c Client) capacity() *int64 {
return &capacity
}
type Piece struct{}
func (Piece) ReadAt(p []byte, off int64) (n int, err error) {
err = errors.New("disabled")
return
}
func (Piece) WriteAt(p []byte, off int64) (n int, err error) {
err = errors.New("disabled")
return
}
func (Piece) MarkComplete() error {
return errors.New("disabled")
}
func (Piece) MarkNotComplete() error {
return errors.New("disabled")
}
func (Piece) Completion() storage.Completion {
return storage.Completion{
Complete: false,
Ok: true,
}
}

View File

@ -67,14 +67,16 @@ func (me *fileClientImpl) Close() error {
return me.pc.Close() return me.pc.Close()
} }
func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
dir := fs.pathMaker(fs.baseDir, info, infoHash) dir := fs.pathMaker(fs.baseDir, info, infoHash)
upvertedFiles := info.UpvertedFiles() upvertedFiles := info.UpvertedFiles()
files := make([]file, 0, len(upvertedFiles)) files := make([]file, 0, len(upvertedFiles))
for i, fileInfo := range upvertedFiles { for i, fileInfo := range upvertedFiles {
s, err := ToSafeFilePath(append([]string{info.Name}, fileInfo.Path...)...) var s string
s, err = ToSafeFilePath(append([]string{info.Name}, fileInfo.Path...)...)
if err != nil { if err != nil {
return nil, fmt.Errorf("file %v has unsafe path %q: %w", i, fileInfo.Path, err) err = fmt.Errorf("file %v has unsafe path %q: %w", i, fileInfo.Path, err)
return
} }
f := file{ f := file{
path: filepath.Join(dir, s), path: filepath.Join(dir, s),
@ -83,16 +85,21 @@ func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Has
if f.length == 0 { if f.length == 0 {
err = CreateNativeZeroLengthFile(f.path) err = CreateNativeZeroLengthFile(f.path)
if err != nil { if err != nil {
return nil, fmt.Errorf("creating zero length file: %w", err) err = fmt.Errorf("creating zero length file: %w", err)
return
} }
} }
files = append(files, f) files = append(files, f)
} }
return &fileTorrentImpl{ t := &fileTorrentImpl{
files, files,
segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)), segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
infoHash, infoHash,
fs.pc, fs.pc,
}
return TorrentImpl{
Piece: t.Piece,
Close: t.Close,
}, nil }, nil
} }

View File

@ -17,9 +17,11 @@ type ClientImpl interface {
} }
// Data storage bound to a torrent. // Data storage bound to a torrent.
type TorrentImpl interface { type TorrentImpl struct {
Piece(metainfo.Piece) PieceImpl Piece func(p metainfo.Piece) PieceImpl
Close() error Close func() error
// Storages that share the same value, will provide a pointer to the same function.
Capacity *func() *int64
} }
// Interacts with torrent piece data. Optional interfaces to implement include: // Interacts with torrent piece data. Optional interfaces to implement include:

View File

@ -30,14 +30,14 @@ func NewMMapWithCompletion(baseDir string, completion PieceCompletion) *mmapClie
} }
} }
func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t TorrentImpl, err error) { func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
span, err := mMapTorrent(info, s.baseDir) span, err := mMapTorrent(info, s.baseDir)
t = &mmapTorrentStorage{ t := &mmapTorrentStorage{
infoHash: infoHash, infoHash: infoHash,
span: span, span: span,
pc: s.pc, pc: s.pc,
} }
return return TorrentImpl{Piece: t.Piece, Close: t.Close}, err
} }
func (s *mmapClientImpl) Close() error { func (s *mmapClientImpl) Close() error {

View File

@ -26,6 +26,7 @@ type ResourcePiecesOpts struct {
// Sized puts require being able to stream from a statement executed on another connection. // Sized puts require being able to stream from a statement executed on another connection.
// Without them, we buffer the entire read and then put that. // Without them, we buffer the entire read and then put that.
NoSizedPuts bool NoSizedPuts bool
Capacity *int64
} }
func NewResourcePieces(p PieceProvider) ClientImpl { func NewResourcePieces(p PieceProvider) ClientImpl {
@ -49,10 +50,11 @@ func (piecePerResourceTorrentImpl) Close() error {
} }
func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return piecePerResourceTorrentImpl{ t := piecePerResourceTorrentImpl{
s, s,
make([]sync.RWMutex, info.NumPieces()), make([]sync.RWMutex, info.NumPieces()),
}, nil }
return TorrentImpl{Piece: t.Piece, Close: t.Close}, nil
} }
func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl { func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl {

View File

@ -61,9 +61,24 @@ func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, er
if opts.BlobFlushInterval != 0 { if opts.BlobFlushInterval != 0 {
cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc) cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
} }
cl.capacity = cl.getCapacity
return cl, nil return cl, nil
} }
func (cl *client) getCapacity() (ret *int64) {
cl.l.Lock()
defer cl.l.Unlock()
err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
ret = new(int64)
*ret = stmt.ColumnInt64(0)
return nil
})
if err != nil {
panic(err)
}
return
}
type client struct { type client struct {
l sync.Mutex l sync.Mutex
conn conn conn conn
@ -71,6 +86,7 @@ type client struct {
blobFlusher *time.Timer blobFlusher *time.Timer
opts NewDirectStorageOpts opts NewDirectStorageOpts
closed bool closed bool
capacity func() *int64
} }
func (c *client) blobFlusherFunc() { func (c *client) blobFlusherFunc() {
@ -91,7 +107,8 @@ func (c *client) flushBlobs() {
} }
func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
return torrent{c}, nil t := torrent{c}
return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
} }
func (c *client) Close() error { func (c *client) Close() error {

View File

@ -124,7 +124,7 @@ func (me *diskFullStorage) Close() error {
} }
func (d *diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { func (d *diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
return d, nil return storage.TorrentImpl{Piece: d.Piece, Close: d.Close}, nil
} }
type pieceImpl struct { type pieceImpl struct {

View File

@ -12,6 +12,7 @@ import (
"testing/iotest" "testing/iotest"
"time" "time"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/filecache" "github.com/anacrolix/missinggo/v2/filecache"
"github.com/anacrolix/torrent" "github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/internal/testutil" "github.com/anacrolix/torrent/internal/testutil"
@ -168,7 +169,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
for _, pc := range leecherPeerConns { for _, pc := range leecherPeerConns {
completed := pc.PeerPieces().Len() completed := pc.PeerPieces().Len()
t.Logf("peer conn %v has %v completed pieces", pc, completed) t.Logf("peer conn %v has %v completed pieces", pc, completed)
if completed == leecherTorrent.Info().NumPieces() { if completed == bitmap.BitRange(leecherTorrent.Info().NumPieces()) {
foundSeeder = true foundSeeder = true
} }
} }
@ -192,7 +193,6 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
type fileCacheClientStorageFactoryParams struct { type fileCacheClientStorageFactoryParams struct {
Capacity int64 Capacity int64
SetCapacity bool SetCapacity bool
Wrapper func(*filecache.Cache) storage.ClientImplCloser
} }
func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory { func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
@ -201,10 +201,22 @@ func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) st
if err != nil { if err != nil {
panic(err) panic(err)
} }
var sharedCapacity *int64
if ps.SetCapacity { if ps.SetCapacity {
sharedCapacity = &ps.Capacity
fc.SetCapacity(ps.Capacity) fc.SetCapacity(ps.Capacity)
} }
return ps.Wrapper(fc) return struct {
storage.ClientImpl
io.Closer
}{
storage.NewResourcePiecesOpts(
fc.AsResourceProvider(),
storage.ResourcePiecesOpts{
Capacity: sharedCapacity,
}),
ioutil.NopCloser(nil),
}
} }
} }
@ -212,17 +224,13 @@ type storageFactory func(string) storage.ClientImplCloser
func TestClientTransferDefault(t *testing.T) { func TestClientTransferDefault(t *testing.T) {
testClientTransfer(t, testClientTransferParams{ testClientTransfer(t, testClientTransferParams{
LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{ LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
Wrapper: fileCachePieceResourceStorage,
}),
}) })
} }
func TestClientTransferDefaultNoMetadata(t *testing.T) { func TestClientTransferDefaultNoMetadata(t *testing.T) {
testClientTransfer(t, testClientTransferParams{ testClientTransfer(t, testClientTransferParams{
LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{ LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
Wrapper: fileCachePieceResourceStorage,
}),
LeecherStartsWithoutMetadata: true, LeecherStartsWithoutMetadata: true,
}) })
} }
@ -244,16 +252,6 @@ func TestClientTransferRateLimitedDownload(t *testing.T) {
}) })
} }
func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
return struct {
storage.ClientImpl
io.Closer
}{
storage.NewResourcePieces(fc.AsResourceProvider()),
ioutil.NopCloser(nil),
}
}
func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) { func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
testClientTransfer(t, testClientTransferParams{ testClientTransfer(t, testClientTransferParams{
LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{ LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
@ -261,7 +259,6 @@ func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int
// Going below the piece length means it can't complete a piece so // Going below the piece length means it can't complete a piece so
// that it can be hashed. // that it can be hashed.
Capacity: 5, Capacity: 5,
Wrapper: fileCachePieceResourceStorage,
}), }),
SetReadahead: setReadahead, SetReadahead: setReadahead,
// Can't readahead too far or the cache will thrash and drop data we // Can't readahead too far or the cache will thrash and drop data we
@ -324,9 +321,7 @@ func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase {
func TestClientTransferVarious(t *testing.T) { func TestClientTransferVarious(t *testing.T) {
// Leecher storage // Leecher storage
for _, ls := range []leecherStorageTestCase{ for _, ls := range []leecherStorageTestCase{
{"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{ {"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), 0},
Wrapper: fileCachePieceResourceStorage,
}), 0},
{"Boltdb", storage.NewBoltDB, 0}, {"Boltdb", storage.NewBoltDB, 0},
{"SqliteDirect", func(s string) storage.ClientImplCloser { {"SqliteDirect", func(s string) storage.ClientImplCloser {
path := filepath.Join(s, "sqlite3.db") path := filepath.Join(s, "sqlite3.db")

View File

@ -12,17 +12,12 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"sort" "sort"
"strings"
"sync" "sync"
"text/tabwriter" "text/tabwriter"
"time" "time"
"unsafe" "unsafe"
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/webseed"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
"github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2"
"github.com/anacrolix/log" "github.com/anacrolix/log"
"github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo"
@ -32,12 +27,18 @@ import (
"github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/prioritybitmap" "github.com/anacrolix/missinggo/v2/prioritybitmap"
"github.com/anacrolix/multiless"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
"github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/storage" "github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/tracker" "github.com/anacrolix/torrent/tracker"
"github.com/anacrolix/torrent/webseed"
"github.com/anacrolix/torrent/webtorrent" "github.com/anacrolix/torrent/webtorrent"
) )
@ -55,9 +56,6 @@ type Torrent struct {
dataUploadDisallowed bool dataUploadDisallowed bool
userOnWriteChunkErr func(error) userOnWriteChunkErr func(error)
// Determines what chunks to request from peers.
requestStrategy requestStrategy
closed missinggo.Event closed missinggo.Event
infoHash metainfo.Hash infoHash metainfo.Hash
pieces []Piece pieces []Piece
@ -95,7 +93,6 @@ type Torrent struct {
// Set of addrs to which we're attempting to connect. Connections are // Set of addrs to which we're attempting to connect. Connections are
// half-open until all handshakes are completed. // half-open until all handshakes are completed.
halfOpen map[string]PeerInfo halfOpen map[string]PeerInfo
fastestPeer *Peer
// Reserve of peers to connect to. A peer can be both here and in the // Reserve of peers to connect to. A peer can be both here and in the
// active connections if were told about the peer after connecting with // active connections if were told about the peer after connecting with
@ -150,6 +147,34 @@ type Torrent struct {
pex pexState pex pexState
} }
func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) {
t.iterPeers(func(peer *Peer) {
if peer.peerHasPiece(i) {
count++
}
})
return
}
func (t *Torrent) decPieceAvailability(i pieceIndex) {
if !t.haveInfo() {
return
}
p := t.piece(i)
if p.availability <= 0 {
panic(p.availability)
}
p.availability--
}
func (t *Torrent) incPieceAvailability(i pieceIndex) {
// If we don't the info, this should be reconciled when we do.
if t.haveInfo() {
p := t.piece(i)
p.availability++
}
}
func (t *Torrent) numConns() int { func (t *Torrent) numConns() int {
return len(t.conns) return len(t.conns)
} }
@ -166,15 +191,8 @@ func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap {
return t._readerReadaheadPieces return t._readerReadaheadPieces
} }
func (t *Torrent) ignorePieces() bitmap.Bitmap { func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool {
ret := t._completedPieces.Copy() return !t.wantPieceIndex(i)
ret.Union(t.piecesQueuedForHash)
for i := 0; i < t.numPieces(); i++ {
if t.piece(i).hashing {
ret.Set(i, true)
}
}
return ret
} }
func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap { func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap {
@ -417,8 +435,14 @@ func (t *Torrent) onSetInfo() {
p.onGotInfo(t.info) p.onGotInfo(t.info)
}) })
for i := range t.pieces { for i := range t.pieces {
t.updatePieceCompletion(pieceIndex(i))
p := &t.pieces[i] p := &t.pieces[i]
// Need to add availability before updating piece completion, as that may result in conns
// being dropped.
if p.availability != 0 {
panic(p.availability)
}
p.availability = int64(t.pieceAvailabilityFromPeers(i))
t.updatePieceCompletion(pieceIndex(i))
if !p.storageCompletionOk { if !p.storageCompletionOk {
// t.logger.Printf("piece %s completion unknown, queueing check", p) // t.logger.Printf("piece %s completion unknown, queueing check", p)
t.queuePieceCheck(pieceIndex(i)) t.queuePieceCheck(pieceIndex(i))
@ -536,6 +560,26 @@ func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType int, piece in
} }
} }
type pieceAvailabilityRun struct {
count pieceIndex
availability int64
}
func (me pieceAvailabilityRun) String() string {
return fmt.Sprintf("%v(%v)", me.count, me.availability)
}
func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
ret = append(ret, pieceAvailabilityRun{availability: el.(int64), count: int(count)})
})
for i := range t.pieces {
rle.Append(t.pieces[i].availability, 1)
}
rle.Flush()
return
}
func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) { func (t *Torrent) pieceStateRuns() (ret PieceStateRuns) {
rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) { rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
ret = append(ret, PieceStateRun{ ret = append(ret, PieceStateRun{
@ -606,17 +650,26 @@ func (t *Torrent) writeStatus(w io.Writer) {
} }
fmt.Fprintln(w) fmt.Fprintln(w)
} }
fmt.Fprintf(w, "Piece length: %s\n", func() string { fmt.Fprintf(w, "Piece length: %s\n",
func() string {
if t.haveInfo() { if t.haveInfo() {
return fmt.Sprint(t.usualPieceSize()) return fmt.Sprintf("%v (%v chunks)",
t.usualPieceSize(),
float64(t.usualPieceSize())/float64(t.chunkSize))
} else { } else {
return "?" return "no info"
} }
}()) }(),
)
if t.info != nil { if t.info != nil {
fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted()) fmt.Fprintf(w, "Num Pieces: %d (%d completed)\n", t.numPieces(), t.numPiecesCompleted())
fmt.Fprintf(w, "Piece States: %s", t.pieceStateRuns()) fmt.Fprintf(w, "Piece States: %s\n", t.pieceStateRuns())
fmt.Fprintln(w) fmt.Fprintf(w, "Piece availability: %v\n", strings.Join(func() (ret []string) {
for _, run := range t.pieceAvailabilityRuns() {
ret = append(ret, run.String())
}
return
}(), " "))
} }
fmt.Fprintf(w, "Reader Pieces:") fmt.Fprintf(w, "Reader Pieces:")
t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) { t.forReaderOffsetPieces(func(begin, end pieceIndex) (again bool) {
@ -651,8 +704,15 @@ func (t *Torrent) writeStatus(w io.Writer) {
spew.Fdump(w, t.statsLocked()) spew.Fdump(w, t.statsLocked())
peers := t.peersAsSlice() peers := t.peersAsSlice()
sort.Slice(peers, func(i, j int) bool { sort.Slice(peers, func(_i, _j int) bool {
return worseConn(peers[i], peers[j]) i := peers[_i]
j := peers[_j]
if less, ok := multiless.New().EagerSameLess(
i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(),
).LessOk(); ok {
return less
}
return worseConn(i, j)
}) })
for i, c := range peers { for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1) fmt.Fprintf(w, "%2d. ", i+1)
@ -700,7 +760,7 @@ func (t *Torrent) bytesMissingLocked() int64 {
} }
func (t *Torrent) bytesLeft() (left int64) { func (t *Torrent) bytesLeft() (left int64) {
bitmap.Flip(t._completedPieces, 0, bitmap.BitIndex(t.numPieces())).IterTyped(func(piece int) bool { bitmap.Flip(t._completedPieces, 0, bitmap.BitRange(t.numPieces())).IterTyped(func(piece int) bool {
p := &t.pieces[piece] p := &t.pieces[piece]
left += int64(p.length() - p.numDirtyBytes()) left += int64(p.length() - p.numDirtyBytes())
return true return true
@ -735,8 +795,8 @@ func (t *Torrent) numPieces() pieceIndex {
return pieceIndex(t.info.NumPieces()) return pieceIndex(t.info.NumPieces())
} }
func (t *Torrent) numPiecesCompleted() (num int) { func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
return t._completedPieces.Len() return pieceIndex(t._completedPieces.Len())
} }
func (t *Torrent) close() (err error) { func (t *Torrent) close() (err error) {
@ -746,7 +806,9 @@ func (t *Torrent) close() (err error) {
func() { func() {
t.storageLock.Lock() t.storageLock.Lock()
defer t.storageLock.Unlock() defer t.storageLock.Unlock()
t.storage.Close() if f := t.storage.Close; f != nil {
f()
}
}() }()
} }
t.iterPeers(func(p *Peer) { t.iterPeers(func(p *Peer) {
@ -844,7 +906,7 @@ func (t *Torrent) haveAllPieces() bool {
if !t.haveInfo() { if !t.haveInfo() {
return false return false
} }
return t._completedPieces.Len() == bitmap.BitIndex(t.numPieces()) return t._completedPieces.Len() == bitmap.BitRange(t.numPieces())
} }
func (t *Torrent) havePiece(index pieceIndex) bool { func (t *Torrent) havePiece(index pieceIndex) bool {
@ -890,12 +952,15 @@ func chunkIndex(cs ChunkSpec, chunkSize pp.Integer) int {
} }
func (t *Torrent) wantPieceIndex(index pieceIndex) bool { func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
// TODO: Are these overly conservative, should we be guarding this here?
{
if !t.haveInfo() { if !t.haveInfo() {
return false return false
} }
if index < 0 || index >= t.numPieces() { if index < 0 || index >= t.numPieces() {
return false return false
} }
}
p := &t.pieces[index] p := &t.pieces[index]
if p.queuedForHash() { if p.queuedForHash() {
return false return false
@ -906,7 +971,7 @@ func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
if t.pieceComplete(index) { if t.pieceComplete(index) {
return false return false
} }
if t._pendingPieces.Contains(bitmap.BitIndex(index)) { if t._pendingPieces.Contains(int(index)) {
return true return true
} }
// t.logger.Printf("piece %d not pending", index) // t.logger.Printf("piece %d not pending", index)
@ -965,7 +1030,7 @@ func (t *Torrent) pieceNumPendingChunks(piece pieceIndex) pp.Integer {
} }
func (t *Torrent) pieceAllDirty(piece pieceIndex) bool { func (t *Torrent) pieceAllDirty(piece pieceIndex) bool {
return t.pieces[piece]._dirtyChunks.Len() == int(t.pieceNumChunks(piece)) return t.pieces[piece]._dirtyChunks.Len() == bitmap.BitRange(t.pieceNumChunks(piece))
} }
func (t *Torrent) readersChanged() { func (t *Torrent) readersChanged() {
@ -1024,11 +1089,11 @@ func (t *Torrent) updatePiecePriority(piece pieceIndex) {
newPrio := p.uncachedPriority() newPrio := p.uncachedPriority()
// t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
if newPrio == PiecePriorityNone { if newPrio == PiecePriorityNone {
if !t._pendingPieces.Remove(bitmap.BitIndex(piece)) { if !t._pendingPieces.Remove(int(piece)) {
return return
} }
} else { } else {
if !t._pendingPieces.Set(bitmap.BitIndex(piece), newPrio.BitmapPriority()) { if !t._pendingPieces.Set(int(piece), newPrio.BitmapPriority()) {
return return
} }
} }
@ -1084,7 +1149,7 @@ func (t *Torrent) forReaderOffsetPieces(f func(begin, end pieceIndex) (more bool
} }
func (t *Torrent) piecePriority(piece pieceIndex) piecePriority { func (t *Torrent) piecePriority(piece pieceIndex) piecePriority {
prio, ok := t._pendingPieces.GetPriority(bitmap.BitIndex(piece)) prio, ok := t._pendingPieces.GetPriority(piece)
if !ok { if !ok {
return PiecePriorityNone return PiecePriorityNone
} }
@ -1232,7 +1297,7 @@ func (t *Torrent) readerPiecePriorities() (now, readahead bitmap.Bitmap) {
t.forReaderOffsetPieces(func(begin, end pieceIndex) bool { t.forReaderOffsetPieces(func(begin, end pieceIndex) bool {
if end > begin { if end > begin {
now.Add(bitmap.BitIndex(begin)) now.Add(bitmap.BitIndex(begin))
readahead.AddRange(bitmap.BitIndex(begin)+1, bitmap.BitIndex(end)) readahead.AddRange(bitmap.BitRange(begin)+1, bitmap.BitRange(end))
} }
return true return true
}) })
@ -1319,6 +1384,16 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
return return
} }
func (t *Torrent) decPeerPieceAvailability(p *Peer) {
if !t.haveInfo() {
return
}
p.newPeerPieces().IterTyped(func(i int) bool {
p.t.decPieceAvailability(i)
return true
})
}
func (t *Torrent) numActivePeers() (num int) { func (t *Torrent) numActivePeers() (num int) {
t.iterPeers(func(*Peer) { t.iterPeers(func(*Peer) {
num++ num++
@ -1905,7 +1980,7 @@ func (t *Torrent) tryCreatePieceHasher() bool {
return false return false
} }
p := t.piece(pi) p := t.piece(pi)
t.piecesQueuedForHash.Remove(pi) t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi))
p.hashing = true p.hashing = true
t.publishPieceChange(pi) t.publishPieceChange(pi)
t.updatePiecePriority(pi) t.updatePiecePriority(pi)
@ -2036,30 +2111,6 @@ func (t *Torrent) piece(i int) *Piece {
return &t.pieces[i] return &t.pieces[i]
} }
func (t *Torrent) requestStrategyTorrent() requestStrategyTorrent {
return t
}
type torrentRequestStrategyCallbacks struct {
t *Torrent
}
func (cb torrentRequestStrategyCallbacks) requestTimedOut(r Request) {
torrent.Add("Request timeouts", 1)
cb.t.cl.lock()
defer cb.t.cl.unlock()
cb.t.iterPeers(func(cn *Peer) {
if cn.peerHasPiece(pieceIndex(r.Index)) {
cn.updateRequests()
}
})
}
func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks {
return torrentRequestStrategyCallbacks{t}
}
func (t *Torrent) onWriteChunkErr(err error) { func (t *Torrent) onWriteChunkErr(err error) {
if t.userOnWriteChunkErr != nil { if t.userOnWriteChunkErr != nil {
go t.userOnWriteChunkErr(err) go t.userOnWriteChunkErr(err)
@ -2121,7 +2172,7 @@ func (t *Torrent) SetOnWriteChunkError(f func(error)) {
t.userOnWriteChunkErr = f t.userOnWriteChunkErr = f
} }
func (t *Torrent) iterPeers(f func(*Peer)) { func (t *Torrent) iterPeers(f func(p *Peer)) {
for pc := range t.conns { for pc := range t.conns {
f(&pc.Peer) f(&pc.Peer)
} }
@ -2154,7 +2205,6 @@ func (t *Torrent) addWebSeed(url string) {
outgoing: true, outgoing: true,
Network: "http", Network: "http",
reconciledHandshakeStats: true, reconciledHandshakeStats: true,
peerSentHaveAll: true,
// TODO: Raise this limit, and instead limit concurrent fetches. // TODO: Raise this limit, and instead limit concurrent fetches.
PeerMaxRequests: 32, PeerMaxRequests: 32,
RemoteAddr: remoteAddrFromUrl(url), RemoteAddr: remoteAddrFromUrl(url),
@ -2180,6 +2230,7 @@ func (t *Torrent) addWebSeed(url string) {
ws.onGotInfo(t.info) ws.onGotInfo(t.info)
} }
t.webSeeds[url] = &ws.peer t.webSeeds[url] = &ws.peer
ws.peer.onPeerHasAllPieces()
} }
func (t *Torrent) peerIsActive(p *Peer) (active bool) { func (t *Torrent) peerIsActive(p *Peer) (active bool) {

View File

@ -9,6 +9,7 @@ import (
"testing" "testing"
"github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/bradfitz/iter" "github.com/bradfitz/iter"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -96,8 +97,8 @@ func BenchmarkUpdatePiecePriorities(b *testing.B) {
r.Seek(3500000, io.SeekStart) r.Seek(3500000, io.SeekStart)
} }
assert.Len(b, t.readers, 7) assert.Len(b, t.readers, 7)
for i := 0; i < int(t.numPieces()); i += 3 { for i := 0; i < t.numPieces(); i += 3 {
t._completedPieces.Set(i, true) t._completedPieces.Set(bitmap.BitIndex(i), true)
} }
t.DownloadPieces(0, t.numPieces()) t.DownloadPieces(0, t.numPieces())
for range iter.N(b.N) { for range iter.N(b.N) {

52
types/types.go Normal file
View File

@ -0,0 +1,52 @@
package types
import (
pp "github.com/anacrolix/torrent/peer_protocol"
)
type PieceIndex = int
type ChunkSpec struct {
Begin, Length pp.Integer
}
type Request struct {
Index pp.Integer
ChunkSpec
}
func (r Request) ToMsg(mt pp.MessageType) pp.Message {
return pp.Message{
Type: mt,
Index: r.Index,
Begin: r.Begin,
Length: r.Length,
}
}
// Describes the importance of obtaining a particular piece.
type PiecePriority byte
func (pp *PiecePriority) Raise(maybe PiecePriority) bool {
if maybe > *pp {
*pp = maybe
return true
}
return false
}
// Priority for use in PriorityBitmap
func (me PiecePriority) BitmapPriority() int {
return -int(me)
}
const (
PiecePriorityNone PiecePriority = iota // Not wanted. Must be the zero value.
PiecePriorityNormal // Wanted.
PiecePriorityHigh // Wanted a lot.
PiecePriorityReadahead // May be required soon.
// Succeeds a piece where a read occurred. Currently the same as Now,
// apparently due to issues with caching.
PiecePriorityNext
PiecePriorityNow // A Reader is reading in this piece. Highest urgency.
)

View File

@ -24,6 +24,10 @@ type webseedPeer struct {
var _ peerImpl = (*webseedPeer)(nil) var _ peerImpl = (*webseedPeer)(nil)
func (me *webseedPeer) writeBufferFull() bool {
return false
}
func (me *webseedPeer) connStatusString() string { func (me *webseedPeer) connStatusString() string {
return me.client.Url return me.client.Url
} }
@ -37,20 +41,15 @@ func (ws *webseedPeer) onGotInfo(info *metainfo.Info) {
ws.client.Info = info ws.client.Info = info
} }
func (ws *webseedPeer) _postCancel(r Request) {
ws.cancel(r)
}
func (ws *webseedPeer) writeInterested(interested bool) bool { func (ws *webseedPeer) writeInterested(interested bool) bool {
return true return true
} }
func (ws *webseedPeer) cancel(r Request) bool { func (ws *webseedPeer) _cancel(r Request) bool {
active, ok := ws.activeRequests[r] active, ok := ws.activeRequests[r]
if !ok { if ok {
return false
}
active.Cancel() active.Cancel()
}
return true return true
} }
@ -58,7 +57,7 @@ func (ws *webseedPeer) intoSpec(r Request) webseed.RequestSpec {
return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)} return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
} }
func (ws *webseedPeer) request(r Request) bool { func (ws *webseedPeer) _request(r Request) bool {
ws.requesterCond.Signal() ws.requesterCond.Signal()
return true return true
} }
@ -79,7 +78,7 @@ func (ws *webseedPeer) requester() {
defer ws.requesterCond.L.Unlock() defer ws.requesterCond.L.Unlock()
start: start:
for !ws.peer.closed.IsSet() { for !ws.peer.closed.IsSet() {
for r := range ws.peer.requests { for r := range ws.peer.actualRequestState.Requests {
if _, ok := ws.activeRequests[r]; ok { if _, ok := ws.activeRequests[r]; ok {
continue continue
} }
@ -99,7 +98,6 @@ func (ws *webseedPeer) connectionFlags() string {
func (ws *webseedPeer) drop() {} func (ws *webseedPeer) drop() {}
func (ws *webseedPeer) updateRequests() { func (ws *webseedPeer) updateRequests() {
ws.peer.doRequestState()
} }
func (ws *webseedPeer) onClose() { func (ws *webseedPeer) onClose() {
@ -112,6 +110,10 @@ func (ws *webseedPeer) onClose() {
func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) { func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Request) {
result := <-webseedRequest.Result result := <-webseedRequest.Result
// We do this here rather than inside receiveChunk, since we want to count errors too. I'm not
// sure if we can divine which errors indicate cancellation on our end without hitting the
// network though.
ws.peer.doChunkReadStats(int64(len(result.Bytes)))
ws.peer.t.cl.lock() ws.peer.t.cl.lock()
defer ws.peer.t.cl.unlock() defer ws.peer.t.cl.unlock()
if result.Err != nil { if result.Err != nil {
@ -146,3 +148,7 @@ func (ws *webseedPeer) requestResultHandler(r Request, webseedRequest webseed.Re
} }
} }
} }
func (me *webseedPeer) onNextRequestStateChanged() {
me.peer.applyNextRequestState()
}