Break up the DHT announcer code into smaller functions, and add a 5 minute delay between announces

This commit is contained in:
Matt Joiner 2016-07-23 22:38:31 +10:00
parent 5cc735ef32
commit 980cd69ab2
2 changed files with 67 additions and 49 deletions

View File

@ -1473,7 +1473,7 @@ func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bo
new = true
t = cl.newTorrent(infoHash)
if cl.dHT != nil {
go t.announceDHT(true)
go t.dhtAnnouncer()
}
cl.torrents[infoHash] = t
t.updateWantPeersEvent()

View File

@ -23,6 +23,7 @@ import (
"github.com/bradfitz/iter"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/dht"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
"github.com/anacrolix/torrent/storage"
@ -1208,7 +1209,60 @@ func (t *Torrent) announceRequest() tracker.AnnounceRequest {
}
}
func (t *Torrent) announceDHT(impliedPort bool) {
// Adds peers revealed in an announce until the announce ends, or we have
// enough peers.
func (t *Torrent) consumeDHTAnnounce(pvs <-chan dht.PeersValues) {
cl := t.cl
// Count all the unique addresses we got during this announce.
allAddrs := make(map[string]struct{})
for {
select {
case v, ok := <-pvs:
if !ok {
return
}
addPeers := make([]Peer, 0, len(v.Peers))
for _, cp := range v.Peers {
if cp.Port == 0 {
// Can't do anything with this.
continue
}
addPeers = append(addPeers, Peer{
IP: cp.IP[:],
Port: cp.Port,
Source: peerSourceDHT,
})
key := (&net.UDPAddr{
IP: cp.IP[:],
Port: cp.Port,
}).String()
allAddrs[key] = struct{}{}
}
cl.mu.Lock()
t.addPeers(addPeers)
numPeers := len(t.peers)
cl.mu.Unlock()
if numPeers >= torrentPeersHighWater {
return
}
case <-t.closed.LockedChan(&cl.mu):
return
}
}
}
func (t *Torrent) announceDHT(impliedPort bool) (err error) {
cl := t.cl
ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
if err != nil {
return
}
t.consumeDHTAnnounce(ps.Peers)
ps.Close()
return
}
func (t *Torrent) dhtAnnouncer() {
cl := t.cl
for {
select {
@ -1216,55 +1270,19 @@ func (t *Torrent) announceDHT(impliedPort bool) {
case <-t.closed.LockedChan(&cl.mu):
return
}
// log.Printf("getting peers for %q from DHT", t)
ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
if err != nil {
log.Printf("error getting peers from dht: %s", err)
err := t.announceDHT(true)
if err == nil {
cl.mu.Lock()
t.numDHTAnnounces++
cl.mu.Unlock()
} else {
log.Printf("error announcing %q to DHT: %s", t, err)
}
select {
case <-t.closed.LockedChan(&cl.mu):
return
case <-time.After(5 * time.Minute):
}
cl.mu.Lock()
t.numDHTAnnounces++
cl.mu.Unlock()
// Count all the unique addresses we got during this announce.
allAddrs := make(map[string]struct{})
getPeers:
for {
select {
case v, ok := <-ps.Peers:
if !ok {
break getPeers
}
addPeers := make([]Peer, 0, len(v.Peers))
for _, cp := range v.Peers {
if cp.Port == 0 {
// Can't do anything with this.
continue
}
addPeers = append(addPeers, Peer{
IP: cp.IP[:],
Port: cp.Port,
Source: peerSourceDHT,
})
key := (&net.UDPAddr{
IP: cp.IP[:],
Port: cp.Port,
}).String()
allAddrs[key] = struct{}{}
}
cl.mu.Lock()
t.addPeers(addPeers)
numPeers := len(t.peers)
cl.mu.Unlock()
if numPeers >= torrentPeersHighWater {
break getPeers
}
case <-t.closed.LockedChan(&cl.mu):
ps.Close()
return
}
}
ps.Close()
// log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
}
}