Law of Demeter Client.mu
This commit is contained in:
parent
bf5552ae3c
commit
6dd3b9c12c
127
client.go
127
client.go
|
@ -42,7 +42,8 @@ type Client struct {
|
||||||
// An aggregate of stats over all connections. First in struct to ensure
|
// An aggregate of stats over all connections. First in struct to ensure
|
||||||
// 64-bit alignment of fields. See #262.
|
// 64-bit alignment of fields. See #262.
|
||||||
stats ConnStats
|
stats ConnStats
|
||||||
mu sync.RWMutex
|
|
||||||
|
_mu sync.RWMutex
|
||||||
event sync.Cond
|
event sync.Cond
|
||||||
closed missinggo.Event
|
closed missinggo.Event
|
||||||
|
|
||||||
|
@ -71,8 +72,8 @@ type Client struct {
|
||||||
type ipStr string
|
type ipStr string
|
||||||
|
|
||||||
func (cl *Client) BadPeerIPs() []string {
|
func (cl *Client) BadPeerIPs() []string {
|
||||||
cl.mu.RLock()
|
cl.rLock()
|
||||||
defer cl.mu.RUnlock()
|
defer cl.rUnlock()
|
||||||
return cl.badPeerIPsLocked()
|
return cl.badPeerIPsLocked()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,8 +118,8 @@ func writeDhtServerStatus(w io.Writer, s *dht.Server) {
|
||||||
// Writes out a human readable status of the client, such as for writing to a
|
// Writes out a human readable status of the client, such as for writing to a
|
||||||
// HTTP status page.
|
// HTTP status page.
|
||||||
func (cl *Client) WriteStatus(_w io.Writer) {
|
func (cl *Client) WriteStatus(_w io.Writer) {
|
||||||
cl.mu.RLock()
|
cl.rLock()
|
||||||
defer cl.mu.RUnlock()
|
defer cl.rUnlock()
|
||||||
w := bufio.NewWriter(_w)
|
w := bufio.NewWriter(_w)
|
||||||
defer w.Flush()
|
defer w.Flush()
|
||||||
fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
|
fmt.Fprintf(w, "Listen port: %d\n", cl.LocalPort())
|
||||||
|
@ -193,7 +194,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
|
||||||
cl.Close()
|
cl.Close()
|
||||||
}()
|
}()
|
||||||
cl.extensionBytes = defaultPeerExtensionBytes()
|
cl.extensionBytes = defaultPeerExtensionBytes()
|
||||||
cl.event.L = &cl.mu
|
cl.event.L = cl.locker()
|
||||||
storageImpl := cfg.DefaultStorage
|
storageImpl := cfg.DefaultStorage
|
||||||
if storageImpl == nil {
|
if storageImpl == nil {
|
||||||
// We'd use mmap but HFS+ doesn't support sparse files.
|
// We'd use mmap but HFS+ doesn't support sparse files.
|
||||||
|
@ -291,8 +292,8 @@ func firstNonEmptyString(ss ...string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cl *Client) Closed() <-chan struct{} {
|
func (cl *Client) Closed() <-chan struct{} {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
return cl.closed.C()
|
return cl.closed.C()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -313,8 +314,8 @@ func (cl *Client) closeSockets() {
|
||||||
// Stops the client. All connections to peers are closed and all activity will
|
// Stops the client. All connections to peers are closed and all activity will
|
||||||
// come to a halt.
|
// come to a halt.
|
||||||
func (cl *Client) Close() {
|
func (cl *Client) Close() {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
cl.closed.Set()
|
cl.closed.Set()
|
||||||
cl.eachDhtServer(func(s *dht.Server) { s.Close() })
|
cl.eachDhtServer(func(s *dht.Server) { s.Close() })
|
||||||
cl.closeSockets()
|
cl.closeSockets()
|
||||||
|
@ -375,13 +376,13 @@ func (cl *Client) acceptConnections(l net.Listener) {
|
||||||
for {
|
for {
|
||||||
conn, err := l.Accept()
|
conn, err := l.Accept()
|
||||||
conn = pproffd.WrapNetConn(conn)
|
conn = pproffd.WrapNetConn(conn)
|
||||||
cl.mu.RLock()
|
cl.rLock()
|
||||||
closed := cl.closed.IsSet()
|
closed := cl.closed.IsSet()
|
||||||
reject := false
|
reject := false
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
reject = cl.rejectAccepted(conn)
|
reject = cl.rejectAccepted(conn)
|
||||||
}
|
}
|
||||||
cl.mu.RUnlock()
|
cl.rUnlock()
|
||||||
if closed {
|
if closed {
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
|
@ -418,8 +419,8 @@ func (cl *Client) incomingConnection(nc net.Conn) {
|
||||||
|
|
||||||
// Returns a handle to the given torrent, if it's present in the client.
|
// Returns a handle to the given torrent, if it's present in the client.
|
||||||
func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
|
func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
t, ok = cl.torrents[ih]
|
t, ok = cl.torrents[ih]
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -513,8 +514,8 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) net.Conn {
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
func() {
|
func() {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
cl.eachListener(func(s socket) bool {
|
cl.eachListener(func(s socket) bool {
|
||||||
if peerNetworkEnabled(s.Addr().Network(), cl.config) {
|
if peerNetworkEnabled(s.Addr().Network(), cl.config) {
|
||||||
dial(s.dial)
|
dial(s.dial)
|
||||||
|
@ -594,8 +595,8 @@ func (cl *Client) establishOutgoingConnEx(t *Torrent, addr string, ctx context.C
|
||||||
// for valid reasons.
|
// for valid reasons.
|
||||||
func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
|
func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
|
ctx, cancel := context.WithTimeout(context.Background(), func() time.Duration {
|
||||||
cl.mu.RLock()
|
cl.rLock()
|
||||||
defer cl.mu.RUnlock()
|
defer cl.rUnlock()
|
||||||
return t.dialTimeout()
|
return t.dialTimeout()
|
||||||
}())
|
}())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -628,8 +629,8 @@ func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection,
|
||||||
// considered half-open.
|
// considered half-open.
|
||||||
func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
|
func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
|
||||||
c, err := cl.establishOutgoingConn(t, addr)
|
c, err := cl.establishOutgoingConn(t, addr)
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
// Don't release lock between here and addConnection, unless it's for
|
// Don't release lock between here and addConnection, unless it's for
|
||||||
// failure.
|
// failure.
|
||||||
cl.noLongerHalfOpen(t, addr)
|
cl.noLongerHalfOpen(t, addr)
|
||||||
|
@ -688,8 +689,8 @@ func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err er
|
||||||
|
|
||||||
// Calls f with any secret keys.
|
// Calls f with any secret keys.
|
||||||
func (cl *Client) forSkeys(f func([]byte) bool) {
|
func (cl *Client) forSkeys(f func([]byte) bool) {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
for ih := range cl.torrents {
|
for ih := range cl.torrents {
|
||||||
if !f(ih[:]) {
|
if !f(ih[:]) {
|
||||||
break
|
break
|
||||||
|
@ -721,9 +722,9 @@ func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
t = cl.torrents[ih]
|
t = cl.torrents[ih]
|
||||||
cl.mu.Unlock()
|
cl.unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -755,21 +756,21 @@ func (cl *Client) runReceivedConn(c *connection) {
|
||||||
"network", c.remoteAddr().Network(),
|
"network", c.remoteAddr().Network(),
|
||||||
).Log(cl.logger)
|
).Log(cl.logger)
|
||||||
torrent.Add("error receiving handshake", 1)
|
torrent.Add("error receiving handshake", 1)
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
cl.onBadAccept(c.remoteAddr())
|
cl.onBadAccept(c.remoteAddr())
|
||||||
cl.mu.Unlock()
|
cl.unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if t == nil {
|
if t == nil {
|
||||||
torrent.Add("received handshake for unloaded torrent", 1)
|
torrent.Add("received handshake for unloaded torrent", 1)
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
cl.onBadAccept(c.remoteAddr())
|
cl.onBadAccept(c.remoteAddr())
|
||||||
cl.mu.Unlock()
|
cl.unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
torrent.Add("received handshake for loaded torrent", 1)
|
torrent.Add("received handshake for loaded torrent", 1)
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
cl.runHandshookConn(c, t)
|
cl.runHandshookConn(c, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -958,7 +959,7 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
|
||||||
networkingEnabled: true,
|
networkingEnabled: true,
|
||||||
requestStrategy: 3,
|
requestStrategy: 3,
|
||||||
metadataChanged: sync.Cond{
|
metadataChanged: sync.Cond{
|
||||||
L: &cl.mu,
|
L: cl.locker(),
|
||||||
},
|
},
|
||||||
duplicateRequestTimeout: 1 * time.Second,
|
duplicateRequestTimeout: 1 * time.Second,
|
||||||
}
|
}
|
||||||
|
@ -983,8 +984,8 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
|
||||||
// If the torrent already exists then this Storage is ignored and the
|
// If the torrent already exists then this Storage is ignored and the
|
||||||
// existing torrent returned with `new` set to `false`
|
// existing torrent returned with `new` set to `false`
|
||||||
func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
|
func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
t, ok := cl.torrents[infoHash]
|
t, ok := cl.torrents[infoHash]
|
||||||
if ok {
|
if ok {
|
||||||
return
|
return
|
||||||
|
@ -1020,8 +1021,8 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err e
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
if spec.ChunkSize != 0 {
|
if spec.ChunkSize != 0 {
|
||||||
t.setChunkSize(pp.Integer(spec.ChunkSize))
|
t.setChunkSize(pp.Integer(spec.ChunkSize))
|
||||||
}
|
}
|
||||||
|
@ -1059,8 +1060,8 @@ func (cl *Client) allTorrentsCompleted() bool {
|
||||||
// Returns true when all torrents are completely downloaded and false if the
|
// Returns true when all torrents are completely downloaded and false if the
|
||||||
// client is stopped before that.
|
// client is stopped before that.
|
||||||
func (cl *Client) WaitAll() bool {
|
func (cl *Client) WaitAll() bool {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
for !cl.allTorrentsCompleted() {
|
for !cl.allTorrentsCompleted() {
|
||||||
if cl.closed.IsSet() {
|
if cl.closed.IsSet() {
|
||||||
return false
|
return false
|
||||||
|
@ -1072,8 +1073,8 @@ func (cl *Client) WaitAll() bool {
|
||||||
|
|
||||||
// Returns handles to all the torrents loaded in the Client.
|
// Returns handles to all the torrents loaded in the Client.
|
||||||
func (cl *Client) Torrents() []*Torrent {
|
func (cl *Client) Torrents() []*Torrent {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
return cl.torrentsAsSlice()
|
return cl.torrentsAsSlice()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1149,7 +1150,7 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
|
||||||
PeerMaxRequests: 250,
|
PeerMaxRequests: 250,
|
||||||
writeBuffer: new(bytes.Buffer),
|
writeBuffer: new(bytes.Buffer),
|
||||||
}
|
}
|
||||||
c.writerCond.L = &cl.mu
|
c.writerCond.L = cl.locker()
|
||||||
c.setRW(connStatsReadWriter{nc, c})
|
c.setRW(connStatsReadWriter{nc, c})
|
||||||
c.r = &rateLimitedReader{
|
c.r = &rateLimitedReader{
|
||||||
l: cl.config.DownloadRateLimiter,
|
l: cl.config.DownloadRateLimiter,
|
||||||
|
@ -1159,8 +1160,8 @@ func (cl *Client) newConnection(nc net.Conn, outgoing bool) (c *connection) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
|
func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
t := cl.torrent(ih)
|
t := cl.torrent(ih)
|
||||||
if t == nil {
|
if t == nil {
|
||||||
return
|
return
|
||||||
|
@ -1224,8 +1225,8 @@ func (cl *Client) publicAddr(peer net.IP) ipPort {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cl *Client) ListenAddrs() (ret []net.Addr) {
|
func (cl *Client) ListenAddrs() (ret []net.Addr) {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
cl.eachListener(func(l socket) bool {
|
cl.eachListener(func(l socket) bool {
|
||||||
ret = append(ret, l.Addr())
|
ret = append(ret, l.Addr())
|
||||||
return true
|
return true
|
||||||
|
@ -1255,12 +1256,12 @@ func (cl *Client) clearAcceptLimits() {
|
||||||
func (cl *Client) acceptLimitClearer() {
|
func (cl *Client) acceptLimitClearer() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-cl.closed.LockedChan(&cl.mu):
|
case <-cl.closed.LockedChan(cl.locker()):
|
||||||
return
|
return
|
||||||
case <-time.After(15 * time.Minute):
|
case <-time.After(15 * time.Minute):
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
cl.clearAcceptLimits()
|
cl.clearAcceptLimits()
|
||||||
cl.mu.Unlock()
|
cl.unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1271,3 +1272,35 @@ func (cl *Client) rateLimitAccept(ip net.IP) bool {
|
||||||
}
|
}
|
||||||
return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
|
return cl.acceptLimiter[ipStr(maskIpForAcceptLimiting(ip).String())] > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cl *Client) rLock() {
|
||||||
|
cl._mu.RLock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl *Client) rUnlock() {
|
||||||
|
cl._mu.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl *Client) lock() {
|
||||||
|
cl._mu.Lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl *Client) unlock() {
|
||||||
|
cl._mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl *Client) locker() sync.Locker {
|
||||||
|
return clientLocker{cl}
|
||||||
|
}
|
||||||
|
|
||||||
|
type clientLocker struct {
|
||||||
|
*Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl clientLocker) Lock() {
|
||||||
|
cl.lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cl clientLocker) Unlock() {
|
||||||
|
cl.unlock()
|
||||||
|
}
|
||||||
|
|
|
@ -112,15 +112,15 @@ func TestTorrentInitialState(t *testing.T) {
|
||||||
storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
|
storage.NewFileWithCompletion(tempDir(), storage.NewMapPieceCompletion()),
|
||||||
)
|
)
|
||||||
tor.setChunkSize(2)
|
tor.setChunkSize(2)
|
||||||
tor.cl.mu.Lock()
|
tor.cl.lock()
|
||||||
err := tor.setInfoBytes(mi.InfoBytes)
|
err := tor.setInfoBytes(mi.InfoBytes)
|
||||||
tor.cl.mu.Unlock()
|
tor.cl.unlock()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, tor.pieces, 3)
|
require.Len(t, tor.pieces, 3)
|
||||||
tor.pendAllChunkSpecs(0)
|
tor.pendAllChunkSpecs(0)
|
||||||
tor.cl.mu.Lock()
|
tor.cl.lock()
|
||||||
assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
|
assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
|
||||||
tor.cl.mu.Unlock()
|
tor.cl.unlock()
|
||||||
assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
|
assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -782,12 +782,12 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
|
||||||
psc := leecherGreeting.SubscribePieceStateChanges()
|
psc := leecherGreeting.SubscribePieceStateChanges()
|
||||||
defer psc.Close()
|
defer psc.Close()
|
||||||
|
|
||||||
leecherGreeting.cl.mu.Lock()
|
leecherGreeting.cl.lock()
|
||||||
leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
|
leecherGreeting.downloadPiecesLocked(0, leecherGreeting.numPieces())
|
||||||
if ps.Cancel {
|
if ps.Cancel {
|
||||||
leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
|
leecherGreeting.cancelPiecesLocked(0, leecherGreeting.NumPieces())
|
||||||
}
|
}
|
||||||
leecherGreeting.cl.mu.Unlock()
|
leecherGreeting.cl.unlock()
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
defer close(done)
|
defer close(done)
|
||||||
go leecherGreeting.AddClientPeer(seeder)
|
go leecherGreeting.AddClientPeer(seeder)
|
||||||
|
@ -909,9 +909,9 @@ func TestClientDynamicListenPortNoProtocols(t *testing.T) {
|
||||||
|
|
||||||
func totalConns(tts []*Torrent) (ret int) {
|
func totalConns(tts []*Torrent) (ret int) {
|
||||||
for _, tt := range tts {
|
for _, tt := range tts {
|
||||||
tt.cl.mu.Lock()
|
tt.cl.lock()
|
||||||
ret += len(tt.conns)
|
ret += len(tt.conns)
|
||||||
tt.cl.mu.Unlock()
|
tt.cl.unlock()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,7 +177,7 @@ func (cn *connection) peerHasAllPieces() (all bool, known bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cn *connection) mu() sync.Locker {
|
func (cn *connection) mu() sync.Locker {
|
||||||
return &cn.t.cl.mu
|
return cn.t.cl.locker()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cn *connection) remoteAddr() net.Addr {
|
func (cn *connection) remoteAddr() net.Addr {
|
||||||
|
@ -1084,8 +1084,8 @@ func (c *connection) mainReadLoop() (err error) {
|
||||||
for {
|
for {
|
||||||
var msg pp.Message
|
var msg pp.Message
|
||||||
func() {
|
func() {
|
||||||
cl.mu.Unlock()
|
cl.unlock()
|
||||||
defer cl.mu.Lock()
|
defer cl.lock()
|
||||||
err = decoder.Decode(&msg)
|
err = decoder.Decode(&msg)
|
||||||
}()
|
}()
|
||||||
if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
|
if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
|
||||||
|
@ -1324,8 +1324,8 @@ func (c *connection) receiveChunk(msg *pp.Message) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := func() error {
|
err := func() error {
|
||||||
cl.mu.Unlock()
|
cl.unlock()
|
||||||
defer cl.mu.Lock()
|
defer cl.lock()
|
||||||
// Write the chunk out. Note that the upper bound on chunk writing
|
// Write the chunk out. Note that the upper bound on chunk writing
|
||||||
// concurrency will be the number of connections. We write inline with
|
// concurrency will be the number of connections. We write inline with
|
||||||
// receiving the chunk (with this lock dance), because we want to
|
// receiving the chunk (with this lock dance), because we want to
|
||||||
|
|
|
@ -114,7 +114,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
|
||||||
Piece: make([]byte, defaultChunkSize),
|
Piece: make([]byte, defaultChunkSize),
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
err := cn.mainReadLoop()
|
err := cn.mainReadLoop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mrlErr <- err
|
mrlErr <- err
|
||||||
|
@ -127,12 +127,12 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
ts.writeSem.Lock()
|
ts.writeSem.Lock()
|
||||||
for range iter.N(b.N) {
|
for range iter.N(b.N) {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
// The chunk must be written to storage everytime, to ensure the
|
// The chunk must be written to storage everytime, to ensure the
|
||||||
// writeSem is unlocked.
|
// writeSem is unlocked.
|
||||||
t.pieces[0].dirtyChunks.Clear()
|
t.pieces[0].dirtyChunks.Clear()
|
||||||
cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): struct{}{}}
|
cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): struct{}{}}
|
||||||
cl.mu.Unlock()
|
cl.unlock()
|
||||||
n, err := w.Write(wb)
|
n, err := w.Write(wb)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
require.EqualValues(b, len(wb), n)
|
require.EqualValues(b, len(wb), n)
|
||||||
|
|
14
file.go
14
file.go
|
@ -59,8 +59,8 @@ type FilePieceState struct {
|
||||||
|
|
||||||
// Returns the state of pieces in this file.
|
// Returns the state of pieces in this file.
|
||||||
func (f *File) State() (ret []FilePieceState) {
|
func (f *File) State() (ret []FilePieceState) {
|
||||||
f.t.cl.mu.RLock()
|
f.t.cl.rLock()
|
||||||
defer f.t.cl.mu.RUnlock()
|
defer f.t.cl.rUnlock()
|
||||||
pieceSize := int64(f.t.usualPieceSize())
|
pieceSize := int64(f.t.usualPieceSize())
|
||||||
off := f.offset % pieceSize
|
off := f.offset % pieceSize
|
||||||
remaining := f.length
|
remaining := f.length
|
||||||
|
@ -102,7 +102,7 @@ func (f *File) Cancel() {
|
||||||
|
|
||||||
func (f *File) NewReader() Reader {
|
func (f *File) NewReader() Reader {
|
||||||
tr := reader{
|
tr := reader{
|
||||||
mu: &f.t.cl.mu,
|
mu: f.t.cl.locker(),
|
||||||
t: f.t,
|
t: f.t,
|
||||||
readahead: 5 * 1024 * 1024,
|
readahead: 5 * 1024 * 1024,
|
||||||
offset: f.Offset(),
|
offset: f.Offset(),
|
||||||
|
@ -114,8 +114,8 @@ func (f *File) NewReader() Reader {
|
||||||
|
|
||||||
// Sets the minimum priority for pieces in the File.
|
// Sets the minimum priority for pieces in the File.
|
||||||
func (f *File) SetPriority(prio piecePriority) {
|
func (f *File) SetPriority(prio piecePriority) {
|
||||||
f.t.cl.mu.Lock()
|
f.t.cl.lock()
|
||||||
defer f.t.cl.mu.Unlock()
|
defer f.t.cl.unlock()
|
||||||
if prio == f.prio {
|
if prio == f.prio {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -125,8 +125,8 @@ func (f *File) SetPriority(prio piecePriority) {
|
||||||
|
|
||||||
// Returns the priority per File.SetPriority.
|
// Returns the priority per File.SetPriority.
|
||||||
func (f *File) Priority() piecePriority {
|
func (f *File) Priority() piecePriority {
|
||||||
f.t.cl.mu.Lock()
|
f.t.cl.lock()
|
||||||
defer f.t.cl.mu.Unlock()
|
defer f.t.cl.unlock()
|
||||||
return f.prio
|
return f.prio
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
8
piece.go
8
piece.go
|
@ -180,8 +180,8 @@ func (p *Piece) bytesLeft() (ret pp.Integer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Piece) VerifyData() {
|
func (p *Piece) VerifyData() {
|
||||||
p.t.cl.mu.Lock()
|
p.t.cl.lock()
|
||||||
defer p.t.cl.mu.Unlock()
|
defer p.t.cl.unlock()
|
||||||
target := p.numVerifies + 1
|
target := p.numVerifies + 1
|
||||||
if p.hashing {
|
if p.hashing {
|
||||||
target++
|
target++
|
||||||
|
@ -208,8 +208,8 @@ func (p *Piece) torrentEndOffset() int64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Piece) SetPriority(prio piecePriority) {
|
func (p *Piece) SetPriority(prio piecePriority) {
|
||||||
p.t.cl.mu.Lock()
|
p.t.cl.lock()
|
||||||
defer p.t.cl.mu.Unlock()
|
defer p.t.cl.unlock()
|
||||||
p.priority = prio
|
p.priority = prio
|
||||||
p.t.updatePiecePriority(p.index)
|
p.t.updatePiecePriority(p.index)
|
||||||
}
|
}
|
||||||
|
|
12
portfwd.go
12
portfwd.go
|
@ -20,20 +20,20 @@ func addPortMapping(d upnp.Device, proto upnp.Protocol, internalPort int, debug
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cl *Client) forwardPort() {
|
func (cl *Client) forwardPort() {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
if cl.config.NoDefaultPortForwarding {
|
if cl.config.NoDefaultPortForwarding {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cl.mu.Unlock()
|
cl.unlock()
|
||||||
ds := upnp.Discover(0, 2*time.Second)
|
ds := upnp.Discover(0, 2*time.Second)
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
flog.Default.Handle(flog.Fmsg("discovered %d upnp devices", len(ds)))
|
flog.Default.Handle(flog.Fmsg("discovered %d upnp devices", len(ds)))
|
||||||
port := cl.incomingPeerPort()
|
port := cl.incomingPeerPort()
|
||||||
cl.mu.Unlock()
|
cl.unlock()
|
||||||
for _, d := range ds {
|
for _, d := range ds {
|
||||||
go addPortMapping(d, upnp.TCP, port, cl.config.Debug)
|
go addPortMapping(d, upnp.TCP, port, cl.config.Debug)
|
||||||
go addPortMapping(d, upnp.UDP, port, cl.config.Debug)
|
go addPortMapping(d, upnp.UDP, port, cl.config.Debug)
|
||||||
}
|
}
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
}
|
}
|
||||||
|
|
20
reader.go
20
reader.go
|
@ -69,8 +69,8 @@ func (r *reader) SetReadahead(readahead int64) {
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
r.readahead = readahead
|
r.readahead = readahead
|
||||||
r.mu.Unlock()
|
r.mu.Unlock()
|
||||||
r.t.cl.mu.Lock()
|
r.t.cl.lock()
|
||||||
defer r.t.cl.mu.Unlock()
|
defer r.t.cl.unlock()
|
||||||
r.posChanged()
|
r.posChanged()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,10 +146,10 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
go func() {
|
go func() {
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
r.t.cl.mu.Lock()
|
r.t.cl.lock()
|
||||||
ctxErr = ctx.Err()
|
ctxErr = ctx.Err()
|
||||||
r.t.tickleReaders()
|
r.t.tickleReaders()
|
||||||
r.t.cl.mu.Unlock()
|
r.t.cl.unlock()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
// Hmmm, if a Read gets stuck, this means you can't change position for
|
// Hmmm, if a Read gets stuck, this means you can't change position for
|
||||||
|
@ -183,8 +183,8 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
|
||||||
// Wait until some data should be available to read. Tickles the client if it
|
// Wait until some data should be available to read. Tickles the client if it
|
||||||
// isn't. Returns how much should be readable without blocking.
|
// isn't. Returns how much should be readable without blocking.
|
||||||
func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
|
func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
|
||||||
r.t.cl.mu.Lock()
|
r.t.cl.lock()
|
||||||
defer r.t.cl.mu.Unlock()
|
defer r.t.cl.unlock()
|
||||||
for !r.readable(pos) && *ctxErr == nil {
|
for !r.readable(pos) && *ctxErr == nil {
|
||||||
r.waitReadable(pos)
|
r.waitReadable(pos)
|
||||||
}
|
}
|
||||||
|
@ -222,19 +222,19 @@ func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro
|
||||||
err = nil
|
err = nil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
r.t.cl.mu.Lock()
|
r.t.cl.lock()
|
||||||
// TODO: Just reset pieces in the readahead window. This might help
|
// TODO: Just reset pieces in the readahead window. This might help
|
||||||
// prevent thrashing with small caches and file and piece priorities.
|
// prevent thrashing with small caches and file and piece priorities.
|
||||||
log.Printf("error reading torrent %q piece %d offset %d, %d bytes: %s", r.t, pi, po, len(b1), err)
|
log.Printf("error reading torrent %q piece %d offset %d, %d bytes: %s", r.t, pi, po, len(b1), err)
|
||||||
r.t.updateAllPieceCompletions()
|
r.t.updateAllPieceCompletions()
|
||||||
r.t.updateAllPiecePriorities()
|
r.t.updateAllPiecePriorities()
|
||||||
r.t.cl.mu.Unlock()
|
r.t.cl.unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *reader) Close() error {
|
func (r *reader) Close() error {
|
||||||
r.t.cl.mu.Lock()
|
r.t.cl.lock()
|
||||||
defer r.t.cl.mu.Unlock()
|
defer r.t.cl.unlock()
|
||||||
r.t.deleteReader(r)
|
r.t.deleteReader(r)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
70
t.go
70
t.go
|
@ -17,15 +17,15 @@ func (t *Torrent) InfoHash() metainfo.Hash {
|
||||||
// Returns a channel that is closed when the info (.Info()) for the torrent
|
// Returns a channel that is closed when the info (.Info()) for the torrent
|
||||||
// has become available.
|
// has become available.
|
||||||
func (t *Torrent) GotInfo() <-chan struct{} {
|
func (t *Torrent) GotInfo() <-chan struct{} {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
return t.gotMetainfo.C()
|
return t.gotMetainfo.C()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the metainfo info dictionary, or nil if it's not yet available.
|
// Returns the metainfo info dictionary, or nil if it's not yet available.
|
||||||
func (t *Torrent) Info() *metainfo.Info {
|
func (t *Torrent) Info() *metainfo.Info {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
return t.info
|
return t.info
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,7 +33,7 @@ func (t *Torrent) Info() *metainfo.Info {
|
||||||
// the data requested is actually available.
|
// the data requested is actually available.
|
||||||
func (t *Torrent) NewReader() Reader {
|
func (t *Torrent) NewReader() Reader {
|
||||||
r := reader{
|
r := reader{
|
||||||
mu: &t.cl.mu,
|
mu: t.cl.locker(),
|
||||||
t: t,
|
t: t,
|
||||||
readahead: 5 * 1024 * 1024,
|
readahead: 5 * 1024 * 1024,
|
||||||
length: *t.length,
|
length: *t.length,
|
||||||
|
@ -46,14 +46,14 @@ func (t *Torrent) NewReader() Reader {
|
||||||
// same state. The sum of the state run lengths is the number of pieces
|
// same state. The sum of the state run lengths is the number of pieces
|
||||||
// in the torrent.
|
// in the torrent.
|
||||||
func (t *Torrent) PieceStateRuns() []PieceStateRun {
|
func (t *Torrent) PieceStateRuns() []PieceStateRun {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
return t.pieceStateRuns()
|
return t.pieceStateRuns()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) PieceState(piece pieceIndex) PieceState {
|
func (t *Torrent) PieceState(piece pieceIndex) PieceState {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
return t.pieceState(piece)
|
return t.pieceState(piece)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -65,8 +65,8 @@ func (t *Torrent) NumPieces() pieceIndex {
|
||||||
|
|
||||||
// Get missing bytes count for specific piece.
|
// Get missing bytes count for specific piece.
|
||||||
func (t *Torrent) PieceBytesMissing(piece int) int64 {
|
func (t *Torrent) PieceBytesMissing(piece int) int64 {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
|
|
||||||
return int64(t.pieces[piece].bytesLeft())
|
return int64(t.pieces[piece].bytesLeft())
|
||||||
}
|
}
|
||||||
|
@ -75,9 +75,9 @@ func (t *Torrent) PieceBytesMissing(piece int) int64 {
|
||||||
// this. No data corruption can, or should occur to either the torrent's data,
|
// this. No data corruption can, or should occur to either the torrent's data,
|
||||||
// or connected peers.
|
// or connected peers.
|
||||||
func (t *Torrent) Drop() {
|
func (t *Torrent) Drop() {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
t.cl.dropTorrent(t.infoHash)
|
t.cl.dropTorrent(t.infoHash)
|
||||||
t.cl.mu.Unlock()
|
t.cl.unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Number of bytes of the entire torrent we have completed. This is the sum of
|
// Number of bytes of the entire torrent we have completed. This is the sum of
|
||||||
|
@ -85,8 +85,8 @@ func (t *Torrent) Drop() {
|
||||||
// for download rate, as it can go down when pieces are lost or fail checks.
|
// for download rate, as it can go down when pieces are lost or fail checks.
|
||||||
// Sample Torrent.Stats.DataBytesRead for actual file data download rate.
|
// Sample Torrent.Stats.DataBytesRead for actual file data download rate.
|
||||||
func (t *Torrent) BytesCompleted() int64 {
|
func (t *Torrent) BytesCompleted() int64 {
|
||||||
t.cl.mu.RLock()
|
t.cl.rLock()
|
||||||
defer t.cl.mu.RUnlock()
|
defer t.cl.rUnlock()
|
||||||
return t.bytesCompleted()
|
return t.bytesCompleted()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,24 +99,24 @@ func (t *Torrent) SubscribePieceStateChanges() *pubsub.Subscription {
|
||||||
// Returns true if the torrent is currently being seeded. This occurs when the
|
// Returns true if the torrent is currently being seeded. This occurs when the
|
||||||
// client is willing to upload without wanting anything in return.
|
// client is willing to upload without wanting anything in return.
|
||||||
func (t *Torrent) Seeding() bool {
|
func (t *Torrent) Seeding() bool {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
return t.seeding()
|
return t.seeding()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clobbers the torrent display name. The display name is used as the torrent
|
// Clobbers the torrent display name. The display name is used as the torrent
|
||||||
// name if the metainfo is not available.
|
// name if the metainfo is not available.
|
||||||
func (t *Torrent) SetDisplayName(dn string) {
|
func (t *Torrent) SetDisplayName(dn string) {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
t.setDisplayName(dn)
|
t.setDisplayName(dn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The current working name for the torrent. Either the name in the info dict,
|
// The current working name for the torrent. Either the name in the info dict,
|
||||||
// or a display name given such as by the dn value in a magnet link, or "".
|
// or a display name given such as by the dn value in a magnet link, or "".
|
||||||
func (t *Torrent) Name() string {
|
func (t *Torrent) Name() string {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
return t.name()
|
return t.name()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,14 +129,14 @@ func (t *Torrent) Length() int64 {
|
||||||
// Returns a run-time generated metainfo for the torrent that includes the
|
// Returns a run-time generated metainfo for the torrent that includes the
|
||||||
// info bytes and announce-list as currently known to the client.
|
// info bytes and announce-list as currently known to the client.
|
||||||
func (t *Torrent) Metainfo() metainfo.MetaInfo {
|
func (t *Torrent) Metainfo() metainfo.MetaInfo {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
return t.newMetaInfo()
|
return t.newMetaInfo()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) addReader(r *reader) {
|
func (t *Torrent) addReader(r *reader) {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
if t.readers == nil {
|
if t.readers == nil {
|
||||||
t.readers = make(map[*reader]struct{})
|
t.readers = make(map[*reader]struct{})
|
||||||
}
|
}
|
||||||
|
@ -153,8 +153,8 @@ func (t *Torrent) deleteReader(r *reader) {
|
||||||
// priority. Piece indexes are not the same as bytes. Requires that the info
|
// priority. Piece indexes are not the same as bytes. Requires that the info
|
||||||
// has been obtained, see Torrent.Info and Torrent.GotInfo.
|
// has been obtained, see Torrent.Info and Torrent.GotInfo.
|
||||||
func (t *Torrent) DownloadPieces(begin, end pieceIndex) {
|
func (t *Torrent) DownloadPieces(begin, end pieceIndex) {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
t.downloadPiecesLocked(begin, end)
|
t.downloadPiecesLocked(begin, end)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,8 +167,8 @@ func (t *Torrent) downloadPiecesLocked(begin, end pieceIndex) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) CancelPieces(begin, end pieceIndex) {
|
func (t *Torrent) CancelPieces(begin, end pieceIndex) {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
t.cancelPiecesLocked(begin, end)
|
t.cancelPiecesLocked(begin, end)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,8 +208,8 @@ func (t *Torrent) Files() []*File {
|
||||||
|
|
||||||
func (t *Torrent) AddPeers(pp []Peer) {
|
func (t *Torrent) AddPeers(pp []Peer) {
|
||||||
cl := t.cl
|
cl := t.cl
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
t.addPeers(pp)
|
t.addPeers(pp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,13 +228,13 @@ func (t *Torrent) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) AddTrackers(announceList [][]string) {
|
func (t *Torrent) AddTrackers(announceList [][]string) {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
t.addTrackers(announceList)
|
t.addTrackers(announceList)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) Piece(i pieceIndex) *Piece {
|
func (t *Torrent) Piece(i pieceIndex) *Piece {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
return &t.pieces[i]
|
return &t.pieces[i]
|
||||||
}
|
}
|
||||||
|
|
46
torrent.go
46
torrent.go
|
@ -155,7 +155,7 @@ func (t *Torrent) tickleReaders() {
|
||||||
|
|
||||||
// Returns a channel that is closed when the Torrent is closed.
|
// Returns a channel that is closed when the Torrent is closed.
|
||||||
func (t *Torrent) Closed() <-chan struct{} {
|
func (t *Torrent) Closed() <-chan struct{} {
|
||||||
return t.closed.LockedChan(&t.cl.mu)
|
return t.closed.LockedChan(t.cl.locker())
|
||||||
}
|
}
|
||||||
|
|
||||||
// KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
|
// KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
|
||||||
|
@ -640,8 +640,8 @@ func (t *Torrent) newMetaInfo() metainfo.MetaInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) BytesMissing() int64 {
|
func (t *Torrent) BytesMissing() int64 {
|
||||||
t.mu().RLock()
|
t.cl.rLock()
|
||||||
defer t.mu().RUnlock()
|
defer t.cl.rUnlock()
|
||||||
return t.bytesMissingLocked()
|
return t.bytesMissingLocked()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1210,8 +1210,8 @@ func (t *Torrent) bytesCompleted() int64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) SetInfoBytes(b []byte) (err error) {
|
func (t *Torrent) SetInfoBytes(b []byte) (err error) {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
return t.setInfoBytes(b)
|
return t.setInfoBytes(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1388,14 +1388,14 @@ func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
|
||||||
}).String()
|
}).String()
|
||||||
allAddrs[key] = struct{}{}
|
allAddrs[key] = struct{}{}
|
||||||
}
|
}
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
t.addPeers(addPeers)
|
t.addPeers(addPeers)
|
||||||
numPeers := t.peers.Len()
|
numPeers := t.peers.Len()
|
||||||
cl.mu.Unlock()
|
cl.unlock()
|
||||||
if numPeers >= cl.config.TorrentPeersHighWater {
|
if numPeers >= cl.config.TorrentPeersHighWater {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-t.closed.LockedChan(&cl.mu):
|
case <-t.closed.LockedChan(cl.locker()):
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1416,14 +1416,14 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) {
|
||||||
cl := t.cl
|
cl := t.cl
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-t.wantPeersEvent.LockedChan(&cl.mu):
|
case <-t.wantPeersEvent.LockedChan(cl.locker()):
|
||||||
case <-t.closed.LockedChan(&cl.mu):
|
case <-t.closed.LockedChan(cl.locker()):
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err := t.announceDHT(true, s)
|
err := t.announceDHT(true, s)
|
||||||
func() {
|
func() {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.numDHTAnnounces++
|
t.numDHTAnnounces++
|
||||||
} else {
|
} else {
|
||||||
|
@ -1431,7 +1431,7 @@ func (t *Torrent) dhtAnnouncer(s *dht.Server) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-t.closed.LockedChan(&cl.mu):
|
case <-t.closed.LockedChan(cl.locker()):
|
||||||
return
|
return
|
||||||
case <-time.After(5 * time.Minute):
|
case <-time.After(5 * time.Minute):
|
||||||
}
|
}
|
||||||
|
@ -1445,8 +1445,8 @@ func (t *Torrent) addPeers(peers []Peer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) Stats() TorrentStats {
|
func (t *Torrent) Stats() TorrentStats {
|
||||||
t.cl.mu.RLock()
|
t.cl.rLock()
|
||||||
defer t.cl.mu.RUnlock()
|
defer t.cl.rUnlock()
|
||||||
return t.statsLocked()
|
return t.statsLocked()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1561,8 +1561,8 @@ func (t *Torrent) wantConns() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
|
func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
|
||||||
t.cl.mu.Lock()
|
t.cl.lock()
|
||||||
defer t.cl.mu.Unlock()
|
defer t.cl.unlock()
|
||||||
oldMax = t.maxEstablishedConns
|
oldMax = t.maxEstablishedConns
|
||||||
t.maxEstablishedConns = max
|
t.maxEstablishedConns = max
|
||||||
wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
|
wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
|
||||||
|
@ -1573,10 +1573,6 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
|
||||||
return oldMax
|
return oldMax
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Torrent) mu() missinggo.RWLocker {
|
|
||||||
return &t.cl.mu
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
|
func (t *Torrent) pieceHashed(piece pieceIndex, correct bool) {
|
||||||
log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
|
log.Fmsg("hashed piece %d", piece).Add("piece", piece).Add("passed", correct).AddValue(debugLogValue).Log(t.logger)
|
||||||
if t.closed.IsSet() {
|
if t.closed.IsSet() {
|
||||||
|
@ -1679,8 +1675,8 @@ func (t *Torrent) onIncompletePiece(piece pieceIndex) {
|
||||||
|
|
||||||
func (t *Torrent) verifyPiece(piece pieceIndex) {
|
func (t *Torrent) verifyPiece(piece pieceIndex) {
|
||||||
cl := t.cl
|
cl := t.cl
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
p := &t.pieces[piece]
|
p := &t.pieces[piece]
|
||||||
defer func() {
|
defer func() {
|
||||||
p.numVerifies++
|
p.numVerifies++
|
||||||
|
@ -1700,10 +1696,10 @@ func (t *Torrent) verifyPiece(piece pieceIndex) {
|
||||||
t.publishPieceChange(piece)
|
t.publishPieceChange(piece)
|
||||||
t.updatePiecePriority(piece)
|
t.updatePiecePriority(piece)
|
||||||
t.storageLock.RLock()
|
t.storageLock.RLock()
|
||||||
cl.mu.Unlock()
|
cl.unlock()
|
||||||
sum := t.hashPiece(piece)
|
sum := t.hashPiece(piece)
|
||||||
t.storageLock.RUnlock()
|
t.storageLock.RUnlock()
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
p.hashing = false
|
p.hashing = false
|
||||||
t.updatePiecePriority(piece)
|
t.updatePiecePriority(piece)
|
||||||
t.pieceHashed(piece, sum == p.hash)
|
t.pieceHashed(piece, sum == p.hash)
|
||||||
|
|
|
@ -154,13 +154,13 @@ func TestPieceHashFailed(t *testing.T) {
|
||||||
tt := cl.newTorrent(mi.HashInfoBytes(), badStorage{})
|
tt := cl.newTorrent(mi.HashInfoBytes(), badStorage{})
|
||||||
tt.setChunkSize(2)
|
tt.setChunkSize(2)
|
||||||
require.NoError(t, tt.setInfoBytes(mi.InfoBytes))
|
require.NoError(t, tt.setInfoBytes(mi.InfoBytes))
|
||||||
tt.cl.mu.Lock()
|
tt.cl.lock()
|
||||||
tt.pieces[1].dirtyChunks.AddRange(0, 3)
|
tt.pieces[1].dirtyChunks.AddRange(0, 3)
|
||||||
require.True(t, tt.pieceAllDirty(1))
|
require.True(t, tt.pieceAllDirty(1))
|
||||||
tt.pieceHashed(1, false)
|
tt.pieceHashed(1, false)
|
||||||
// Dirty chunks should be cleared so we can try again.
|
// Dirty chunks should be cleared so we can try again.
|
||||||
require.False(t, tt.pieceAllDirty(1))
|
require.False(t, tt.pieceAllDirty(1))
|
||||||
tt.cl.mu.Unlock()
|
tt.cl.unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the behaviour of Torrent.Metainfo when metadata is not completed.
|
// Check the behaviour of Torrent.Metainfo when metadata is not completed.
|
||||||
|
@ -194,8 +194,8 @@ func TestTorrentMetainfoIncompleteMetadata(t *testing.T) {
|
||||||
assert.EqualValues(t, 0, tt.metadataSize())
|
assert.EqualValues(t, 0, tt.metadataSize())
|
||||||
|
|
||||||
func() {
|
func() {
|
||||||
cl.mu.Lock()
|
cl.lock()
|
||||||
defer cl.mu.Unlock()
|
defer cl.unlock()
|
||||||
go func() {
|
go func() {
|
||||||
_, err = nc.Write(pp.Message{
|
_, err = nc.Write(pp.Message{
|
||||||
Type: pp.Extended,
|
Type: pp.Extended,
|
||||||
|
|
|
@ -107,9 +107,9 @@ func (me *trackerScraper) announce() (ret trackerAnnounceResult) {
|
||||||
ret.Err = fmt.Errorf("error getting ip: %s", err)
|
ret.Err = fmt.Errorf("error getting ip: %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
me.t.cl.mu.Lock()
|
me.t.cl.lock()
|
||||||
req := me.t.announceRequest()
|
req := me.t.announceRequest()
|
||||||
me.t.cl.mu.Unlock()
|
me.t.cl.unlock()
|
||||||
res, err := tracker.Announce{
|
res, err := tracker.Announce{
|
||||||
HttpClient: me.t.cl.config.TrackerHttpClient,
|
HttpClient: me.t.cl.config.TrackerHttpClient,
|
||||||
UserAgent: me.t.cl.config.HTTPUserAgent,
|
UserAgent: me.t.cl.config.HTTPUserAgent,
|
||||||
|
@ -133,24 +133,24 @@ func (me *trackerScraper) announce() (ret trackerAnnounceResult) {
|
||||||
func (me *trackerScraper) Run() {
|
func (me *trackerScraper) Run() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-me.t.closed.LockedChan(&me.t.cl.mu):
|
case <-me.t.closed.LockedChan(me.t.cl.locker()):
|
||||||
return
|
return
|
||||||
case <-me.stop.LockedChan(&me.t.cl.mu):
|
case <-me.stop.LockedChan(me.t.cl.locker()):
|
||||||
return
|
return
|
||||||
case <-me.t.wantPeersEvent.LockedChan(&me.t.cl.mu):
|
case <-me.t.wantPeersEvent.LockedChan(me.t.cl.locker()):
|
||||||
}
|
}
|
||||||
|
|
||||||
ar := me.announce()
|
ar := me.announce()
|
||||||
me.t.cl.mu.Lock()
|
me.t.cl.lock()
|
||||||
me.lastAnnounce = ar
|
me.lastAnnounce = ar
|
||||||
me.t.cl.mu.Unlock()
|
me.t.cl.unlock()
|
||||||
|
|
||||||
intervalChan := time.After(time.Until(ar.Completed.Add(ar.Interval)))
|
intervalChan := time.After(time.Until(ar.Completed.Add(ar.Interval)))
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-me.t.closed.LockedChan(&me.t.cl.mu):
|
case <-me.t.closed.LockedChan(me.t.cl.locker()):
|
||||||
return
|
return
|
||||||
case <-me.stop.LockedChan(&me.t.cl.mu):
|
case <-me.stop.LockedChan(me.t.cl.locker()):
|
||||||
return
|
return
|
||||||
case <-intervalChan:
|
case <-intervalChan:
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue