Rework storage interfaces to make them simpler to implement

This allows lots of behaviour to be baked into the new Client, Torrent and Piece wrappers, rather than duplicating (badly) them in all the backend implementations.
This commit is contained in:
Matt Joiner 2016-09-02 15:10:57 +10:00
parent db3be3441f
commit 1e919dd6b1
15 changed files with 177 additions and 109 deletions

View File

@ -74,7 +74,7 @@ type Client struct {
dopplegangerAddrs map[string]struct{}
badPeerIPs map[string]struct{}
defaultStorage storage.Client
defaultStorage *storage.Client
mu sync.RWMutex
event sync.Cond
@ -253,15 +253,16 @@ func NewClient(cfg *Config) (cl *Client, err error) {
cl = &Client{
halfOpenLimit: defaultHalfOpenConnsPerTorrent,
config: *cfg,
defaultStorage: cfg.DefaultStorage,
dopplegangerAddrs: make(map[string]struct{}),
torrents: make(map[metainfo.Hash]*Torrent),
}
missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
cl.event.L = &cl.mu
if cl.defaultStorage == nil {
cl.defaultStorage = storage.NewFile(cfg.DataDir)
storageImpl := cfg.DefaultStorage
if storageImpl == nil {
storageImpl = storage.NewFile(cfg.DataDir)
}
cl.defaultStorage = storage.NewClient(storageImpl)
if cfg.IPBlocklist != nil {
cl.ipBlockList = cfg.IPBlocklist
}
@ -1417,7 +1418,7 @@ type TorrentSpec struct {
// The chunk size to use for outbound requests. Defaults to 16KiB if not
// set.
ChunkSize int
Storage storage.Client
Storage storage.ClientImpl
}
func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {

View File

@ -92,7 +92,7 @@ func TestTorrentInitialState(t *testing.T) {
pieceStateChanges: pubsub.NewPubSub(),
}
tor.chunkSize = 2
tor.storageOpener = storage.NewFile("/dev/null")
tor.storageOpener = storage.NewClient(storage.NewFile("/dev/null"))
// Needed to lock for asynchronous piece verification.
tor.cl = new(Client)
err := tor.setInfoBytes(mi.InfoBytes)
@ -241,11 +241,11 @@ func TestAddDropManyTorrents(t *testing.T) {
type FileCacheClientStorageFactoryParams struct {
Capacity int64
SetCapacity bool
Wrapper func(*filecache.Cache) storage.Client
Wrapper func(*filecache.Cache) storage.ClientImpl
}
func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) storageFactory {
return func(dataDir string) storage.Client {
return func(dataDir string) storage.ClientImpl {
fc, err := filecache.NewCache(dataDir)
if err != nil {
panic(err)
@ -257,7 +257,7 @@ func NewFileCacheClientStorageFactory(ps FileCacheClientStorageFactoryParams) st
}
}
type storageFactory func(string) storage.Client
type storageFactory func(string) storage.ClientImpl
func TestClientTransferDefault(t *testing.T) {
testClientTransfer(t, testClientTransferParams{
@ -268,11 +268,11 @@ func TestClientTransferDefault(t *testing.T) {
})
}
func fileCachePieceResourceStorage(fc *filecache.Cache) storage.Client {
func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
return storage.NewResourcePieces(fc.AsResourceProvider())
}
func fileCachePieceFileStorage(fc *filecache.Cache) storage.Client {
func fileCachePieceFileStorage(fc *filecache.Cache) storage.ClientImpl {
return storage.NewFileStorePieces(fc.AsFileStore())
}
@ -303,7 +303,7 @@ func TestClientTransferVarious(t *testing.T) {
}),
storage.NewBoltDB,
} {
for _, ss := range []func(string) storage.Client{
for _, ss := range []func(string) storage.ClientImpl{
storage.NewFile,
storage.NewMMap,
} {
@ -332,8 +332,8 @@ type testClientTransferParams struct {
Readahead int64
SetReadahead bool
ExportClientStatus bool
LeecherStorage func(string) storage.Client
SeederStorage func(string) storage.Client
LeecherStorage func(string) storage.ClientImpl
SeederStorage func(string) storage.ClientImpl
}
// Creates a seeder and a leecher, and ensures the data transfers when a read
@ -493,7 +493,7 @@ func TestMergingTrackersByAddingSpecs(t *testing.T) {
type badStorage struct{}
func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.Torrent, error) {
func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
return bs, nil
}
@ -501,7 +501,7 @@ func (bs badStorage) Close() error {
return nil
}
func (bs badStorage) Piece(p metainfo.Piece) storage.Piece {
func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {
return badStoragePiece{p}
}
@ -521,6 +521,10 @@ func (p badStoragePiece) MarkComplete() error {
return errors.New("psyyyyyyyche")
}
func (p badStoragePiece) MarkNotComplete() error {
return errors.New("psyyyyyyyche")
}
func (p badStoragePiece) randomlyTruncatedDataString() string {
return "hello, world\n"[:rand.Intn(14)]
}
@ -709,14 +713,14 @@ func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
}
}
func writeTorrentData(ts storage.Torrent, info metainfo.Info, b []byte) {
func writeTorrentData(ts *storage.Torrent, info metainfo.Info, b []byte) {
for i := range iter.N(info.NumPieces()) {
n, _ := ts.Piece(info.Piece(i)).WriteAt(b, 0)
b = b[n:]
p := info.Piece(i)
ts.Piece(p).WriteAt(b[p.Offset():p.Offset()+p.Length()], 0)
}
}
func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.Client) {
func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf func(*filecache.Cache) storage.ClientImpl) {
fileCacheDir, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(fileCacheDir)
@ -727,7 +731,7 @@ func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool, csf
filePieceStore := csf(fileCache)
info := greetingMetainfo.UnmarshalInfo()
ih := greetingMetainfo.HashInfoBytes()
greetingData, err := filePieceStore.OpenTorrent(&info, ih)
greetingData, err := storage.NewClient(filePieceStore).OpenTorrent(&info, ih)
require.NoError(t, err)
writeTorrentData(greetingData, info, []byte(testutil.GreetingFileContents))
// require.Equal(t, len(testutil.GreetingFileContents), written)

View File

@ -35,7 +35,7 @@ type Config struct {
DisableTCP bool `long:"disable-tcp"`
// Called to instantiate storage for each added torrent. Provided backends
// are in $REPO/data. If not set, the "file" implementation is used.
DefaultStorage storage.Client
DefaultStorage storage.ClientImpl
DisableEncryption bool `long:"disable-encryption"`
IPBlocklist iplist.Ranger

View File

@ -15,7 +15,7 @@ func TestHashPieceAfterStorageClosed(t *testing.T) {
td, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(td)
cs := storage.NewFile(td)
cs := storage.NewClient(storage.NewFile(td))
tt := &Torrent{}
mi := testutil.GreetingMetaInfo()
info := mi.UnmarshalInfo()

View File

@ -2,10 +2,8 @@ package storage
import (
"encoding/binary"
"io"
"path/filepath"
"github.com/anacrolix/missinggo"
"github.com/boltdb/bolt"
"github.com/anacrolix/torrent/metainfo"
@ -43,7 +41,7 @@ type boltDBPiece struct {
key [24]byte
}
func NewBoltDB(filePath string) Client {
func NewBoltDB(filePath string) ClientImpl {
ret := &boltDBClient{}
var err error
ret.db, err = bolt.Open(filepath.Join(filePath, "bolt.db"), 0600, nil)
@ -53,11 +51,11 @@ func NewBoltDB(filePath string) Client {
return ret
}
func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
func (me *boltDBClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return &boltDBTorrent{me, infoHash}, nil
}
func (me *boltDBTorrent) Piece(p metainfo.Piece) Piece {
func (me *boltDBTorrent) Piece(p metainfo.Piece) PieceImpl {
ret := &boltDBPiece{p: p, db: me.cl.db}
copy(ret.key[:], me.ih[:])
binary.BigEndian.PutUint32(ret.key[20:], uint32(p.Index()))
@ -82,16 +80,24 @@ func (me *boltDBPiece) GetIsComplete() (complete bool) {
}
func (me *boltDBPiece) MarkComplete() error {
return me.db.Update(func(tx *bolt.Tx) (err error) {
return me.db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(completed)
if err != nil {
return
return err
}
b.Put(me.key[:], completedValue)
return
return b.Put(me.key[:], completedValue)
})
}
func (me *boltDBPiece) MarkNotComplete() error {
return me.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(completed)
if b == nil {
return nil
}
return b.Delete(me.key[:])
})
}
func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
err = me.db.View(func(tx *bolt.Tx) error {
db := tx.Bucket(data)
@ -114,14 +120,6 @@ func (me *boltDBPiece) ReadAt(b []byte, off int64) (n int, err error) {
}
return nil
})
if n == 0 && err == nil {
if off < me.p.Length() {
err = io.ErrUnexpectedEOF
} else {
err = io.EOF
}
}
// // log.Println(n, err)
return
}

View File

@ -17,13 +17,13 @@ type fileStorage struct {
baseDir string
}
func NewFile(baseDir string) Client {
func NewFile(baseDir string) ClientImpl {
return &fileStorage{
baseDir: baseDir,
}
}
func (fs *fileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
func (fs *fileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return &fileTorrentStorage{
fs,
info,
@ -40,7 +40,7 @@ type fileTorrentStorage struct {
completion pieceCompletion
}
func (fts *fileTorrentStorage) Piece(p metainfo.Piece) Piece {
func (fts *fileTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
// Create a view onto the file-based torrent storage.
_io := fileStorageTorrent{fts}
// Return the appropriate segments of this.

View File

@ -11,7 +11,7 @@ type fileStoragePiece struct {
*fileTorrentStorage
p metainfo.Piece
io.WriterAt
r io.ReaderAt
io.ReaderAt
}
func (me *fileStoragePiece) pieceKey() metainfo.PieceKey {
@ -45,15 +45,7 @@ func (fs *fileStoragePiece) MarkComplete() error {
return nil
}
func (fsp *fileStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
n, err = fsp.r.ReadAt(b, off)
if n != 0 {
err = nil
return
}
if off < 0 || off >= fsp.p.Length() {
return
}
fsp.completion.Set(fsp.pieceKey(), false)
return
func (fs *fileStoragePiece) MarkNotComplete() error {
fs.completion.Set(fs.pieceKey(), false)
return nil
}

View File

@ -7,26 +7,28 @@ import (
)
// Represents data storage for an unspecified torrent.
type Client interface {
OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error)
type ClientImpl interface {
OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error)
}
// Data storage bound to a torrent.
type Torrent interface {
Piece(metainfo.Piece) Piece
type TorrentImpl interface {
Piece(metainfo.Piece) PieceImpl
Close() error
}
// Interacts with torrent piece data.
type Piece interface {
// Should return io.EOF only at end of torrent. Short reads due to missing
// data should return io.ErrUnexpectedEOF.
type PieceImpl interface {
// These interfaces are not as strict as normally required. They can
// assume that the parameters are appropriate for the dimentions of the
// piece.
io.ReaderAt
io.WriterAt
// Called when the client believes the piece data will pass a hash check.
// The storage can move or mark the piece data as read-only as it sees
// fit.
MarkComplete() error
MarkNotComplete() error
// Returns true if the piece is complete.
GetIsComplete() bool
}

View File

@ -14,7 +14,7 @@ import (
// Two different torrents opened from the same storage. Closing one should not
// break the piece completion on the other.
func testIssue95(t *testing.T, c Client) {
func testIssue95(t *testing.T, c ClientImpl) {
i1 := &metainfo.Info{
Files: []metainfo.FileInfo{{Path: []string{"a"}}},
Pieces: make([]byte, 20),

View File

@ -10,11 +10,11 @@ import (
"github.com/anacrolix/torrent/metainfo"
)
func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) Client) {
func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) ClientImpl) {
td, err := ioutil.TempDir("", "")
require.NoError(t, err)
defer os.RemoveAll(td)
cs := csf(td)
cs := NewClient(csf(td))
info := &metainfo.Info{
PieceLength: 1,
Files: []metainfo.FileInfo{{Path: []string{"a"}, Length: 1}},
@ -23,7 +23,7 @@ func testMarkedCompleteMissingOnRead(t *testing.T, csf func(string) Client) {
require.NoError(t, err)
p := ts.Piece(info.Piece(0))
require.NoError(t, p.MarkComplete())
require.False(t, p.GetIsComplete())
// require.False(t, p.GetIsComplete())
n, err := p.ReadAt(make([]byte, 1), 0)
require.Error(t, err)
require.EqualValues(t, 0, n)

View File

@ -17,13 +17,13 @@ type mmapStorage struct {
baseDir string
}
func NewMMap(baseDir string) Client {
func NewMMap(baseDir string) ClientImpl {
return &mmapStorage{
baseDir: baseDir,
}
}
func (s *mmapStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t Torrent, err error) {
func (s *mmapStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t TorrentImpl, err error) {
span, err := mMapTorrent(info, s.baseDir)
t = &mmapTorrentStorage{
span: span,
@ -37,7 +37,7 @@ type mmapTorrentStorage struct {
pc pieceCompletion
}
func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) Piece {
func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
return mmapStoragePiece{
pc: ts.pc,
p: p,
@ -73,6 +73,11 @@ func (sp mmapStoragePiece) MarkComplete() error {
return nil
}
func (sp mmapStoragePiece) MarkNotComplete() error {
sp.pc.Set(sp.pieceKey(), false)
return nil
}
func mMapTorrent(md *metainfo.Info, location string) (mms mmap_span.MMapSpan, err error) {
defer func() {
if err != nil {

View File

@ -1,7 +1,6 @@
package storage
import (
"errors"
"io"
"os"
"path"
@ -15,7 +14,7 @@ type pieceFileStorage struct {
fs missinggo.FileStore
}
func NewFileStorePieces(fs missinggo.FileStore) Client {
func NewFileStorePieces(fs missinggo.FileStore) ClientImpl {
return &pieceFileStorage{
fs: fs,
}
@ -25,7 +24,7 @@ type pieceFileTorrentStorage struct {
s *pieceFileStorage
}
func (s *pieceFileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
func (s *pieceFileStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return &pieceFileTorrentStorage{s}, nil
}
@ -33,7 +32,7 @@ func (s *pieceFileTorrentStorage) Close() error {
return nil
}
func (s *pieceFileTorrentStorage) Piece(p metainfo.Piece) Piece {
func (s *pieceFileTorrentStorage) Piece(p metainfo.Piece) PieceImpl {
return pieceFileTorrentStoragePiece{s, p, s.s.fs}
}
@ -60,6 +59,10 @@ func (s pieceFileTorrentStoragePiece) MarkComplete() error {
return s.fs.Rename(s.incompletePath(), s.completedPath())
}
func (s pieceFileTorrentStoragePiece) MarkNotComplete() error {
return s.fs.Remove(s.completedPath())
}
func (s pieceFileTorrentStoragePiece) openFile() (f missinggo.File, err error) {
f, err = s.fs.OpenFile(s.completedPath(), os.O_RDONLY)
if err == nil {
@ -85,27 +88,14 @@ func (s pieceFileTorrentStoragePiece) ReadAt(b []byte, off int64) (n int, err er
return
}
defer f.Close()
missinggo.LimitLen(&b, s.p.Length()-off)
n, err = f.ReadAt(b, off)
off += int64(n)
if off >= s.p.Length() {
err = io.EOF
} else if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return
return f.ReadAt(b, off)
}
func (s pieceFileTorrentStoragePiece) WriteAt(b []byte, off int64) (n int, err error) {
if s.GetIsComplete() {
err = errors.New("piece completed")
return
}
f, err := s.fs.OpenFile(s.incompletePath(), os.O_WRONLY|os.O_CREATE)
if err != nil {
return
}
defer f.Close()
missinggo.LimitLen(&b, s.p.Length()-off)
return f.WriteAt(b, off)
}

View File

@ -1,10 +1,8 @@
package storage
import (
"io"
"path"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/resource"
"github.com/anacrolix/torrent/metainfo"
@ -14,13 +12,13 @@ type piecePerResource struct {
p resource.Provider
}
func NewResourcePieces(p resource.Provider) Client {
func NewResourcePieces(p resource.Provider) ClientImpl {
return &piecePerResource{
p: p,
}
}
func (s *piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (Torrent, error) {
func (s *piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return s, nil
}
@ -28,7 +26,7 @@ func (s *piecePerResource) Close() error {
return nil
}
func (s *piecePerResource) Piece(p metainfo.Piece) Piece {
func (s *piecePerResource) Piece(p metainfo.Piece) PieceImpl {
completed, err := s.p.NewInstance(path.Join("completed", p.Hash().HexString()))
if err != nil {
panic(err)
@ -59,22 +57,18 @@ func (s piecePerResourcePiece) MarkComplete() error {
return resource.Move(s.i, s.c)
}
func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (n int, err error) {
missinggo.LimitLen(&b, s.p.Length()-off)
n, err = s.c.ReadAt(b, off)
if err != nil {
n, err = s.i.ReadAt(b, off)
func (s piecePerResourcePiece) MarkNotComplete() error {
return s.c.Delete()
}
off += int64(n)
if off >= s.p.Length() {
err = io.EOF
} else if err == io.EOF {
err = io.ErrUnexpectedEOF
func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
if s.GetIsComplete() {
return s.c.ReadAt(b, off)
} else {
return s.i.ReadAt(b, off)
}
return
}
func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
missinggo.LimitLen(&b, s.p.Length()-off)
return s.i.WriteAt(b, off)
}

82
storage/wrappers.go Normal file
View File

@ -0,0 +1,82 @@
package storage
import (
"errors"
"io"
"os"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/torrent/metainfo"
)
type Client struct {
ClientImpl
}
func NewClient(cl ClientImpl) *Client {
return &Client{cl}
}
func (cl Client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (*Torrent, error) {
t, err := cl.ClientImpl.OpenTorrent(info, infoHash)
return &Torrent{t}, err
}
type Torrent struct {
TorrentImpl
}
func (t Torrent) Piece(p metainfo.Piece) Piece {
return Piece{t.TorrentImpl.Piece(p), p}
}
type Piece struct {
PieceImpl
mip metainfo.Piece
}
func (p Piece) WriteAt(b []byte, off int64) (n int, err error) {
if p.GetIsComplete() {
err = errors.New("piece completed")
return
}
if off+int64(len(b)) > p.mip.Length() {
panic("write overflows piece")
}
missinggo.LimitLen(&b, p.mip.Length()-off)
return p.PieceImpl.WriteAt(b, off)
}
func (p Piece) ReadAt(b []byte, off int64) (n int, err error) {
if off < 0 {
err = os.ErrInvalid
return
}
if off >= p.mip.Length() {
err = io.EOF
return
}
missinggo.LimitLen(&b, p.mip.Length()-off)
if len(b) == 0 {
return
}
n, err = p.PieceImpl.ReadAt(b, off)
if n > len(b) {
panic(n)
}
off += int64(n)
if err == io.EOF && off < p.mip.Length() {
err = io.ErrUnexpectedEOF
}
if err == nil && off >= p.mip.Length() {
err = io.EOF
}
if n == 0 && err == nil {
err = io.ErrUnexpectedEOF
}
if off < p.mip.Length() && err != nil {
p.MarkNotComplete()
}
return
}

View File

@ -57,9 +57,9 @@ type Torrent struct {
length int64
// The storage to open when the info dict becomes available.
storageOpener storage.Client
storageOpener *storage.Client
// Storage for torrent data.
storage storage.Torrent
storage *storage.Torrent
metainfo metainfo.MetaInfo
@ -550,8 +550,8 @@ func (t *Torrent) numPiecesCompleted() (num int) {
func (t *Torrent) close() (err error) {
t.closed.Set()
if c, ok := t.storage.(io.Closer); ok {
c.Close()
if t.storage != nil {
t.storage.Close()
}
for _, conn := range t.conns {
conn.Close()