Add connection trust flag, and more tests with small caches

Thanks to observations and feedback from @ccampbell.
This commit is contained in:
Matt Joiner 2019-12-18 13:52:00 +11:00
parent 9e1804f52e
commit 5f1d937b62
6 changed files with 62 additions and 23 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/peer_protocol"
) )
// Peer connection info, handed about publicly.
type Peer struct { type Peer struct {
Id [20]byte Id [20]byte
IP net.IP IP net.IP
@ -16,6 +17,8 @@ type Peer struct {
// Peer is known to support encryption. // Peer is known to support encryption.
SupportsEncryption bool SupportsEncryption bool
peer_protocol.PexPeerFlags peer_protocol.PexPeerFlags
// Whether we can ignore poor or bad behaviour from the peer.
Trusted bool
} }
func (me *Peer) FromPex(na krpc.NodeAddr, fs peer_protocol.PexPeerFlags) { func (me *Peer) FromPex(na krpc.NodeAddr, fs peer_protocol.PexPeerFlags) {

View File

@ -19,12 +19,12 @@ import (
"github.com/anacrolix/dht/v2" "github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/krpc" "github.com/anacrolix/dht/v2/krpc"
"github.com/anacrolix/log" "github.com/anacrolix/log"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/bitmap" "github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/pproffd" "github.com/anacrolix/missinggo/pproffd"
"github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/conntrack" "github.com/anacrolix/missinggo/v2/conntrack"
"github.com/anacrolix/sync" "github.com/anacrolix/sync"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
@ -690,7 +690,7 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr IpPort) (c *connection,
// Called to dial out and run a connection. The addr we're given is already // Called to dial out and run a connection. The addr we're given is already
// considered half-open. // considered half-open.
func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) { func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource, trusted bool) {
cl.dialRateLimiter.Wait(context.Background()) cl.dialRateLimiter.Wait(context.Background())
c, err := cl.establishOutgoingConn(t, addr) c, err := cl.establishOutgoingConn(t, addr)
cl.lock() cl.lock()
@ -706,6 +706,7 @@ func (cl *Client) outgoingConnection(t *Torrent, addr IpPort, ps peerSource) {
} }
defer c.Close() defer c.Close()
c.Discovery = ps c.Discovery = ps
c.trusted = trusted
cl.runHandshookConn(c, t) cl.runHandshookConn(c, t)
} }
@ -1212,6 +1213,7 @@ func (cl *Client) AddDHTNodes(nodes []string) {
} }
func (cl *Client) banPeerIP(ip net.IP) { func (cl *Client) banPeerIP(ip net.IP) {
cl.logger.Printf("banning ip %v", ip)
if cl.badPeerIPs == nil { if cl.badPeerIPs == nil {
cl.badPeerIPs = make(map[string]struct{}) cl.badPeerIPs = make(map[string]struct{})
} }

View File

@ -228,7 +228,7 @@ func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
return storage.NewResourcePieces(fc.AsResourceProvider()) return storage.NewResourcePieces(fc.AsResourceProvider())
} }
func TestClientTransferSmallCache(t *testing.T) { func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
testClientTransfer(t, testClientTransferParams{ testClientTransfer(t, testClientTransferParams{
LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{ LeecherStorage: NewFileCacheClientStorageFactory(FileCacheClientStorageFactoryParams{
SetCapacity: true, SetCapacity: true,
@ -237,14 +237,26 @@ func TestClientTransferSmallCache(t *testing.T) {
Capacity: 5, Capacity: 5,
Wrapper: fileCachePieceResourceStorage, Wrapper: fileCachePieceResourceStorage,
}), }),
SetReadahead: true, SetReadahead: setReadahead,
// Can't readahead too far or the cache will thrash and drop data we // Can't readahead too far or the cache will thrash and drop data we
// thought we had. // thought we had.
Readahead: 0, Readahead: readahead,
ExportClientStatus: true, ExportClientStatus: true,
}) })
} }
func TestClientTransferSmallCachePieceSizedReadahead(t *testing.T) {
testClientTransferSmallCache(t, true, 5)
}
func TestClientTransferSmallCacheLargeReadahead(t *testing.T) {
testClientTransferSmallCache(t, true, 15)
}
func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
testClientTransferSmallCache(t, false, -1)
}
func TestClientTransferVarious(t *testing.T) { func TestClientTransferVarious(t *testing.T) {
// Leecher storage // Leecher storage
for _, ls := range []storageFactory{ for _, ls := range []storageFactory{
@ -342,6 +354,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
} }
cfg.Seed = false cfg.Seed = false
//cfg.Debug = true
leecher, err := NewClient(cfg) leecher, err := NewClient(cfg)
require.NoError(t, err) require.NoError(t, err)
defer leecher.Close() defer leecher.Close()

View File

@ -55,6 +55,7 @@ type connection struct {
headerEncrypted bool headerEncrypted bool
cryptoMethod mse.CryptoMethod cryptoMethod mse.CryptoMethod
Discovery peerSource Discovery peerSource
trusted bool
closed missinggo.Event closed missinggo.Event
// Set true after we've added our ConnStats generated during handshake to // Set true after we've added our ConnStats generated during handshake to
// other ConnStat instances as determined when the *Torrent became known. // other ConnStat instances as determined when the *Torrent became known.
@ -1568,3 +1569,19 @@ func (c *connection) remoteIpPort() IpPort {
func (c *connection) String() string { func (c *connection) String() string {
return fmt.Sprintf("connection %p", c) return fmt.Sprintf("connection %p", c)
} }
func (c *connection) trust() connectionTrust {
return connectionTrust{c.trusted, c.netGoodPiecesDirtied()}
}
type connectionTrust struct {
Implicit bool
NetGoodPiecesDirted int64
}
func (l connectionTrust) Less(r connectionTrust) bool {
var ml missinggo.MultiLess
ml.NextBool(!l.Implicit, !r.Implicit)
ml.StrictNext(l.NetGoodPiecesDirted == r.NetGoodPiecesDirted, l.NetGoodPiecesDirted < r.NetGoodPiecesDirted)
return ml.Less()
}

View File

@ -4,7 +4,7 @@ import (
"errors" "errors"
"net" "net"
"github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/v2"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/metainfo"
@ -107,7 +107,7 @@ func chunkIndexSpec(index pp.Integer, pieceLength, chunkSize pp.Integer) chunkSp
} }
func connLessTrusted(l, r *connection) bool { func connLessTrusted(l, r *connection) bool {
return l.netGoodPiecesDirtied() < r.netGoodPiecesDirtied() return l.trust().Less(r.trust())
} }
func connIsIpv6(nc interface { func connIsIpv6(nc interface {

View File

@ -1548,8 +1548,7 @@ func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
} }
} else { } else {
if len(touchers) != 0 { if len(touchers) != 0 {
// Don't increment stats above connection-level for every involved // Don't increment stats above connection-level for every involved connection.
// connection.
t.allStats((*ConnStats).incrementPiecesDirtiedBad) t.allStats((*ConnStats).incrementPiecesDirtiedBad)
for _, c := range touchers { for _, c := range touchers {
// Y u do dis peer?! // Y u do dis peer?!
@ -1557,16 +1556,20 @@ func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
} }
slices.Sort(touchers, connLessTrusted) slices.Sort(touchers, connLessTrusted)
if t.cl.config.Debug { if t.cl.config.Debug {
t.logger.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) { t.logger.Printf("conns by trust for piece %d: %v",
for _, c := range touchers { piece,
ret = append(ret, c.netGoodPiecesDirtied()) func() (ret []connectionTrust) {
} for _, c := range touchers {
return ret = append(ret, c.trust())
}()) }
return
}())
} }
c := touchers[0] c := touchers[0]
t.cl.banPeerIP(c.remoteAddr.IP) if !c.trust().Implicit {
c.Drop() t.cl.banPeerIP(c.remoteAddr.IP)
c.Drop()
}
} }
t.onIncompletePiece(piece) t.onIncompletePiece(piece)
p.Storage().MarkNotComplete() p.Storage().MarkNotComplete()
@ -1702,13 +1705,12 @@ func (t *Torrent) VerifyData() {
} }
} }
// Start the process of connecting to the given peer for the given torrent if // Start the process of connecting to the given peer for the given torrent if appropriate.
// appropriate.
func (t *Torrent) initiateConn(peer Peer) { func (t *Torrent) initiateConn(peer Peer) {
if peer.Id == t.cl.peerID { if peer.Id == t.cl.peerID {
return return
} }
if t.cl.badPeerIPPort(peer.IP, peer.Port) { if t.cl.badPeerIPPort(peer.IP, peer.Port) && !peer.Trusted {
return return
} }
addr := IpPort{peer.IP, uint16(peer.Port)} addr := IpPort{peer.IP, uint16(peer.Port)}
@ -1716,15 +1718,17 @@ func (t *Torrent) initiateConn(peer Peer) {
return return
} }
t.halfOpen[addr.String()] = peer t.halfOpen[addr.String()] = peer
go t.cl.outgoingConnection(t, addr, peer.Source) go t.cl.outgoingConnection(t, addr, peer.Source, peer.Trusted)
} }
// Adds each a trusted, pending peer for each of the Client's addresses.
func (t *Torrent) AddClientPeer(cl *Client) { func (t *Torrent) AddClientPeer(cl *Client) {
t.AddPeers(func() (ps []Peer) { t.AddPeers(func() (ps []Peer) {
for _, la := range cl.ListenAddrs() { for _, la := range cl.ListenAddrs() {
ps = append(ps, Peer{ ps = append(ps, Peer{
IP: missinggo.AddrIP(la), IP: missinggo.AddrIP(la),
Port: missinggo.AddrPort(la), Port: missinggo.AddrPort(la),
Trusted: true,
}) })
} }
return return