From 5c5a26afedee70dc16adbd3e00bcec239f289a73 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Tue, 25 Oct 2016 19:07:26 +1100 Subject: [PATCH] Add bolt piece completion DB This means it can be persistent without needing cgo. Fixes issues #115 and #124. --- storage/boltPieceCompletion.go | 74 +++++++++++++++++++++ storage/boltdb.go | 5 +- storage/boltpc_test.go | 40 +++++++++++ storage/completion.go | 5 +- storage/completion_piece_map.go | 2 +- storage/file.go | 9 ++- storage/mmap.go | 8 ++- storage/piece_file.go | 2 + storage/{db.go => sqlitePieceCompletion.go} | 12 ++-- storage/storage_test.go | 5 ++ 10 files changed, 149 insertions(+), 13 deletions(-) create mode 100644 storage/boltPieceCompletion.go create mode 100644 storage/boltpc_test.go rename storage/{db.go => sqlitePieceCompletion.go} (70%) create mode 100644 storage/storage_test.go diff --git a/storage/boltPieceCompletion.go b/storage/boltPieceCompletion.go new file mode 100644 index 00000000..795b0888 --- /dev/null +++ b/storage/boltPieceCompletion.go @@ -0,0 +1,74 @@ +package storage + +import ( + "encoding/binary" + "path/filepath" + "time" + + "github.com/boltdb/bolt" + _ "github.com/mattn/go-sqlite3" + + "github.com/anacrolix/torrent/metainfo" +) + +var ( + value = []byte{} +) + +type boltPieceCompletion struct { + db *bolt.DB +} + +func newBoltPieceCompletion(dir string) (ret *boltPieceCompletion, err error) { + p := filepath.Join(dir, ".torrent.bolt.db") + db, err := bolt.Open(p, 0660, &bolt.Options{ + Timeout: time.Second, + }) + if err != nil { + return + } + ret = &boltPieceCompletion{db} + return +} + +func (me *boltPieceCompletion) Get(pk metainfo.PieceKey) (ret bool, err error) { + err = me.db.View(func(tx *bolt.Tx) error { + c := tx.Bucket(completed) + if c == nil { + return nil + } + ih := c.Bucket(pk.InfoHash[:]) + if ih == nil { + return nil + } + var key [4]byte + binary.BigEndian.PutUint32(key[:], uint32(pk.Index)) + ret = ih.Get(key[:]) != nil + return nil + }) + return +} + +func (me *boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error { + return me.db.Update(func(tx *bolt.Tx) error { + c, err := tx.CreateBucketIfNotExists(completed) + if err != nil { + return err + } + ih, err := c.CreateBucketIfNotExists(pk.InfoHash[:]) + if err != nil { + return err + } + var key [4]byte + binary.BigEndian.PutUint32(key[:], uint32(pk.Index)) + if b { + return ih.Put(key[:], value) + } else { + return ih.Delete(key[:]) + } + }) +} + +func (me *boltPieceCompletion) Close() error { + return me.db.Close() +} diff --git a/storage/boltdb.go b/storage/boltdb.go index 140dc234..09562ce8 100644 --- a/storage/boltdb.go +++ b/storage/boltdb.go @@ -26,7 +26,6 @@ var ( ) type boltDBClient struct { - // TODO: This is never closed. db *bolt.DB } @@ -51,6 +50,10 @@ func NewBoltDB(filePath string) ClientImpl { return ret } +func (me *boltDBClient) Close() error { + return me.db.Close() +} + func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { return &boltDBTorrent{me, infoHash}, nil } diff --git a/storage/boltpc_test.go b/storage/boltpc_test.go new file mode 100644 index 00000000..e1be2579 --- /dev/null +++ b/storage/boltpc_test.go @@ -0,0 +1,40 @@ +package storage + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/anacrolix/torrent/metainfo" +) + +func TestBoltPieceCompletion(t *testing.T) { + td, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(td) + + pc, err := newBoltPieceCompletion(td) + require.NoError(t, err) + defer pc.Close() + + pk := metainfo.PieceKey{} + + b, err := pc.Get(pk) + require.NoError(t, err) + assert.False(t, b) + + require.NoError(t, pc.Set(pk, false)) + + b, err = pc.Get(pk) + require.NoError(t, err) + assert.False(t, b) + + require.NoError(t, pc.Set(pk, true)) + + b, err = pc.Get(pk) + require.NoError(t, err) + assert.True(t, b) +} diff --git a/storage/completion.go b/storage/completion.go index c3047be2..15fcc249 100644 --- a/storage/completion.go +++ b/storage/completion.go @@ -6,14 +6,15 @@ import ( "github.com/anacrolix/torrent/metainfo" ) +// Implementations track the completion of pieces. type pieceCompletion interface { Get(metainfo.PieceKey) (bool, error) Set(metainfo.PieceKey, bool) error - Close() + Close() error } func pieceCompletionForDir(dir string) (ret pieceCompletion) { - ret, err := newDBPieceCompletion(dir) + ret, err := newBoltPieceCompletion(dir) if err != nil { log.Printf("couldn't open piece completion db in %q: %s", dir, err) ret = new(mapPieceCompletion) diff --git a/storage/completion_piece_map.go b/storage/completion_piece_map.go index 26d893ce..cca13245 100644 --- a/storage/completion_piece_map.go +++ b/storage/completion_piece_map.go @@ -8,7 +8,7 @@ type mapPieceCompletion struct { m map[metainfo.PieceKey]struct{} } -func (mapPieceCompletion) Close() {} +func (mapPieceCompletion) Close() error { return nil } func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (bool, error) { _, ok := me.m[pk] diff --git a/storage/file.go b/storage/file.go index 55603bdd..19e54756 100644 --- a/storage/file.go +++ b/storage/file.go @@ -14,14 +14,20 @@ import ( // torrent. type fileClientImpl struct { baseDir string + pc pieceCompletion } func NewFile(baseDir string) ClientImpl { return &fileClientImpl{ baseDir: baseDir, + pc: pieceCompletionForDir(baseDir), } } +func (me *fileClientImpl) Close() error { + return me.pc.Close() +} + func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { err := CreateNativeZeroLengthFiles(info, fs.baseDir) if err != nil { @@ -31,7 +37,7 @@ func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Has fs, info, infoHash, - pieceCompletionForDir(fs.baseDir), + fs.pc, }, nil } @@ -56,7 +62,6 @@ func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl { } func (fs *fileTorrentImpl) Close() error { - fs.completion.Close() return nil } diff --git a/storage/mmap.go b/storage/mmap.go index c8b4c2ea..8cc5a7d5 100644 --- a/storage/mmap.go +++ b/storage/mmap.go @@ -15,11 +15,13 @@ import ( type mmapStorage struct { baseDir string + pc pieceCompletion } func NewMMap(baseDir string) ClientImpl { return &mmapStorage{ baseDir: baseDir, + pc: pieceCompletionForDir(baseDir), } } @@ -27,11 +29,15 @@ func (s *mmapStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) ( span, err := mMapTorrent(info, s.baseDir) t = &mmapTorrentStorage{ span: span, - pc: pieceCompletionForDir(s.baseDir), + pc: s.pc, } return } +func (s *mmapStorage) Close() error { + return s.pc.Close() +} + type mmapTorrentStorage struct { span mmap_span.MMapSpan pc pieceCompletion diff --git a/storage/piece_file.go b/storage/piece_file.go index 77db0496..2760751c 100644 --- a/storage/piece_file.go +++ b/storage/piece_file.go @@ -20,6 +20,8 @@ func NewFileStorePieces(fs missinggo.FileStore) ClientImpl { } } +func (pieceFileStorage) Close() error { return nil } + type pieceFileTorrentStorage struct { s *pieceFileStorage } diff --git a/storage/db.go b/storage/sqlitePieceCompletion.go similarity index 70% rename from storage/db.go rename to storage/sqlitePieceCompletion.go index cc7f8d09..3f6ef67b 100644 --- a/storage/db.go +++ b/storage/sqlitePieceCompletion.go @@ -9,11 +9,11 @@ import ( "github.com/anacrolix/torrent/metainfo" ) -type dbPieceCompletion struct { +type sqlitePieceCompletion struct { db *sql.DB } -func newDBPieceCompletion(dir string) (ret *dbPieceCompletion, err error) { +func newSqlitePieceCompletion(dir string) (ret *sqlitePieceCompletion, err error) { p := filepath.Join(dir, ".torrent.db") db, err := sql.Open("sqlite3", p) if err != nil { @@ -24,17 +24,17 @@ func newDBPieceCompletion(dir string) (ret *dbPieceCompletion, err error) { db.Close() return } - ret = &dbPieceCompletion{db} + ret = &sqlitePieceCompletion{db} return } -func (me *dbPieceCompletion) Get(pk metainfo.PieceKey) (ret bool, err error) { +func (me *sqlitePieceCompletion) Get(pk metainfo.PieceKey) (ret bool, err error) { row := me.db.QueryRow(`select exists(select * from completed where infohash=? and "index"=?)`, pk.InfoHash.HexString(), pk.Index) err = row.Scan(&ret) return } -func (me *dbPieceCompletion) Set(pk metainfo.PieceKey, b bool) (err error) { +func (me *sqlitePieceCompletion) Set(pk metainfo.PieceKey, b bool) (err error) { if b { _, err = me.db.Exec(`insert into completed (infohash, "index") values (?, ?)`, pk.InfoHash.HexString(), pk.Index) } else { @@ -43,6 +43,6 @@ func (me *dbPieceCompletion) Set(pk metainfo.PieceKey, b bool) (err error) { return } -func (me *dbPieceCompletion) Close() { +func (me *sqlitePieceCompletion) Close() { me.db.Close() } diff --git a/storage/storage_test.go b/storage/storage_test.go new file mode 100644 index 00000000..8eee1608 --- /dev/null +++ b/storage/storage_test.go @@ -0,0 +1,5 @@ +package storage + +import ( + _ "github.com/anacrolix/envpprof" +)