Allow chunk size to be specified per torrent

This commit is contained in:
Matt Joiner 2015-07-15 15:31:18 +10:00
parent 4807f12ea1
commit c018c660f0
6 changed files with 51 additions and 40 deletions

View File

@ -324,7 +324,7 @@ func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) {
}
if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
if t.urgent == nil {
t.urgent = make(map[request]struct{}, (n+chunkSize-1)/chunkSize)
t.urgent = make(map[request]struct{}, (n+int(t.chunkSize)-1)/int(t.chunkSize))
}
t.urgent[req] = struct{}{}
cl.event.Broadcast() // Why?
@ -1912,6 +1912,7 @@ func (cl *Client) setMetaData(t *torrent, md *metainfo.Info, bytes []byte) (err
func newTorrent(ih InfoHash) (t *torrent, err error) {
t = &torrent{
InfoHash: ih,
chunkSize: defaultChunkSize,
Peers: make(map[peersKey]Peer),
closing: make(chan struct{}),
@ -2078,10 +2079,15 @@ func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err
// Specifies a new torrent for adding to a client. There are helpers for
// magnet URIs and torrent metainfo files.
type TorrentSpec struct {
// The tiered tracker URIs.
Trackers [][]string
InfoHash InfoHash
Info *metainfo.InfoEx
// The name to use if the Name field from the Info isn't available.
DisplayName string
// The chunk size to use for outbound requests. Defaults to 16KiB if not
// set.
ChunkSize int
}
func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
@ -2129,6 +2135,9 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err er
if err != nil {
return
}
if spec.ChunkSize != 0 {
t.chunkSize = pp.Integer(spec.ChunkSize)
}
}
if spec.DisplayName != "" {
t.DisplayName = spec.DisplayName
@ -2463,7 +2472,7 @@ func (me *Client) fillRequests(t *torrent, c *connection) {
panic("unwanted piece in connection request order")
}
piece := t.Pieces[pieceIndex]
for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex)) {
for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex), pp.Integer(t.chunkSize)) {
r := request{pp.Integer(pieceIndex), cs}
if !addRequest(r) {
return
@ -2524,7 +2533,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
// log.Println("got chunk", req)
piece.Event.Broadcast()
// Record that we have the chunk.
piece.unpendChunkIndex(chunkIndex(req.chunkSpec))
piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
delete(t.urgent, req)
if piece.numPendingChunks() == 0 {
for _, c := range t.Conns {

View File

@ -93,6 +93,7 @@ func TestTorrentInitialState(t *testing.T) {
if err != nil {
t.Fatal(err)
}
tor.chunkSize = 2
err = tor.setMetadata(&mi.Info.Info, mi.Info.Bytes, nil)
if err != nil {
t.Fatal(err)
@ -102,13 +103,8 @@ func TestTorrentInitialState(t *testing.T) {
}
p := tor.Pieces[0]
tor.pendAllChunkSpecs(0)
if p.numPendingChunks() != 1 {
t.Fatalf("should only be 1 chunk: %v", p.PendingChunkSpecs)
}
// TODO: Set chunkSize to 2, to test odd/even silliness.
if chunkIndexSpec(0, tor.pieceLength(0)).Length != 5 {
t.Fatal("pending chunk spec is incorrect")
}
assert.EqualValues(t, 3, p.numPendingChunks())
assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
}
func TestUnmarshalPEXMsg(t *testing.T) {
@ -271,7 +267,11 @@ func TestClientTransfer(t *testing.T) {
cfg.TorrentDataOpener = blob.NewStore(leecherDataDir).OpenTorrent
leecher, _ := NewClient(&cfg)
defer leecher.Close()
leecherGreeting, _, _ := leecher.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
ret = TorrentSpecFromMetaInfo(mi)
ret.ChunkSize = 2
return
}())
leecherGreeting.AddPeers([]Peer{
Peer{
IP: util.AddrIP(seeder.ListenAddr()),

View File

@ -13,7 +13,7 @@ import (
const (
pieceHash = crypto.SHA1
maxRequests = 250 // Maximum pending requests we allow peers to send us.
chunkSize = 0x4000 // 16KiB
defaultChunkSize = 0x4000 // 16KiB
// Peer ID client identifier prefix. We'll update this occasionally to
// reflect changes to client behaviour that other clients may depend on.
// Also see `extendedHandshakeClientVersion`.
@ -35,7 +35,7 @@ func (ih *InfoHash) HexString() string {
return fmt.Sprintf("%x", ih[:])
}
func lastChunkSpec(pieceLength pp.Integer) (cs chunkSpec) {
func lastChunkSpec(pieceLength, chunkSize pp.Integer) (cs chunkSpec) {
cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
cs.Length = pieceLength - cs.Begin
return

View File

@ -4,7 +4,7 @@ import . "gopkg.in/check.v1"
func (suite) TestTorrentOffsetRequest(c *C) {
check := func(tl, ps, off int64, expected request, ok bool) {
req, _ok := torrentOffsetRequest(tl, ps, chunkSize, off)
req, _ok := torrentOffsetRequest(tl, ps, defaultChunkSize, off)
c.Check(_ok, Equals, ok)
c.Check(req, Equals, expected)
}

View File

@ -32,11 +32,11 @@ type piece struct {
Priority piecePriority
}
func (p *piece) pendingChunk(cs chunkSpec) bool {
func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool {
if p.PendingChunkSpecs == nil {
return false
}
return p.PendingChunkSpecs[chunkIndex(cs)]
return p.PendingChunkSpecs[chunkIndex(cs, chunkSize)]
}
func (p *piece) numPendingChunks() (ret int) {
@ -55,7 +55,7 @@ func (p *piece) unpendChunkIndex(i int) {
p.PendingChunkSpecs[i] = false
}
func chunkIndexSpec(index int, pieceLength pp.Integer) chunkSpec {
func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec {
ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize}
if ret.Begin+ret.Length > pieceLength {
ret.Length = pieceLength - ret.Begin
@ -63,14 +63,14 @@ func chunkIndexSpec(index int, pieceLength pp.Integer) chunkSpec {
return ret
}
func (p *piece) shuffledPendingChunkSpecs(pieceLength pp.Integer) (css []chunkSpec) {
func (p *piece) shuffledPendingChunkSpecs(pieceLength, chunkSize pp.Integer) (css []chunkSpec) {
if p.numPendingChunks() == 0 {
return
}
css = make([]chunkSpec, 0, p.numPendingChunks())
for i, pending := range p.PendingChunkSpecs {
if pending {
css = append(css, chunkIndexSpec(i, pieceLength))
css = append(css, chunkIndexSpec(i, pieceLength, chunkSize))
}
}
if len(css) <= 1 {

View File

@ -32,7 +32,7 @@ func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
}
for i, pending := range piece.PendingChunkSpecs {
if pending {
count += chunkIndexSpec(i, pieceLength).Length
count += chunkIndexSpec(i, pieceLength, t.chunkSize).Length
}
}
return
@ -63,6 +63,7 @@ type torrent struct {
InfoHash InfoHash
Pieces []*piece
chunkSize pp.Integer
// Chunks that are wanted before all others. This is for
// responsive/streaming readers that want to unblock ASAP.
urgent map[request]struct{}
@ -552,7 +553,7 @@ func (t *torrent) requestOffset(r request) int64 {
// Return the request that would include the given offset into the torrent
// data. Returns !ok if there is no such request.
func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off)
return torrentOffsetRequest(t.Length(), t.Info.PieceLength, int64(t.chunkSize), off)
}
func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
@ -575,26 +576,26 @@ func (t *torrent) validOutgoingRequest(r request) bool {
if r.Index >= pp.Integer(t.Info.NumPieces()) {
return false
}
if r.Begin%chunkSize != 0 {
if r.Begin%t.chunkSize != 0 {
return false
}
if r.Length > chunkSize {
if r.Length > t.chunkSize {
return false
}
pieceLength := t.pieceLength(int(r.Index))
if r.Begin+r.Length > pieceLength {
return false
}
return r.Length == chunkSize || r.Begin+r.Length == pieceLength
return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
}
func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
css = make([]chunkSpec, 0, (t.pieceLength(piece)+chunkSize-1)/chunkSize)
css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
var cs chunkSpec
for left := t.pieceLength(piece); left != 0; left -= cs.Length {
cs.Length = left
if cs.Length > chunkSize {
cs.Length = chunkSize
if cs.Length > t.chunkSize {
cs.Length = t.chunkSize
}
css = append(css, cs)
cs.Begin += cs.Length
@ -606,7 +607,7 @@ func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
piece := t.Pieces[pieceIndex]
if piece.PendingChunkSpecs == nil {
// Allocate to exact size.
piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+chunkSize-1)/chunkSize)
piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+t.chunkSize-1)/t.chunkSize)
}
// Pend all the chunks.
pcss := piece.PendingChunkSpecs
@ -671,10 +672,11 @@ func (t *torrent) haveChunk(r request) bool {
if !t.haveInfo() {
return false
}
return !t.Pieces[r.Index].pendingChunk(r.chunkSpec)
p := t.Pieces[r.Index]
return !p.pendingChunk(r.chunkSpec, t.chunkSize)
}
func chunkIndex(cs chunkSpec) int {
func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
return int(cs.Begin / chunkSize)
}
@ -683,7 +685,7 @@ func (t *torrent) wantChunk(r request) bool {
if !t.wantPiece(int(r.Index)) {
return false
}
if t.Pieces[r.Index].pendingChunk(r.chunkSpec) {
if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
return true
}
_, ok := t.urgent[r]