Add bolt piece completion DB
This means it can be persistent without needing cgo. Fixes issues #115 and #124.
This commit is contained in:
parent
1725133111
commit
5c5a26afed
|
@ -0,0 +1,74 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
)
|
||||
|
||||
var (
|
||||
value = []byte{}
|
||||
)
|
||||
|
||||
type boltPieceCompletion struct {
|
||||
db *bolt.DB
|
||||
}
|
||||
|
||||
func newBoltPieceCompletion(dir string) (ret *boltPieceCompletion, err error) {
|
||||
p := filepath.Join(dir, ".torrent.bolt.db")
|
||||
db, err := bolt.Open(p, 0660, &bolt.Options{
|
||||
Timeout: time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ret = &boltPieceCompletion{db}
|
||||
return
|
||||
}
|
||||
|
||||
func (me *boltPieceCompletion) Get(pk metainfo.PieceKey) (ret bool, err error) {
|
||||
err = me.db.View(func(tx *bolt.Tx) error {
|
||||
c := tx.Bucket(completed)
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
ih := c.Bucket(pk.InfoHash[:])
|
||||
if ih == nil {
|
||||
return nil
|
||||
}
|
||||
var key [4]byte
|
||||
binary.BigEndian.PutUint32(key[:], uint32(pk.Index))
|
||||
ret = ih.Get(key[:]) != nil
|
||||
return nil
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (me *boltPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
|
||||
return me.db.Update(func(tx *bolt.Tx) error {
|
||||
c, err := tx.CreateBucketIfNotExists(completed)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ih, err := c.CreateBucketIfNotExists(pk.InfoHash[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var key [4]byte
|
||||
binary.BigEndian.PutUint32(key[:], uint32(pk.Index))
|
||||
if b {
|
||||
return ih.Put(key[:], value)
|
||||
} else {
|
||||
return ih.Delete(key[:])
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (me *boltPieceCompletion) Close() error {
|
||||
return me.db.Close()
|
||||
}
|
|
@ -26,7 +26,6 @@ var (
|
|||
)
|
||||
|
||||
type boltDBClient struct {
|
||||
// TODO: This is never closed.
|
||||
db *bolt.DB
|
||||
}
|
||||
|
||||
|
@ -51,6 +50,10 @@ func NewBoltDB(filePath string) ClientImpl {
|
|||
return ret
|
||||
}
|
||||
|
||||
func (me *boltDBClient) Close() error {
|
||||
return me.db.Close()
|
||||
}
|
||||
|
||||
func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
|
||||
return &boltDBTorrent{me, infoHash}, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
)
|
||||
|
||||
func TestBoltPieceCompletion(t *testing.T) {
|
||||
td, err := ioutil.TempDir("", "")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(td)
|
||||
|
||||
pc, err := newBoltPieceCompletion(td)
|
||||
require.NoError(t, err)
|
||||
defer pc.Close()
|
||||
|
||||
pk := metainfo.PieceKey{}
|
||||
|
||||
b, err := pc.Get(pk)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, b)
|
||||
|
||||
require.NoError(t, pc.Set(pk, false))
|
||||
|
||||
b, err = pc.Get(pk)
|
||||
require.NoError(t, err)
|
||||
assert.False(t, b)
|
||||
|
||||
require.NoError(t, pc.Set(pk, true))
|
||||
|
||||
b, err = pc.Get(pk)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, b)
|
||||
}
|
|
@ -6,14 +6,15 @@ import (
|
|||
"github.com/anacrolix/torrent/metainfo"
|
||||
)
|
||||
|
||||
// Implementations track the completion of pieces.
|
||||
type pieceCompletion interface {
|
||||
Get(metainfo.PieceKey) (bool, error)
|
||||
Set(metainfo.PieceKey, bool) error
|
||||
Close()
|
||||
Close() error
|
||||
}
|
||||
|
||||
func pieceCompletionForDir(dir string) (ret pieceCompletion) {
|
||||
ret, err := newDBPieceCompletion(dir)
|
||||
ret, err := newBoltPieceCompletion(dir)
|
||||
if err != nil {
|
||||
log.Printf("couldn't open piece completion db in %q: %s", dir, err)
|
||||
ret = new(mapPieceCompletion)
|
||||
|
|
|
@ -8,7 +8,7 @@ type mapPieceCompletion struct {
|
|||
m map[metainfo.PieceKey]struct{}
|
||||
}
|
||||
|
||||
func (mapPieceCompletion) Close() {}
|
||||
func (mapPieceCompletion) Close() error { return nil }
|
||||
|
||||
func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (bool, error) {
|
||||
_, ok := me.m[pk]
|
||||
|
|
|
@ -14,14 +14,20 @@ import (
|
|||
// torrent.
|
||||
type fileClientImpl struct {
|
||||
baseDir string
|
||||
pc pieceCompletion
|
||||
}
|
||||
|
||||
func NewFile(baseDir string) ClientImpl {
|
||||
return &fileClientImpl{
|
||||
baseDir: baseDir,
|
||||
pc: pieceCompletionForDir(baseDir),
|
||||
}
|
||||
}
|
||||
|
||||
func (me *fileClientImpl) Close() error {
|
||||
return me.pc.Close()
|
||||
}
|
||||
|
||||
func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
|
||||
err := CreateNativeZeroLengthFiles(info, fs.baseDir)
|
||||
if err != nil {
|
||||
|
@ -31,7 +37,7 @@ func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Has
|
|||
fs,
|
||||
info,
|
||||
infoHash,
|
||||
pieceCompletionForDir(fs.baseDir),
|
||||
fs.pc,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -56,7 +62,6 @@ func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
|
|||
}
|
||||
|
||||
func (fs *fileTorrentImpl) Close() error {
|
||||
fs.completion.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -15,11 +15,13 @@ import (
|
|||
|
||||
type mmapStorage struct {
|
||||
baseDir string
|
||||
pc pieceCompletion
|
||||
}
|
||||
|
||||
func NewMMap(baseDir string) ClientImpl {
|
||||
return &mmapStorage{
|
||||
baseDir: baseDir,
|
||||
pc: pieceCompletionForDir(baseDir),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,11 +29,15 @@ func (s *mmapStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (
|
|||
span, err := mMapTorrent(info, s.baseDir)
|
||||
t = &mmapTorrentStorage{
|
||||
span: span,
|
||||
pc: pieceCompletionForDir(s.baseDir),
|
||||
pc: s.pc,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *mmapStorage) Close() error {
|
||||
return s.pc.Close()
|
||||
}
|
||||
|
||||
type mmapTorrentStorage struct {
|
||||
span mmap_span.MMapSpan
|
||||
pc pieceCompletion
|
||||
|
|
|
@ -20,6 +20,8 @@ func NewFileStorePieces(fs missinggo.FileStore) ClientImpl {
|
|||
}
|
||||
}
|
||||
|
||||
func (pieceFileStorage) Close() error { return nil }
|
||||
|
||||
type pieceFileTorrentStorage struct {
|
||||
s *pieceFileStorage
|
||||
}
|
||||
|
|
|
@ -9,11 +9,11 @@ import (
|
|||
"github.com/anacrolix/torrent/metainfo"
|
||||
)
|
||||
|
||||
type dbPieceCompletion struct {
|
||||
type sqlitePieceCompletion struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func newDBPieceCompletion(dir string) (ret *dbPieceCompletion, err error) {
|
||||
func newSqlitePieceCompletion(dir string) (ret *sqlitePieceCompletion, err error) {
|
||||
p := filepath.Join(dir, ".torrent.db")
|
||||
db, err := sql.Open("sqlite3", p)
|
||||
if err != nil {
|
||||
|
@ -24,17 +24,17 @@ func newDBPieceCompletion(dir string) (ret *dbPieceCompletion, err error) {
|
|||
db.Close()
|
||||
return
|
||||
}
|
||||
ret = &dbPieceCompletion{db}
|
||||
ret = &sqlitePieceCompletion{db}
|
||||
return
|
||||
}
|
||||
|
||||
func (me *dbPieceCompletion) Get(pk metainfo.PieceKey) (ret bool, err error) {
|
||||
func (me *sqlitePieceCompletion) Get(pk metainfo.PieceKey) (ret bool, err error) {
|
||||
row := me.db.QueryRow(`select exists(select * from completed where infohash=? and "index"=?)`, pk.InfoHash.HexString(), pk.Index)
|
||||
err = row.Scan(&ret)
|
||||
return
|
||||
}
|
||||
|
||||
func (me *dbPieceCompletion) Set(pk metainfo.PieceKey, b bool) (err error) {
|
||||
func (me *sqlitePieceCompletion) Set(pk metainfo.PieceKey, b bool) (err error) {
|
||||
if b {
|
||||
_, err = me.db.Exec(`insert into completed (infohash, "index") values (?, ?)`, pk.InfoHash.HexString(), pk.Index)
|
||||
} else {
|
||||
|
@ -43,6 +43,6 @@ func (me *dbPieceCompletion) Set(pk metainfo.PieceKey, b bool) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (me *dbPieceCompletion) Close() {
|
||||
func (me *sqlitePieceCompletion) Close() {
|
||||
me.db.Close()
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
_ "github.com/anacrolix/envpprof"
|
||||
)
|
Loading…
Reference in New Issue