Use missinggo.Event for connection closing event

This commit is contained in:
Matt Joiner 2016-02-01 21:08:52 +11:00
parent eb838a894c
commit 3ed628356b
3 changed files with 10 additions and 25 deletions

View File

@ -1391,10 +1391,8 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1) receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
me.mu.Lock() me.mu.Lock()
c.lastMessageReceived = time.Now() c.lastMessageReceived = time.Now()
select { if c.closed.IsSet() {
case <-c.closing:
return nil return nil
default:
} }
if err != nil { if err != nil {
if me.stopped() || err == io.EOF { if me.stopped() || err == io.EOF {
@ -1691,10 +1689,8 @@ func (t *torrent) needData() bool {
} }
func (cl *Client) usefulConn(t *torrent, c *connection) bool { func (cl *Client) usefulConn(t *torrent, c *connection) bool {
select { if c.closed.IsSet() {
case <-c.closing:
return false return false
default:
} }
if !t.haveInfo() { if !t.haveInfo() {
return c.supportsExtension("ut_metadata") return c.supportsExtension("ut_metadata")

View File

@ -10,9 +10,9 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"sync"
"time" "time"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/bencode"
pp "github.com/anacrolix/torrent/peer_protocol" pp "github.com/anacrolix/torrent/peer_protocol"
) )
@ -36,8 +36,7 @@ type connection struct {
encrypted bool encrypted bool
Discovery peerSource Discovery peerSource
uTP bool uTP bool
closing chan struct{} closed missinggo.Event
mu sync.Mutex // Only for closing.
post chan pp.Message post chan pp.Message
writeCh chan []byte writeCh chan []byte
@ -84,7 +83,6 @@ func newConnection() (c *connection) {
PeerChoked: true, PeerChoked: true,
PeerMaxRequests: 250, PeerMaxRequests: 250,
closing: make(chan struct{}),
writeCh: make(chan []byte), writeCh: make(chan []byte),
post: make(chan pp.Message), post: make(chan pp.Message),
} }
@ -235,14 +233,7 @@ func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
} }
func (c *connection) Close() { func (c *connection) Close() {
c.mu.Lock() c.closed.Set()
defer c.mu.Unlock()
select {
case <-c.closing:
return
default:
}
close(c.closing)
// TODO: This call blocks sometimes, why? // TODO: This call blocks sometimes, why?
go c.conn.Close() go c.conn.Close()
} }
@ -260,7 +251,7 @@ func (c *connection) PeerHasPiece(piece int) bool {
func (c *connection) Post(msg pp.Message) { func (c *connection) Post(msg pp.Message) {
select { select {
case c.post <- msg: case c.post <- msg:
case <-c.closing: case <-c.closed.C():
} }
} }
@ -422,7 +413,7 @@ func (conn *connection) writer() {
conn.Close() conn.Close()
return return
} }
case <-conn.closing: case <-conn.closed.C():
return return
} }
} else { } else {
@ -439,7 +430,7 @@ func (conn *connection) writer() {
conn.Close() conn.Close()
return return
} }
case <-conn.closing: case <-conn.closed.C():
return return
default: default:
connectionWriterFlush.Add(1) connectionWriterFlush.Add(1)
@ -505,7 +496,7 @@ func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
if pending.Len() == 0 { if pending.Len() == 0 {
timer.Reset(keepAliveDelay) timer.Reset(keepAliveDelay)
} }
case <-conn.closing: case <-conn.closed.C():
return return
} }
} }

View File

@ -146,9 +146,7 @@ func (t *torrent) worstConns(cl *Client) (wcs *worstConns) {
cl: cl, cl: cl,
} }
for _, c := range t.Conns { for _, c := range t.Conns {
select { if !c.closed.IsSet() {
case <-c.closing:
default:
wcs.c = append(wcs.c, c) wcs.c = append(wcs.c, c)
} }
} }