Use separate squirrel module

This commit is contained in:
Matt Joiner 2021-08-25 14:37:00 +10:00
parent 19d5905b6c
commit d6fcf7a32a
9 changed files with 23 additions and 638 deletions

1
go.mod
View File

@ -15,6 +15,7 @@ require (
github.com/anacrolix/missinggo/perf v1.0.0
github.com/anacrolix/missinggo/v2 v2.5.2
github.com/anacrolix/multiless v0.1.1-0.20210529082330-de2f6cf29619
github.com/anacrolix/squirrel v0.0.0-00010101000000-000000000000
github.com/anacrolix/sync v0.4.0
github.com/anacrolix/tagflag v1.3.0
github.com/anacrolix/upnp v0.1.2-0.20200416075019-5e9378ed1425

View File

@ -0,0 +1,7 @@
package sqliteStorage
import (
"github.com/anacrolix/squirrel"
)
type NewDirectStorageOpts = squirrel.NewCacheOpts

View File

@ -4,180 +4,44 @@
package sqliteStorage
import (
"errors"
"fmt"
"io"
"runtime"
"sync"
"time"
"crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqlitex"
"github.com/anacrolix/squirrel"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/storage"
)
type NewDirectStorageOpts struct {
NewConnOpts
InitDbOpts
InitConnOpts
GcBlobs bool
NoCacheBlobs bool
BlobFlushInterval time.Duration
}
func NewSquirrelCache(opts NewDirectStorageOpts) (_ *SquirrelCache, err error) {
conn, err := newConn(opts.NewConnOpts)
if err != nil {
return
}
if opts.PageSize == 0 {
// The largest size sqlite supports. I think we want this to be the smallest SquirrelBlob size we
// can expect, which is probably 1<<17.
opts.PageSize = 1 << 16
}
err = initDatabase(conn, opts.InitDbOpts)
if err != nil {
conn.Close()
return
}
err = initConn(conn, opts.InitConnOpts)
if err != nil {
conn.Close()
return
}
if opts.BlobFlushInterval == 0 && !opts.GcBlobs {
// This is influenced by typical busy timeouts, of 5-10s. We want to give other connections
// a few chances at getting a transaction through.
opts.BlobFlushInterval = time.Second
}
cl := &SquirrelCache{
conn: conn,
blobs: make(map[string]*sqlite.Blob),
opts: opts,
}
// Avoid race with cl.blobFlusherFunc
cl.l.Lock()
defer cl.l.Unlock()
if opts.BlobFlushInterval != 0 {
cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
}
cl.capacity = cl.getCapacity
return cl, nil
}
// A convenience function that creates a connection pool, resource provider, and a pieces storage
// ClientImpl and returns them all with a Close attached.
func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, err error) {
cache, err := NewSquirrelCache(opts)
cache, err := squirrel.NewCache(opts)
if err != nil {
return
}
return client{cache}, nil
}
func (cl *SquirrelCache) getCapacity() (ret *int64) {
cl.l.Lock()
defer cl.l.Unlock()
err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
ret = new(int64)
*ret = stmt.ColumnInt64(0)
return nil
})
if err != nil {
panic(err)
}
return
}
type SquirrelCache struct {
l sync.Mutex
conn conn
blobs map[string]*sqlite.Blob
blobFlusher *time.Timer
opts NewDirectStorageOpts
closed bool
capacity func() *int64
return &client{
cache,
cache.GetCapacity}, nil
}
type client struct {
*SquirrelCache
*squirrel.Cache
capacity func() *int64
}
func (c *SquirrelCache) blobFlusherFunc() {
c.l.Lock()
defer c.l.Unlock()
c.flushBlobs()
if !c.closed {
c.blobFlusher.Reset(c.opts.BlobFlushInterval)
}
}
func (c *SquirrelCache) flushBlobs() {
for key, b := range c.blobs {
// Need the lock to prevent racing with the GC finalizers.
b.Close()
delete(c.blobs, key)
}
}
func (c client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
t := torrent{c.SquirrelCache}
func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
t := torrent{c.Cache}
return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
}
func (c *SquirrelCache) Close() (err error) {
c.l.Lock()
defer c.l.Unlock()
c.flushBlobs()
if c.opts.BlobFlushInterval != 0 {
c.blobFlusher.Stop()
}
if !c.closed {
c.closed = true
err = c.conn.Close()
c.conn = nil
}
return
}
type torrent struct {
c *SquirrelCache
}
func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
rowidOk := false
err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
if rowidOk {
panic("expected at most one row")
}
// TODO: How do we know if we got this wrong?
rowid = stmt.ColumnInt64(0)
rowidOk = true
return nil
}, name)
if err != nil {
return
}
if rowidOk {
return
}
if !create {
err = errors.New("no existing row")
return
}
err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
if err != nil {
return
}
rowid = c.LastInsertRowID()
return
c *squirrel.Cache
}
func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
ret := piece{
sb: SquirrelBlob{
sb: squirrel.Blob{
p.Hash().HexString(),
p.Length(),
t.c,
@ -192,104 +56,20 @@ func (t torrent) Close() error {
return nil
}
type SquirrelBlob struct {
name string
length int64
*SquirrelCache
}
type piece struct {
sb SquirrelBlob
sb squirrel.Blob
io.ReaderAt
io.WriterAt
}
func (p SquirrelBlob) doAtIoWithBlob(
atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
b []byte,
off int64,
create bool,
) (n int, err error) {
p.l.Lock()
defer p.l.Unlock()
if p.opts.NoCacheBlobs {
defer p.forgetBlob()
}
blob, err := p.getBlob(create)
if err != nil {
err = fmt.Errorf("getting blob: %w", err)
return
}
n, err = atIo(blob)(b, off)
if err == nil {
return
}
var se sqlite.Error
if !errors.As(err, &se) {
return
}
// "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
// if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
// because they may be attached to names that have since moved on to another blob.
if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
return
}
p.forgetBlob()
// Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
// might be possible to skip to this version if we don't cache blobs.
blob, err = p.getBlob(create)
if err != nil {
err = fmt.Errorf("getting blob: %w", err)
return
}
return atIo(blob)(b, off)
}
func (p SquirrelBlob) ReadAt(b []byte, off int64) (n int, err error) {
return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
return blob.ReadAt
}, b, off, false)
}
func (p SquirrelBlob) WriteAt(b []byte, off int64) (n int, err error) {
return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
return blob.WriteAt
}, b, off, true)
}
func (p SquirrelBlob) SetTag(name string, value interface{}) error {
p.l.Lock()
defer p.l.Unlock()
return sqlitex.Exec(p.conn, "insert or replace into tag (blob_name, tag_name, value) values (?, ?, ?)", nil,
p.name, name, value)
}
func (p piece) MarkComplete() error {
return p.sb.SetTag("verified", true)
}
func (p SquirrelBlob) forgetBlob() {
blob, ok := p.blobs[p.name]
if !ok {
return
}
blob.Close()
delete(p.blobs, p.name)
}
func (p piece) MarkNotComplete() error {
return p.sb.SetTag("verified", false)
}
func (p SquirrelBlob) GetTag(name string, result func(*sqlite.Stmt)) error {
p.l.Lock()
defer p.l.Unlock()
return sqlitex.Exec(p.conn, "select value from tag where blob_name=? and tag_name=?", func(stmt *sqlite.Stmt) error {
result(stmt)
return nil
}, p.name, name)
}
func (p piece) Completion() (ret storage.Completion) {
err := p.sb.GetTag("verified", func(stmt *sqlite.Stmt) {
ret.Complete = stmt.ColumnInt(0) != 0
@ -300,30 +80,3 @@ func (p piece) Completion() (ret storage.Completion) {
}
return
}
func (p SquirrelBlob) getBlob(create bool) (*sqlite.Blob, error) {
blob, ok := p.blobs[p.name]
if !ok {
rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
if err != nil {
return nil, fmt.Errorf("getting rowid for blob: %w", err)
}
blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
if err != nil {
panic(err)
}
if p.opts.GcBlobs {
herp := new(byte)
runtime.SetFinalizer(herp, func(*byte) {
p.l.Lock()
defer p.l.Unlock()
// Note there's no guarantee that the finalizer fired while this blob is the same
// one in the blob cache. It might be possible to rework this so that we check, or
// strip finalizers as appropriate.
blob.Close()
})
}
p.blobs[p.name] = blob
}
return blob, nil
}

View File

@ -1,25 +0,0 @@
create trigger if not exists delete_blob_tags_before_blob_deleted
before delete on blob
begin
delete from tag where blob_name=old.name;
end;
create trigger if not exists after_insert_blob
after insert on blob
begin
update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
delete from blob where rowid in (select blob_rowid from deletable_blob);
end;
create trigger if not exists after_update_blob
after update of data on blob
begin
update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
delete from blob where rowid in (select blob_rowid from deletable_blob);
end;
create trigger if not exists after_delete_blob
after delete on blob
begin
update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
end;

View File

@ -1,64 +0,0 @@
-- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
-- can trim the database file size with partial vacuums without having to do a full vacuum, which
-- locks everything.
pragma auto_vacuum=incremental;
create table if not exists blob (
name text,
last_used timestamp default (datetime('now')),
data blob,
primary key (name)
);
create table if not exists blob_meta (
key text primary key,
value
);
create index if not exists blob_last_used on blob(last_used);
-- While sqlite *seems* to be faster to get sum(length(data)) instead of
-- sum(length(data)), it may still require a large table scan at start-up or with a
-- cold-cache. With this we can be assured that it doesn't.
insert or ignore into blob_meta values ('size', 0);
create table if not exists setting (
name primary key on conflict replace,
value
);
create table if not exists tag (
blob_name references blob(name),
tag_name,
value,
primary key (blob_name, tag_name)
);
create view if not exists deletable_blob as
with recursive excess (
usage_with,
last_used,
blob_rowid,
data_length
) as (
select *
from (
select
(select value from blob_meta where key='size') as usage_with,
last_used,
rowid,
length(data)
from blob order by last_used, rowid limit 1
)
where usage_with > (select value from setting where name='capacity')
union all
select
usage_with-data_length as new_usage_with,
blob.last_used,
blob.rowid,
length(data)
from excess join blob
on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
where new_usage_with > (select value from setting where name='capacity')
)
select * from excess;

View File

@ -1,29 +0,0 @@
pragma auto_vacuum=incremental;
create table if not exists blob(
name text,
last_used timestamp default (datetime('now')),
data blob,
primary key (name)
);
create view if not exists deletable_blob as
with recursive excess_blob(
usage_with,
last_used,
blob_rowid,
data_length
) as (
select * from (select (select sum(length(data)) from blob) as usage_with, last_used, rowid, length(data) from blob order by last_used, rowid limit 1)
where usage_with >= (select value from setting where name='capacity')
union all
select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess_blob join blob
on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
where usage_with >= (select value from setting where name='capacity')
) select * from excess;
CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin
delete from blob where rowid in (select blob_rowid from deletable_blob);
end;
CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin
delete from blob where rowid in (select blob_rowid from deletable_blob);
end;

View File

@ -1,46 +0,0 @@
//go:build cgo
// +build cgo
package main
import (
"fmt"
"log"
"os"
"crawshaw.io/sqlite"
"github.com/alexflint/go-arg"
sqliteStorage "github.com/anacrolix/torrent/storage/sqlite"
)
type InitCommand struct {
Path string `arg:"positional"`
}
func main() {
err := mainErr()
if err != nil {
log.Printf("error in main: %v", err)
os.Exit(1)
}
}
func mainErr() error {
var args struct {
Init *InitCommand `arg:"subcommand"`
}
p := arg.MustParse(&args)
switch {
case args.Init != nil:
conn, err := sqlite.OpenConn(args.Init.Path, 0)
if err != nil {
return fmt.Errorf("opening sqlite conn: %w", err)
}
defer conn.Close()
return sqliteStorage.InitSchema(conn, 1<<14, true)
default:
p.Fail("expected subcommand")
panic("unreachable")
}
}

View File

@ -1,213 +0,0 @@
//go:build cgo
// +build cgo
package sqliteStorage
import (
_ "embed"
"errors"
"fmt"
"net/url"
"crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqlitex"
)
type conn = *sqlite.Conn
type InitConnOpts struct {
SetSynchronous int
SetJournalMode string
MmapSizeOk bool // If false, a package-specific default will be used.
MmapSize int64 // If MmapSizeOk is set, use sqlite default if < 0, otherwise this value.
}
type UnexpectedJournalMode struct {
JournalMode string
}
func (me UnexpectedJournalMode) Error() string {
return fmt.Sprintf("unexpected journal mode: %q", me.JournalMode)
}
func setSynchronous(conn conn, syncInt int) (err error) {
err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma synchronous=%v`, syncInt), nil)
if err != nil {
return err
}
var (
actual int
actualOk bool
)
err = sqlitex.ExecTransient(conn, `pragma synchronous`, func(stmt *sqlite.Stmt) error {
actual = stmt.ColumnInt(0)
actualOk = true
return nil
})
if err != nil {
return
}
if !actualOk {
return errors.New("synchronous setting query didn't return anything")
}
if actual != syncInt {
return fmt.Errorf("set synchronous %q, got %q", syncInt, actual)
}
return nil
}
func initConn(conn conn, opts InitConnOpts) (err error) {
err = sqlitex.ExecTransient(conn, "pragma foreign_keys=on", nil)
if err != nil {
return err
}
err = setSynchronous(conn, opts.SetSynchronous)
if err != nil {
return
}
// Recursive triggers are required because we need to trim the blob_meta size after trimming to
// capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
err = sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
if err != nil {
return err
}
if opts.SetJournalMode != "" {
err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma journal_mode=%s`, opts.SetJournalMode), func(stmt *sqlite.Stmt) error {
ret := stmt.ColumnText(0)
if ret != opts.SetJournalMode {
return UnexpectedJournalMode{ret}
}
return nil
})
if err != nil {
return err
}
}
if !opts.MmapSizeOk {
// Set the default. Currently it seems the library picks reasonable defaults, especially for
// wal.
opts.MmapSize = -1
//opts.MmapSize = 1 << 24 // 8 MiB
}
if opts.MmapSize >= 0 {
err = sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma mmap_size=%d`, opts.MmapSize), nil)
if err != nil {
return err
}
}
return nil
}
func setPageSize(conn conn, pageSize int) error {
if pageSize == 0 {
return nil
}
var retSize int64
err := sqlitex.ExecTransient(conn, fmt.Sprintf(`pragma page_size=%d`, pageSize), nil)
if err != nil {
return err
}
err = sqlitex.ExecTransient(conn, "pragma page_size", func(stmt *sqlite.Stmt) error {
retSize = stmt.ColumnInt64(0)
return nil
})
if err != nil {
return err
}
if retSize != int64(pageSize) {
return fmt.Errorf("requested page size %v but got %v", pageSize, retSize)
}
return nil
}
var (
//go:embed init.sql
initScript string
//go:embed init-triggers.sql
initTriggers string
)
func InitSchema(conn conn, pageSize int, triggers bool) error {
err := setPageSize(conn, pageSize)
if err != nil {
return fmt.Errorf("setting page size: %w", err)
}
err = sqlitex.ExecScript(conn, initScript)
if err != nil {
return err
}
if triggers {
err := sqlitex.ExecScript(conn, initTriggers)
if err != nil {
return err
}
}
return nil
}
type InitDbOpts struct {
DontInitSchema bool
PageSize int
// If non-zero, overrides the existing setting.
Capacity int64
NoTriggers bool
}
// Remove any capacity limits.
func unlimitCapacity(conn conn) error {
return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
}
// Set the capacity limit to exactly this value.
func setCapacity(conn conn, cap int64) error {
return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
}
type NewConnOpts struct {
// See https://www.sqlite.org/c3ref/open.html. NB: "If the filename is an empty string, then a
// private, temporary on-disk database will be created. This private database will be
// automatically deleted as soon as the database connection is closed."
Path string
Memory bool
// Whether multiple blobs will not be read simultaneously. Enables journal mode other than WAL,
// and NumConns < 2.
NoConcurrentBlobReads bool
}
func newOpenUri(opts NewConnOpts) string {
path := url.PathEscape(opts.Path)
if opts.Memory {
path = ":memory:"
}
values := make(url.Values)
if opts.NoConcurrentBlobReads || opts.Memory {
values.Add("cache", "shared")
}
return fmt.Sprintf("file:%s?%s", path, values.Encode())
}
func initDatabase(conn conn, opts InitDbOpts) (err error) {
if !opts.DontInitSchema {
err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
if err != nil {
return
}
}
if opts.Capacity < 0 {
err = unlimitCapacity(conn)
} else if opts.Capacity > 0 {
err = setCapacity(conn, opts.Capacity)
}
return
}
// Go fmt, why you so shit?
const openConnFlags = 0 |
sqlite.SQLITE_OPEN_READWRITE |
sqlite.SQLITE_OPEN_CREATE |
sqlite.SQLITE_OPEN_URI |
sqlite.SQLITE_OPEN_NOMUTEX
func newConn(opts NewConnOpts) (conn, error) {
return sqlite.OpenConn(newOpenUri(opts), openConnFlags)
}

View File

@ -11,6 +11,7 @@ import (
"time"
_ "github.com/anacrolix/envpprof"
"github.com/anacrolix/squirrel"
"github.com/anacrolix/torrent/storage"
test_storage "github.com/anacrolix/torrent/storage/test"
"github.com/dustin/go-humanize"
@ -30,7 +31,7 @@ func BenchmarkMarkComplete(b *testing.B) {
}
c := qt.New(b)
b.Run("CustomDirect", func(b *testing.B) {
var opts NewDirectStorageOpts
var opts squirrel.NewCacheOpts
opts.Capacity = capacity
opts.NoTriggers = noTriggers
benchOpts := func(b *testing.B) {
@ -54,7 +55,7 @@ func BenchmarkMarkComplete(b *testing.B) {
directBench := func(b *testing.B) {
opts.Path = filepath.Join(b.TempDir(), "storage.db")
ci, err := NewDirectStorage(opts)
var ujm UnexpectedJournalMode
var ujm squirrel.UnexpectedJournalMode
if errors.As(err, &ujm) {
b.Skipf("setting journal mode %q: %v", opts.SetJournalMode, err)
}