Merge branch 'sqlite-direct'

This commit is contained in:
Matt Joiner 2021-05-14 15:36:35 +10:00
commit 627b75c8b4
22 changed files with 576 additions and 170 deletions

View File

@ -130,7 +130,7 @@ func MustMarshal(v interface{}) []byte {
// Unmarshal the bencode value in the 'data' to a value pointed by the 'v'
// pointer, return a non-nil error if any.
func Unmarshal(data []byte, v interface{}) (err error) {
buf := bytes.NewBuffer(data)
buf := bytes.NewReader(data)
e := Decoder{r: buf}
err = e.Decode(v)
if err == nil && buf.Len() != 0 {

2
go.sum
View File

@ -268,6 +268,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0 h1:zvFSvII5rTbMZ3idAqSUjUCDgZFbWMKzxQot3/Y7nzA=
github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1 h1:j3L6gSLQalDETeEg/Jg0mGY0/y/N6zI2xX1978P0Uqw=

View File

@ -23,14 +23,20 @@ func TestTorrentOffsetRequest(t *testing.T) {
check(13, 5, 13, Request{}, false)
}
func TestIterBitmapsDistinct(t *testing.T) {
var skip, first, second bitmap.Bitmap
skip.Add(1)
first.Add(1, 0, 3)
second.Add(1, 2, 0)
skipCopy := skip.Copy()
assert.Equal(t, []interface{}{0, 3, 2}, iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second)))
assert.Equal(t, []int{1}, skip.ToSortedSlice())
func BenchmarkIterBitmapsDistinct(t *testing.B) {
t.ReportAllocs()
for range iter.N(t.N) {
var skip, first, second bitmap.Bitmap
skip.Add(1)
first.Add(1, 0, 3)
second.Add(1, 2, 0)
skipCopy := skip.Copy()
t.StartTimer()
output := iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second))
t.StopTimer()
assert.Equal(t, []interface{}{0, 3, 2}, output)
assert.Equal(t, []int{1}, skip.ToSortedSlice())
}
}
func TestSpewConnStats(t *testing.T) {

View File

@ -48,7 +48,7 @@ func TestMarshalPexMessage(t *testing.T) {
msg = Message{}
dec := Decoder{
R: bufio.NewReader(bytes.NewBuffer(b)),
R: bufio.NewReader(bytes.NewReader(b)),
MaxLength: 128,
}
pmOut := PexMsg{}

View File

@ -743,12 +743,13 @@ func (cn *PeerConn) updateRequests() {
func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
return func(cb iter.Callback) {
for _, bm := range bms {
bm.Sub(*skip)
if !iter.All(
func(i interface{}) bool {
skip.Add(i.(int))
return cb(i)
},
bitmap.Sub(bm, *skip).Iter,
bm.Iter,
) {
return
}

View File

@ -9,7 +9,7 @@ import (
"github.com/anacrolix/torrent/metainfo"
)
type boltDBPiece struct {
type boltPiece struct {
db *bbolt.DB
p metainfo.Piece
ih metainfo.Hash
@ -17,19 +17,19 @@ type boltDBPiece struct {
}
var (
_ PieceImpl = (*boltDBPiece)(nil)
_ PieceImpl = (*boltPiece)(nil)
dataBucketKey = []byte("data")
)
func (me *boltDBPiece) pc() PieceCompletionGetSetter {
func (me *boltPiece) pc() PieceCompletionGetSetter {
return boltPieceCompletion{me.db}
}
func (me *boltDBPiece) pk() metainfo.PieceKey {
func (me *boltPiece) pk() metainfo.PieceKey {
return metainfo.PieceKey{me.ih, me.p.Index()}
}
func (me *boltDBPiece) Completion() Completion {
func (me *boltPiece) Completion() Completion {
c, err := me.pc().Get(me.pk())
switch err {
case bbolt.ErrDatabaseNotOpen:
@ -41,14 +41,14 @@ func (me *boltDBPiece) Completion() Completion {
return c
}
func (me *boltDBPiece) MarkComplete() error {
func (me *boltPiece) MarkComplete() error {
return me.pc().Set(me.pk(), true)
}
func (me *boltDBPiece) MarkNotComplete() error {
func (me *boltPiece) MarkNotComplete() error {
return me.pc().Set(me.pk(), false)
}
func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
func (me *boltPiece) ReadAt(b []byte, off int64) (n int, err error) {
err = me.db.View(func(tx *bbolt.Tx) error {
db := tx.Bucket(dataBucketKey)
if db == nil {
@ -74,13 +74,13 @@ func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
return
}
func (me *boltDBPiece) chunkKey(index int) (ret [26]byte) {
func (me *boltPiece) chunkKey(index int) (ret [26]byte) {
copy(ret[:], me.key[:])
binary.BigEndian.PutUint16(ret[24:], uint16(index))
return
}
func (me *boltDBPiece) WriteAt(b []byte, off int64) (n int, err error) {
func (me *boltPiece) WriteAt(b []byte, off int64) (n int, err error) {
err = me.db.Update(func(tx *bbolt.Tx) error {
db, err := tx.CreateBucketIfNotExists(dataBucketKey)
if err != nil {

View File

@ -12,17 +12,17 @@ import (
)
const (
// Chosen to match the usual chunk size in a torrent client. This way,
// most chunk writes are to exactly one full item in bbolt DB.
// Chosen to match the usual chunk size in a torrent client. This way, most chunk writes are to
// exactly one full item in bbolt DB.
chunkSize = 1 << 14
)
type boltDBClient struct {
type boltClient struct {
db *bbolt.DB
}
type boltDBTorrent struct {
cl *boltDBClient
type boltTorrent struct {
cl *boltClient
ih metainfo.Hash
}
@ -32,19 +32,19 @@ func NewBoltDB(filePath string) ClientImplCloser {
})
expect.Nil(err)
db.NoSync = true
return &boltDBClient{db}
return &boltClient{db}
}
func (me *boltDBClient) Close() error {
func (me *boltClient) Close() error {
return me.db.Close()
}
func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return &boltDBTorrent{me, infoHash}, nil
func (me *boltClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return &boltTorrent{me, infoHash}, nil
}
func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl {
ret := &boltDBPiece{
func (me *boltTorrent) Piece(p metainfo.Piece) PieceImpl {
ret := &boltPiece{
p: p,
db: me.cl.db,
ih: me.ih,
@ -54,4 +54,4 @@ func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl {
return ret
}
func (boltDBTorrent) Close() error { return nil }
func (boltTorrent) Close() error { return nil }

View File

@ -7,31 +7,27 @@ import (
)
type mapPieceCompletion struct {
mu sync.Mutex
m map[metainfo.PieceKey]bool
m sync.Map
}
var _ PieceCompletion = (*mapPieceCompletion)(nil)
func NewMapPieceCompletion() PieceCompletion {
return &mapPieceCompletion{m: make(map[metainfo.PieceKey]bool)}
return &mapPieceCompletion{}
}
func (*mapPieceCompletion) Close() error { return nil }
func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (c Completion, err error) {
me.mu.Lock()
defer me.mu.Unlock()
c.Complete, c.Ok = me.m[pk]
v, ok := me.m.Load(pk)
if ok {
c.Complete = v.(bool)
}
c.Ok = ok
return
}
func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
me.mu.Lock()
defer me.mu.Unlock()
if me.m == nil {
me.m = make(map[metainfo.PieceKey]bool)
}
me.m[pk] = b
me.m.Store(pk, b)
return nil
}

View File

@ -10,7 +10,7 @@ import (
func BenchmarkMarkComplete(b *testing.B) {
bench := func(b *testing.B, ci storage.ClientImpl) {
test_storage.BenchmarkPieceMarkComplete(
b, ci, test_storage.DefaultPieceSize, test_storage.DefaultNumPieces, test_storage.DefaultCapacity)
b, ci, test_storage.DefaultPieceSize, test_storage.DefaultNumPieces, 0)
}
b.Run("File", func(b *testing.B) {
ci := storage.NewFile(b.TempDir())

View File

@ -21,8 +21,11 @@ type piecePerResource struct {
}
type ResourcePiecesOpts struct {
// After marking a piece complete, don't bother deleting its incomplete blobs.
LeaveIncompleteChunks bool
NoSizedPuts bool
// Sized puts require being able to stream from a statement executed on another connection.
// Without them, we buffer the entire read and then put that.
NoSizedPuts bool
}
func NewResourcePieces(p PieceProvider) ClientImpl {

243
storage/sqlite/direct.go Normal file
View File

@ -0,0 +1,243 @@
package sqliteStorage
import (
"errors"
"runtime"
"sync"
"time"
"crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqlitex"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
)
type NewDirectStorageOpts struct {
NewConnOpts
InitDbOpts
InitConnOpts
GcBlobs bool
CacheBlobs bool
BlobFlushInterval time.Duration
}
// A convenience function that creates a connection pool, resource provider, and a pieces storage
// ClientImpl and returns them all with a Close attached.
func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
conn, err := newConn(opts.NewConnOpts)
if err != nil {
return
}
err = initConn(conn, opts.InitConnOpts)
if err != nil {
conn.Close()
return
}
err = initDatabase(conn, opts.InitDbOpts)
if err != nil {
return
}
cl := &client{
conn: conn,
blobs: make(map[string]*sqlite.Blob),
opts: opts,
}
if opts.BlobFlushInterval != 0 {
cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
}
return cl, nil
}
type client struct {
l sync.Mutex
conn conn
blobs map[string]*sqlite.Blob
blobFlusher *time.Timer
opts NewDirectStorageOpts
closed bool
}
func (c *client) blobFlusherFunc() {
c.l.Lock()
defer c.l.Unlock()
c.flushBlobs()
if !c.closed {
c.blobFlusher.Reset(c.opts.BlobFlushInterval)
}
}
func (c *client) flushBlobs() {
for key, b := range c.blobs {
// Need the lock to prevent racing with the GC finalizers.
b.Close()
delete(c.blobs, key)
}
}
func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
return torrent{c}, nil
}
func (c *client) Close() error {
c.l.Lock()
defer c.l.Unlock()
c.flushBlobs()
c.closed = true
if c.opts.BlobFlushInterval != 0 {
c.blobFlusher.Stop()
}
return c.conn.Close()
}
type torrent struct {
c *client
}
func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
rowid = stmt.ColumnInt64(0)
return nil
}, name)
if err != nil {
return
}
if rowid != 0 {
return
}
err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
if err != nil {
return
}
rowid = c.LastInsertRowID()
return
}
func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
t.c.l.Lock()
defer t.c.l.Unlock()
name := p.Hash().HexString()
return piece{
name,
p.Length(),
t.c,
}
}
func (t torrent) Close() error {
return nil
}
type piece struct {
name string
length int64
*client
}
func (p2 piece) doAtIoWithBlob(
atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
p []byte,
off int64,
) (n int, err error) {
p2.l.Lock()
defer p2.l.Unlock()
if !p2.opts.CacheBlobs {
defer p2.forgetBlob()
}
n, err = atIo(p2.getBlob())(p, off)
if err == nil {
return
}
var se sqlite.Error
if !errors.As(err, &se) {
return
}
if se.Code != sqlite.SQLITE_ABORT && !(p2.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
return
}
p2.forgetBlob()
return atIo(p2.getBlob())(p, off)
}
func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
return blob.ReadAt
}, p, off)
}
func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) {
return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
return blob.WriteAt
}, p, off)
}
func (p2 piece) MarkComplete() error {
p2.l.Lock()
defer p2.l.Unlock()
err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name)
if err != nil {
return err
}
changes := p2.conn.Changes()
if changes != 1 {
panic(changes)
}
return nil
}
func (p2 piece) forgetBlob() {
blob, ok := p2.blobs[p2.name]
if !ok {
return
}
blob.Close()
delete(p2.blobs, p2.name)
}
func (p2 piece) MarkNotComplete() error {
return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name)
}
func (p2 piece) Completion() (ret storage.Completion) {
p2.l.Lock()
defer p2.l.Unlock()
err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error {
ret.Complete = stmt.ColumnInt(0) != 0
return nil
}, p2.name)
ret.Ok = err == nil
if err != nil {
panic(err)
}
return
}
func (p2 piece) closeBlobIfExists() {
if b, ok := p2.blobs[p2.name]; ok {
b.Close()
delete(p2.blobs, p2.name)
}
}
func (p2 piece) getBlob() *sqlite.Blob {
blob, ok := p2.blobs[p2.name]
if !ok {
rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
if err != nil {
panic(err)
}
blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
if err != nil {
panic(err)
}
if p2.opts.GcBlobs {
herp := new(byte)
runtime.SetFinalizer(herp, func(*byte) {
p2.l.Lock()
defer p2.l.Unlock()
blob.Close()
})
}
p2.blobs[p2.name] = blob
}
return blob
}

View File

@ -25,7 +25,22 @@ import (
type conn = *sqlite.Conn
func initConn(conn conn, opts ProviderOpts) error {
type InitConnOpts struct {
SetJournalMode string
MmapSizeOk bool // If false, a package-specific default will be used.
MmapSize int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
}
func (me InitConnOpts) JournalMode() string {
if me.SetJournalMode != "" {
return me.SetJournalMode
}
return "wal"
}
var UnexpectedJournalMode = errors.New("unexpected journal mode")
func initConn(conn conn, opts InitConnOpts) error {
// Recursive triggers are required because we need to trim the blob_meta size after trimming to
// capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
err := sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
@ -36,13 +51,25 @@ func initConn(conn conn, opts ProviderOpts) error {
if err != nil {
return err
}
if opts.NoConcurrentBlobReads {
err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil)
if opts.SetJournalMode != "" {
err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
ret := stmt.ColumnText(0)
if ret != opts.SetJournalMode {
return UnexpectedJournalMode
}
return nil
})
if err != nil {
return err
}
}
if opts.MmapSizeOk {
if !opts.MmapSizeOk {
// Set the default. Currently it seems the library picks reasonable defaults, especially for
// wal.
opts.MmapSize = -1
//opts.MmapSize = 1 << 24 // 8 MiB
}
if opts.MmapSize >= 0 {
err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
if err != nil {
return err
@ -68,6 +95,7 @@ func InitSchema(conn conn, pageSize int, triggers bool) error {
name text,
last_used timestamp default (datetime('now')),
data blob,
verified bool,
primary key (name)
);
@ -151,6 +179,7 @@ func InitSchema(conn conn, pageSize int, triggers bool) error {
type NewPiecesStorageOpts struct {
NewPoolOpts
InitDbOpts
ProvOpts func(*ProviderOpts)
StorageOpts func(*storage.ResourcePiecesOpts)
}
@ -158,10 +187,17 @@ type NewPiecesStorageOpts struct {
// A convenience function that creates a connection pool, resource provider, and a pieces storage
// ClientImpl and returns them all with a Close attached.
func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) {
conns, provOpts, err := NewPool(opts.NewPoolOpts)
conns, err := NewPool(opts.NewPoolOpts)
if err != nil {
return
}
err = initPoolDatabase(conns, opts.InitDbOpts)
if err != nil {
return
}
provOpts := ProviderOpts{
BatchWrites: conns.NumConns() > 1,
}
if f := opts.ProvOpts; f != nil {
f(&provOpts)
}
@ -170,8 +206,27 @@ func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, er
conns.Close()
return
}
var (
journalMode string
)
withPoolConn(conns, func(c conn) {
err = sqlitex.Exec(c, "pragma journal_mode", func(stmt *sqlite.Stmt) error {
journalMode = stmt.ColumnText(0)
return nil
})
})
if err != nil {
err = fmt.Errorf("getting journal mode: %w", err)
prov.Close()
return
}
if journalMode == "" {
err = errors.New("didn't get journal mode")
prov.Close()
return
}
storageOpts := storage.ResourcePiecesOpts{
NoSizedPuts: provOpts.NoConcurrentBlobReads,
NoSizedPuts: journalMode != "wal" || conns.NumConns() == 1,
}
if f := opts.StorageOpts; f != nil {
f(&storageOpts)
@ -187,30 +242,25 @@ func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, er
}
type NewPoolOpts struct {
// See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
// private, temporary on-disk database will be created. This private database will be
// automatically deleted as soon as the database connection is closed."
Path string
Memory bool
NewConnOpts
InitConnOpts
NumConns int
// Forces WAL, disables shared caching.
NoConcurrentBlobReads bool
DontInitSchema bool
PageSize int
}
type InitDbOpts struct {
DontInitSchema bool
PageSize int
// If non-zero, overrides the existing setting.
Capacity int64
Capacity int64
NoTriggers bool
}
// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
// top-level option type.
type ProviderOpts struct {
NumConns int
// Concurrent blob reads require WAL.
NoConcurrentBlobReads bool
BatchWrites bool
MmapSize int64
MmapSizeOk bool
type PoolConf struct {
NumConns int
JournalMode string
}
// Remove any capacity limits.
@ -223,10 +273,18 @@ func SetCapacity(conn conn, cap int64) error {
return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
}
func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
if opts.NumConns == 0 {
opts.NumConns = runtime.NumCPU()
}
type NewConnOpts struct {
// See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
// private, temporary on-disk database will be created. This private database will be
// automatically deleted as soon as the database connection is closed."
Path string
Memory bool
// Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
// and NumConns < 2.
NoConcurrentBlobReads bool
}
func newOpenUri(opts NewConnOpts) string {
path := url.PathEscape(opts.Path)
if opts.Memory {
path = ":memory:"
@ -235,31 +293,18 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
if opts.NoConcurrentBlobReads || opts.Memory {
values.Add("cache", "shared")
}
uri := fmt.Sprintf("file:%s?%s", path, values.Encode())
conns, err := func() (ConnPool, error) {
switch opts.NumConns {
case 1:
conn, err := sqlite.OpenConn(uri, 0)
return &poolFromConn{conn: conn}, err
default:
return sqlitex.Open(uri, 0, opts.NumConns)
}
}()
if err != nil {
return
}
defer func() {
if err != nil {
conns.Close()
}
}()
conn := conns.Get(context.TODO())
defer conns.Put(conn)
return fmt.Sprintf("file:%s?%s", path, values.Encode())
}
func initDatabase(conn conn, opts InitDbOpts) (err error) {
if !opts.DontInitSchema {
if opts.PageSize == 0 {
opts.PageSize = 1 << 14
// There doesn't seem to be an optimal size. I did try with the standard chunk size, but
// the difference is not convincing.
//opts.PageSize = 1 << 14
}
err = InitSchema(conn, opts.PageSize, true)
err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
if err != nil {
return
}
@ -270,13 +315,55 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
return
}
}
return conns, ProviderOpts{
NumConns: opts.NumConns,
NoConcurrentBlobReads: opts.NoConcurrentBlobReads || opts.Memory || opts.NumConns == 1,
BatchWrites: opts.NumConns > 1,
MmapSize: 1 << 23, // 8 MiB
MmapSizeOk: true,
}, nil
return
}
func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
withPoolConn(pool, func(c conn) {
err = initDatabase(c, opts)
})
return
}
func newConn(opts NewConnOpts) (conn, error) {
return sqlite.OpenConn(newOpenUri(opts), 0)
}
type poolWithNumConns struct {
*sqlitex.Pool
numConns int
}
func (me poolWithNumConns) NumConns() int {
return me.numConns
}
func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
if opts.NumConns == 0 {
opts.NumConns = runtime.NumCPU()
}
conns, err := func() (ConnPool, error) {
switch opts.NumConns {
case 1:
conn, err := newConn(opts.NewConnOpts)
return &poolFromConn{conn: conn}, err
default:
_pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), 0, opts.NumConns)
return poolWithNumConns{_pool, opts.NumConns}, err
}
}()
if err != nil {
return
}
defer func() {
if err != nil {
conns.Close()
}
}()
return conns, initPoolConns(nil, conns, InitPoolOpts{
NumConns: opts.NumConns,
InitConnOpts: opts.InitConnOpts,
})
}
// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
@ -301,20 +388,17 @@ func (me *poolFromConn) Close() error {
return me.conn.Close()
}
func (poolFromConn) NumConns() int { return 1 }
type ProviderOpts struct {
BatchWrites bool
}
// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
// the ConnPool (since it has to initialize all the connections anyway).
func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
_, err = initPoolConns(context.TODO(), pool, opts)
if err != nil {
err = fmt.Errorf("initing pool conns: %w", err)
return
}
prov := &provider{pool: pool, opts: opts}
if opts.BatchWrites {
if opts.NumConns < 2 {
err = errors.New("batch writes requires more than 1 conn")
return
}
writes := make(chan writeRequest)
prov.writes = writes
// This is retained for backwards compatibility. It may not be necessary.
@ -326,7 +410,12 @@ func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
return prov, nil
}
func initPoolConns(ctx context.Context, pool ConnPool, opts ProviderOpts) (numInited int, err error) {
type InitPoolOpts struct {
NumConns int
InitConnOpts
}
func initPoolConns(ctx context.Context, pool ConnPool, opts InitPoolOpts) (err error) {
var conns []conn
defer func() {
for _, c := range conns {
@ -339,12 +428,11 @@ func initPoolConns(ctx context.Context, pool ConnPool, opts ProviderOpts) (numIn
break
}
conns = append(conns, conn)
err = initConn(conn, opts)
err = initConn(conn, opts.InitConnOpts)
if err != nil {
err = fmt.Errorf("initing conn %v: %w", len(conns), err)
return
}
numInited++
}
return
}
@ -353,6 +441,13 @@ type ConnPool interface {
Get(context.Context) conn
Put(conn)
Close() error
NumConns() int
}
func withPoolConn(pool ConnPool, with func(conn)) {
c := pool.Get(nil)
defer pool.Put(c)
with(c)
}
type provider struct {

View File

@ -2,15 +2,19 @@ package sqliteStorage
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"path/filepath"
"sync"
"testing"
"time"
_ "github.com/anacrolix/envpprof"
"github.com/anacrolix/torrent/storage"
test_storage "github.com/anacrolix/torrent/storage/test"
"github.com/dustin/go-humanize"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -18,14 +22,15 @@ import (
func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
conns, provOpts, err := NewPool(opts)
require.NoError(t, err)
pool, err := NewPool(opts)
qt.Assert(t, err, qt.IsNil)
// sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now.
//t.Cleanup(func() { conns.Close() })
prov, err := NewProvider(conns, provOpts)
//t.Cleanup(func() { pool.Close() })
qt.Assert(t, initPoolDatabase(pool, InitDbOpts{}), qt.IsNil)
prov, err := NewProvider(pool, ProviderOpts{BatchWrites: pool.NumConns() > 1})
require.NoError(t, err)
t.Cleanup(func() { prov.Close() })
return conns, prov
return pool, prov
}
func TestTextBlobSize(t *testing.T) {
@ -68,31 +73,76 @@ func TestSimultaneousIncrementalBlob(t *testing.T) {
func BenchmarkMarkComplete(b *testing.B) {
const pieceSize = test_storage.DefaultPieceSize
const capacity = test_storage.DefaultCapacity
const noTriggers = false
var capacity int64 = test_storage.DefaultNumPieces * pieceSize / 2
if noTriggers {
// Since we won't push out old pieces, we have to mark them incomplete manually.
capacity = 0
}
runBench := func(b *testing.B, ci storage.ClientImpl) {
test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
}
c := qt.New(b)
for _, memory := range []bool{false, true} {
b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
for _, batchWrites := range []bool{false, true} {
b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
dbPath := filepath.Join(b.TempDir(), "storage.db")
//b.Logf("storage db path: %q", dbPath)
ci, err := NewPiecesStorage(NewPiecesStorageOpts{
NewPoolOpts: NewPoolOpts{
Path: dbPath,
Capacity: 4*pieceSize - 1,
NoConcurrentBlobReads: false,
PageSize: 1 << 14,
Memory: memory,
},
ProvOpts: func(opts *ProviderOpts) {
opts.BatchWrites = batchWrites
},
})
b.Run("Direct", func(b *testing.B) {
var opts NewDirectStorageOpts
opts.Memory = memory
opts.Path = filepath.Join(b.TempDir(), "storage.db")
opts.Capacity = capacity
opts.CacheBlobs = true
//opts.GcBlobs = true
opts.BlobFlushInterval = time.Second
opts.NoTriggers = noTriggers
directBench := func(b *testing.B) {
ci, err := NewDirectStorage(opts)
if errors.Is(err, UnexpectedJournalMode) {
b.Skipf("setting journal mode %q: %v", opts.SetJournalMode, err)
}
c.Assert(err, qt.IsNil)
defer ci.Close()
test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
})
}
runBench(b, ci)
}
for _, journalMode := range []string{"", "wal", "off", "truncate", "delete", "persist", "memory"} {
opts.SetJournalMode = journalMode
b.Run("JournalMode="+journalMode, func(b *testing.B) {
for _, mmapSize := range []int64{-1, 0, 1 << 23, 1 << 24, 1 << 25} {
if memory && mmapSize >= 0 {
continue
}
b.Run(fmt.Sprintf("MmapSize=%s", func() string {
if mmapSize < 0 {
return "default"
} else {
return humanize.IBytes(uint64(mmapSize))
}
}()), func(b *testing.B) {
opts.MmapSize = mmapSize
opts.MmapSizeOk = true
directBench(b)
})
}
})
}
})
b.Run("ResourcePieces", func(b *testing.B) {
for _, batchWrites := range []bool{false, true} {
b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
var opts NewPiecesStorageOpts
opts.Path = filepath.Join(b.TempDir(), "storage.db")
//b.Logf("storage db path: %q", dbPath)
opts.Capacity = capacity
opts.Memory = memory
opts.ProvOpts = func(opts *ProviderOpts) {
opts.BatchWrites = batchWrites
}
ci, err := NewPiecesStorage(opts)
c.Assert(err, qt.IsNil)
defer ci.Close()
runBench(b, ci)
})
}
})
})
}
}

View File

@ -2,8 +2,6 @@ package test_storage
import (
"bytes"
"io"
"io/ioutil"
"math/rand"
"sync"
"testing"
@ -17,15 +15,16 @@ import (
const (
ChunkSize = 1 << 14
DefaultPieceSize = 2 << 20
DefaultCapacity = 0
DefaultNumPieces = 16
)
func BenchmarkPieceMarkComplete(
b *testing.B, ci storage.ClientImpl,
pieceSize int64, numPieces int, capacity int64,
pieceSize int64, numPieces int,
// This drives any special handling around capacity that may be configured into the storage
// implementation.
capacity int64,
) {
const check = true
c := qt.New(b)
info := &metainfo.Info{
Pieces: make([]byte, numPieces*metainfo.HashSize),
@ -35,16 +34,17 @@ func BenchmarkPieceMarkComplete(
}
ti, err := ci.OpenTorrent(info, metainfo.Hash{})
c.Assert(err, qt.IsNil)
defer ti.Close()
tw := storage.Torrent{ti}
defer tw.Close()
rand.Read(info.Pieces)
data := make([]byte, pieceSize)
readData := make([]byte, pieceSize)
b.SetBytes(int64(numPieces) * pieceSize)
oneIter := func() {
for pieceIndex := range iter.N(numPieces) {
pi := ti.Piece(info.Piece(pieceIndex))
if check {
rand.Read(data)
}
pi := tw.Piece(info.Piece(pieceIndex))
rand.Read(data)
b.StartTimer()
var wg sync.WaitGroup
for off := int64(0); off < int64(len(data)); off += ChunkSize {
wg.Add(1)
@ -67,12 +67,11 @@ func BenchmarkPieceMarkComplete(
c.Assert(pi.Completion(), qt.Equals, storage.Completion{Complete: false, Ok: true})
c.Assert(pi.MarkComplete(), qt.IsNil)
c.Assert(pi.Completion(), qt.Equals, storage.Completion{true, true})
if check {
readData, err := ioutil.ReadAll(io.NewSectionReader(pi, 0, int64(len(data))))
c.Assert(err, qt.IsNil)
c.Assert(len(readData), qt.Equals, len(data))
c.Assert(bytes.Equal(readData, data), qt.IsTrue)
}
n, err := pi.WriteTo(bytes.NewBuffer(readData[:0]))
b.StopTimer()
c.Assert(err, qt.IsNil)
c.Assert(n, qt.Equals, int64(len(data)))
c.Assert(bytes.Equal(readData[:n], data), qt.IsTrue)
}
}
// Fill the cache

View File

@ -1,5 +1,11 @@
package test
import (
"log"
_ "github.com/anacrolix/envpprof"
)
func init() {
log.SetFlags(log.Flags() | log.Lshortfile)
}

View File

@ -110,6 +110,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter
}
cfg.Seed = false
//cfg.Debug = true
if ps.ConfigureLeecher.Config != nil {
ps.ConfigureLeecher.Config(cfg)
}
@ -311,13 +312,10 @@ type leecherStorageTestCase struct {
func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase {
return leecherStorageTestCase{
fmt.Sprintf("SqliteFile,NumConns=%v", numConns),
sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
return sqliteStorage.NewPiecesStorageOpts{
NewPoolOpts: sqliteStorage.NewPoolOpts{
Path: filepath.Join(dataDir, "sqlite.db"),
NumConns: numConns,
},
}
sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) {
opts.Path = filepath.Join(dataDir, "sqlite.db")
opts.NumConns = numConns
return
}),
numConns,
}
@ -330,16 +328,23 @@ func TestClientTransferVarious(t *testing.T) {
Wrapper: fileCachePieceResourceStorage,
}), 0},
{"Boltdb", storage.NewBoltDB, 0},
{"SqliteDirect", func(s string) storage.ClientImplCloser {
path := filepath.Join(s, "sqlite3.db")
var opts sqliteStorage.NewDirectStorageOpts
opts.Path = path
cl, err := sqliteStorage.NewDirectStorage(opts)
if err != nil {
panic(err)
}
return cl
}, 0},
sqliteLeecherStorageTestCase(1),
sqliteLeecherStorageTestCase(2),
// This should use a number of connections equal to the number of CPUs
sqliteLeecherStorageTestCase(0),
{"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts {
return sqliteStorage.NewPiecesStorageOpts{
NewPoolOpts: sqliteStorage.NewPoolOpts{
Memory: true,
},
}
{"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) {
opts.Memory = true
return
}), 0},
} {
t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
@ -362,7 +367,7 @@ func TestClientTransferVarious(t *testing.T) {
GOMAXPROCS: ls.gomaxprocs,
})
})
for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} {
for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} {
t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) {
testClientTransfer(t, testClientTransferParams{
SeederStorage: ss.f,