From 0eb418360bfee3b234c9fa38efdab8336773beb2 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 25 Feb 2015 14:48:39 +1100 Subject: [PATCH] Allow configuring Client torrent data opener, config dir, disabling metainfo cache, and prune with a timer instead of goroutine --- client.go | 144 ++++++++++++++++++++++++++++--------------- client_test.go | 2 +- config.go | 4 ++ data/file/file.go | 15 +++-- fs/torrentfs_test.go | 8 ++- torrent.go | 37 +++++------ torrent_test.go | 2 + 7 files changed, 131 insertions(+), 81 deletions(-) diff --git a/client.go b/client.go index 55bb9aca..28ba233e 100644 --- a/client.go +++ b/client.go @@ -36,6 +36,8 @@ import ( "syscall" "time" + filePkg "bitbucket.org/anacrolix/go.torrent/data/file" + "bitbucket.org/anacrolix/go.torrent/dht" "bitbucket.org/anacrolix/go.torrent/internal/pieceordering" "bitbucket.org/anacrolix/go.torrent/iplist" @@ -80,6 +82,8 @@ const ( // impact of a few bad apples. 4s loses 1% of successful handshakes that // are obtained with 60s timeout, and 5% of unsuccessful handshakes. handshakeTimeout = 4 * time.Second + + pruneInterval = 10 * time.Second ) // Currently doesn't really queue, but should in the future. @@ -116,6 +120,11 @@ type Client struct { disableTCP bool ipBlockList *iplist.IPList bannedTorrents map[InfoHash]struct{} + _configDir string + config Config + pruneTimer *time.Timer + + torrentDataOpener TorrentDataOpener mu sync.RWMutex event sync.Cond @@ -219,8 +228,7 @@ func (cl *Client) WriteStatus(_w io.Writer) { } } -// Read torrent data at the given offset. Returns ErrDataNotReady if the data -// isn't available. +// Read torrent data at the given offset. Will block until it is available. func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) { cl.mu.Lock() defer cl.mu.Unlock() @@ -248,10 +256,10 @@ func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err err if len(p) == 0 { panic(len(p)) } - for !piece.Complete() { + for !piece.Complete() && !t.isClosed() { piece.Event.Wait() } - return t.Data.ReadAt(p, off) + return t.data.ReadAt(p, off) } func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) { @@ -272,7 +280,10 @@ func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) { } func (cl *Client) configDir() string { - return filepath.Join(os.Getenv("HOME"), ".config/torrent") + if cl._configDir == "" { + return filepath.Join(os.Getenv("HOME"), ".config/torrent") + } + return cl._configDir } func (cl *Client) ConfigDir() string { @@ -393,6 +404,11 @@ func NewClient(cfg *Config) (cl *Client, err error) { dataDir: cfg.DataDir, disableUTP: cfg.DisableUTP, disableTCP: cfg.DisableTCP, + _configDir: cfg.ConfigDir, + config: *cfg, + torrentDataOpener: func(md *metainfo.Info) (TorrentData, error) { + return filePkg.TorrentData(md, cfg.DataDir), nil + }, quit: make(chan struct{}), torrents: make(map[InfoHash]*torrent), @@ -1163,7 +1179,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { // routine. // c.PeerRequests[request] = struct{}{} p := make([]byte, msg.Length) - n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin)) + n, err := t.data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin)) if err != nil { return fmt.Errorf("reading t data to serve request %q: %s", request, err) } @@ -1499,22 +1515,10 @@ func (cl *Client) saveTorrentFile(t *torrent) error { return nil } -func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) { - err = t.setMetadata(md, cl.dataDir, bytes, &cl.mu) - if err != nil { - return +func (cl *Client) startTorrent(t *torrent) { + if t.Info == nil || t.data == nil { + panic("nope") } - - if err := cl.saveTorrentFile(t); err != nil { - log.Printf("error saving torrent file for %s: %s", t, err) - } - - if strings.Contains(strings.ToLower(md.Name), "porn") { - cl.dropTorrent(t.InfoHash) - err = errors.New("no porn plx") - return - } - // If the client intends to upload, it needs to know what state pieces are // in. if !cl.noUpload { @@ -1529,9 +1533,43 @@ func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err e } }() } - cl.downloadStrategy.TorrentStarted(t) +} + +// Storage cannot be changed once it's set. +func (cl *Client) setStorage(t *torrent, td TorrentData) (err error) { + err = t.setStorage(td) + cl.event.Broadcast() + if err != nil { + return + } + cl.startTorrent(t) + return +} + +type TorrentDataOpener func(*metainfo.Info) (TorrentData, error) + +func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) { + err = t.setMetadata(md, bytes, &cl.mu) + if err != nil { + return + } + if !cl.config.DisableMetainfoCache { + if err := cl.saveTorrentFile(t); err != nil { + log.Printf("error saving torrent file for %s: %s", t, err) + } + } + if strings.Contains(strings.ToLower(md.Name), "porn") { + cl.dropTorrent(t.InfoHash) + err = errors.New("no porn plx") + return + } close(t.gotMetainfo) + td, err := cl.torrentDataOpener(&md) + if err != nil { + return + } + err = cl.setStorage(t, td) return } @@ -1722,6 +1760,9 @@ func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) { // Returns nil metainfo if it isn't in the cache. func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) { + if cl.config.DisableMetainfoCache { + return + } f, err := os.Open(cl.torrentFileCachePath(ih)) if err != nil { if os.IsNotExist(err) { @@ -1768,34 +1809,33 @@ func (cl *Client) AddMagnet(uri string) (T Torrent, err error) { return } -// Actively prunes unused connections. This is required to make space to dial -// for replacements. -func (cl *Client) connectionPruner(t *torrent) { - for { - select { - case <-t.ceasingNetworking: - return - case <-t.closing: - return - case <-time.After(15 * time.Second): - } - cl.mu.Lock() - license := len(t.Conns) - (socketsPerTorrent+1)/2 - for _, c := range t.Conns { - if license <= 0 { - break - } - if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute { - continue - } - if time.Now().Sub(c.completedHandshake) < time.Minute { - continue - } - c.Close() - license-- - } - cl.mu.Unlock() +// Prunes unused connections. This is required to make space to dial for +// replacements. +func (cl *Client) pruneConnectionsUnlocked(t *torrent) { + select { + case <-t.ceasingNetworking: + return + case <-t.closing: + return + default: } + cl.mu.Lock() + license := len(t.Conns) - (socketsPerTorrent+1)/2 + for _, c := range t.Conns { + if license <= 0 { + break + } + if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute { + continue + } + if time.Now().Sub(c.completedHandshake) < time.Minute { + continue + } + c.Close() + license-- + } + cl.mu.Unlock() + t.pruneTimer.Reset(pruneInterval) } func (me *Client) dropTorrent(infoHash InfoHash) (err error) { @@ -1835,7 +1875,9 @@ func (me *Client) addOrMergeTorrent(ih InfoHash, announceList [][]string) (T Tor if me.dHT != nil { go me.announceTorrentDHT(T.torrent, true) } - go me.connectionPruner(T.torrent) + T.torrent.pruneTimer = time.AfterFunc(0, func() { + me.pruneConnectionsUnlocked(T.torrent) + }) } return } @@ -2178,7 +2220,7 @@ func (cl *Client) verifyPiece(t *torrent, index pp.Integer) { cl.mu.Lock() defer cl.mu.Unlock() p := t.Pieces[index] - for p.Hashing { + for p.Hashing || t.data == nil { cl.event.Wait() } if t.isClosed() { diff --git a/client_test.go b/client_test.go index 71f419c7..c480fecd 100644 --- a/client_test.go +++ b/client_test.go @@ -70,7 +70,7 @@ func TestTorrentInitialState(t *testing.T) { if err != nil { t.Fatal(err) } - err = tor.setMetadata(mi.Info.Info, dir, mi.Info.Bytes, nil) + err = tor.setMetadata(mi.Info.Info, mi.Info.Bytes, nil) if err != nil { t.Fatal(err) } diff --git a/config.go b/config.go index c78db5be..29d35c6a 100644 --- a/config.go +++ b/config.go @@ -16,4 +16,8 @@ type Config struct { DisableUTP bool DisableTCP bool NoDefaultBlocklist bool + // Defaults to "$HOME/.config/torrent" + ConfigDir string + DisableMetainfoCache bool + TorrentDataOpener } diff --git a/data/file/file.go b/data/file/file.go index 7017e105..5e6608de 100644 --- a/data/file/file.go +++ b/data/file/file.go @@ -13,12 +13,11 @@ type data struct { loc string } -func TorrentData(md *metainfo.Info, location string) (ret *data, err error) { - ret = &data{md, location} - return +func TorrentData(md *metainfo.Info, location string) data { + return data{md, location} } -func (me *data) ReadAt(p []byte, off int64) (n int, err error) { +func (me data) ReadAt(p []byte, off int64) (n int, err error) { for _, fi := range me.info.UpvertedFiles() { if off >= fi.Length { off -= fi.Length @@ -48,9 +47,9 @@ func (me *data) ReadAt(p []byte, off int64) (n int, err error) { return } -func (me *data) Close() {} +func (me data) Close() {} -func (me *data) WriteAt(p []byte, off int64) (n int, err error) { +func (me data) WriteAt(p []byte, off int64) (n int, err error) { for _, fi := range me.info.UpvertedFiles() { if off >= fi.Length { off -= fi.Length @@ -82,7 +81,7 @@ func (me *data) WriteAt(p []byte, off int64) (n int, err error) { return } -func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) { +func (me data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err error) { for _, fi := range me.info.UpvertedFiles() { if off >= fi.Length { off -= fi.Length @@ -112,6 +111,6 @@ func (me *data) WriteSectionTo(w io.Writer, off, n int64) (written int64, err er return } -func (me *data) fileInfoName(fi metainfo.FileInfo) string { +func (me data) fileInfoName(fi metainfo.FileInfo) string { return filepath.Join(append([]string{me.loc, me.info.Name}, fi.Path...)...) } diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index 049e5f8c..f6a3cef9 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -169,6 +169,9 @@ func TestDownloadOnDemand(t *testing.T) { ListenAddr: ":0", NoDefaultBlocklist: true, + // Ensure that the metainfo is obtained over the wire, since we added + // the torrent to the seeder by magnet. + DisableMetainfoCache: true, }) if err != nil { t.Fatalf("error creating seeder client: %s", err) @@ -183,7 +186,6 @@ func TestDownloadOnDemand(t *testing.T) { t.Fatal(err) } leecher, err := torrent.NewClient(&torrent.Config{ - DataDir: filepath.Join(layout.BaseDir, "download"), DisableTrackers: true, NoDHT: true, ListenAddr: ":0", @@ -191,6 +193,10 @@ func TestDownloadOnDemand(t *testing.T) { NoDefaultBlocklist: true, + TorrentDataOpener: func(info *metainfo.Info) (torrent.TorrentData, error) { + return mmap.TorrentData(info, filepath.Join(layout.BaseDir, "download")) + }, + // This can be used to check if clients can connect to other clients // with the same ID. diff --git a/torrent.go b/torrent.go index 450f15fe..e962fdca 100644 --- a/torrent.go +++ b/torrent.go @@ -10,8 +10,6 @@ import ( "sync" "time" - "bitbucket.org/anacrolix/go.torrent/data/file" - pp "bitbucket.org/anacrolix/go.torrent/peer_protocol" "bitbucket.org/anacrolix/go.torrent/tracker" "bitbucket.org/anacrolix/go.torrent/util" @@ -40,7 +38,7 @@ type peersKey struct { Port int } -type torrentData interface { +type TorrentData interface { ReadAt(p []byte, off int64) (n int, err error) Close() WriteAt(p []byte, off int64) (n int, err error) @@ -60,9 +58,7 @@ type torrent struct { Pieces []*piece length int64 - // Prevent mutations to Data memory maps while in use as they're not safe. - dataLock sync.RWMutex - Data torrentData + data TorrentData Info *MetaInfo // Active peer connections. @@ -85,6 +81,8 @@ type torrent struct { gotMetainfo chan struct{} GotMetainfo <-chan struct{} + + pruneTimer *time.Timer } func (t *torrent) numConnsUnchoked() (num int) { @@ -129,6 +127,7 @@ func (t *torrent) ceaseNetworking() { for _, c := range t.Conns { c.Close() } + t.pruneTimer.Stop() } func (t *torrent) AddPeers(pp []Peer) { @@ -183,7 +182,7 @@ func infoPieceHashes(info *metainfo.Info) (ret []string) { } // Called when metadata for a torrent becomes available. -func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte, eventLocker sync.Locker) (err error) { +func (t *torrent) setMetadata(md metainfo.Info, infoBytes []byte, eventLocker sync.Locker) (err error) { t.Info = newMetaInfo(&md) t.length = 0 for _, f := range t.Info.UpvertedFiles() { @@ -204,11 +203,14 @@ func (t *torrent) setMetadata(md metainfo.Info, dataDir string, infoBytes []byte conn.Close() } } - t.Data, err = file.TorrentData(&md, dataDir) - if err != nil { - err = fmt.Errorf("error mmap'ing torrent data: %s", err) - return + return +} + +func (t *torrent) setStorage(td TorrentData) (err error) { + if t.data != nil { + t.data.Close() } + t.data = td return } @@ -477,12 +479,9 @@ func (t *torrent) close() (err error) { } t.ceaseNetworking() close(t.closing) - t.dataLock.Lock() - if t.Data != nil { - t.Data.Close() - t.Data = nil + if t.data != nil { + t.data.Close() } - t.dataLock.Unlock() for _, conn := range t.Conns { conn.Close() } @@ -525,7 +524,7 @@ func (t *torrent) offsetRequest(off int64) (req request, ok bool) { } func (t *torrent) WriteChunk(piece int, begin int64, data []byte) (err error) { - _, err = t.Data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin) + _, err = t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin) return } @@ -583,9 +582,7 @@ func (t *torrent) PieceLength(piece pp.Integer) (len_ pp.Integer) { func (t *torrent) HashPiece(piece pp.Integer) (ps pieceSum) { hash := pieceHash.New() - t.dataLock.RLock() - t.Data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) - t.dataLock.RUnlock() + t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, t.Info.PieceLength) util.CopyExact(ps[:], hash.Sum(nil)) return } diff --git a/torrent_test.go b/torrent_test.go index ab92429b..cd73b62a 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -3,6 +3,7 @@ package torrent import ( "sync" "testing" + "time" "bitbucket.org/anacrolix/go.torrent/peer_protocol" ) @@ -45,6 +46,7 @@ func TestTorrentRequest(t *testing.T) { func TestTorrentDoubleClose(t *testing.T) { tt, err := newTorrent(InfoHash{}, nil, 0) + tt.pruneTimer = time.NewTimer(0) if err != nil { t.Fatal(err) }