Further progress on webseeding

This commit is contained in:
Matt Joiner 2020-06-01 18:25:45 +10:00
parent 67c9021e97
commit ff53ab860c
9 changed files with 228 additions and 59 deletions

View File

@ -1233,9 +1233,9 @@ func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
var ss []string var ss []string
slices.MakeInto(&ss, mi.Nodes) slices.MakeInto(&ss, mi.Nodes)
cl.AddDHTNodes(ss) cl.AddDHTNodes(ss)
//for _, url := range mi.UrlList { for _, url := range mi.UrlList {
//T.addWebSeed(url) T.addWebSeed(url)
//} }
return return
} }

View File

@ -39,6 +39,7 @@ type PeerImpl interface {
UpdateRequests() UpdateRequests()
WriteInterested(interested bool) bool WriteInterested(interested bool) bool
Cancel(request) bool Cancel(request) bool
// Return true if there's room for more activity.
Request(request) bool Request(request) bool
ConnectionFlags() string ConnectionFlags() string
Close() Close()
@ -211,7 +212,7 @@ func (cn *PeerConn) localAddr() net.Addr {
return cn.conn.LocalAddr() return cn.conn.LocalAddr()
} }
func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool { func (cn *peer) supportsExtension(ext pp.ExtensionName) bool {
_, ok := cn.PeerExtensionIDs[ext] _, ok := cn.PeerExtensionIDs[ext]
return ok return ok
} }
@ -373,6 +374,7 @@ func (cn *PeerConn) post(msg pp.Message) {
cn.tickleWriter() cn.tickleWriter()
} }
// Returns true if there's room to write more.
func (cn *PeerConn) write(msg pp.Message) bool { func (cn *PeerConn) write(msg pp.Message) bool {
cn.wroteMsg(&msg) cn.wroteMsg(&msg)
cn.writeBuffer.Write(msg.MustMarshalBinary()) cn.writeBuffer.Write(msg.MustMarshalBinary())
@ -954,7 +956,7 @@ func (cn *PeerConn) readBytes(n int64) {
// Returns whether the connection could be useful to us. We're seeding and // Returns whether the connection could be useful to us. We're seeding and
// they want data, we don't have metainfo and they can provide it, etc. // they want data, we don't have metainfo and they can provide it, etc.
func (c *PeerConn) useful() bool { func (c *peer) useful() bool {
t := c.t t := c.t
if c.closed.IsSet() { if c.closed.IsSet() {
return false return false
@ -1448,7 +1450,7 @@ func (cn *peer) netGoodPiecesDirtied() int64 {
return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64() return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64()
} }
func (c *PeerConn) peerHasWantedPieces() bool { func (c *peer) peerHasWantedPieces() bool {
return !c._pieceRequestOrder.IsEmpty() return !c._pieceRequestOrder.IsEmpty()
} }
@ -1541,7 +1543,7 @@ func (c *PeerConn) setTorrent(t *Torrent) {
t.reconcileHandshakeStats(c) t.reconcileHandshakeStats(c)
} }
func (c *PeerConn) peerPriority() (peerPriority, error) { func (c *peer) peerPriority() (peerPriority, error) {
return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp())) return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
} }
@ -1549,7 +1551,7 @@ func (c *peer) remoteIp() net.IP {
return addrIpOrNil(c.remoteAddr) return addrIpOrNil(c.remoteAddr)
} }
func (c *PeerConn) remoteIpPort() IpPort { func (c *peer) remoteIpPort() IpPort {
ipa, _ := tryIpPortFromNetAddr(c.remoteAddr) ipa, _ := tryIpPortFromNetAddr(c.remoteAddr)
return IpPort{ipa.IP, uint16(ipa.Port)} return IpPort{ipa.IP, uint16(ipa.Port)}
} }

View File

@ -29,17 +29,17 @@ func (me Index) iterSegments() func() (Length, bool) {
} }
} }
func (me Index) Locate(e Extent, output Callback) { func (me Index) Locate(e Extent, output Callback) bool {
first := sort.Search(len(me.segments), func(i int) bool { first := sort.Search(len(me.segments), func(i int) bool {
_e := me.segments[i] _e := me.segments[i]
return _e.End() > e.Start return _e.End() > e.Start
}) })
if first == len(me.segments) { if first == len(me.segments) {
return return false
} }
e.Start -= me.segments[first].Start e.Start -= me.segments[first].Start
me.segments = me.segments[first:] me.segments = me.segments[first:]
Scan(me.iterSegments(), e, func(i int, e Extent) bool { return Scan(me.iterSegments(), e, func(i int, e Extent) bool {
return output(i+first, e) return output(i+first, e)
}) })
} }

View File

@ -27,12 +27,12 @@ type (
LengthIter = func() (Length, bool) LengthIter = func() (Length, bool)
) )
func Scan(haystack func() (Length, bool), needle Extent, callback Callback) { func Scan(haystack LengthIter, needle Extent, callback Callback) bool {
i := 0 i := 0
for needle.Length != 0 { for needle.Length != 0 {
l, ok := haystack() l, ok := haystack()
if !ok { if !ok {
return return false
} }
if needle.Start < l || needle.Start == l && l == 0 { if needle.Start < l || needle.Start == l && l == 0 {
e1 := Extent{ e1 := Extent{
@ -41,7 +41,7 @@ func Scan(haystack func() (Length, bool), needle Extent, callback Callback) {
} }
if e1.Length >= 0 { if e1.Length >= 0 {
if !callback(i, e1) { if !callback(i, e1) {
return return true
} }
needle.Start = 0 needle.Start = 0
needle.Length -= e1.Length needle.Length -= e1.Length
@ -51,12 +51,13 @@ func Scan(haystack func() (Length, bool), needle Extent, callback Callback) {
} }
i++ i++
} }
return true
} }
func LocaterFromLengthIter(li LengthIter) Locater { func LocaterFromLengthIter(li LengthIter) Locater {
return func(e Extent, c Callback) { return func(e Extent, c Callback) bool {
Scan(li, e, c) return Scan(li, e, c)
} }
} }
type Locater func(Extent, Callback) type Locater func(Extent, Callback) bool

View File

@ -8,12 +8,17 @@ import (
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
"net/http"
"net/url" "net/url"
"sort"
"sync" "sync"
"text/tabwriter" "text/tabwriter"
"time" "time"
"unsafe" "unsafe"
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/webseed"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel" "github.com/pion/datachannel"
@ -74,8 +79,9 @@ type Torrent struct {
metainfo metainfo.MetaInfo metainfo metainfo.MetaInfo
// The info dict. nil if we don't have it (yet). // The info dict. nil if we don't have it (yet).
info *metainfo.Info info *metainfo.Info
files *[]*File fileIndex segments.Index
files *[]*File
webSeeds map[string]*peer webSeeds map[string]*peer
@ -392,6 +398,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error {
t.nameMu.Lock() t.nameMu.Lock()
t.info = info t.info = info
t.nameMu.Unlock() t.nameMu.Unlock()
t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
t.displayName = "" // Save a few bytes lol. t.displayName = "" // Save a few bytes lol.
t.initFiles() t.initFiles()
t.cacheLength() t.cacheLength()
@ -630,9 +637,11 @@ func (t *Torrent) writeStatus(w io.Writer) {
spew.NewDefaultConfig() spew.NewDefaultConfig()
spew.Fdump(w, t.statsLocked()) spew.Fdump(w, t.statsLocked())
conns := t.connsAsSlice() peers := t.peersAsSlice()
slices.Sort(conns, worseConn) sort.Slice(peers, func(i, j int) bool {
for i, c := range conns { return worseConn(peers[i], peers[j])
})
for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1) fmt.Fprintf(w, "%2d. ", i+1)
c.writeStatus(w, t) c.writeStatus(w, t)
} }
@ -849,10 +858,9 @@ func (t *Torrent) wantPieceIndex(index pieceIndex) bool {
}) })
} }
// The worst connection is one that hasn't been sent, or sent anything useful // The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
// for the longest. A bad connection is one that usually sends us unwanted // connection is one that usually sends us unwanted pieces, or has been in worser half of the
// pieces, or has been in worser half of the established connections for more // established connections for more than a minute.
// than a minute.
func (t *Torrent) worstBadConn() *PeerConn { func (t *Torrent) worstBadConn() *PeerConn {
wcs := worseConnSlice{t.unclosedConnsAsSlice()} wcs := worseConnSlice{t.unclosedConnsAsSlice()}
heap.Init(&wcs) heap.Init(&wcs)
@ -1650,7 +1658,9 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
defer t.cl.unlock() defer t.cl.unlock()
oldMax = t.maxEstablishedConns oldMax = t.maxEstablishedConns
t.maxEstablishedConns = max t.maxEstablishedConns = max
wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn) wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool {
return worseConn(&l.peer, &r.peer)
})
for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 { for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
t.dropConnection(wcs.Pop().(*PeerConn)) t.dropConnection(wcs.Pop().(*PeerConn))
} }
@ -1844,10 +1854,10 @@ func (t *Torrent) clearPieceTouchers(pi pieceIndex) {
} }
} }
func (t *Torrent) connsAsSlice() (ret []*PeerConn) { func (t *Torrent) peersAsSlice() (ret []*peer) {
for c := range t.conns { t.iterPeers(func(p *peer) {
ret = append(ret, c) ret = append(ret, p)
} })
return return
} }
@ -1999,19 +2009,26 @@ func (t *Torrent) addWebSeed(url string) {
if _, ok := t.webSeeds[url]; ok { if _, ok := t.webSeeds[url]; ok {
return return
} }
p := &peer{
t: t,
connString: url,
outgoing: true,
network: "http",
reconciledHandshakeStats: true,
peerSentHaveAll: true,
}
ws := webSeed{ ws := webSeed{
peer: p, peer: peer{
t: t,
connString: url,
outgoing: true,
network: "http",
reconciledHandshakeStats: true,
peerSentHaveAll: true,
PeerMaxRequests: 10,
},
client: webseed.Client{
HttpClient: http.DefaultClient,
Url: url,
FileIndex: t.fileIndex,
Info: t.info,
Events: make(chan webseed.ClientEvent),
},
} }
p.PeerImpl = &ws ws.peer.PeerImpl = &ws
t.webSeeds[url] = p t.webSeeds[url] = &ws.peer
} }

View File

@ -2,16 +2,42 @@ package torrent
import ( import (
"net/http" "net/http"
"github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/webseed"
) )
type webSeed struct { type httpRequestResult struct {
peer *peer resp *http.Response
httpClient *http.Client err error
url string
} }
type requestPart struct {
req *http.Request
e segments.Extent
result chan httpRequestResult
}
type webseedRequest struct {
cancel func()
}
type webSeed struct {
client webseed.Client
peer peer
}
type webseedClientEvent interface{}
type webseedRequestFailed struct {
r request
err error
}
var _ PeerImpl = (*webSeed)(nil)
func (ws *webSeed) PostCancel(r request) { func (ws *webSeed) PostCancel(r request) {
panic("implement me") ws.Cancel(r)
} }
func (ws *webSeed) WriteInterested(interested bool) bool { func (ws *webSeed) WriteInterested(interested bool) bool {
@ -19,11 +45,13 @@ func (ws *webSeed) WriteInterested(interested bool) bool {
} }
func (ws *webSeed) Cancel(r request) bool { func (ws *webSeed) Cancel(r request) bool {
panic("implement me") //panic("implement me")
return true
} }
func (ws *webSeed) Request(r request) bool { func (ws *webSeed) Request(r request) bool {
panic("implement me") ws.client.Request(webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)})
return true
} }
func (ws *webSeed) ConnectionFlags() string { func (ws *webSeed) ConnectionFlags() string {

View File

@ -1,24 +1,119 @@
package webseed package webseed
import ( import (
"bytes"
"context"
"fmt"
"io"
"net/http" "net/http"
pp "github.com/anacrolix/torrent/peer_protocol" "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/segments"
) )
type RequestSpec = pp.RequestSpec type RequestSpec = segments.Extent
type Client struct { type httpRequestResult struct {
HttpClient *http.Client resp *http.Response
Url string err error
}
requests map[RequestSpec]request type requestPart struct {
req *http.Request
e segments.Extent
result chan httpRequestResult
} }
type request struct { type request struct {
cancel func() cancel func()
} }
func (cl *Client) Request(r RequestSpec) { type Client struct {
//cl.HttpClient.Do() HttpClient *http.Client
Url string
FileIndex segments.Index
Info *metainfo.Info
requests map[RequestSpec]request
Events chan ClientEvent
}
type ClientEvent struct {
RequestSpec RequestSpec
Bytes []byte
Err error
}
func (ws *Client) Cancel(r RequestSpec) {
ws.requests[r].cancel()
}
func (ws *Client) Request(r RequestSpec) {
ctx, cancel := context.WithCancel(context.Background())
var requestParts []requestPart
if !ws.FileIndex.Locate(r, func(i int, e segments.Extent) bool {
req, err := NewRequest(ws.Url, i, ws.Info, e.Start, e.Length)
if err != nil {
panic(err)
}
req = req.WithContext(ctx)
part := requestPart{
req: req,
result: make(chan httpRequestResult, 1),
e: e,
}
go func() {
resp, err := ws.HttpClient.Do(req)
part.result <- httpRequestResult{
resp: resp,
err: err,
}
}()
requestParts = append(requestParts, part)
return true
}) {
panic("request out of file bounds")
}
if ws.requests == nil {
ws.requests = make(map[RequestSpec]request)
}
ws.requests[r] = request{cancel}
go func() {
b, err := readRequestPartResponses(requestParts)
ws.Events <- ClientEvent{
RequestSpec: r,
Bytes: b,
Err: err,
}
}()
}
func recvPartResult(buf io.Writer, part requestPart) error {
result := <-part.result
if result.err != nil {
return result.err
}
defer result.resp.Body.Close()
if part.e.Start != 0 && result.resp.StatusCode != http.StatusPartialContent {
return fmt.Errorf("expected partial content response got %v", result.resp.StatusCode)
}
copied, err := io.Copy(buf, result.resp.Body)
if err != nil {
return err
}
if copied != part.e.Length {
return fmt.Errorf("got %v bytes, expected %v", copied, part.e.Length)
}
return nil
}
func readRequestPartResponses(parts []requestPart) ([]byte, error) {
var buf bytes.Buffer
for _, part := range parts {
err := recvPartResult(&buf, part)
if err != nil {
return buf.Bytes(), err
}
}
return buf.Bytes(), nil
} }

26
webseed/misc.go Normal file
View File

@ -0,0 +1,26 @@
package webseed
import (
"fmt"
"net/http"
"path"
"strings"
"github.com/anacrolix/torrent/metainfo"
)
// Creates a request per BEP 19.
func NewRequest(url string, fileIndex int, info *metainfo.Info, offset, length int64) (*http.Request, error) {
fileInfo := info.UpvertedFiles()[fileIndex]
if strings.HasSuffix(url, "/") {
url += path.Join(append([]string{info.Name}, fileInfo.Path...)...)
}
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
if offset != 0 || length != fileInfo.Length {
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
}
return req, nil
}

View File

@ -8,7 +8,7 @@ import (
"github.com/anacrolix/multiless" "github.com/anacrolix/multiless"
) )
func worseConn(l, r *PeerConn) bool { func worseConn(l, r *peer) bool {
less, ok := multiless.New().Bool( less, ok := multiless.New().Bool(
l.useful(), r.useful()).CmpInt64( l.useful(), r.useful()).CmpInt64(
l.lastHelpful().Sub(r.lastHelpful()).Nanoseconds()).CmpInt64( l.lastHelpful().Sub(r.lastHelpful()).Nanoseconds()).CmpInt64(
@ -45,7 +45,7 @@ func (me worseConnSlice) Len() int {
} }
func (me worseConnSlice) Less(i, j int) bool { func (me worseConnSlice) Less(i, j int) bool {
return worseConn(me.conns[i], me.conns[j]) return worseConn(&me.conns[i].peer, &me.conns[j].peer)
} }
func (me *worseConnSlice) Pop() interface{} { func (me *worseConnSlice) Pop() interface{} {