Get rid of obsoleted Torrent.ceaseNetworking, and clean up Torrent.closing

This commit is contained in:
Matt Joiner 2016-05-11 21:44:55 +10:00
parent dce3a7f675
commit 528836ab4c
5 changed files with 75 additions and 130 deletions

View File

@ -921,7 +921,7 @@ func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
if !cl.addConnection(t, c) {
return
}
defer cl.dropConnection(t, c)
defer t.dropConnection(c)
go c.writer(time.Minute)
cl.sendInitialMessages(c, t)
err = cl.connectionLoop(t, c)
@ -1080,7 +1080,7 @@ func (cl *Client) upload(t *Torrent, c *connection) {
if !c.PeerInterested {
return
}
seeding := cl.seeding(t)
seeding := t.seeding()
if !seeding && !t.connHasWantedPieces(c) {
return
}
@ -1336,40 +1336,11 @@ func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
}
}
// Returns true if connection is removed from torrent.Conns.
func (cl *Client) deleteConnection(t *Torrent, c *connection) bool {
for i0, _c := range t.conns {
if _c != c {
continue
}
i1 := len(t.conns) - 1
if i0 != i1 {
t.conns[i0] = t.conns[i1]
}
t.conns = t.conns[:i1]
return true
}
return false
}
func (cl *Client) dropConnection(t *Torrent, c *connection) {
cl.event.Broadcast()
c.Close()
if cl.deleteConnection(t, c) {
cl.openNewConns(t)
}
}
// Returns true if the connection is added.
func (cl *Client) addConnection(t *Torrent, c *connection) bool {
if cl.closed.IsSet() {
return false
}
select {
case <-t.ceasingNetworking:
return false
default:
}
if !cl.wantConns(t) {
return false
}
@ -1389,7 +1360,7 @@ func (cl *Client) addConnection(t *Torrent, c *connection) bool {
log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
}
c.Close()
cl.deleteConnection(t, c)
t.deleteConnection(c)
}
if len(t.conns) >= socketsPerTorrent {
panic(len(t.conns))
@ -1406,14 +1377,14 @@ func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
if !t.haveInfo() {
return c.supportsExtension("ut_metadata")
}
if cl.seeding(t) {
if t.seeding() {
return c.PeerInterested
}
return t.connHasWantedPieces(c)
}
func (cl *Client) wantConns(t *Torrent) bool {
if !cl.seeding(t) && !t.needData() {
if !t.seeding() && !t.needData() {
return false
}
if len(t.conns) < socketsPerTorrent {
@ -1423,11 +1394,6 @@ func (cl *Client) wantConns(t *Torrent) bool {
}
func (cl *Client) openNewConns(t *Torrent) {
select {
case <-t.ceasingNetworking:
return
default:
}
for len(t.peers) != 0 {
if !cl.wantConns(t) {
return
@ -1477,9 +1443,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
chunkSize: defaultChunkSize,
peers: make(map[peersKey]Peer),
closing: make(chan struct{}),
ceasingNetworking: make(chan struct{}),
halfOpen: make(map[string]struct{}),
pieceStateChanges: pubsub.NewPubSub(),
@ -1635,43 +1598,8 @@ func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
return
}
// Returns true when peers are required, or false if the torrent is closing.
func (cl *Client) waitWantPeers(t *Torrent) bool {
cl.mu.Lock()
defer cl.mu.Unlock()
for {
select {
case <-t.ceasingNetworking:
return false
default:
}
if len(t.peers) > torrentPeersLowWater {
goto wait
}
if t.needData() || cl.seeding(t) {
return true
}
wait:
t.wantPeers.Wait()
}
}
// Returns whether the client should make effort to seed the torrent.
func (cl *Client) seeding(t *Torrent) bool {
if cl.config.NoUpload {
return false
}
if !cl.config.Seed {
return false
}
if t.needData() {
return false
}
return true
}
func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
for cl.waitWantPeers(t) {
for t.waitWantPeers() {
// log.Printf("getting peers for %q from DHT", t)
ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
if err != nil {
@ -1711,7 +1639,7 @@ func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
if numPeers >= torrentPeersHighWater {
break getPeers
}
case <-t.ceasingNetworking:
case <-t.closed.LockedChan(&cl.mu):
ps.Close()
return
}
@ -1800,7 +1728,7 @@ func (cl *Client) announceTorrentTrackers(t *Torrent) {
PeerId: cl.peerID,
InfoHash: t.infoHash,
}
if !cl.waitWantPeers(t) {
if !t.waitWantPeers() {
return
}
cl.mu.RLock()
@ -1811,7 +1739,7 @@ func (cl *Client) announceTorrentTrackers(t *Torrent) {
req.Event = tracker.None
}
newAnnounce:
for cl.waitWantPeers(t) {
for t.waitWantPeers() {
cl.mu.RLock()
req.Left = t.bytesLeftAnnounce()
trackers = t.trackers
@ -1975,7 +1903,7 @@ func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
} else if len(touchers) != 0 {
log.Printf("dropping %d conns that touched piece", len(touchers))
for _, c := range touchers {
cl.dropConnection(t, c)
t.dropConnection(c)
}
}
cl.pieceChanged(t, piece)
@ -2034,7 +1962,7 @@ func (cl *Client) verifyPiece(t *Torrent, piece int) {
cl.event.Wait()
}
p.QueuedForHash = false
if t.isClosed() || t.pieceComplete(piece) {
if t.closed.IsSet() || t.pieceComplete(piece) {
t.updatePiecePriority(piece)
t.publishPieceChange(piece)
return
@ -2044,11 +1972,6 @@ func (cl *Client) verifyPiece(t *Torrent, piece int) {
cl.mu.Unlock()
sum := t.hashPiece(piece)
cl.mu.Lock()
select {
case <-t.closing:
return
default:
}
p.Hashing = false
cl.pieceHashed(t, piece, sum == p.Hash)
}

View File

@ -45,7 +45,7 @@ func (r *Reader) SetReadahead(readahead int64) {
}
func (r *Reader) readable(off int64) (ret bool) {
if r.torrentClosed() {
if r.t.closed.IsSet() {
return true
}
req, ok := r.t.offsetRequest(off)
@ -137,11 +137,6 @@ func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
return
}
// Safe to call with or without client lock.
func (r *Reader) torrentClosed() bool {
return r.t.isClosed()
}
// Wait until some data should be available to read. Tickles the client if it
// isn't. Returns how much should be readable without blocking.
func (r *Reader) waitAvailable(pos, wanted int64, ctxErr *error) (avail int64) {
@ -162,7 +157,7 @@ func (r *Reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err erro
for {
avail := r.waitAvailable(pos, int64(len(b)), ctxErr)
if avail == 0 {
if r.torrentClosed() {
if r.t.closed.IsSet() {
err = errors.New("torrent closed")
return
}

2
t.go
View File

@ -87,7 +87,7 @@ func (t *Torrent) SubscribePieceStateChanges() *pubsub.Subscription {
func (t *Torrent) Seeding() bool {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
return t.cl.seeding(t)
return t.seeding()
}
// Clobbers the torrent display name. The display name is used as the torrent

View File

@ -38,12 +38,7 @@ type peersKey struct {
type Torrent struct {
cl *Client
closing chan struct{}
// Closed when no more network activity is desired. This includes
// announcing, and communicating with peers.
ceasingNetworking chan struct{}
closed missinggo.Event
infoHash metainfo.Hash
pieces []piece
// Values are the piece indices that changed.
@ -142,18 +137,6 @@ func (t *Torrent) worstConns(cl *Client) (wcs *worstConns) {
return
}
func (t *Torrent) ceaseNetworking() {
select {
case <-t.ceasingNetworking:
return
default:
}
close(t.ceasingNetworking)
for _, c := range t.conns {
c.Close()
}
}
func (t *Torrent) addPeer(p Peer, cl *Client) {
cl.openNewConns(t)
if len(t.peers) >= torrentPeersHighWater {
@ -527,22 +510,8 @@ func (t *Torrent) numPiecesCompleted() (num int) {
return t.completedPieces.Len()
}
// Safe to call with or without client lock.
func (t *Torrent) isClosed() bool {
select {
case <-t.closing:
return true
default:
return false
}
}
func (t *Torrent) close() (err error) {
if t.isClosed() {
return
}
t.ceaseNetworking()
close(t.closing)
t.closed.Set()
if c, ok := t.storage.(io.Closer); ok {
c.Close()
}
@ -1107,3 +1076,61 @@ func (t *Torrent) SetInfoBytes(b []byte) (err error) {
}
return t.setInfoBytes(b)
}
// Returns true if connection is removed from torrent.Conns.
func (t *Torrent) deleteConnection(c *connection) bool {
for i0, _c := range t.conns {
if _c != c {
continue
}
i1 := len(t.conns) - 1
if i0 != i1 {
t.conns[i0] = t.conns[i1]
}
t.conns = t.conns[:i1]
return true
}
return false
}
func (t *Torrent) dropConnection(c *connection) {
t.cl.event.Broadcast()
c.Close()
if t.deleteConnection(c) {
t.openNewConns()
}
}
// Returns true when peers are required, or false if the torrent is closing.
func (t *Torrent) waitWantPeers() bool {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
for {
if t.closed.IsSet() {
return false
}
if len(t.peers) > torrentPeersLowWater {
goto wait
}
if t.needData() || t.seeding() {
return true
}
wait:
t.wantPeers.Wait()
}
}
// Returns whether the client should make effort to seed the torrent.
func (t *Torrent) seeding() bool {
cl := t.cl
if cl.config.NoUpload {
return false
}
if !cl.config.Seed {
return false
}
if t.needData() {
return false
}
return true
}

View File

@ -45,7 +45,7 @@ func (wc worstConnsSortKey) Less(other worstConnsSortKey) bool {
func (wc *worstConns) key(i int) (key worstConnsSortKey) {
c := wc.c[i]
key.useful = wc.cl.usefulConn(wc.t, c)
if wc.cl.seeding(wc.t) {
if wc.t.seeding() {
key.lastHelpful = c.lastChunkSent
}
// Intentionally consider the last time a chunk was received when seeding,