diff --git a/storage/mark-complete_test.go b/storage/mark-complete_test.go index 3f76f699..7e50832d 100644 --- a/storage/mark-complete_test.go +++ b/storage/mark-complete_test.go @@ -10,7 +10,7 @@ import ( func BenchmarkMarkComplete(b *testing.B) { bench := func(b *testing.B, ci storage.ClientImpl) { test_storage.BenchmarkPieceMarkComplete( - b, ci, test_storage.DefaultPieceSize, test_storage.DefaultNumPieces, test_storage.DefaultCapacity) + b, ci, test_storage.DefaultPieceSize, test_storage.DefaultNumPieces, 0) } b.Run("File", func(b *testing.B) { ci := storage.NewFile(b.TempDir()) diff --git a/storage/piece-resource.go b/storage/piece-resource.go index 8df44389..d56280bc 100644 --- a/storage/piece-resource.go +++ b/storage/piece-resource.go @@ -21,8 +21,11 @@ type piecePerResource struct { } type ResourcePiecesOpts struct { + // After marking a piece complete, don't bother deleting its incomplete blobs. LeaveIncompleteChunks bool - NoSizedPuts bool + // Sized puts require being able to stream from a statement executed on another connection. + // Without them, we buffer the entire read and then put that. + NoSizedPuts bool } func NewResourcePieces(p PieceProvider) ClientImpl { diff --git a/storage/sqlite/direct.go b/storage/sqlite/direct.go index f5d6179d..9da0f701 100644 --- a/storage/sqlite/direct.go +++ b/storage/sqlite/direct.go @@ -11,36 +11,41 @@ import ( ) type NewDirectStorageOpts struct { - NewPoolOpts - ProvOpts func(*ProviderOpts) + NewConnOpts + InitDbOpts } // A convenience function that creates a connection pool, resource provider, and a pieces storage // ClientImpl and returns them all with a Close attached. func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) { - conns, provOpts, err := NewPool(opts.NewPoolOpts) + conn, err := newConn(opts.NewConnOpts) if err != nil { return } - if f := opts.ProvOpts; f != nil { - f(&provOpts) + journalMode := "delete" + if opts.Memory { + journalMode = "off" } - provOpts.BatchWrites = false - prov, err := NewProvider(conns, provOpts) + err = initConn(conn, InitConnOpts{ + SetJournalMode: journalMode, + MmapSizeOk: true, + MmapSize: 1 << 25, + }) + if err != nil { + return + } + err = initDatabase(conn, opts.InitDbOpts) if err != nil { - conns.Close() return } return &client{ - prov: prov, - conn: prov.pool.Get(nil), + conn: conn, blobs: make(map[string]*sqlite.Blob), }, nil } type client struct { l sync.Mutex - prov *provider conn conn blobs map[string]*sqlite.Blob } @@ -53,8 +58,7 @@ func (c *client) Close() error { for _, b := range c.blobs { b.Close() } - c.prov.pool.Put(c.conn) - return c.prov.Close() + return c.conn.Close() } type torrent struct { @@ -106,6 +110,7 @@ func (p2 piece) doAtIoWithBlob( ) (n int, err error) { p2.l.Lock() defer p2.l.Unlock() + //defer p2.blobWouldExpire() n, err = atIo(p2.getBlob())(p, off) var se sqlite.Error if !errors.As(err, &se) || se.Code != sqlite.SQLITE_ABORT { diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index 49ea4d74..8c569314 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -25,7 +25,20 @@ import ( type conn = *sqlite.Conn -func initConn(conn conn, opts ProviderOpts) error { +type InitConnOpts struct { + SetJournalMode string + MmapSizeOk bool // If false, a package-specific default will be used. + MmapSize int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value. +} + +func (me InitConnOpts) JournalMode() string { + if me.SetJournalMode != "" { + return me.SetJournalMode + } + return "wal" +} + +func initConn(conn conn, opts InitConnOpts) error { // Recursive triggers are required because we need to trim the blob_meta size after trimming to // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown. err := sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil) @@ -36,13 +49,22 @@ func initConn(conn conn, opts ProviderOpts) error { if err != nil { return err } - if opts.NoConcurrentBlobReads { - err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil) + if opts.SetJournalMode != "" { + err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error { + ret := stmt.ColumnText(0) + if ret != opts.SetJournalMode { + panic(ret) + } + return nil + }) if err != nil { return err } } - if opts.MmapSizeOk { + if !opts.MmapSizeOk { + opts.MmapSize = 1 << 24 // 8 MiB + } + if opts.MmapSize >= 0 { err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil) if err != nil { return err @@ -152,6 +174,7 @@ func InitSchema(conn conn, pageSize int, triggers bool) error { type NewPiecesStorageOpts struct { NewPoolOpts + InitDbOpts ProvOpts func(*ProviderOpts) StorageOpts func(*storage.ResourcePiecesOpts) } @@ -159,10 +182,17 @@ type NewPiecesStorageOpts struct { // A convenience function that creates a connection pool, resource provider, and a pieces storage // ClientImpl and returns them all with a Close attached. func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) { - conns, provOpts, err := NewPool(opts.NewPoolOpts) + conns, err := NewPool(opts.NewPoolOpts) if err != nil { return } + err = initPoolDatabase(conns, opts.InitDbOpts) + if err != nil { + return + } + provOpts := ProviderOpts{ + BatchWrites: conns.NumConns() > 1, + } if f := opts.ProvOpts; f != nil { f(&provOpts) } @@ -171,8 +201,27 @@ func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, er conns.Close() return } + var ( + journalMode string + ) + withPoolConn(conns, func(c conn) { + err = sqlitex.Exec(c, "pragma journal_mode", func(stmt *sqlite.Stmt) error { + journalMode = stmt.ColumnText(0) + return nil + }) + }) + if err != nil { + err = fmt.Errorf("getting journal mode: %w", err) + prov.Close() + return + } + if journalMode == "" { + err = errors.New("didn't get journal mode") + prov.Close() + return + } storageOpts := storage.ResourcePiecesOpts{ - NoSizedPuts: provOpts.NoConcurrentBlobReads, + NoSizedPuts: journalMode != "wal" || conns.NumConns() == 1, } if f := opts.StorageOpts; f != nil { f(&storageOpts) @@ -188,16 +237,14 @@ func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, er } type NewPoolOpts struct { - // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a - // private, temporary on-disk database will be created. This private database will be - // automatically deleted as soon as the database connection is closed." - Path string - Memory bool + NewConnOpts + InitConnOpts NumConns int - // Forces WAL, disables shared caching. - NoConcurrentBlobReads bool - DontInitSchema bool - PageSize int +} + +type InitDbOpts struct { + DontInitSchema bool + PageSize int // If non-zero, overrides the existing setting. Capacity int64 } @@ -205,13 +252,9 @@ type NewPoolOpts struct { // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now, // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the // top-level option type. -type ProviderOpts struct { - NumConns int - // Concurrent blob reads require WAL. - NoConcurrentBlobReads bool - BatchWrites bool - MmapSize int64 - MmapSizeOk bool +type PoolConf struct { + NumConns int + JournalMode string } // Remove any capacity limits. @@ -224,10 +267,18 @@ func SetCapacity(conn conn, cap int64) error { return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap) } -func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { - if opts.NumConns == 0 { - opts.NumConns = runtime.NumCPU() - } +type NewConnOpts struct { + // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a + // private, temporary on-disk database will be created. This private database will be + // automatically deleted as soon as the database connection is closed." + Path string + Memory bool + // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL, + // and NumConns < 2. + NoConcurrentBlobReads bool +} + +func newOpenUri(opts NewConnOpts) string { path := url.PathEscape(opts.Path) if opts.Memory { path = ":memory:" @@ -236,26 +287,10 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { if opts.NoConcurrentBlobReads || opts.Memory { values.Add("cache", "shared") } - uri := fmt.Sprintf("file:%s?%s", path, values.Encode()) - conns, err := func() (ConnPool, error) { - switch opts.NumConns { - case 1: - conn, err := sqlite.OpenConn(uri, 0) - return &poolFromConn{conn: conn}, err - default: - return sqlitex.Open(uri, 0, opts.NumConns) - } - }() - if err != nil { - return - } - defer func() { - if err != nil { - conns.Close() - } - }() - conn := conns.Get(context.TODO()) - defer conns.Put(conn) + return fmt.Sprintf("file:%s?%s", path, values.Encode()) +} + +func initDatabase(conn conn, opts InitDbOpts) (err error) { if !opts.DontInitSchema { if opts.PageSize == 0 { opts.PageSize = 1 << 14 @@ -271,13 +306,55 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { return } } - return conns, ProviderOpts{ - NumConns: opts.NumConns, - NoConcurrentBlobReads: opts.NoConcurrentBlobReads || opts.Memory || opts.NumConns == 1, - BatchWrites: opts.NumConns > 1, - MmapSize: 1 << 23, // 8 MiB - MmapSizeOk: true, - }, nil + return +} + +func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) { + withPoolConn(pool, func(c conn) { + err = initDatabase(c, opts) + }) + return +} + +func newConn(opts NewConnOpts) (conn, error) { + return sqlite.OpenConn(newOpenUri(opts), 0) +} + +type poolWithNumConns struct { + *sqlitex.Pool + numConns int +} + +func (me poolWithNumConns) NumConns() int { + return me.numConns +} + +func NewPool(opts NewPoolOpts) (_ ConnPool, err error) { + if opts.NumConns == 0 { + opts.NumConns = runtime.NumCPU() + } + conns, err := func() (ConnPool, error) { + switch opts.NumConns { + case 1: + conn, err := newConn(opts.NewConnOpts) + return &poolFromConn{conn: conn}, err + default: + _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), 0, opts.NumConns) + return poolWithNumConns{_pool, opts.NumConns}, err + } + }() + if err != nil { + return + } + defer func() { + if err != nil { + conns.Close() + } + }() + return conns, initPoolConns(nil, conns, InitPoolOpts{ + NumConns: opts.NumConns, + InitConnOpts: opts.InitConnOpts, + }) } // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool. @@ -302,20 +379,17 @@ func (me *poolFromConn) Close() error { return me.conn.Close() } +func (poolFromConn) NumConns() int { return 1 } + +type ProviderOpts struct { + BatchWrites bool +} + // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of // the ConnPool (since it has to initialize all the connections anyway). func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) { - _, err = initPoolConns(context.TODO(), pool, opts) - if err != nil { - err = fmt.Errorf("initing pool conns: %w", err) - return - } prov := &provider{pool: pool, opts: opts} if opts.BatchWrites { - if opts.NumConns < 2 { - err = errors.New("batch writes requires more than 1 conn") - return - } writes := make(chan writeRequest) prov.writes = writes // This is retained for backwards compatibility. It may not be necessary. @@ -327,7 +401,12 @@ func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) { return prov, nil } -func initPoolConns(ctx context.Context, pool ConnPool, opts ProviderOpts) (numInited int, err error) { +type InitPoolOpts struct { + NumConns int + InitConnOpts +} + +func initPoolConns(ctx context.Context, pool ConnPool, opts InitPoolOpts) (err error) { var conns []conn defer func() { for _, c := range conns { @@ -340,12 +419,11 @@ func initPoolConns(ctx context.Context, pool ConnPool, opts ProviderOpts) (numIn break } conns = append(conns, conn) - err = initConn(conn, opts) + err = initConn(conn, opts.InitConnOpts) if err != nil { err = fmt.Errorf("initing conn %v: %w", len(conns), err) return } - numInited++ } return } @@ -354,6 +432,13 @@ type ConnPool interface { Get(context.Context) conn Put(conn) Close() error + NumConns() int +} + +func withPoolConn(pool ConnPool, with func(conn)) { + c := pool.Get(nil) + defer pool.Put(c) + with(c) } type provider struct { diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index d18369ed..f2e7ad29 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -19,14 +19,15 @@ import ( func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) { opts.Path = filepath.Join(t.TempDir(), "sqlite3.db") - conns, provOpts, err := NewPool(opts) - require.NoError(t, err) + pool, err := NewPool(opts) + qt.Assert(t, err, qt.IsNil) // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now. - //t.Cleanup(func() { conns.Close() }) - prov, err := NewProvider(conns, provOpts) + //t.Cleanup(func() { pool.Close() }) + qt.Assert(t, initPoolDatabase(pool, InitDbOpts{}), qt.IsNil) + prov, err := NewProvider(pool, ProviderOpts{BatchWrites: pool.NumConns() > 1}) require.NoError(t, err) t.Cleanup(func() { prov.Close() }) - return conns, prov + return pool, prov } func TestTextBlobSize(t *testing.T) { @@ -70,52 +71,40 @@ func TestSimultaneousIncrementalBlob(t *testing.T) { func BenchmarkMarkComplete(b *testing.B) { const pieceSize = test_storage.DefaultPieceSize const capacity = test_storage.DefaultNumPieces * pieceSize / 2 + runBench := func(b *testing.B, ci storage.ClientImpl) { + test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity) + } c := qt.New(b) - for _, storage := range []struct { - name string - maker func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser - }{ - {"SqliteDirect", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser { - ci, err := NewDirectStorage(NewDirectStorageOpts{ - NewPoolOpts: newPoolOpts, - ProvOpts: provOpts, + for _, memory := range []bool{false, true} { + b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) { + b.Run("Direct", func(b *testing.B) { + var opts NewDirectStorageOpts + opts.Memory = memory + opts.Path = filepath.Join(b.TempDir(), "storage.db") + opts.Capacity = capacity + ci, err := NewDirectStorage(opts) + c.Assert(err, qt.IsNil) + defer ci.Close() + runBench(b, ci) }) - c.Assert(err, qt.IsNil) - return ci - }}, - {"SqlitePieceStorage", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser { - ci, err := NewPiecesStorage(NewPiecesStorageOpts{ - NewPoolOpts: newPoolOpts, - ProvOpts: provOpts, + b.Run("ResourcePieces", func(b *testing.B) { + for _, batchWrites := range []bool{false, true} { + b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) { + var opts NewPiecesStorageOpts + opts.Path = filepath.Join(b.TempDir(), "storage.db") + //b.Logf("storage db path: %q", dbPath) + opts.Capacity = capacity + opts.Memory = memory + opts.ProvOpts = func(opts *ProviderOpts) { + opts.BatchWrites = batchWrites + } + ci, err := NewPiecesStorage(opts) + c.Assert(err, qt.IsNil) + defer ci.Close() + runBench(b, ci) + }) + } }) - c.Assert(err, qt.IsNil) - return ci - }}, - } { - b.Run(storage.name, func(b *testing.B) { - for _, memory := range []bool{false, true} { - b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) { - for _, batchWrites := range []bool{false, true} { - b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) { - dbPath := filepath.Join(b.TempDir(), "storage.db") - //b.Logf("storage db path: %q", dbPath) - newPoolOpts := NewPoolOpts{ - Path: dbPath, - Capacity: capacity, - NoConcurrentBlobReads: false, - PageSize: 1 << 14, - Memory: memory, - } - provOpts := func(opts *ProviderOpts) { - opts.BatchWrites = batchWrites - } - ci := storage.maker(newPoolOpts, provOpts) - defer ci.Close() - test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity) - }) - } - }) - } }) } } diff --git a/storage/test/bench-piece-mark-complete.go b/storage/test/bench-piece-mark-complete.go index 9ebf6ef7..6e04702b 100644 --- a/storage/test/bench-piece-mark-complete.go +++ b/storage/test/bench-piece-mark-complete.go @@ -15,7 +15,6 @@ import ( const ( ChunkSize = 1 << 14 DefaultPieceSize = 2 << 20 - DefaultCapacity = 0 DefaultNumPieces = 16 ) diff --git a/test/transfer_test.go b/test/transfer_test.go index 0ae10060..e58bb53b 100644 --- a/test/transfer_test.go +++ b/test/transfer_test.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "os" "path/filepath" "runtime" @@ -111,7 +110,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter } cfg.Seed = false - cfg.Debug = true + //cfg.Debug = true if ps.ConfigureLeecher.Config != nil { ps.ConfigureLeecher.Config(cfg) } @@ -313,13 +312,10 @@ type leecherStorageTestCase struct { func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase { return leecherStorageTestCase{ fmt.Sprintf("SqliteFile,NumConns=%v", numConns), - sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts { - return sqliteStorage.NewPiecesStorageOpts{ - NewPoolOpts: sqliteStorage.NewPoolOpts{ - Path: filepath.Join(dataDir, "sqlite.db"), - NumConns: numConns, - }, - } + sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) { + opts.Path = filepath.Join(dataDir, "sqlite.db") + opts.NumConns = numConns + return }), numConns, } @@ -334,13 +330,9 @@ func TestClientTransferVarious(t *testing.T) { {"Boltdb", storage.NewBoltDB, 0}, {"SqliteDirect", func(s string) storage.ClientImplCloser { path := filepath.Join(s, "sqlite3.db") - log.Print(path) - cl, err := sqliteStorage.NewDirectStorage(sqliteStorage.NewDirectStorageOpts{ - NewPoolOpts: sqliteStorage.NewPoolOpts{ - Path: path, - }, - ProvOpts: nil, - }) + var opts sqliteStorage.NewDirectStorageOpts + opts.Path = path + cl, err := sqliteStorage.NewDirectStorage(opts) if err != nil { panic(err) } @@ -350,12 +342,9 @@ func TestClientTransferVarious(t *testing.T) { sqliteLeecherStorageTestCase(2), // This should use a number of connections equal to the number of CPUs sqliteLeecherStorageTestCase(0), - {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts { - return sqliteStorage.NewPiecesStorageOpts{ - NewPoolOpts: sqliteStorage.NewPoolOpts{ - Memory: true, - }, - } + {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) { + opts.Memory = true + return }), 0}, } { t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) {