Remove events from webseed

Manage this stuff inside the webseed peer instead.
This commit is contained in:
Matt Joiner 2020-06-02 13:54:26 +10:00
parent 026c737886
commit 054ea59e6d
3 changed files with 47 additions and 58 deletions

View File

@ -2009,6 +2009,7 @@ func (t *Torrent) addWebSeed(url string) {
if _, ok := t.webSeeds[url]; ok {
return
}
const maxRequests = 10
ws := webSeed{
peer: peer{
t: t,
@ -2017,17 +2018,16 @@ func (t *Torrent) addWebSeed(url string) {
network: "http",
reconciledHandshakeStats: true,
peerSentHaveAll: true,
PeerMaxRequests: 10,
PeerMaxRequests: maxRequests,
},
client: webseed.Client{
HttpClient: http.DefaultClient,
Url: url,
FileIndex: t.fileIndex,
Info: t.info,
Events: make(chan webseed.ClientEvent),
},
requests: make(map[request]webseed.Request, maxRequests),
}
go ws.eventProcessor()
ws.peer.PeerImpl = &ws
t.webSeeds[url] = &ws.peer
}

View File

@ -24,15 +24,9 @@ type webseedRequest struct {
}
type webSeed struct {
client webseed.Client
peer peer
}
type webseedClientEvent interface{}
type webseedRequestFailed struct {
r request
err error
client webseed.Client
requests map[request]webseed.Request
peer peer
}
var _ PeerImpl = (*webSeed)(nil)
@ -46,12 +40,18 @@ func (ws *webSeed) WriteInterested(interested bool) bool {
}
func (ws *webSeed) Cancel(r request) bool {
//panic("implement me")
ws.requests[r].Cancel()
return true
}
func (ws *webSeed) intoSpec(r request) webseed.RequestSpec {
return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
}
func (ws *webSeed) Request(r request) bool {
ws.client.Request(webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)})
webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
ws.requests[r] = webseedRequest
go ws.requestResultHandler(r, webseedRequest)
return true
}
@ -68,25 +68,17 @@ func (ws *webSeed) UpdateRequests() {
func (ws *webSeed) Close() {}
func (ws *webSeed) eventProcessor() {
for ev := range ws.client.Events {
if ev.Err != nil {
panic(ev)
}
r, ok := ws.peer.t.offsetRequest(ev.RequestSpec.Start)
if !ok {
panic(ev)
}
ws.peer.t.cl.lock()
err := ws.peer.receiveChunk(&pp.Message{
Type: pp.Piece,
Index: r.Index,
Begin: r.Begin,
Piece: ev.Bytes,
})
ws.peer.t.cl.unlock()
if err != nil {
panic(err)
}
func (ws *webSeed) requestResultHandler(r request, webseedRequest webseed.Request) {
webseedRequestResult := <-webseedRequest.Result
ws.peer.t.cl.lock()
err := ws.peer.receiveChunk(&pp.Message{
Type: pp.Piece,
Index: r.Index,
Begin: r.Begin,
Piece: webseedRequestResult.Bytes,
})
ws.peer.t.cl.unlock()
if err != nil {
panic(err)
}
}

View File

@ -13,7 +13,7 @@ import (
type RequestSpec = segments.Extent
type httpRequestResult struct {
type requestPartResult struct {
resp *http.Response
err error
}
@ -21,11 +21,16 @@ type httpRequestResult struct {
type requestPart struct {
req *http.Request
e segments.Extent
result chan httpRequestResult
result chan requestPartResult
}
type request struct {
type Request struct {
cancel func()
Result chan RequestResult
}
func (r Request) Cancel() {
r.cancel()
}
type Client struct {
@ -33,22 +38,14 @@ type Client struct {
Url string
FileIndex segments.Index
Info *metainfo.Info
requests map[RequestSpec]request
Events chan ClientEvent
}
type ClientEvent struct {
RequestSpec RequestSpec
Bytes []byte
Err error
type RequestResult struct {
Bytes []byte
Err error
}
func (ws *Client) Cancel(r RequestSpec) {
ws.requests[r].cancel()
}
func (ws *Client) Request(r RequestSpec) {
func (ws *Client) NewRequest(r RequestSpec) Request {
ctx, cancel := context.WithCancel(context.Background())
var requestParts []requestPart
if !ws.FileIndex.Locate(r, func(i int, e segments.Extent) bool {
@ -59,12 +56,12 @@ func (ws *Client) Request(r RequestSpec) {
req = req.WithContext(ctx)
part := requestPart{
req: req,
result: make(chan httpRequestResult, 1),
result: make(chan requestPartResult, 1),
e: e,
}
go func() {
resp, err := ws.HttpClient.Do(req)
part.result <- httpRequestResult{
part.result <- requestPartResult{
resp: resp,
err: err,
}
@ -74,18 +71,18 @@ func (ws *Client) Request(r RequestSpec) {
}) {
panic("request out of file bounds")
}
if ws.requests == nil {
ws.requests = make(map[RequestSpec]request)
req := Request{
cancel: cancel,
Result: make(chan RequestResult, 1),
}
ws.requests[r] = request{cancel}
go func() {
b, err := readRequestPartResponses(requestParts)
ws.Events <- ClientEvent{
RequestSpec: r,
Bytes: b,
Err: err,
req.Result <- RequestResult{
Bytes: b,
Err: err,
}
}()
return req
}
func recvPartResult(buf io.Writer, part requestPart) error {