sqlite storage: Provide helpers and reasonable defaults
This commit is contained in:
parent
c28e9aaeae
commit
b75ebbf9e7
|
@ -7,7 +7,9 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/url"
|
||||
"os"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -15,6 +17,7 @@ import (
|
|||
"crawshaw.io/sqlite/sqlitex"
|
||||
"github.com/anacrolix/missinggo/iter"
|
||||
"github.com/anacrolix/missinggo/v2/resource"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
)
|
||||
|
||||
type conn = *sqlite.Conn
|
||||
|
@ -77,7 +80,75 @@ end;
|
|||
`)
|
||||
}
|
||||
|
||||
// Emulates a pool from a single Conn.
|
||||
// A convenience function that creates a connection pool, resource provider, and a pieces storage
|
||||
// ClientImpl and returns them all with a Close attached.
|
||||
func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) {
|
||||
conns, provOpts, err := NewPool(opts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
prov, err := NewProvider(conns, provOpts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
store := storage.NewResourcePieces(prov)
|
||||
return struct {
|
||||
storage.ClientImpl
|
||||
io.Closer
|
||||
}{
|
||||
store,
|
||||
conns,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type NewPoolOpts struct {
|
||||
Path string
|
||||
Memory bool
|
||||
NumConns int
|
||||
// Forces WAL, disables shared caching.
|
||||
ConcurrentBlobReads bool
|
||||
}
|
||||
|
||||
// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
|
||||
// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
|
||||
// top-level option type.
|
||||
type ProviderOpts struct {
|
||||
NumConns int
|
||||
// Concurrent blob reads require WAL.
|
||||
ConcurrentBlobRead bool
|
||||
}
|
||||
|
||||
func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
|
||||
if opts.NumConns == 0 {
|
||||
opts.NumConns = runtime.NumCPU()
|
||||
}
|
||||
if opts.Memory {
|
||||
opts.Path = ":memory:"
|
||||
}
|
||||
values := make(url.Values)
|
||||
if !opts.ConcurrentBlobReads {
|
||||
values.Add("cache", "shared")
|
||||
}
|
||||
path := fmt.Sprintf("file:%s?%s", opts.Path, values.Encode())
|
||||
conns, err := func() (ConnPool, error) {
|
||||
switch opts.NumConns {
|
||||
case 1:
|
||||
conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", opts.Path), 0)
|
||||
return &poolFromConn{conn: conn}, err
|
||||
default:
|
||||
return sqlitex.Open(path, 0, opts.NumConns)
|
||||
}
|
||||
}()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return conns, ProviderOpts{
|
||||
NumConns: opts.NumConns,
|
||||
ConcurrentBlobRead: opts.ConcurrentBlobReads,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
|
||||
type poolFromConn struct {
|
||||
mu sync.Mutex
|
||||
conn conn
|
||||
|
@ -95,18 +166,13 @@ func (me *poolFromConn) Put(conn conn) {
|
|||
me.mu.Unlock()
|
||||
}
|
||||
|
||||
func NewProvider(conn *sqlite.Conn) (_ *provider, err error) {
|
||||
err = initConn(conn, false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = initSchema(conn)
|
||||
return &provider{&poolFromConn{conn: conn}}, err
|
||||
func (me *poolFromConn) Close() error {
|
||||
return me.conn.Close()
|
||||
}
|
||||
|
||||
// Needs the pool size so it can initialize all the connections with pragmas.
|
||||
func NewProviderPool(pool *sqlitex.Pool, numConns int, wal bool) (_ *provider, err error) {
|
||||
_, err = initPoolConns(context.TODO(), pool, numConns, wal)
|
||||
// Needs the ConnPool size so it can initialize all the connections with pragmas.
|
||||
func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
|
||||
_, err = initPoolConns(context.TODO(), pool, opts.NumConns, opts.ConcurrentBlobRead)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -116,7 +182,7 @@ func NewProviderPool(pool *sqlitex.Pool, numConns int, wal bool) (_ *provider, e
|
|||
return &provider{pool: pool}, err
|
||||
}
|
||||
|
||||
func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int, wal bool) (numInited int, err error) {
|
||||
func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) {
|
||||
var conns []conn
|
||||
defer func() {
|
||||
for _, c := range conns {
|
||||
|
@ -139,13 +205,14 @@ func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int, wal boo
|
|||
return
|
||||
}
|
||||
|
||||
type pool interface {
|
||||
type ConnPool interface {
|
||||
Get(context.Context) conn
|
||||
Put(conn)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type provider struct {
|
||||
pool pool
|
||||
pool ConnPool
|
||||
}
|
||||
|
||||
func (p *provider) NewInstance(s string) (resource.Instance, error) {
|
||||
|
|
|
@ -2,33 +2,40 @@ package sqliteStorage
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"crawshaw.io/sqlite"
|
||||
"crawshaw.io/sqlite/sqlitex"
|
||||
_ "github.com/anacrolix/envpprof"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
|
||||
opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
|
||||
conns, provOpts, err := NewPool(opts)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { conns.Close() })
|
||||
prov, err := NewProvider(conns, provOpts)
|
||||
require.NoError(t, err)
|
||||
return conns, prov
|
||||
}
|
||||
|
||||
func TestTextBlobSize(t *testing.T) {
|
||||
_, prov := newConnsAndProv(t, NewPoolOpts{})
|
||||
a, _ := prov.NewInstance("a")
|
||||
a.Put(bytes.NewBufferString("\x00hello"))
|
||||
fi, _ := a.Stat()
|
||||
assert.EqualValues(t, 6, fi.Size())
|
||||
}
|
||||
|
||||
func TestSimultaneousIncrementalBlob(t *testing.T) {
|
||||
const poolSize = 10
|
||||
pool, err := sqlitex.Open(
|
||||
// We don't do this in memory, because it seems to have some locking issues with updating
|
||||
// last_used.
|
||||
fmt.Sprintf("file:%s", filepath.Join(t.TempDir(), "sqlite3.db")),
|
||||
// We can't disable WAL in this test because then we can't open 2 blobs simultaneously for read.
|
||||
sqlite.OpenFlagsDefault, /* &^sqlite.SQLITE_OPEN_WAL */
|
||||
poolSize)
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
p, err := NewProviderPool(pool, poolSize, true)
|
||||
require.NoError(t, err)
|
||||
_, p := newConnsAndProv(t, NewPoolOpts{
|
||||
NumConns: 2,
|
||||
ConcurrentBlobReads: true,
|
||||
})
|
||||
a, err := p.NewInstance("a")
|
||||
require.NoError(t, err)
|
||||
const contents = "hello, world"
|
||||
|
|
|
@ -11,8 +11,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"crawshaw.io/sqlite"
|
||||
"crawshaw.io/sqlite/sqlitex"
|
||||
"github.com/anacrolix/missinggo/v2/filecache"
|
||||
"github.com/anacrolix/torrent"
|
||||
"github.com/anacrolix/torrent/internal/testutil"
|
||||
|
@ -272,46 +270,15 @@ func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) {
|
|||
testClientTransferSmallCache(t, false, -1)
|
||||
}
|
||||
|
||||
func sqliteClientStorageFactory(connPathMaker func(dataDir string) string) storageFactory {
|
||||
func sqliteClientStorageFactory(connOptsMaker func(dataDir string) sqliteStorage.NewPoolOpts) storageFactory {
|
||||
return func(dataDir string) storage.ClientImplCloser {
|
||||
path := connPathMaker(dataDir)
|
||||
log.Printf("opening sqlite db at %q", path)
|
||||
if true {
|
||||
conn, err := sqlite.OpenConn(path, 0)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prov, err := sqliteStorage.NewProvider(conn)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return struct {
|
||||
storage.ClientImpl
|
||||
io.Closer
|
||||
}{
|
||||
storage.NewResourcePieces(prov),
|
||||
conn,
|
||||
}
|
||||
} else {
|
||||
// Test pool implementation for SQLITE_BUSY when we want SQLITE_LOCKED (so the
|
||||
// crawshaw.io/sqlite unlock notify handler kicks in for us).
|
||||
const poolSize = 1
|
||||
pool, err := sqlitex.Open(path, 0, poolSize)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
prov, err := sqliteStorage.NewProviderPool(pool, poolSize, false)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return struct {
|
||||
storage.ClientImpl
|
||||
io.Closer
|
||||
}{
|
||||
storage.NewResourcePieces(prov),
|
||||
pool,
|
||||
}
|
||||
connOpts := connOptsMaker(dataDir)
|
||||
log.Printf("opening sqlite db: %#v", connOpts)
|
||||
ret, err := sqliteStorage.NewPiecesStorage(connOpts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -325,11 +292,15 @@ func TestClientTransferVarious(t *testing.T) {
|
|||
Wrapper: fileCachePieceResourceStorage,
|
||||
})},
|
||||
{"Boltdb", storage.NewBoltDB},
|
||||
{"SqliteFile", sqliteClientStorageFactory(func(dataDir string) string {
|
||||
return "file:" + filepath.Join(dataDir, "sqlite.db")
|
||||
{"SqliteFile", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
|
||||
return sqliteStorage.NewPoolOpts{
|
||||
Path: filepath.Join(dataDir, "sqlite.db"),
|
||||
}
|
||||
})},
|
||||
{"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) string {
|
||||
return "file:memory:?mode=memory&cache=shared"
|
||||
{"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts {
|
||||
return sqliteStorage.NewPoolOpts{
|
||||
Memory: true,
|
||||
}
|
||||
})},
|
||||
} {
|
||||
t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue