diff --git a/peerconn.go b/peerconn.go index 0318cb6e..5170a749 100644 --- a/peerconn.go +++ b/peerconn.go @@ -269,6 +269,8 @@ func (cn *PeerConn) fillWriteBuffer() { return } } + // write负责将msg写入缓冲区 + // upload负责send cn.upload(cn.write) } @@ -481,6 +483,7 @@ func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) { // startFetch is for testing purposes currently. func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1) + // 来自某个peer的重复请求 if _, ok := c.peerRequests[r]; ok { torrent.Add("duplicate requests received", 1) if c.fastEnabled() { @@ -488,6 +491,7 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { } return nil } + // 当前peer choking, 无法返回chunk if c.choking { torrent.Add("requests received while choking", 1) if c.fastEnabled() { @@ -505,6 +509,7 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { // BEP 6 says we may close here if we choose. return nil } + // chunk size is usually 16 KiB if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value { err := fmt.Errorf("peer requested chunk too long (%v)", r.Length) c.logger.Levelf(log.Warning, err.Error()) @@ -515,11 +520,13 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { return err } } + // 当前peer没有该piece, 因此没有该chunk if !c.t.havePiece(pieceIndex(r.Index)) { // TODO: Tell the peer we don't have the piece, and reject this request. requestsReceivedForMissingPieces.Add(1) return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int()) } + // 检查chunk是否超过piece的范围 pieceLength := c.t.pieceLength(pieceIndex(r.Index)) // Check this after we know we have the piece, so that the piece length will be known. if chunkOverflowsPiece(r.ChunkSpec, pieceLength) { @@ -541,7 +548,7 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { } func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) { - b, err := c.readPeerRequestData(r, prs) + b, err := c.readPeerRequestData(r, prs) // 读取数据(读取方式与该torrent的storage有关) c.locker().Lock() defer c.locker().Unlock() if err != nil { @@ -598,6 +605,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) { } } +// 利用 torrent.Piece.Storage().ReadAt() 读取数据到b func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) { // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere, // or fail to read and then cleanup. @@ -735,12 +743,14 @@ func (c *PeerConn) mainReadLoop() (err error) { err = c.peerSentHave(pieceIndex(msg.Index)) case pp.Bitfield: err = c.peerSentBitfield(msg.Bitfield) + // peer对chunk的请求(需要发送chunk, load from memory) case pp.Request: r := newRequestFromMessage(&msg) err = c.onReadRequest(r, true) if err != nil { err = fmt.Errorf("on reading request %v: %w", r, err) } + // 收到peer的chunk(需要接收chunk, store into memory) case pp.Piece: c.doChunkReadStats(int64(len(msg.Piece))) err = c.receiveChunk(&msg) @@ -939,6 +949,8 @@ another: if !more { return false } + // 完成一次写入, 打印日志 + log.Printf("write %d to %s",len(state.data),c.RemoteAddr) goto another } return true @@ -960,6 +972,9 @@ func (c *PeerConn) tickleWriter() { c.messageWriter.writeCond.Broadcast() } +// 函数式编程 +// fillWriteBuffer => cn.upload(cn.write) +// msg即cn.write负责将数据写入缓冲区(套接字的缓冲区) func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) { c.lastChunkSent = time.Now() state.allocReservation.Release() diff --git a/torrent.go b/torrent.go index b5cda964..e80a4657 100644 --- a/torrent.go +++ b/torrent.go @@ -749,7 +749,7 @@ func (t *Torrent) writeStatus(w io.Writer) { ml.StrictNext(lu.String() == ru.String(), lu.String() < ru.String()) return ml.Less() }).([]torrentTrackerAnnouncer) { - fmt.Fprintf(tw, " %q\t%v\n", ta.URL(), ta.statusLine()) + fmt.Fprintf(tw, " %q\t%v\n", ta.URL(), ta.statusLine()) // tracker的状态 } tw.Flush() }() @@ -1447,6 +1447,7 @@ func appendMissingTrackerTiers(existing [][]string, minNumTiers int) (ret [][]st func (t *Torrent) addTrackers(announceList [][]string) { fullAnnounceList := &t.metainfo.AnnounceList t.metainfo.AnnounceList = appendMissingTrackerTiers(*fullAnnounceList, len(announceList)) + t.metainfo.Announce = announceList[0][0] for tierIndex, trackerURLs := range announceList { (*fullAnnounceList)[tierIndex] = appendMissingStrings((*fullAnnounceList)[tierIndex], trackerURLs) }