diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index fc419c29..771c3452 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -7,7 +7,9 @@ import ( "fmt" "io" "log" + "net/url" "os" + "runtime" "sync" "time" @@ -15,6 +17,7 @@ import ( "crawshaw.io/sqlite/sqlitex" "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/v2/resource" + "github.com/anacrolix/torrent/storage" ) type conn = *sqlite.Conn @@ -77,7 +80,75 @@ end; `) } -// Emulates a pool from a single Conn. +// A convenience function that creates a connection pool, resource provider, and a pieces storage +// ClientImpl and returns them all with a Close attached. +func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) { + conns, provOpts, err := NewPool(opts) + if err != nil { + return + } + prov, err := NewProvider(conns, provOpts) + if err != nil { + return + } + store := storage.NewResourcePieces(prov) + return struct { + storage.ClientImpl + io.Closer + }{ + store, + conns, + }, nil +} + +type NewPoolOpts struct { + Path string + Memory bool + NumConns int + // Forces WAL, disables shared caching. + ConcurrentBlobReads bool +} + +// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now, +// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the +// top-level option type. +type ProviderOpts struct { + NumConns int + // Concurrent blob reads require WAL. + ConcurrentBlobRead bool +} + +func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { + if opts.NumConns == 0 { + opts.NumConns = runtime.NumCPU() + } + if opts.Memory { + opts.Path = ":memory:" + } + values := make(url.Values) + if !opts.ConcurrentBlobReads { + values.Add("cache", "shared") + } + path := fmt.Sprintf("file:%s?%s", opts.Path, values.Encode()) + conns, err := func() (ConnPool, error) { + switch opts.NumConns { + case 1: + conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", opts.Path), 0) + return &poolFromConn{conn: conn}, err + default: + return sqlitex.Open(path, 0, opts.NumConns) + } + }() + if err != nil { + return + } + return conns, ProviderOpts{ + NumConns: opts.NumConns, + ConcurrentBlobRead: opts.ConcurrentBlobReads, + }, nil +} + +// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool. type poolFromConn struct { mu sync.Mutex conn conn @@ -95,18 +166,13 @@ func (me *poolFromConn) Put(conn conn) { me.mu.Unlock() } -func NewProvider(conn *sqlite.Conn) (_ *provider, err error) { - err = initConn(conn, false) - if err != nil { - return - } - err = initSchema(conn) - return &provider{&poolFromConn{conn: conn}}, err +func (me *poolFromConn) Close() error { + return me.conn.Close() } -// Needs the pool size so it can initialize all the connections with pragmas. -func NewProviderPool(pool *sqlitex.Pool, numConns int, wal bool) (_ *provider, err error) { - _, err = initPoolConns(context.TODO(), pool, numConns, wal) +// Needs the ConnPool size so it can initialize all the connections with pragmas. +func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) { + _, err = initPoolConns(context.TODO(), pool, opts.NumConns, opts.ConcurrentBlobRead) if err != nil { return } @@ -116,7 +182,7 @@ func NewProviderPool(pool *sqlitex.Pool, numConns int, wal bool) (_ *provider, e return &provider{pool: pool}, err } -func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int, wal bool) (numInited int, err error) { +func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) { var conns []conn defer func() { for _, c := range conns { @@ -139,13 +205,14 @@ func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int, wal boo return } -type pool interface { +type ConnPool interface { Get(context.Context) conn Put(conn) + Close() error } type provider struct { - pool pool + pool ConnPool } func (p *provider) NewInstance(s string) (resource.Instance, error) { diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index 71d5540a..2a413739 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -2,33 +2,40 @@ package sqliteStorage import ( "bytes" - "fmt" "io" "io/ioutil" "path/filepath" "sync" "testing" - "crawshaw.io/sqlite" - "crawshaw.io/sqlite/sqlitex" _ "github.com/anacrolix/envpprof" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) { + opts.Path = filepath.Join(t.TempDir(), "sqlite3.db") + conns, provOpts, err := NewPool(opts) + require.NoError(t, err) + t.Cleanup(func() { conns.Close() }) + prov, err := NewProvider(conns, provOpts) + require.NoError(t, err) + return conns, prov +} + +func TestTextBlobSize(t *testing.T) { + _, prov := newConnsAndProv(t, NewPoolOpts{}) + a, _ := prov.NewInstance("a") + a.Put(bytes.NewBufferString("\x00hello")) + fi, _ := a.Stat() + assert.EqualValues(t, 6, fi.Size()) +} + func TestSimultaneousIncrementalBlob(t *testing.T) { - const poolSize = 10 - pool, err := sqlitex.Open( - // We don't do this in memory, because it seems to have some locking issues with updating - // last_used. - fmt.Sprintf("file:%s", filepath.Join(t.TempDir(), "sqlite3.db")), - // We can't disable WAL in this test because then we can't open 2 blobs simultaneously for read. - sqlite.OpenFlagsDefault, /* &^sqlite.SQLITE_OPEN_WAL */ - poolSize) - require.NoError(t, err) - defer pool.Close() - p, err := NewProviderPool(pool, poolSize, true) - require.NoError(t, err) + _, p := newConnsAndProv(t, NewPoolOpts{ + NumConns: 2, + ConcurrentBlobReads: true, + }) a, err := p.NewInstance("a") require.NoError(t, err) const contents = "hello, world" diff --git a/test/transfer_test.go b/test/transfer_test.go index 25d03b49..6c955b36 100644 --- a/test/transfer_test.go +++ b/test/transfer_test.go @@ -11,8 +11,6 @@ import ( "testing" "time" - "crawshaw.io/sqlite" - "crawshaw.io/sqlite/sqlitex" "github.com/anacrolix/missinggo/v2/filecache" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/internal/testutil" @@ -272,46 +270,15 @@ func TestClientTransferSmallCacheDefaultReadahead(t *testing.T) { testClientTransferSmallCache(t, false, -1) } -func sqliteClientStorageFactory(connPathMaker func(dataDir string) string) storageFactory { +func sqliteClientStorageFactory(connOptsMaker func(dataDir string) sqliteStorage.NewPoolOpts) storageFactory { return func(dataDir string) storage.ClientImplCloser { - path := connPathMaker(dataDir) - log.Printf("opening sqlite db at %q", path) - if true { - conn, err := sqlite.OpenConn(path, 0) - if err != nil { - panic(err) - } - prov, err := sqliteStorage.NewProvider(conn) - if err != nil { - panic(err) - } - return struct { - storage.ClientImpl - io.Closer - }{ - storage.NewResourcePieces(prov), - conn, - } - } else { - // Test pool implementation for SQLITE_BUSY when we want SQLITE_LOCKED (so the - // crawshaw.io/sqlite unlock notify handler kicks in for us). - const poolSize = 1 - pool, err := sqlitex.Open(path, 0, poolSize) - if err != nil { - panic(err) - } - prov, err := sqliteStorage.NewProviderPool(pool, poolSize, false) - if err != nil { - panic(err) - } - return struct { - storage.ClientImpl - io.Closer - }{ - storage.NewResourcePieces(prov), - pool, - } + connOpts := connOptsMaker(dataDir) + log.Printf("opening sqlite db: %#v", connOpts) + ret, err := sqliteStorage.NewPiecesStorage(connOpts) + if err != nil { + panic(err) } + return ret } } @@ -325,11 +292,15 @@ func TestClientTransferVarious(t *testing.T) { Wrapper: fileCachePieceResourceStorage, })}, {"Boltdb", storage.NewBoltDB}, - {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) string { - return "file:" + filepath.Join(dataDir, "sqlite.db") + {"SqliteFile", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts { + return sqliteStorage.NewPoolOpts{ + Path: filepath.Join(dataDir, "sqlite.db"), + } })}, - {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) string { - return "file:memory:?mode=memory&cache=shared" + {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPoolOpts { + return sqliteStorage.NewPoolOpts{ + Memory: true, + } })}, } { t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {