Save progress

This commit is contained in:
Matt Joiner 2020-05-30 17:52:27 +10:00
parent de88c620bc
commit 3d1f08b12f
3 changed files with 81 additions and 17 deletions

View File

@ -41,6 +41,8 @@ type peerImpl interface {
cancel(request) bool cancel(request) bool
request(request) bool request(request) bool
connectionFlags() string connectionFlags() string
close()
postCancel(request)
drop() drop()
} }
@ -335,16 +337,20 @@ func (cn *peer) writeStatus(w io.Writer, t *Torrent) {
) )
} }
func (cn *PeerConn) close() { func (cn *peer) close() {
if !cn.closed.Set() { if !cn.closed.Set() {
return return
} }
cn.discardPieceInclination()
cn._pieceRequestOrder.Clear()
cn.peerImpl.close()
}
func (cn *PeerConn) close() {
if cn.pex.IsEnabled() { if cn.pex.IsEnabled() {
cn.pex.Close() cn.pex.Close()
} }
cn.tickleWriter() cn.tickleWriter()
cn.discardPieceInclination()
cn._pieceRequestOrder.Clear()
if cn.conn != nil { if cn.conn != nil {
cn.conn.Close() cn.conn.Close()
} }
@ -367,6 +373,15 @@ func (cn *PeerConn) post(msg pp.Message) {
cn.tickleWriter() cn.tickleWriter()
} }
func (cn *PeerConn) write(msg pp.Message) bool {
cn.wroteMsg(&msg)
cn.writeBuffer.Write(msg.MustMarshalBinary())
torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
// 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update
// when https://github.com/pion/datachannel/issues/59 is fixed.
return cn.writeBuffer.Len() < 1<<15
}
func (cn *PeerConn) requestMetadataPiece(index int) { func (cn *PeerConn) requestMetadataPiece(index int) {
eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata] eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
if eID == 0 { if eID == 0 {
@ -606,15 +621,6 @@ func (cn *PeerConn) fillWriteBuffer() {
cn.upload(cn.write) cn.upload(cn.write)
} }
func (cn *PeerConn) write(msg pp.Message) bool {
cn.wroteMsg(&msg)
cn.writeBuffer.Write(msg.MustMarshalBinary())
torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
// 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update
// when https://github.com/pion/datachannel/issues/59 is fixed.
return cn.writeBuffer.Len() < 1<<15
}
// Routine that writes to the peer. Some of what to write is buffered by // Routine that writes to the peer. Some of what to write is buffered by
// activity elsewhere in the Client, and some is determined locally when the // activity elsewhere in the Client, and some is determined locally when the
// connection is writable. // connection is writable.
@ -803,7 +809,7 @@ func (cn *PeerConn) getPieceInclination() []int {
return cn.pieceInclination return cn.pieceInclination
} }
func (cn *PeerConn) discardPieceInclination() { func (cn *peer) discardPieceInclination() {
if cn.pieceInclination == nil { if cn.pieceInclination == nil {
return return
} }
@ -1475,7 +1481,7 @@ func (c *peer) deleteRequest(r request) bool {
return true return true
} }
func (c *PeerConn) deleteAllRequests() { func (c *peer) deleteAllRequests() {
for r := range c.requests { for r := range c.requests {
c.deleteRequest(r) c.deleteRequest(r)
} }
@ -1491,14 +1497,18 @@ func (c *PeerConn) tickleWriter() {
c.writerCond.Broadcast() c.writerCond.Broadcast()
} }
func (c *PeerConn) postCancel(r request) bool { func (c *peer) postCancel(r request) bool {
if !c.deleteRequest(r) { if !c.deleteRequest(r) {
return false return false
} }
c.post(makeCancelMessage(r)) c.peerImpl.postCancel(r)
return true return true
} }
func (c *PeerConn) postCancel(r request) {
c.post(makeCancelMessage(r))
}
func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) { func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
// Count the chunk being sent, even if it isn't. // Count the chunk being sent, even if it isn't.
b := make([]byte, r.Length) b := make([]byte, r.Length)

View File

@ -77,6 +77,8 @@ type Torrent struct {
info *metainfo.Info info *metainfo.Info
files *[]*File files *[]*File
webSeeds map[string]*peer
// Active peer connections, running message stream loops. TODO: Make this // Active peer connections, running message stream loops. TODO: Make this
// open (not-closed) connections only. // open (not-closed) connections only.
conns map[*PeerConn]struct{} conns map[*PeerConn]struct{}
@ -1226,12 +1228,19 @@ func (t *Torrent) deleteConnection(c *PeerConn) (ret bool) {
} }
torrent.Add("deleted connections", 1) torrent.Add("deleted connections", 1)
c.deleteAllRequests() c.deleteAllRequests()
if len(t.conns) == 0 { if t.numActivePeers() == 0 {
t.assertNoPendingRequests() t.assertNoPendingRequests()
} }
return return
} }
func (t *Torrent) numActivePeers() (num int) {
t.iterPeers(func(*peer) {
num++
})
return
}
func (t *Torrent) assertNoPendingRequests() { func (t *Torrent) assertNoPendingRequests() {
if len(t.pendingRequests) != 0 { if len(t.pendingRequests) != 0 {
panic(t.pendingRequests) panic(t.pendingRequests)
@ -1979,6 +1988,18 @@ func (t *Torrent) iterPeers(f func(*peer)) {
for pc := range t.conns { for pc := range t.conns {
f(&pc.peer) f(&pc.peer)
} }
for _, ws := range t.webSeeds {
f(ws)
}
}
func (t *Torrent) addWebSeed(url string) {
if _, ok := t.webSeeds[url]; ok {
return
}
t.webSeeds[url] = &peer{
peerImpl: &webSeed{},
}
} }
func (t *Torrent) peerIsActive(p *peer) (active bool) { func (t *Torrent) peerIsActive(p *peer) (active bool) {

33
web_seed.go Normal file
View File

@ -0,0 +1,33 @@
package torrent
import (
"net/http"
)
type webSeed struct {
peer *peer
httpClient *http.Client
}
func (ws *webSeed) writeInterested(interested bool) bool {
return true
}
func (ws *webSeed) cancel(r request) bool {
panic("implement me")
}
func (ws *webSeed) request(r request) bool {
panic("implement me")
}
func (ws *webSeed) connectionFlags() string {
return "WS"
}
func (ws *webSeed) drop() {
}
func (ws *webSeed) updateRequests() {
ws.peer.doRequestState()
}