add comment, use announceList[0] to fullfille announce field

This commit is contained in:
whr819987540 2023-05-06 11:44:23 +08:00
parent 64c11ff7b5
commit 1346784b01
2 changed files with 18 additions and 2 deletions

View File

@ -269,6 +269,8 @@ func (cn *PeerConn) fillWriteBuffer() {
return return
} }
} }
// write负责将msg写入缓冲区
// upload负责send
cn.upload(cn.write) cn.upload(cn.write)
} }
@ -481,6 +483,7 @@ func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) {
// startFetch is for testing purposes currently. // startFetch is for testing purposes currently.
func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1) requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
// 来自某个peer的重复请求
if _, ok := c.peerRequests[r]; ok { if _, ok := c.peerRequests[r]; ok {
torrent.Add("duplicate requests received", 1) torrent.Add("duplicate requests received", 1)
if c.fastEnabled() { if c.fastEnabled() {
@ -488,6 +491,7 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
} }
return nil return nil
} }
// 当前peer choking, 无法返回chunk
if c.choking { if c.choking {
torrent.Add("requests received while choking", 1) torrent.Add("requests received while choking", 1)
if c.fastEnabled() { if c.fastEnabled() {
@ -505,6 +509,7 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
// BEP 6 says we may close here if we choose. // BEP 6 says we may close here if we choose.
return nil return nil
} }
// chunk size is usually 16 KiB
if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value { if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value {
err := fmt.Errorf("peer requested chunk too long (%v)", r.Length) err := fmt.Errorf("peer requested chunk too long (%v)", r.Length)
c.logger.Levelf(log.Warning, err.Error()) c.logger.Levelf(log.Warning, err.Error())
@ -515,11 +520,13 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
return err return err
} }
} }
// 当前peer没有该piece, 因此没有该chunk
if !c.t.havePiece(pieceIndex(r.Index)) { if !c.t.havePiece(pieceIndex(r.Index)) {
// TODO: Tell the peer we don't have the piece, and reject this request. // TODO: Tell the peer we don't have the piece, and reject this request.
requestsReceivedForMissingPieces.Add(1) requestsReceivedForMissingPieces.Add(1)
return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int()) return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
} }
// 检查chunk是否超过piece的范围
pieceLength := c.t.pieceLength(pieceIndex(r.Index)) pieceLength := c.t.pieceLength(pieceIndex(r.Index))
// Check this after we know we have the piece, so that the piece length will be known. // Check this after we know we have the piece, so that the piece length will be known.
if chunkOverflowsPiece(r.ChunkSpec, pieceLength) { if chunkOverflowsPiece(r.ChunkSpec, pieceLength) {
@ -541,7 +548,7 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error {
} }
func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) { func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) {
b, err := c.readPeerRequestData(r, prs) b, err := c.readPeerRequestData(r, prs) // 读取数据(读取方式与该torrent的storage有关)
c.locker().Lock() c.locker().Lock()
defer c.locker().Unlock() defer c.locker().Unlock()
if err != nil { if err != nil {
@ -598,6 +605,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) {
} }
} }
// 利用 torrent.Piece.Storage().ReadAt() 读取数据到b
func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) { func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) {
// Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere, // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere,
// or fail to read and then cleanup. // or fail to read and then cleanup.
@ -735,12 +743,14 @@ func (c *PeerConn) mainReadLoop() (err error) {
err = c.peerSentHave(pieceIndex(msg.Index)) err = c.peerSentHave(pieceIndex(msg.Index))
case pp.Bitfield: case pp.Bitfield:
err = c.peerSentBitfield(msg.Bitfield) err = c.peerSentBitfield(msg.Bitfield)
// peer对chunk的请求(需要发送chunk, load from memory)
case pp.Request: case pp.Request:
r := newRequestFromMessage(&msg) r := newRequestFromMessage(&msg)
err = c.onReadRequest(r, true) err = c.onReadRequest(r, true)
if err != nil { if err != nil {
err = fmt.Errorf("on reading request %v: %w", r, err) err = fmt.Errorf("on reading request %v: %w", r, err)
} }
// 收到peer的chunk(需要接收chunk, store into memory)
case pp.Piece: case pp.Piece:
c.doChunkReadStats(int64(len(msg.Piece))) c.doChunkReadStats(int64(len(msg.Piece)))
err = c.receiveChunk(&msg) err = c.receiveChunk(&msg)
@ -939,6 +949,8 @@ another:
if !more { if !more {
return false return false
} }
// 完成一次写入, 打印日志
log.Printf("write %d to %s",len(state.data),c.RemoteAddr)
goto another goto another
} }
return true return true
@ -960,6 +972,9 @@ func (c *PeerConn) tickleWriter() {
c.messageWriter.writeCond.Broadcast() c.messageWriter.writeCond.Broadcast()
} }
// 函数式编程
// fillWriteBuffer => cn.upload(cn.write)
// msg即cn.write负责将数据写入缓冲区(套接字的缓冲区)
func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) { func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
c.lastChunkSent = time.Now() c.lastChunkSent = time.Now()
state.allocReservation.Release() state.allocReservation.Release()

View File

@ -749,7 +749,7 @@ func (t *Torrent) writeStatus(w io.Writer) {
ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String()) ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String())
return ml.Less() return ml.Less()
}).([]torrentTrackerAnnouncer) { }).([]torrentTrackerAnnouncer) {
fmt.Fprintf(tw, " %q\t%v\n", ta.URL(), ta.statusLine()) fmt.Fprintf(tw, " %q\t%v\n", ta.URL(), ta.statusLine()) // tracker的状态
} }
tw.Flush() tw.Flush()
}() }()
@ -1447,6 +1447,7 @@ func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]st
func (t *Torrent) addTrackers(announceList [][]string) { func (t *Torrent) addTrackers(announceList [][]string) {
fullAnnounceList := &t.metainfo.AnnounceList fullAnnounceList := &t.metainfo.AnnounceList
t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList)) t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList))
t.metainfo.Announce = announceList[0][0]
for tierIndex, trackerURLs := range announceList { for tierIndex, trackerURLs := range announceList {
(*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs) (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs)
} }