From 17e22516ff596c7dfe74c5871a1a501bb38d0263 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 30 Oct 2020 19:46:51 +1100 Subject: [PATCH] sqlite storage: Add batched writes --- storage/sqlite/sqlite-storage.go | 106 ++++++++++++++++++++------ storage/sqlite/sqlite-storage_test.go | 4 +- 2 files changed, 87 insertions(+), 23 deletions(-) diff --git a/storage/sqlite/sqlite-storage.go b/storage/sqlite/sqlite-storage.go index 75520e77..4d000353 100644 --- a/storage/sqlite/sqlite-storage.go +++ b/storage/sqlite/sqlite-storage.go @@ -89,6 +89,7 @@ func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) } prov, err := NewProvider(conns, provOpts) if err != nil { + conns.Close() return } store := storage.NewResourcePieces(prov) @@ -97,7 +98,7 @@ func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) io.Closer }{ store, - conns, + prov, }, nil } @@ -116,6 +117,7 @@ type ProviderOpts struct { NumConns int // Concurrent blob reads require WAL. ConcurrentBlobRead bool + BatchWrites bool } func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { @@ -133,7 +135,7 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { conns, err := func() (ConnPool, error) { switch opts.NumConns { case 1: - conn, err := sqlite.OpenConn(fmt.Sprintf("file:%s", opts.Path), 0) + conn, err := sqlite.OpenConn(path, 0) return &poolFromConn{conn: conn}, err default: return sqlitex.Open(path, 0, opts.NumConns) @@ -145,6 +147,7 @@ func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) { return conns, ProviderOpts{ NumConns: opts.NumConns, ConcurrentBlobRead: opts.ConcurrentBlobReads, + BatchWrites: true, }, nil } @@ -170,16 +173,23 @@ func (me *poolFromConn) Close() error { return me.conn.Close() } -// Needs the ConnPool size so it can initialize all the connections with pragmas. +// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of +// the ConnPool (since it has to initialize all the connections anyway). func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) { - _, err = initPoolConns(context.TODO(), pool, opts.NumConns, opts.ConcurrentBlobRead) + _, err = initPoolConns(context.TODO(), pool, opts.NumConns, true) if err != nil { return } conn := pool.Get(context.TODO()) defer pool.Put(conn) err = initSchema(conn) - return &provider{pool: pool}, err + if err != nil { + return + } + writes := make(chan writeRequest) + prov := &provider{pool: pool, writes: writes, opts: opts} + go prov.writer(writes) + return prov, nil } func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) { @@ -212,7 +222,57 @@ type ConnPool interface { } type provider struct { - pool ConnPool + pool ConnPool + writes chan<- writeRequest + opts ProviderOpts +} + +func (me *provider) Close() error { + close(me.writes) + return me.pool.Close() +} + +type writeRequest struct { + query func(*sqlite.Conn) + done chan<- struct{} +} + +func (me *provider) writer(writes <-chan writeRequest) { + for { + first, ok := <-writes + if !ok { + return + } + buf := []writeRequest{first} + buffer: + for { + select { + case wr, ok := <-writes: + if !ok { + break buffer + } + buf = append(buf, wr) + default: + break buffer + } + } + var cantFail error + func() { + conn := me.pool.Get(context.TODO()) + defer me.pool.Put(conn) + defer sqlitex.Save(conn)(&cantFail) + for _, wr := range buf { + wr.query(conn) + } + }() + if cantFail != nil { + panic(cantFail) + } + for _, wr := range buf { + close(wr.done) + } + log.Printf("batched %v write queries", len(buf)) + } } func (p *provider) NewInstance(s string) (resource.Instance, error) { @@ -224,17 +284,19 @@ type instance struct { p *provider } -func (i instance) withConn(with func(conn conn)) { - conn := i.p.pool.Get(context.TODO()) - //err := sqlitex.Exec(conn, "pragma synchronous", func(stmt *sqlite.Stmt) error { - // log.Print(stmt.ColumnText(0)) - // return nil - //}) - //if err != nil { - // log.Print(err) - //} - defer i.p.pool.Put(conn) - with(conn) +func (i instance) withConn(with func(conn conn), write bool) { + if write && i.p.opts.BatchWrites { + done := make(chan struct{}) + i.p.writes <- writeRequest{ + query: with, + done: done, + } + <-done + } else { + conn := i.p.pool.Get(context.TODO()) + defer i.p.pool.Put(conn) + with(conn) + } } func (i instance) getConn() *sqlite.Conn { @@ -252,7 +314,7 @@ func (i instance) Readdirnames() (names []string, err error) { names = append(names, stmt.ColumnText(0)[len(prefix):]) return nil }, prefix+"%") - }) + }, false) //log.Printf("readdir %q gave %q", i.location, names) return } @@ -340,7 +402,7 @@ func (i instance) Put(reader io.Reader) (err error) { } break } - }) + }, true) return } @@ -381,7 +443,7 @@ func (i instance) Stat() (ret os.FileInfo, err error) { } defer blob.Close() ret = fileInfo{blob.Size()} - }) + }, false) return } @@ -429,7 +491,7 @@ func (i instance) ReadAt(p []byte, off int64) (n int, err error) { err = io.EOF } } - }) + }, false) return } @@ -440,6 +502,6 @@ func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) { func (i instance) Delete() (err error) { i.withConn(func(conn conn) { err = sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location) - }) + }, true) return } diff --git a/storage/sqlite/sqlite-storage_test.go b/storage/sqlite/sqlite-storage_test.go index 2a413739..bb71d7fe 100644 --- a/storage/sqlite/sqlite-storage_test.go +++ b/storage/sqlite/sqlite-storage_test.go @@ -17,9 +17,11 @@ func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) { opts.Path = filepath.Join(t.TempDir(), "sqlite3.db") conns, provOpts, err := NewPool(opts) require.NoError(t, err) - t.Cleanup(func() { conns.Close() }) + // sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now. + //t.Cleanup(func() { conns.Close() }) prov, err := NewProvider(conns, provOpts) require.NoError(t, err) + t.Cleanup(func() { prov.Close() }) return conns, prov }