sqlite storage: Add batched writes

This commit is contained in:
Matt Joiner 2020-10-30 19:46:51 +11:00
parent ba70ad9b41
commit 17e22516ff
2 changed files with 87 additions and 23 deletions

View File

@ -89,6 +89,7 @@ func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error)
} }
prov, err := NewProvider(conns, provOpts) prov, err := NewProvider(conns, provOpts)
if err != nil { if err != nil {
conns.Close()
return return
} }
store := storage.NewResourcePieces(prov) store := storage.NewResourcePieces(prov)
@ -97,7 +98,7 @@ func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error)
io.Closer io.Closer
}{ }{
store, store,
conns, prov,
}, nil }, nil
} }
@ -116,6 +117,7 @@ type ProviderOpts struct {
NumConns int NumConns int
// Concurrent blob reads require WAL. // Concurrent blob reads require WAL.
ConcurrentBlobRead bool ConcurrentBlobRead bool
BatchWrites bool
} }
func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
@ -133,7 +135,7 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
conns, err := func() (ConnPool, error) { conns, err := func() (ConnPool, error) {
switch opts.NumConns { switch opts.NumConns {
case 1: case 1:
conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", opts.Path), 0) conn, err := sqlite.OpenConn(path, 0)
return &poolFromConn{conn: conn}, err return &poolFromConn{conn: conn}, err
default: default:
return sqlitex.Open(path, 0, opts.NumConns) return sqlitex.Open(path, 0, opts.NumConns)
@ -145,6 +147,7 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
return conns, ProviderOpts{ return conns, ProviderOpts{
NumConns: opts.NumConns, NumConns: opts.NumConns,
ConcurrentBlobRead: opts.ConcurrentBlobReads, ConcurrentBlobRead: opts.ConcurrentBlobReads,
BatchWrites: true,
}, nil }, nil
} }
@ -170,16 +173,23 @@ func (me *poolFromConn) Close() error {
return me.conn.Close() return me.conn.Close()
} }
// Needs the ConnPool size so it can initialize all the connections with pragmas. // Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
// the ConnPool (since it has to initialize all the connections anyway).
func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) { func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
_, err = initPoolConns(context.TODO(), pool, opts.NumConns, opts.ConcurrentBlobRead) _, err = initPoolConns(context.TODO(), pool, opts.NumConns, true)
if err != nil { if err != nil {
return return
} }
conn := pool.Get(context.TODO()) conn := pool.Get(context.TODO())
defer pool.Put(conn) defer pool.Put(conn)
err = initSchema(conn) err = initSchema(conn)
return &provider{pool: pool}, err if err != nil {
return
}
writes := make(chan writeRequest)
prov := &provider{pool: pool, writes: writes, opts: opts}
go prov.writer(writes)
return prov, nil
} }
func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) { func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) {
@ -212,7 +222,57 @@ type ConnPool interface {
} }
type provider struct { type provider struct {
pool ConnPool pool ConnPool
writes chan<- writeRequest
opts ProviderOpts
}
func (me *provider) Close() error {
close(me.writes)
return me.pool.Close()
}
type writeRequest struct {
query func(*sqlite.Conn)
done chan<- struct{}
}
func (me *provider) writer(writes <-chan writeRequest) {
for {
first, ok := <-writes
if !ok {
return
}
buf := []writeRequest{first}
buffer:
for {
select {
case wr, ok := <-writes:
if !ok {
break buffer
}
buf = append(buf, wr)
default:
break buffer
}
}
var cantFail error
func() {
conn := me.pool.Get(context.TODO())
defer me.pool.Put(conn)
defer sqlitex.Save(conn)(&cantFail)
for _, wr := range buf {
wr.query(conn)
}
}()
if cantFail != nil {
panic(cantFail)
}
for _, wr := range buf {
close(wr.done)
}
log.Printf("batched %v write queries", len(buf))
}
} }
func (p *provider) NewInstance(s string) (resource.Instance, error) { func (p *provider) NewInstance(s string) (resource.Instance, error) {
@ -224,17 +284,19 @@ type instance struct {
p *provider p *provider
} }
func (i instance) withConn(with func(conn conn)) { func (i instance) withConn(with func(conn conn), write bool) {
conn := i.p.pool.Get(context.TODO()) if write && i.p.opts.BatchWrites {
//err := sqlitex.Exec(conn, "pragma synchronous", func(stmt *sqlite.Stmt) error { done := make(chan struct{})
// log.Print(stmt.ColumnText(0)) i.p.writes <- writeRequest{
// return nil query: with,
//}) done: done,
//if err != nil { }
// log.Print(err) <-done
//} } else {
defer i.p.pool.Put(conn) conn := i.p.pool.Get(context.TODO())
with(conn) defer i.p.pool.Put(conn)
with(conn)
}
} }
func (i instance) getConn() *sqlite.Conn { func (i instance) getConn() *sqlite.Conn {
@ -252,7 +314,7 @@ func (i instance) Readdirnames() (names []string, err error) {
names = append(names, stmt.ColumnText(0)[len(prefix):]) names = append(names, stmt.ColumnText(0)[len(prefix):])
return nil return nil
}, prefix+"%") }, prefix+"%")
}) }, false)
//log.Printf("readdir %q gave %q", i.location, names) //log.Printf("readdir %q gave %q", i.location, names)
return return
} }
@ -340,7 +402,7 @@ func (i instance) Put(reader io.Reader) (err error) {
} }
break break
} }
}) }, true)
return return
} }
@ -381,7 +443,7 @@ func (i instance) Stat() (ret os.FileInfo, err error) {
} }
defer blob.Close() defer blob.Close()
ret = fileInfo{blob.Size()} ret = fileInfo{blob.Size()}
}) }, false)
return return
} }
@ -429,7 +491,7 @@ func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
err = io.EOF err = io.EOF
} }
} }
}) }, false)
return return
} }
@ -440,6 +502,6 @@ func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
func (i instance) Delete() (err error) { func (i instance) Delete() (err error) {
i.withConn(func(conn conn) { i.withConn(func(conn conn) {
err = sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location) err = sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
}) }, true)
return return
} }

View File

@ -17,9 +17,11 @@ func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
opts.Path = filepath.Join(t.TempDir(), "sqlite3.db") opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
conns, provOpts, err := NewPool(opts) conns, provOpts, err := NewPool(opts)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { conns.Close() }) // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now.
//t.Cleanup(func() { conns.Close() })
prov, err := NewProvider(conns, provOpts) prov, err := NewProvider(conns, provOpts)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { prov.Close() })
return conns, prov return conns, prov
} }