Add write incomplete/consecutive chunks interfaces

This commit is contained in:
Matt Joiner 2020-11-02 15:35:07 +11:00
parent 17e22516ff
commit 636b20b860
5 changed files with 97 additions and 29 deletions

View File

@ -38,6 +38,10 @@ type PieceImpl interface {
Completion() Completion
}
type IncompletePieceToWriter interface {
WriteIncompleteTo(w io.Writer) error
}
type Completion struct {
Complete bool
Ok bool

View File

@ -13,10 +13,10 @@ import (
)
type piecePerResource struct {
p resource.Provider
p PieceProvider
}
func NewResourcePieces(p resource.Provider) ClientImpl {
func NewResourcePieces(p PieceProvider) ClientImpl {
return &piecePerResource{
p: p,
}
@ -41,11 +41,29 @@ func (s piecePerResource) Piece(p metainfo.Piece) PieceImpl {
}
}
type PieceProvider interface {
resource.Provider
}
type ConsecutiveChunkWriter interface {
WriteConsecutiveChunks(prefix string, _ io.Writer) error
}
type piecePerResourcePiece struct {
mp metainfo.Piece
rp resource.Provider
}
var _ IncompletePieceToWriter = piecePerResourcePiece{}
func (s piecePerResourcePiece) WriteIncompleteTo(w io.Writer) error {
if ccw, ok := s.rp.(ConsecutiveChunkWriter); ok {
return ccw.WriteConsecutiveChunks(s.incompleteDirPath()+"/", w)
}
_, err := io.Copy(w, io.NewSectionReader(s.getChunks(), 0, s.mp.Length()))
return err
}
func (s piecePerResourcePiece) Completion() Completion {
fi, err := s.completed().Stat()
return Completion{
@ -73,6 +91,7 @@ func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
if s.Completion().Complete {
return s.completed().ReadAt(b, off)
}
//panic("unexpected ReadAt of incomplete piece")
return s.getChunks().ReadAt(b, off)
}

View File

@ -71,10 +71,17 @@ with recursive excess(
where usage_with >= (select value from setting where name='capacity')
) select * from excess;
CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin
create trigger if not exists trim_blobs_to_capacity_after_update
after update of data on blob
when length(new.data)>length(old.data) and (select sum(length(cast(data as blob))) from blob)>(select value from setting where name='capacity')
begin
delete from blob where rowid in (select blob_rowid from deletable_blob);
end;
CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin
create trigger if not exists trim_blobs_to_capacity_after_insert
after insert on blob
when (select sum(length(cast(data as blob))) from blob)>(select value from setting where name='capacity')
begin
delete from blob where rowid in (select blob_rowid from deletable_blob);
end;
`)
@ -188,7 +195,13 @@ func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
}
writes := make(chan writeRequest)
prov := &provider{pool: pool, writes: writes, opts: opts}
go prov.writer(writes)
runtime.SetFinalizer(prov, func(p *provider) {
// This is done in a finalizer, as it's easier than trying to synchronize on whether the
// channel has been closed. It also means that the provider writer can pass back errors from
// a closed ConnPool.
close(p.writes)
})
go providerWriter(writes, prov.pool)
return prov, nil
}
@ -227,8 +240,33 @@ type provider struct {
opts ProviderOpts
}
var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (err error) {
p.withConn(func(conn conn) {
err = io.EOF
err = sqlitex.Exec(conn, `
select
cast(data as blob),
cast(substr(name, ?+1) as integer) as offset
from blob
where name like ?||'%'
order by offset`,
func(stmt *sqlite.Stmt) error {
r := stmt.ColumnReader(0)
//offset := stmt.ColumnInt64(1)
//log.Printf("got %v bytes at offset %v", r.Len(), offset)
_, err := io.Copy(w, r)
return err
},
len(prefix),
prefix,
)
}, false)
return
}
func (me *provider) Close() error {
close(me.writes)
return me.pool.Close()
}
@ -237,7 +275,9 @@ type writeRequest struct {
done chan<- struct{}
}
func (me *provider) writer(writes <-chan writeRequest) {
// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
// stronger typing on the writes channel.
func providerWriter(writes <-chan writeRequest, pool ConnPool) {
for {
first, ok := <-writes
if !ok {
@ -258,8 +298,8 @@ func (me *provider) writer(writes <-chan writeRequest) {
}
var cantFail error
func() {
conn := me.pool.Get(context.TODO())
defer me.pool.Put(conn)
conn := pool.Get(context.TODO())
defer pool.Put(conn)
defer sqlitex.Save(conn)(&cantFail)
for _, wr := range buf {
wr.query(conn)
@ -271,7 +311,7 @@ func (me *provider) writer(writes <-chan writeRequest) {
for _, wr := range buf {
close(wr.done)
}
log.Printf("batched %v write queries", len(buf))
//log.Printf("batched %v write queries", len(buf))
}
}
@ -284,21 +324,25 @@ type instance struct {
p *provider
}
func (i instance) withConn(with func(conn conn), write bool) {
if write && i.p.opts.BatchWrites {
func (p *provider) withConn(with func(conn conn), write bool) {
if write && p.opts.BatchWrites {
done := make(chan struct{})
i.p.writes <- writeRequest{
p.writes <- writeRequest{
query: with,
done: done,
}
<-done
} else {
conn := i.p.pool.Get(context.TODO())
defer i.p.pool.Put(conn)
conn := p.pool.Get(context.TODO())
defer p.pool.Put(conn)
with(conn)
}
}
func (i instance) withConn(with func(conn conn), write bool) {
i.p.withConn(with, write)
}
func (i instance) getConn() *sqlite.Conn {
return i.p.pool.Get(context.TODO())
}

View File

@ -38,6 +38,16 @@ type Piece struct {
mip metainfo.Piece
}
func (p Piece) WriteIncompleteTo(w io.Writer) error {
if i, ok := p.PieceImpl.(IncompletePieceToWriter); ok {
return i.WriteIncompleteTo(w)
}
n := p.mip.Length()
r := io.NewSectionReader(p, 0, n)
_, err := io.CopyN(w, r, n)
return err
}
func (p Piece) WriteAt(b []byte, off int64) (n int, err error) {
// Callers should not be writing to completed pieces, but it's too
// expensive to be checking this on every single write using uncached

View File

@ -787,27 +787,18 @@ func (t *Torrent) pieceLength(piece pieceIndex) pp.Integer {
return pp.Integer(t.info.PieceLength)
}
func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, copyErr error) {
func (t *Torrent) hashPiece(piece pieceIndex) (ret metainfo.Hash, err error) {
hash := pieceHash.New()
p := t.piece(piece)
p.waitNoPendingWrites()
ip := t.info.Piece(int(piece))
pl := ip.Length()
pieceReader := io.NewSectionReader(t.pieces[piece].Storage(), 0, pl)
var hashSource io.Reader
doCopy := func() {
// Return no error iff pl bytes are copied.
_, copyErr = io.CopyN(hash, hashSource, pl)
}
storagePiece := t.pieces[piece].Storage()
const logPieceContents = false
if logPieceContents {
var examineBuf bytes.Buffer
hashSource = io.TeeReader(pieceReader, &examineBuf)
doCopy()
log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), copyErr)
err = storagePiece.WriteIncompleteTo(io.MultiWriter(hash, &examineBuf))
log.Printf("hashed %q with copy err %v", examineBuf.Bytes(), err)
} else {
hashSource = pieceReader
doCopy()
err = storagePiece.WriteIncompleteTo(hash)
}
missinggo.CopyExact(&ret, hash.Sum(nil))
return