393 lines
10 KiB
Go
393 lines
10 KiB
Go
package storage
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sync"
|
|
|
|
"github.com/anacrolix/missinggo/v2"
|
|
"github.com/anacrolix/torrent/common"
|
|
"github.com/anacrolix/torrent/metainfo"
|
|
"github.com/anacrolix/torrent/segments"
|
|
)
|
|
|
|
type MemoryBuf struct {
|
|
Data []byte
|
|
Length int64
|
|
}
|
|
|
|
// 专门用来对内存进行读写
|
|
// 基于指定的[]byte生成一个buffer, 可以从某个偏移量开始写入指定的数据
|
|
// 返回实际写入的字节数, error
|
|
type memoryBuf struct {
|
|
data []byte
|
|
length int64
|
|
totalWrite int64 // 实际写入的字节数, 这里需要加锁
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func min(a, b int64) int64 {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (fb *memoryBuf) WriteAt(p []byte, off int64) (int, error) {
|
|
if off < 0 {
|
|
return 0, fmt.Errorf("WriteAt, off < 0")
|
|
}
|
|
n := min(int64(fb.length)-off, int64(len(p))) // n是写入的字节数
|
|
// p的[0,n)写入fb.data的[off,off+n)
|
|
srcSlice := p[0:n]
|
|
dstSlice := fb.data[off : off+n]
|
|
copy(dstSlice, srcSlice)
|
|
fb.mu.Lock()
|
|
fb.totalWrite += n
|
|
fb.mu.Unlock()
|
|
return int(n), nil
|
|
}
|
|
|
|
func (fb *memoryBuf) ReadAt(p []byte, off int64) (int, error) {
|
|
if off < 0 {
|
|
return 0, fmt.Errorf("ReadAt, off < 0")
|
|
}
|
|
// 从off开始, 从fb.data中读字节, 直到fb.data结束或p满
|
|
n := min(int64(fb.length)-off, int64(len(p))) // n是读出的字节数
|
|
// fb.data的[off,off+n)写入到p的[0,n)
|
|
srcSlice := fb.data[off : off+n]
|
|
dstSlice := p[0:n]
|
|
copy(dstSlice, srcSlice)
|
|
|
|
return int(n), nil
|
|
}
|
|
|
|
// type memoryBuf struct {
|
|
// *bytes.Buffer
|
|
// }
|
|
|
|
// func (buf *memoryBuf) WriteAt(p []byte, off int64) (n int, err error) {
|
|
// return buf.WriteAt(p, off)
|
|
// }
|
|
// func (buf *memoryBuf) ReadAt(p []byte, off int64) (n int, err error) {
|
|
// return buf.ReadAt(p, off)
|
|
// }
|
|
|
|
// NewMemoryClientOpts 必须和NewFileClientOpts一样, 实现Close/OpenTorrent两个函数
|
|
type NewMemoryClientOpts struct {
|
|
Torrent *memoryBuf
|
|
PieceCompletion PieceCompletion // 用来记录piece completion情况
|
|
}
|
|
|
|
// MemoryClientImpl is Memory-based storage for torrents, that isn't yet bound to a particular torrent.
|
|
type MemoryClientImpl struct {
|
|
opts NewMemoryClientOpts
|
|
}
|
|
|
|
// GetData return a torrent's data in memory
|
|
func (mci MemoryClientImpl) GetData() *MemoryBuf {
|
|
return &MemoryBuf{
|
|
Data: mci.opts.Torrent.data,
|
|
Length: mci.opts.Torrent.length,
|
|
}
|
|
}
|
|
|
|
// Close 关闭操作piece completion的sqlite连接
|
|
func (mci MemoryClientImpl) Close() error {
|
|
return mci.opts.PieceCompletion.Close()
|
|
}
|
|
|
|
func (mci MemoryClientImpl) BytesWrittenToMemory() int64 {
|
|
return mci.opts.Torrent.totalWrite
|
|
}
|
|
|
|
// OpenTorrent 是ClientImpl接口中定义的一个函数, 必须实现
|
|
// 功能就是给定一个
|
|
func (mci MemoryClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
|
|
// // Data storage bound to a torrent.
|
|
// type TorrentImpl struct {
|
|
// Piece func(p metainfo.Piece) PieceImpl
|
|
// Close func() error
|
|
// Flush func() error
|
|
// // Storages that share the same space, will provide equal pointers. The function is called once
|
|
// // to determine the storage for torrents sharing the same function pointer, and mutated in
|
|
// // place.
|
|
// Capacity TorrentCapacity
|
|
// }
|
|
upvertedFiles := info.UpvertedFiles()
|
|
files := make([]file, 0, len(upvertedFiles))
|
|
for _, fileInfo := range upvertedFiles {
|
|
f := file{
|
|
path: filepath.Join(fileInfo.Path...),
|
|
length: fileInfo.Length,
|
|
}
|
|
files = append(files, f)
|
|
}
|
|
log.Printf("%v",upvertedFiles)
|
|
t := memoryTorrentImpl{
|
|
Torrent: mci.opts.Torrent,
|
|
files: files, // from torrent
|
|
segmentLocater: segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
|
|
infoHash: infoHash,
|
|
completion: mci.opts.PieceCompletion,
|
|
}
|
|
return TorrentImpl{
|
|
Piece: t.Piece,
|
|
Close: t.Close,
|
|
}, nil
|
|
}
|
|
|
|
// NewMemory acts differently for seeders and leechers
|
|
// for seeders, mbpp not nill, mbp not nill, mb not nil
|
|
// for leechers, mbpp not nill, mbp nill, mb nill.
|
|
// this function make mbp not nill, mb not nill.
|
|
// we can access the downloaded date through the returned mbpp
|
|
func NewMemory(totalLength int64, mbpp **MemoryBuf) (ClientImplCloser, error) {
|
|
return NewMemoryWithCompletion(totalLength, mbpp)
|
|
}
|
|
|
|
// NewMemoryWithCompletion 中Completion是指piece completion, 用来记录piece的完成情况(已经实现的是sqlite)
|
|
// 但这个并不是必须的
|
|
func NewMemoryWithCompletion(totalLength int64, mbpp **MemoryBuf) (ClientImplCloser, error) {
|
|
var opts NewMemoryClientOpts
|
|
if *mbpp != nil { // seeders
|
|
// storage.ClientImplCloser 实际返回 memoryClientImpl
|
|
opts = NewMemoryClientOpts{
|
|
Torrent: &memoryBuf{
|
|
data: (**mbpp).Data,
|
|
length: (**mbpp).Length,
|
|
totalWrite: 0,
|
|
},
|
|
PieceCompletion: pieceCompletionForDir("./"), // 默认选择sqlite, sqlite的db文件放在./下面
|
|
}
|
|
} else { // leechers
|
|
// allocate memory
|
|
if flag, avail := isAllocatedOutOfHeapMemory(totalLength); !flag {
|
|
return nil, fmt.Errorf("try to allocate too much memory, want %d, available %d", totalLength, avail)
|
|
}
|
|
data := make([]byte, totalLength)
|
|
var p *byte = &data[0]
|
|
log.Printf("%d %d %v", len(data), cap(data), p)
|
|
|
|
// storage.ClientImplCloser 实际返回 memoryClientImpl
|
|
opts = NewMemoryClientOpts{
|
|
Torrent: &memoryBuf{
|
|
data: data,
|
|
length: totalLength,
|
|
totalWrite: 0,
|
|
},
|
|
PieceCompletion: pieceCompletionForDir("./"), // 默认选择sqlite, sqlite的db文件放在./下面
|
|
}
|
|
*mbpp = &MemoryBuf{
|
|
Data: data,
|
|
Length: totalLength,
|
|
}
|
|
}
|
|
|
|
return MemoryClientImpl{opts}, nil
|
|
}
|
|
|
|
// NewMemoryOpts creates a new MemoryImplCloser that stores files using memory
|
|
func NewMemoryOpts(opts NewMemoryClientOpts) ClientImplCloser {
|
|
if opts.Torrent == nil {
|
|
return nil
|
|
}
|
|
if opts.PieceCompletion == nil {
|
|
opts.PieceCompletion = pieceCompletionForDir("./")
|
|
}
|
|
return MemoryClientImpl{opts: opts}
|
|
}
|
|
|
|
// 检查待分配的内存是否超过堆的大小
|
|
func isAllocatedOutOfHeapMemory(allocated int64) (flag bool, available uint64) {
|
|
var m runtime.MemStats
|
|
if uint64(allocated) > m.HeapIdle {
|
|
flag = true
|
|
}
|
|
available = m.HeapIdle
|
|
return
|
|
}
|
|
|
|
// memoryTorrentImpl是对TorrentImpl接口的一个实现
|
|
//
|
|
// type TorrentImpl struct {
|
|
// Piece func(p metainfo.Piece) PieceImpl
|
|
// Close func() error
|
|
// Flush func() error
|
|
// // Storages that share the same space, will provide equal pointers. The function is called once
|
|
// // to determine the storage for torrents sharing the same function pointer, and mutated in
|
|
// // place.
|
|
// Capacity TorrentCapacity
|
|
// }
|
|
type memoryTorrentImpl struct {
|
|
Torrent *memoryBuf
|
|
files []file
|
|
segmentLocater segments.Index
|
|
infoHash metainfo.Hash
|
|
completion PieceCompletion
|
|
}
|
|
|
|
func (mti *memoryTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
|
|
// Create a view onto the file-based torrent storage.
|
|
_io := memoryTorrentImplIO{mti}
|
|
// Return the appropriate segments of this.
|
|
return &memoryPieceImpl{
|
|
mti,
|
|
p,
|
|
missinggo.NewSectionWriter(_io, p.Offset(), p.Length()),
|
|
io.NewSectionReader(_io, p.Offset(), p.Length()),
|
|
}
|
|
}
|
|
|
|
func (fs *memoryTorrentImpl) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// 实际进行IO操作的部分
|
|
// 需要实现ReadAt/WriteAt
|
|
type memoryTorrentImplIO struct {
|
|
mti *memoryTorrentImpl
|
|
}
|
|
|
|
// Returns EOF on short or missing file.
|
|
func (mtii *memoryTorrentImplIO) readFileAt(file file, b []byte, off int64) (n int, err error) {
|
|
// f, err := os.Open(file.path)
|
|
// if os.IsNotExist(err) {
|
|
// // File missing is treated the same as a short file.
|
|
// err = io.EOF
|
|
// return
|
|
// }
|
|
// if err != nil {
|
|
// return
|
|
// }
|
|
// defer f.Close()
|
|
|
|
f := mtii.mti.Torrent
|
|
|
|
// Limit the read to within the expected bounds of this file.
|
|
if int64(len(b)) > file.length-off {
|
|
b = b[:file.length-off]
|
|
}
|
|
for off < file.length && len(b) != 0 {
|
|
n1, err1 := f.ReadAt(b, off)
|
|
b = b[n1:]
|
|
n += n1
|
|
off += int64(n1)
|
|
if n1 == 0 {
|
|
err = err1
|
|
break
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF.
|
|
func (mtii memoryTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
|
|
mtii.mti.segmentLocater.Locate(
|
|
segments.Extent{
|
|
Start: off,
|
|
Length: int64(len(b)),
|
|
},
|
|
func(i int, e segments.Extent) bool {
|
|
n1, err1 := mtii.readFileAt(mtii.mti.files[i], b[:e.Length], e.Start)
|
|
n += n1
|
|
b = b[n1:]
|
|
err = err1
|
|
return err == nil // && int64(n1) == e.Length
|
|
})
|
|
if len(b) != 0 && err == nil {
|
|
err = io.EOF
|
|
}
|
|
return
|
|
}
|
|
|
|
func (mtii memoryTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
|
|
// log.Printf("write at %v: %v bytes", off, len(p))
|
|
mtii.mti.segmentLocater.Locate(
|
|
segments.Extent{
|
|
Start: off,
|
|
Length: int64(len(p)),
|
|
},
|
|
func(i int, e segments.Extent) bool {
|
|
// name := mtii.mti.files[i].path
|
|
// os.MkdirAll(filepath.Dir(name), 0o777)
|
|
// var f *os.File
|
|
// f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0o666)
|
|
// if err != nil {
|
|
// return false
|
|
// }
|
|
|
|
// 需要从mtii中知道Torrent(写入位置)
|
|
f := mtii.mti.Torrent
|
|
|
|
var n1 int
|
|
n1, err = f.WriteAt(p[:e.Length], e.Start)
|
|
// log.Printf("%v %v wrote %v: %v", i, e, n1, err)
|
|
|
|
n += n1
|
|
p = p[n1:]
|
|
if int64(n1) != e.Length {
|
|
err = io.ErrShortWrite
|
|
}
|
|
return err == nil
|
|
})
|
|
return
|
|
}
|
|
|
|
// memoryPieceImpl是对PieceImpl接口的一个实现
|
|
type memoryPieceImpl struct {
|
|
*memoryTorrentImpl
|
|
p metainfo.Piece
|
|
io.WriterAt
|
|
io.ReaderAt
|
|
}
|
|
|
|
// 获取piece的key
|
|
func (mpi *memoryPieceImpl) pieceKey() metainfo.PieceKey {
|
|
return metainfo.PieceKey{mpi.infoHash, mpi.p.Index()}
|
|
}
|
|
|
|
// 获取某个piece的完成情况
|
|
func (mpi *memoryPieceImpl) Completion() Completion {
|
|
c, err := mpi.completion.Get(mpi.pieceKey())
|
|
if err != nil {
|
|
log.Printf("error getting piece completion: %s", err)
|
|
c.Ok = false
|
|
return c
|
|
}
|
|
|
|
// 对于内存存储, 由于不需要写入磁盘, 无需检查
|
|
// verified := true
|
|
// if c.Complete {
|
|
// // If it's allegedly complete, check that its constituent files have the necessary length.
|
|
// for _, fi := range extentCompleteRequiredLengths(mpi.p.Info, mpi.p.Offset(), mpi.p.Length()) {
|
|
// s, err := os.Stat(mpi.files[fi.fileIndex].path)
|
|
// if err != nil || s.Size() < fi.length {
|
|
// verified = false
|
|
// break
|
|
// }
|
|
// }
|
|
// }
|
|
|
|
// if !verified {
|
|
// // The completion was wrong, fix it.
|
|
// c.Complete = false
|
|
// mpi.completion.Set(mpi.pieceKey(), false)
|
|
// }
|
|
|
|
return c
|
|
}
|
|
|
|
// 将某个piece标记为完成
|
|
func (mpi *memoryPieceImpl) MarkComplete() error {
|
|
return mpi.completion.Set(mpi.pieceKey(), true)
|
|
}
|
|
|
|
// 将某个piece标记未完成
|
|
func (mpi *memoryPieceImpl) MarkNotComplete() error {
|
|
return mpi.completion.Set(mpi.pieceKey(), false)
|
|
}
|