diff --git a/bencode/api.go b/bencode/api.go index 3e3c1633..10375c43 100644 --- a/bencode/api.go +++ b/bencode/api.go @@ -130,7 +130,7 @@ func MustMarshal(v interface{}) []byte { // Unmarshal the bencode value in the 'data' to a value pointed by the 'v' // pointer, return a non-nil error if any. func Unmarshal(data []byte, v interface{}) (err error) { - buf := bytes.NewBuffer(data) + buf := bytes.NewReader(data) e := Decoder{r: buf} err = e.Decode(v) if err == nil && buf.Len() != 0 { diff --git a/go.sum b/go.sum index e12648a1..7cbf393e 100644 --- a/go.sum +++ b/go.sum @@ -268,6 +268,8 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0 h1:zvFSvII5rTbMZ3idAqSUjUCDgZFbWMKzxQot3/Y7nzA= +github.com/getlantern/sqlite v0.3.3-0.20210215090556-4f83cf7731f0/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1 h1:j3L6gSLQalDETeEg/Jg0mGY0/y/N6zI2xX1978P0Uqw= diff --git a/misc_test.go b/misc_test.go index 419ec4d0..f3a37683 100644 --- a/misc_test.go +++ b/misc_test.go @@ -23,14 +23,20 @@ func TestTorrentOffsetRequest(t *testing.T) { check(13, 5, 13, Request{}, false) } -func TestIterBitmapsDistinct(t *testing.T) { - var skip, first, second bitmap.Bitmap - skip.Add(1) - first.Add(1, 0, 3) - second.Add(1, 2, 0) - skipCopy := skip.Copy() - assert.Equal(t, []interface{}{0, 3, 2}, iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second))) - assert.Equal(t, []int{1}, skip.ToSortedSlice()) +func BenchmarkIterBitmapsDistinct(t *testing.B) { + t.ReportAllocs() + for range iter.N(t.N) { + var skip, first, second bitmap.Bitmap + skip.Add(1) + first.Add(1, 0, 3) + second.Add(1, 2, 0) + skipCopy := skip.Copy() + t.StartTimer() + output := iter.ToSlice(iterBitmapsDistinct(&skipCopy, first, second)) + t.StopTimer() + assert.Equal(t, []interface{}{0, 3, 2}, output) + assert.Equal(t, []int{1}, skip.ToSortedSlice()) + } } func TestSpewConnStats(t *testing.T) { diff --git a/peer_protocol/pex_test.go b/peer_protocol/pex_test.go index adfab295..cc45059e 100644 --- a/peer_protocol/pex_test.go +++ b/peer_protocol/pex_test.go @@ -48,7 +48,7 @@ func TestMarshalPexMessage(t *testing.T) { msg = Message{} dec := Decoder{ - R: bufio.NewReader(bytes.NewBuffer(b)), + R: bufio.NewReader(bytes.NewReader(b)), MaxLength: 128, } pmOut := PexMsg{} diff --git a/peerconn.go b/peerconn.go index de82cf3b..0c276808 100644 --- a/peerconn.go +++ b/peerconn.go @@ -743,12 +743,13 @@ func (cn *PeerConn) updateRequests() { func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { return func(cb iter.Callback) { for _, bm := range bms { + bm.Sub(*skip) if !iter.All( func(i interface{}) bool { skip.Add(i.(int)) return cb(i) }, - bitmap.Sub(bm, *skip).Iter, + bm.Iter, ) { return } diff --git a/storage/boltPieceCompletion.go b/storage/bolt-piece-completion.go similarity index 100% rename from storage/boltPieceCompletion.go rename to storage/bolt-piece-completion.go diff --git a/storage/boltpc_test.go b/storage/bolt-piece-completion_test.go similarity index 100% rename from storage/boltpc_test.go rename to storage/bolt-piece-completion_test.go diff --git a/storage/bolt_piece.go b/storage/bolt-piece.go similarity index 74% rename from storage/bolt_piece.go rename to storage/bolt-piece.go index 663ac35a..867ef3a3 100644 --- a/storage/bolt_piece.go +++ b/storage/bolt-piece.go @@ -9,7 +9,7 @@ import ( "github.com/anacrolix/torrent/metainfo" ) -type boltDBPiece struct { +type boltPiece struct { db *bbolt.DB p metainfo.Piece ih metainfo.Hash @@ -17,19 +17,19 @@ type boltDBPiece struct { } var ( - _ PieceImpl = (*boltDBPiece)(nil) + _ PieceImpl = (*boltPiece)(nil) dataBucketKey = []byte("data") ) -func (me *boltDBPiece) pc() PieceCompletionGetSetter { +func (me *boltPiece) pc() PieceCompletionGetSetter { return boltPieceCompletion{me.db} } -func (me *boltDBPiece) pk() metainfo.PieceKey { +func (me *boltPiece) pk() metainfo.PieceKey { return metainfo.PieceKey{me.ih, me.p.Index()} } -func (me *boltDBPiece) Completion() Completion { +func (me *boltPiece) Completion() Completion { c, err := me.pc().Get(me.pk()) switch err { case bbolt.ErrDatabaseNotOpen: @@ -41,14 +41,14 @@ func (me *boltDBPiece) Completion() Completion { return c } -func (me *boltDBPiece) MarkComplete() error { +func (me *boltPiece) MarkComplete() error { return me.pc().Set(me.pk(), true) } -func (me *boltDBPiece) MarkNotComplete() error { +func (me *boltPiece) MarkNotComplete() error { return me.pc().Set(me.pk(), false) } -func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) { +func (me *boltPiece) ReadAt(b []byte, off int64) (n int, err error) { err = me.db.View(func(tx *bbolt.Tx) error { db := tx.Bucket(dataBucketKey) if db == nil { @@ -74,13 +74,13 @@ func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) { return } -func (me *boltDBPiece) chunkKey(index int) (ret [26]byte) { +func (me *boltPiece) chunkKey(index int) (ret [26]byte) { copy(ret[:], me.key[:]) binary.BigEndian.PutUint16(ret[24:], uint16(index)) return } -func (me *boltDBPiece) WriteAt(b []byte, off int64) (n int, err error) { +func (me *boltPiece) WriteAt(b []byte, off int64) (n int, err error) { err = me.db.Update(func(tx *bbolt.Tx) error { db, err := tx.CreateBucketIfNotExists(dataBucketKey) if err != nil { diff --git a/storage/boltdb.go b/storage/bolt.go similarity index 58% rename from storage/boltdb.go rename to storage/bolt.go index f95d2a60..af75061a 100644 --- a/storage/boltdb.go +++ b/storage/bolt.go @@ -12,17 +12,17 @@ import ( ) const ( - // Chosen to match the usual chunk size in a torrent client. This way, - // most chunk writes are to exactly one full item in bbolt DB. + // Chosen to match the usual chunk size in a torrent client. This way, most chunk writes are to + // exactly one full item in bbolt DB. chunkSize = 1 << 14 ) -type boltDBClient struct { +type boltClient struct { db *bbolt.DB } -type boltDBTorrent struct { - cl *boltDBClient +type boltTorrent struct { + cl *boltClient ih metainfo.Hash } @@ -32,19 +32,19 @@ func NewBoltDB(filePath string) ClientImplCloser { }) expect.Nil(err) db.NoSync = true - return &boltDBClient{db} + return &boltClient{db} } -func (me *boltDBClient) Close() error { +func (me *boltClient) Close() error { return me.db.Close() } -func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { - return &boltDBTorrent{me, infoHash}, nil +func (me *boltClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) { + return &boltTorrent{me, infoHash}, nil } -func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl { - ret := &boltDBPiece{ +func (me *boltTorrent) Piece(p metainfo.Piece) PieceImpl { + ret := &boltPiece{ p: p, db: me.cl.db, ih: me.ih, @@ -54,4 +54,4 @@ func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl { return ret } -func (boltDBTorrent) Close() error { return nil } +func (boltTorrent) Close() error { return nil } diff --git a/storage/file_misc.go b/storage/file-misc.go similarity index 100% rename from storage/file_misc.go rename to storage/file-misc.go diff --git a/storage/file_misc_test.go b/storage/file-misc_test.go similarity index 100% rename from storage/file_misc_test.go rename to storage/file-misc_test.go diff --git a/storage/file_piece.go b/storage/file-piece.go similarity index 100% rename from storage/file_piece.go rename to storage/file-piece.go diff --git a/storage/completion_piece_map.go b/storage/map-piece-completion.go similarity index 61% rename from storage/completion_piece_map.go rename to storage/map-piece-completion.go index e12aca7c..5f7f87ef 100644 --- a/storage/completion_piece_map.go +++ b/storage/map-piece-completion.go @@ -7,31 +7,27 @@ import ( ) type mapPieceCompletion struct { - mu sync.Mutex - m map[metainfo.PieceKey]bool + m sync.Map } var _ PieceCompletion = (*mapPieceCompletion)(nil) func NewMapPieceCompletion() PieceCompletion { - return &mapPieceCompletion{m: make(map[metainfo.PieceKey]bool)} + return &mapPieceCompletion{} } func (*mapPieceCompletion) Close() error { return nil } func (me *mapPieceCompletion) Get(pk metainfo.PieceKey) (c Completion, err error) { - me.mu.Lock() - defer me.mu.Unlock() - c.Complete, c.Ok = me.m[pk] + v, ok := me.m.Load(pk) + if ok { + c.Complete = v.(bool) + } + c.Ok = ok return } func (me *mapPieceCompletion) Set(pk metainfo.PieceKey, b bool) error { - me.mu.Lock() - defer me.mu.Unlock() - if me.m == nil { - me.m = make(map[metainfo.PieceKey]bool) - } - me.m[pk] = b + me.m.Store(pk, b) return nil } diff --git a/storage/mark-complete_test.go b/storage/mark-complete_test.go index 3f76f699..7e50832d 100644 --- a/storage/mark-complete_test.go +++ b/storage/mark-complete_test.go @@ -10,7 +10,7 @@ import ( func BenchmarkMarkComplete(b *testing.B) { bench := func(b *testing.B, ci storage.ClientImpl) { test_storage.BenchmarkPieceMarkComplete( - b, ci, test_storage.DefaultPieceSize, test_storage.DefaultNumPieces, test_storage.DefaultCapacity) + b, ci, test_storage.DefaultPieceSize, test_storage.DefaultNumPieces, 0) } b.Run("File", func(b *testing.B) { ci := storage.NewFile(b.TempDir()) diff --git a/storage/completion.go b/storage/piece-completion.go similarity index 100% rename from storage/completion.go rename to storage/piece-completion.go diff --git a/storage/piece_resource.go b/storage/piece-resource.go similarity index 96% rename from storage/piece_resource.go rename to storage/piece-resource.go index 8df44389..d56280bc 100644 --- a/storage/piece_resource.go +++ b/storage/piece-resource.go @@ -21,8 +21,11 @@ type piecePerResource struct { } type ResourcePiecesOpts struct { + // After marking a piece complete, don't bother deleting its incomplete blobs. LeaveIncompleteChunks bool - NoSizedPuts bool + // Sized puts require being able to stream from a statement executed on another connection. + // Without them, we buffer the entire read and then put that. + NoSizedPuts bool } func NewResourcePieces(p PieceProvider) ClientImpl { diff --git a/storage/sqlite/direct.go b/storage/sqlite/direct.go new file mode 100644 index 00000000..e9782296 --- /dev/null +++ b/storage/sqlite/direct.go @@ -0,0 +1,243 @@ +package sqliteStorage + +import ( + "errors" + "runtime" + "sync" + "time" + + "crawshaw.io/sqlite" + "crawshaw.io/sqlite/sqlitex" + "github.com/anacrolix/torrent/metainfo" + "github.com/anacrolix/torrent/storage" +) + +type NewDirectStorageOpts struct { + NewConnOpts + InitDbOpts + InitConnOpts + GcBlobs bool + CacheBlobs bool + BlobFlushInterval time.Duration +} + +// A convenience function that creates a connection pool, resource provider, and a pieces storage +// ClientImpl and returns them all with a Close attached. +func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) { + conn, err := newConn(opts.NewConnOpts) + if err != nil { + return + } + err = initConn(conn, opts.InitConnOpts) + if err != nil { + conn.Close() + return + } + err = initDatabase(conn, opts.InitDbOpts) + if err != nil { + return + } + cl := &client{ + conn: conn, + blobs: make(map[string]*sqlite.Blob), + opts: opts, + } + if opts.BlobFlushInterval != 0 { + cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc) + } + return cl, nil +} + +type client struct { + l sync.Mutex + conn conn + blobs map[string]*sqlite.Blob + blobFlusher *time.Timer + opts NewDirectStorageOpts + closed bool +} + +func (c *client) blobFlusherFunc() { + c.l.Lock() + defer c.l.Unlock() + c.flushBlobs() + if !c.closed { + c.blobFlusher.Reset(c.opts.BlobFlushInterval) + } +} + +func (c *client) flushBlobs() { + for key, b := range c.blobs { + // Need the lock to prevent racing with the GC finalizers. + b.Close() + delete(c.blobs, key) + } +} + +func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) { + return torrent{c}, nil +} + +func (c *client) Close() error { + c.l.Lock() + defer c.l.Unlock() + c.flushBlobs() + c.closed = true + if c.opts.BlobFlushInterval != 0 { + c.blobFlusher.Stop() + } + return c.conn.Close() +} + +type torrent struct { + c *client +} + +func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) { + err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error { + rowid = stmt.ColumnInt64(0) + return nil + }, name) + if err != nil { + return + } + if rowid != 0 { + return + } + err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length) + if err != nil { + return + } + rowid = c.LastInsertRowID() + return +} + +func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl { + t.c.l.Lock() + defer t.c.l.Unlock() + name := p.Hash().HexString() + return piece{ + name, + p.Length(), + t.c, + } +} + +func (t torrent) Close() error { + return nil +} + +type piece struct { + name string + length int64 + *client +} + +func (p2 piece) doAtIoWithBlob( + atIo func(*sqlite.Blob) func([]byte, int64) (int, error), + p []byte, + off int64, +) (n int, err error) { + p2.l.Lock() + defer p2.l.Unlock() + if !p2.opts.CacheBlobs { + defer p2.forgetBlob() + } + n, err = atIo(p2.getBlob())(p, off) + if err == nil { + return + } + var se sqlite.Error + if !errors.As(err, &se) { + return + } + if se.Code != sqlite.SQLITE_ABORT && !(p2.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") { + return + } + p2.forgetBlob() + return atIo(p2.getBlob())(p, off) +} + +func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) { + return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) { + return blob.ReadAt + }, p, off) +} + +func (p2 piece) WriteAt(p []byte, off int64) (n int, err error) { + return p2.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) { + return blob.WriteAt + }, p, off) +} + +func (p2 piece) MarkComplete() error { + p2.l.Lock() + defer p2.l.Unlock() + err := sqlitex.Exec(p2.conn, "update blob set verified=true where name=?", nil, p2.name) + if err != nil { + return err + } + changes := p2.conn.Changes() + if changes != 1 { + panic(changes) + } + return nil +} + +func (p2 piece) forgetBlob() { + blob, ok := p2.blobs[p2.name] + if !ok { + return + } + blob.Close() + delete(p2.blobs, p2.name) +} + +func (p2 piece) MarkNotComplete() error { + return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name) +} + +func (p2 piece) Completion() (ret storage.Completion) { + p2.l.Lock() + defer p2.l.Unlock() + err := sqlitex.Exec(p2.conn, "select verified from blob where name=?", func(stmt *sqlite.Stmt) error { + ret.Complete = stmt.ColumnInt(0) != 0 + return nil + }, p2.name) + ret.Ok = err == nil + if err != nil { + panic(err) + } + return +} + +func (p2 piece) closeBlobIfExists() { + if b, ok := p2.blobs[p2.name]; ok { + b.Close() + delete(p2.blobs, p2.name) + } +} + +func (p2 piece) getBlob() *sqlite.Blob { + blob, ok := p2.blobs[p2.name] + if !ok { + rowid, err := rowidForBlob(p2.conn, p2.name, p2.length) + if err != nil { + panic(err) + } + blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true) + if err != nil { + panic(err) + } + if p2.opts.GcBlobs { + herp := new(byte) + runtime.SetFinalizer(herp, func(*byte) { + p2.l.Lock() + defer p2.l.Unlock() + blob.Close() + }) + } + p2.blobs[p2.name] = blob + } + return blob +} diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index 4b5abb24..ed9aefb3 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -25,7 +25,22 @@ import ( type conn = *sqlite.Conn -func initConn(conn conn, opts ProviderOpts) error { +type InitConnOpts struct { + SetJournalMode string + MmapSizeOk bool // If false, a package-specific default will be used. + MmapSize int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value. +} + +func (me InitConnOpts) JournalMode() string { + if me.SetJournalMode != "" { + return me.SetJournalMode + } + return "wal" +} + +var UnexpectedJournalMode = errors.New("unexpected journal mode") + +func initConn(conn conn, opts InitConnOpts) error { // Recursive triggers are required because we need to trim the blob_meta size after trimming to // capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown. err := sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil) @@ -36,13 +51,25 @@ func initConn(conn conn, opts ProviderOpts) error { if err != nil { return err } - if opts.NoConcurrentBlobReads { - err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil) + if opts.SetJournalMode != "" { + err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error { + ret := stmt.ColumnText(0) + if ret != opts.SetJournalMode { + return UnexpectedJournalMode + } + return nil + }) if err != nil { return err } } - if opts.MmapSizeOk { + if !opts.MmapSizeOk { + // Set the default. Currently it seems the library picks reasonable defaults, especially for + // wal. + opts.MmapSize = -1 + //opts.MmapSize = 1 << 24 // 8 MiB + } + if opts.MmapSize >= 0 { err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil) if err != nil { return err @@ -68,6 +95,7 @@ func InitSchema(conn conn, pageSize int, triggers bool) error { name text, last_used timestamp default (datetime('now')), data blob, + verified bool, primary key (name) ); @@ -151,6 +179,7 @@ func InitSchema(conn conn, pageSize int, triggers bool) error { type NewPiecesStorageOpts struct { NewPoolOpts + InitDbOpts ProvOpts func(*ProviderOpts) StorageOpts func(*storage.ResourcePiecesOpts) } @@ -158,10 +187,17 @@ type NewPiecesStorageOpts struct { // A convenience function that creates a connection pool, resource provider, and a pieces storage // ClientImpl and returns them all with a Close attached. func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, err error) { - conns, provOpts, err := NewPool(opts.NewPoolOpts) + conns, err := NewPool(opts.NewPoolOpts) if err != nil { return } + err = initPoolDatabase(conns, opts.InitDbOpts) + if err != nil { + return + } + provOpts := ProviderOpts{ + BatchWrites: conns.NumConns() > 1, + } if f := opts.ProvOpts; f != nil { f(&provOpts) } @@ -170,8 +206,27 @@ func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, er conns.Close() return } + var ( + journalMode string + ) + withPoolConn(conns, func(c conn) { + err = sqlitex.Exec(c, "pragma journal_mode", func(stmt *sqlite.Stmt) error { + journalMode = stmt.ColumnText(0) + return nil + }) + }) + if err != nil { + err = fmt.Errorf("getting journal mode: %w", err) + prov.Close() + return + } + if journalMode == "" { + err = errors.New("didn't get journal mode") + prov.Close() + return + } storageOpts := storage.ResourcePiecesOpts{ - NoSizedPuts: provOpts.NoConcurrentBlobReads, + NoSizedPuts: journalMode != "wal" || conns.NumConns() == 1, } if f := opts.StorageOpts; f != nil { f(&storageOpts) @@ -187,30 +242,25 @@ func NewPiecesStorage(opts NewPiecesStorageOpts) (_ storage.ClientImplCloser, er } type NewPoolOpts struct { - // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a - // private, temporary on-disk database will be created. This private database will be - // automatically deleted as soon as the database connection is closed." - Path string - Memory bool + NewConnOpts + InitConnOpts NumConns int - // Forces WAL, disables shared caching. - NoConcurrentBlobReads bool - DontInitSchema bool - PageSize int +} + +type InitDbOpts struct { + DontInitSchema bool + PageSize int // If non-zero, overrides the existing setting. - Capacity int64 + Capacity int64 + NoTriggers bool } // There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now, // the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the // top-level option type. -type ProviderOpts struct { - NumConns int - // Concurrent blob reads require WAL. - NoConcurrentBlobReads bool - BatchWrites bool - MmapSize int64 - MmapSizeOk bool +type PoolConf struct { + NumConns int + JournalMode string } // Remove any capacity limits. @@ -223,10 +273,18 @@ func SetCapacity(conn conn, cap int64) error { return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap) } -func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { - if opts.NumConns == 0 { - opts.NumConns = runtime.NumCPU() - } +type NewConnOpts struct { + // See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a + // private, temporary on-disk database will be created. This private database will be + // automatically deleted as soon as the database connection is closed." + Path string + Memory bool + // Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL, + // and NumConns < 2. + NoConcurrentBlobReads bool +} + +func newOpenUri(opts NewConnOpts) string { path := url.PathEscape(opts.Path) if opts.Memory { path = ":memory:" @@ -235,31 +293,18 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { if opts.NoConcurrentBlobReads || opts.Memory { values.Add("cache", "shared") } - uri := fmt.Sprintf("file:%s?%s", path, values.Encode()) - conns, err := func() (ConnPool, error) { - switch opts.NumConns { - case 1: - conn, err := sqlite.OpenConn(uri, 0) - return &poolFromConn{conn: conn}, err - default: - return sqlitex.Open(uri, 0, opts.NumConns) - } - }() - if err != nil { - return - } - defer func() { - if err != nil { - conns.Close() - } - }() - conn := conns.Get(context.TODO()) - defer conns.Put(conn) + return fmt.Sprintf("file:%s?%s", path, values.Encode()) +} + +func initDatabase(conn conn, opts InitDbOpts) (err error) { if !opts.DontInitSchema { if opts.PageSize == 0 { - opts.PageSize = 1 << 14 + // There doesn't seem to be an optimal size. I did try with the standard chunk size, but + // the difference is not convincing. + + //opts.PageSize = 1 << 14 } - err = InitSchema(conn, opts.PageSize, true) + err = InitSchema(conn, opts.PageSize, !opts.NoTriggers) if err != nil { return } @@ -270,13 +315,55 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { return } } - return conns, ProviderOpts{ - NumConns: opts.NumConns, - NoConcurrentBlobReads: opts.NoConcurrentBlobReads || opts.Memory || opts.NumConns == 1, - BatchWrites: opts.NumConns > 1, - MmapSize: 1 << 23, // 8 MiB - MmapSizeOk: true, - }, nil + return +} + +func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) { + withPoolConn(pool, func(c conn) { + err = initDatabase(c, opts) + }) + return +} + +func newConn(opts NewConnOpts) (conn, error) { + return sqlite.OpenConn(newOpenUri(opts), 0) +} + +type poolWithNumConns struct { + *sqlitex.Pool + numConns int +} + +func (me poolWithNumConns) NumConns() int { + return me.numConns +} + +func NewPool(opts NewPoolOpts) (_ ConnPool, err error) { + if opts.NumConns == 0 { + opts.NumConns = runtime.NumCPU() + } + conns, err := func() (ConnPool, error) { + switch opts.NumConns { + case 1: + conn, err := newConn(opts.NewConnOpts) + return &poolFromConn{conn: conn}, err + default: + _pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), 0, opts.NumConns) + return poolWithNumConns{_pool, opts.NumConns}, err + } + }() + if err != nil { + return + } + defer func() { + if err != nil { + conns.Close() + } + }() + return conns, initPoolConns(nil, conns, InitPoolOpts{ + NumConns: opts.NumConns, + InitConnOpts: opts.InitConnOpts, + }) } // Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool. @@ -301,20 +388,17 @@ func (me *poolFromConn) Close() error { return me.conn.Close() } +func (poolFromConn) NumConns() int { return 1 } + +type ProviderOpts struct { + BatchWrites bool +} + // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of // the ConnPool (since it has to initialize all the connections anyway). func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) { - _, err = initPoolConns(context.TODO(), pool, opts) - if err != nil { - err = fmt.Errorf("initing pool conns: %w", err) - return - } prov := &provider{pool: pool, opts: opts} if opts.BatchWrites { - if opts.NumConns < 2 { - err = errors.New("batch writes requires more than 1 conn") - return - } writes := make(chan writeRequest) prov.writes = writes // This is retained for backwards compatibility. It may not be necessary. @@ -326,7 +410,12 @@ func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) { return prov, nil } -func initPoolConns(ctx context.Context, pool ConnPool, opts ProviderOpts) (numInited int, err error) { +type InitPoolOpts struct { + NumConns int + InitConnOpts +} + +func initPoolConns(ctx context.Context, pool ConnPool, opts InitPoolOpts) (err error) { var conns []conn defer func() { for _, c := range conns { @@ -339,12 +428,11 @@ func initPoolConns(ctx context.Context, pool ConnPool, opts ProviderOpts) (numIn break } conns = append(conns, conn) - err = initConn(conn, opts) + err = initConn(conn, opts.InitConnOpts) if err != nil { err = fmt.Errorf("initing conn %v: %w", len(conns), err) return } - numInited++ } return } @@ -353,6 +441,13 @@ type ConnPool interface { Get(context.Context) conn Put(conn) Close() error + NumConns() int +} + +func withPoolConn(pool ConnPool, with func(conn)) { + c := pool.Get(nil) + defer pool.Put(c) + with(c) } type provider struct { diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index a903e3a9..9e5d8275 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -2,15 +2,19 @@ package sqliteStorage import ( "bytes" + "errors" "fmt" "io" "io/ioutil" "path/filepath" "sync" "testing" + "time" _ "github.com/anacrolix/envpprof" + "github.com/anacrolix/torrent/storage" test_storage "github.com/anacrolix/torrent/storage/test" + "github.com/dustin/go-humanize" qt "github.com/frankban/quicktest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,14 +22,15 @@ import ( func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) { opts.Path = filepath.Join(t.TempDir(), "sqlite3.db") - conns, provOpts, err := NewPool(opts) - require.NoError(t, err) + pool, err := NewPool(opts) + qt.Assert(t, err, qt.IsNil) // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now. - //t.Cleanup(func() { conns.Close() }) - prov, err := NewProvider(conns, provOpts) + //t.Cleanup(func() { pool.Close() }) + qt.Assert(t, initPoolDatabase(pool, InitDbOpts{}), qt.IsNil) + prov, err := NewProvider(pool, ProviderOpts{BatchWrites: pool.NumConns() > 1}) require.NoError(t, err) t.Cleanup(func() { prov.Close() }) - return conns, prov + return pool, prov } func TestTextBlobSize(t *testing.T) { @@ -68,31 +73,76 @@ func TestSimultaneousIncrementalBlob(t *testing.T) { func BenchmarkMarkComplete(b *testing.B) { const pieceSize = test_storage.DefaultPieceSize - const capacity = test_storage.DefaultCapacity + const noTriggers = false + var capacity int64 = test_storage.DefaultNumPieces * pieceSize / 2 + if noTriggers { + // Since we won't push out old pieces, we have to mark them incomplete manually. + capacity = 0 + } + runBench := func(b *testing.B, ci storage.ClientImpl) { + test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity) + } c := qt.New(b) for _, memory := range []bool{false, true} { b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) { - for _, batchWrites := range []bool{false, true} { - b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) { - dbPath := filepath.Join(b.TempDir(), "storage.db") - //b.Logf("storage db path: %q", dbPath) - ci, err := NewPiecesStorage(NewPiecesStorageOpts{ - NewPoolOpts: NewPoolOpts{ - Path: dbPath, - Capacity: 4*pieceSize - 1, - NoConcurrentBlobReads: false, - PageSize: 1 << 14, - Memory: memory, - }, - ProvOpts: func(opts *ProviderOpts) { - opts.BatchWrites = batchWrites - }, - }) + b.Run("Direct", func(b *testing.B) { + var opts NewDirectStorageOpts + opts.Memory = memory + opts.Path = filepath.Join(b.TempDir(), "storage.db") + opts.Capacity = capacity + opts.CacheBlobs = true + //opts.GcBlobs = true + opts.BlobFlushInterval = time.Second + opts.NoTriggers = noTriggers + directBench := func(b *testing.B) { + ci, err := NewDirectStorage(opts) + if errors.Is(err, UnexpectedJournalMode) { + b.Skipf("setting journal mode %q: %v", opts.SetJournalMode, err) + } c.Assert(err, qt.IsNil) defer ci.Close() - test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity) - }) - } + runBench(b, ci) + } + for _, journalMode := range []string{"", "wal", "off", "truncate", "delete", "persist", "memory"} { + opts.SetJournalMode = journalMode + b.Run("JournalMode="+journalMode, func(b *testing.B) { + for _, mmapSize := range []int64{-1, 0, 1 << 23, 1 << 24, 1 << 25} { + if memory && mmapSize >= 0 { + continue + } + b.Run(fmt.Sprintf("MmapSize=%s", func() string { + if mmapSize < 0 { + return "default" + } else { + return humanize.IBytes(uint64(mmapSize)) + } + }()), func(b *testing.B) { + opts.MmapSize = mmapSize + opts.MmapSizeOk = true + directBench(b) + }) + } + }) + } + }) + b.Run("ResourcePieces", func(b *testing.B) { + for _, batchWrites := range []bool{false, true} { + b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) { + var opts NewPiecesStorageOpts + opts.Path = filepath.Join(b.TempDir(), "storage.db") + //b.Logf("storage db path: %q", dbPath) + opts.Capacity = capacity + opts.Memory = memory + opts.ProvOpts = func(opts *ProviderOpts) { + opts.BatchWrites = batchWrites + } + ci, err := NewPiecesStorage(opts) + c.Assert(err, qt.IsNil) + defer ci.Close() + runBench(b, ci) + }) + } + }) }) } } diff --git a/storage/test/bench-resource-pieces.go b/storage/test/bench-piece-mark-complete.go similarity index 76% rename from storage/test/bench-resource-pieces.go rename to storage/test/bench-piece-mark-complete.go index 4a79c313..6e04702b 100644 --- a/storage/test/bench-resource-pieces.go +++ b/storage/test/bench-piece-mark-complete.go @@ -2,8 +2,6 @@ package test_storage import ( "bytes" - "io" - "io/ioutil" "math/rand" "sync" "testing" @@ -17,15 +15,16 @@ import ( const ( ChunkSize = 1 << 14 DefaultPieceSize = 2 << 20 - DefaultCapacity = 0 DefaultNumPieces = 16 ) func BenchmarkPieceMarkComplete( b *testing.B, ci storage.ClientImpl, - pieceSize int64, numPieces int, capacity int64, + pieceSize int64, numPieces int, + // This drives any special handling around capacity that may be configured into the storage + // implementation. + capacity int64, ) { - const check = true c := qt.New(b) info := &metainfo.Info{ Pieces: make([]byte, numPieces*metainfo.HashSize), @@ -35,16 +34,17 @@ func BenchmarkPieceMarkComplete( } ti, err := ci.OpenTorrent(info, metainfo.Hash{}) c.Assert(err, qt.IsNil) - defer ti.Close() + tw := storage.Torrent{ti} + defer tw.Close() rand.Read(info.Pieces) data := make([]byte, pieceSize) + readData := make([]byte, pieceSize) b.SetBytes(int64(numPieces) * pieceSize) oneIter := func() { for pieceIndex := range iter.N(numPieces) { - pi := ti.Piece(info.Piece(pieceIndex)) - if check { - rand.Read(data) - } + pi := tw.Piece(info.Piece(pieceIndex)) + rand.Read(data) + b.StartTimer() var wg sync.WaitGroup for off := int64(0); off < int64(len(data)); off += ChunkSize { wg.Add(1) @@ -67,12 +67,11 @@ func BenchmarkPieceMarkComplete( c.Assert(pi.Completion(), qt.Equals, storage.Completion{Complete: false, Ok: true}) c.Assert(pi.MarkComplete(), qt.IsNil) c.Assert(pi.Completion(), qt.Equals, storage.Completion{true, true}) - if check { - readData, err := ioutil.ReadAll(io.NewSectionReader(pi, 0, int64(len(data)))) - c.Assert(err, qt.IsNil) - c.Assert(len(readData), qt.Equals, len(data)) - c.Assert(bytes.Equal(readData, data), qt.IsTrue) - } + n, err := pi.WriteTo(bytes.NewBuffer(readData[:0])) + b.StopTimer() + c.Assert(err, qt.IsNil) + c.Assert(n, qt.Equals, int64(len(data))) + c.Assert(bytes.Equal(readData[:n], data), qt.IsTrue) } } // Fill the cache diff --git a/test/init_test.go b/test/init_test.go index 38335ef8..b862d4ba 100644 --- a/test/init_test.go +++ b/test/init_test.go @@ -1,5 +1,11 @@ package test import ( + "log" + _ "github.com/anacrolix/envpprof" ) + +func init() { + log.SetFlags(log.Flags() | log.Lshortfile) +} diff --git a/test/transfer_test.go b/test/transfer_test.go index f232b186..e58bb53b 100644 --- a/test/transfer_test.go +++ b/test/transfer_test.go @@ -110,6 +110,7 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { cfg.DownloadRateLimiter = ps.LeecherDownloadRateLimiter } cfg.Seed = false + //cfg.Debug = true if ps.ConfigureLeecher.Config != nil { ps.ConfigureLeecher.Config(cfg) } @@ -311,13 +312,10 @@ type leecherStorageTestCase struct { func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase { return leecherStorageTestCase{ fmt.Sprintf("SqliteFile,NumConns=%v", numConns), - sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts { - return sqliteStorage.NewPiecesStorageOpts{ - NewPoolOpts: sqliteStorage.NewPoolOpts{ - Path: filepath.Join(dataDir, "sqlite.db"), - NumConns: numConns, - }, - } + sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) { + opts.Path = filepath.Join(dataDir, "sqlite.db") + opts.NumConns = numConns + return }), numConns, } @@ -330,16 +328,23 @@ func TestClientTransferVarious(t *testing.T) { Wrapper: fileCachePieceResourceStorage, }), 0}, {"Boltdb", storage.NewBoltDB, 0}, + {"SqliteDirect", func(s string) storage.ClientImplCloser { + path := filepath.Join(s, "sqlite3.db") + var opts sqliteStorage.NewDirectStorageOpts + opts.Path = path + cl, err := sqliteStorage.NewDirectStorage(opts) + if err != nil { + panic(err) + } + return cl + }, 0}, sqliteLeecherStorageTestCase(1), sqliteLeecherStorageTestCase(2), // This should use a number of connections equal to the number of CPUs sqliteLeecherStorageTestCase(0), - {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) sqliteStorage.NewPiecesStorageOpts { - return sqliteStorage.NewPiecesStorageOpts{ - NewPoolOpts: sqliteStorage.NewPoolOpts{ - Memory: true, - }, - } + {"SqliteMemory", sqliteClientStorageFactory(func(dataDir string) (opts sqliteStorage.NewPiecesStorageOpts) { + opts.Memory = true + return }), 0}, } { t.Run(fmt.Sprintf("LeecherStorage=%s", ls.name), func(t *testing.T) { @@ -362,7 +367,7 @@ func TestClientTransferVarious(t *testing.T) { GOMAXPROCS: ls.gomaxprocs, }) }) - for _, readahead := range []int64{-1, 0, 1, 2, 3, 4, 5, 6, 9, 10, 11, 12, 13, 14, 15, 20} { + for _, readahead := range []int64{-1, 0, 1, 2, 9, 20} { t.Run(fmt.Sprintf("readahead=%v", readahead), func(t *testing.T) { testClientTransfer(t, testClientTransferParams{ SeederStorage: ss.f,