Don't create blobs when reading
This commit is contained in:
parent
ee572c5822
commit
e5d21dbf34
|
@ -2,6 +2,7 @@ package sqliteStorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -93,15 +94,25 @@ type torrent struct {
|
||||||
c *client
|
c *client
|
||||||
}
|
}
|
||||||
|
|
||||||
func rowidForBlob(c conn, name string, length int64) (rowid int64, err error) {
|
func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64, err error) {
|
||||||
|
rowidOk := false
|
||||||
err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
|
err = sqlitex.Exec(c, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
|
||||||
|
if rowidOk {
|
||||||
|
panic("expected at most one row")
|
||||||
|
}
|
||||||
|
// TODO: How do we know if we got this wrong?
|
||||||
rowid = stmt.ColumnInt64(0)
|
rowid = stmt.ColumnInt64(0)
|
||||||
|
rowidOk = true
|
||||||
return nil
|
return nil
|
||||||
}, name)
|
}, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if rowid != 0 {
|
if rowidOk {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !create {
|
||||||
|
err = errors.New("no existing row")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
|
err = sqlitex.Exec(c, "insert into blob(name, data) values(?, zeroblob(?))", nil, name, length)
|
||||||
|
@ -137,13 +148,19 @@ func (p piece) doAtIoWithBlob(
|
||||||
atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
|
atIo func(*sqlite.Blob) func([]byte, int64) (int, error),
|
||||||
b []byte,
|
b []byte,
|
||||||
off int64,
|
off int64,
|
||||||
|
create bool,
|
||||||
) (n int, err error) {
|
) (n int, err error) {
|
||||||
p.l.Lock()
|
p.l.Lock()
|
||||||
defer p.l.Unlock()
|
defer p.l.Unlock()
|
||||||
if !p.opts.CacheBlobs {
|
if !p.opts.CacheBlobs {
|
||||||
defer p.forgetBlob()
|
defer p.forgetBlob()
|
||||||
}
|
}
|
||||||
n, err = atIo(p.getBlob())(b, off)
|
blob, err := p.getBlob(create)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("getting blob: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n, err = atIo(blob)(b, off)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -151,23 +168,33 @@ func (p piece) doAtIoWithBlob(
|
||||||
if !errors.As(err, &se) {
|
if !errors.As(err, &se) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// "ABORT" occurs if the row the blob is on is modified elsewhere. "ERROR: invalid blob" occurs
|
||||||
|
// if the blob has been closed. We don't forget blobs that are closed by our GC finalizers,
|
||||||
|
// because they may be attached to names that have since moved on to another blob.
|
||||||
if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
|
if se.Code != sqlite.SQLITE_ABORT && !(p.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.forgetBlob()
|
p.forgetBlob()
|
||||||
return atIo(p.getBlob())(b, off)
|
// Try again, this time we're guaranteed to get a fresh blob, and so errors are no excuse. It
|
||||||
|
// might be possible to skip to this version if we don't cache blobs.
|
||||||
|
blob, err = p.getBlob(create)
|
||||||
|
if err != nil {
|
||||||
|
err = fmt.Errorf("getting blob: %w", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return atIo(blob)(b, off)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
|
func (p piece) ReadAt(b []byte, off int64) (n int, err error) {
|
||||||
return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
|
return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
|
||||||
return blob.ReadAt
|
return blob.ReadAt
|
||||||
}, b, off)
|
}, b, off, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
|
func (p piece) WriteAt(b []byte, off int64) (n int, err error) {
|
||||||
return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
|
return p.doAtIoWithBlob(func(blob *sqlite.Blob) func([]byte, int64) (int, error) {
|
||||||
return blob.WriteAt
|
return blob.WriteAt
|
||||||
}, b, off)
|
}, b, off, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p piece) MarkComplete() error {
|
func (p piece) MarkComplete() error {
|
||||||
|
@ -211,12 +238,12 @@ func (p piece) Completion() (ret storage.Completion) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p piece) getBlob() *sqlite.Blob {
|
func (p piece) getBlob(create bool) (*sqlite.Blob, error) {
|
||||||
blob, ok := p.blobs[p.name]
|
blob, ok := p.blobs[p.name]
|
||||||
if !ok {
|
if !ok {
|
||||||
rowid, err := rowidForBlob(p.conn, p.name, p.length)
|
rowid, err := rowidForBlob(p.conn, p.name, p.length, create)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
return nil, fmt.Errorf("getting rowid for blob: %w", err)
|
||||||
}
|
}
|
||||||
blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
|
blob, err = p.conn.OpenBlob("main", "blob", "data", rowid, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -227,10 +254,13 @@ func (p piece) getBlob() *sqlite.Blob {
|
||||||
runtime.SetFinalizer(herp, func(*byte) {
|
runtime.SetFinalizer(herp, func(*byte) {
|
||||||
p.l.Lock()
|
p.l.Lock()
|
||||||
defer p.l.Unlock()
|
defer p.l.Unlock()
|
||||||
|
// Note there's no guarantee that the finalizer fired while this blob is the same
|
||||||
|
// one in the blob cache. It might be possible to rework this so that we check, or
|
||||||
|
// strip finalizers as appropriate.
|
||||||
blob.Close()
|
blob.Close()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
p.blobs[p.name] = blob
|
p.blobs[p.name] = blob
|
||||||
}
|
}
|
||||||
return blob
|
return blob, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue