diff --git a/cmd/soong_zip/rate_limit.go b/cmd/soong_zip/rate_limit.go index 9e95bc14b..9cb5fdd51 100644 --- a/cmd/soong_zip/rate_limit.go +++ b/cmd/soong_zip/rate_limit.go @@ -15,71 +15,54 @@ package main import ( + "fmt" "runtime" ) type RateLimit struct { - requests chan struct{} - finished chan int - released chan int - stop chan struct{} + requests chan request + completions chan int64 + + stop chan struct{} } -// NewRateLimit starts a new rate limiter with maxExecs number of executions -// allowed to happen at a time. If maxExecs is <= 0, it will default to the -// number of logical CPUs on the system. -// -// With Finish and Release, we'll keep track of outstanding buffer sizes to be -// written. If that size goes above maxMem, we'll prevent starting new -// executions. -// -// The total memory use may be higher due to current executions. This just -// prevents runaway memory use due to slower writes. -func NewRateLimit(maxExecs int, maxMem int64) *RateLimit { - if maxExecs <= 0 { - maxExecs = runtime.NumCPU() - } - if maxMem <= 0 { - // Default to 512MB - maxMem = 512 * 1024 * 1024 - } +type request struct { + size int64 + serviced chan struct{} +} +// NewRateLimit starts a new rate limiter that permits the usage of up to at once, +// except when no capacity is in use, in which case the first caller is always permitted +func NewRateLimit(capacity int64) *RateLimit { ret := &RateLimit{ - requests: make(chan struct{}), + requests: make(chan request), + completions: make(chan int64), - // Let all of the pending executions to mark themselves as finished, - // even if our goroutine isn't processing input. - finished: make(chan int, maxExecs), - - released: make(chan int), - stop: make(chan struct{}), + stop: make(chan struct{}), } - go ret.goFunc(maxExecs, maxMem) + go ret.monitorChannels(capacity) return ret } -// RequestExecution blocks until another execution can be allowed to run. -func (r *RateLimit) RequestExecution() Execution { - <-r.requests - return r.finished +// RequestExecution blocks until another execution of size can be allowed to run. +func (r *RateLimit) Request(size int64) { + request := request{ + size: size, + serviced: make(chan struct{}, 1), + } + + // wait for the request to be received + r.requests <- request + + // wait for the request to be accepted + <-request.serviced } -type Execution chan<- int - -// Finish will mark your execution as finished, and allow another request to be -// approved. -// -// bufferSize may be specified to count memory buffer sizes, and must be -// matched with calls to RateLimit.Release to mark the buffers as released. -func (e Execution) Finish(bufferSize int) { - e <- bufferSize -} - -// Call Release when finished with a buffer recorded with Finish. -func (r *RateLimit) Release(bufferSize int) { - r.released <- bufferSize +// Finish declares the completion of an execution of size +func (r *RateLimit) Finish(size int64) { + r.completions <- size } // Stop the background goroutine @@ -87,29 +70,83 @@ func (r *RateLimit) Stop() { close(r.stop) } -func (r *RateLimit) goFunc(maxExecs int, maxMem int64) { - var curExecs int - var curMemory int64 +// monitorChannels processes incoming requests from channels +func (r *RateLimit) monitorChannels(capacity int64) { + var usedCapacity int64 + var currentRequest *request for { - var requests chan struct{} - if curExecs < maxExecs && curMemory < maxMem { + var requests chan request + if currentRequest == nil { + // If we don't already have a queued request, then we should check for a new request requests = r.requests } select { - case requests <- struct{}{}: - curExecs++ - case amount := <-r.finished: - curExecs-- - curMemory += int64(amount) - if curExecs < 0 { - panic("curExecs < 0") + case newRequest := <-requests: + currentRequest = &newRequest + case amountCompleted := <-r.completions: + usedCapacity -= amountCompleted + + if usedCapacity < 0 { + panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted)) } - case amount := <-r.released: - curMemory -= int64(amount) case <-r.stop: return } + + if currentRequest != nil { + accepted := false + if usedCapacity == 0 { + accepted = true + } else { + if capacity >= usedCapacity+currentRequest.size { + accepted = true + } + } + if accepted { + usedCapacity += currentRequest.size + currentRequest.serviced <- struct{}{} + currentRequest = nil + } + } } } + +// A CPURateLimiter limits the number of active calls based on CPU requirements +type CPURateLimiter struct { + impl *RateLimit +} + +func NewCPURateLimiter(capacity int64) *CPURateLimiter { + if capacity <= 0 { + capacity = int64(runtime.NumCPU()) + } + impl := NewRateLimit(capacity) + return &CPURateLimiter{impl: impl} +} + +func (e CPURateLimiter) Request() { + e.impl.Request(1) +} + +func (e CPURateLimiter) Finish() { + e.impl.Finish(1) +} + +func (e CPURateLimiter) Stop() { + e.impl.Stop() +} + +// A MemoryRateLimiter limits the number of active calls based on Memory requirements +type MemoryRateLimiter struct { + *RateLimit +} + +func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter { + if capacity <= 0 { + capacity = 512 * 1024 * 1024 // 512MB + } + impl := NewRateLimit(capacity) + return &MemoryRateLimiter{RateLimit: impl} +} diff --git a/cmd/soong_zip/soong_zip.go b/cmd/soong_zip/soong_zip.go index d634dda3c..cbec1029a 100644 --- a/cmd/soong_zip/soong_zip.go +++ b/cmd/soong_zip/soong_zip.go @@ -163,7 +163,8 @@ type zipWriter struct { errors chan error writeOps chan chan *zipEntry - rateLimit *RateLimit + cpuRateLimiter *CPURateLimiter + memoryRateLimiter *MemoryRateLimiter compressorPool sync.Pool compLevel int @@ -174,6 +175,10 @@ type zipEntry struct { // List of delayed io.Reader futureReaders chan chan io.Reader + + // Only used for passing into the MemoryRateLimiter to ensure we + // release as much memory as much as we request + allocatedSize int64 } func main() { @@ -295,9 +300,12 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin // The RateLimit object will put the upper bounds on the number of // parallel compressions and outstanding buffers. z.writeOps = make(chan chan *zipEntry, 1000) - z.rateLimit = NewRateLimit(*parallelJobs, 0) - defer z.rateLimit.Stop() - + z.cpuRateLimiter = NewCPURateLimiter(int64(*parallelJobs)) + z.memoryRateLimiter = NewMemoryRateLimiter(0) + defer func() { + z.cpuRateLimiter.Stop() + z.memoryRateLimiter.Stop() + }() go func() { var err error defer close(z.writeOps) @@ -369,6 +377,7 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin currentWriter.Close() currentWriter = nil } + z.memoryRateLimiter.Finish(op.allocatedSize) case futureReader, ok := <-readersChan: if !ok { @@ -381,12 +390,10 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin currentReader = futureReader case reader := <-currentReader: - var count int64 - count, err = io.Copy(currentWriter, reader) + _, err = io.Copy(currentWriter, reader) if err != nil { return err } - z.rateLimit.Release(int(count)) currentReader = nil @@ -456,7 +463,9 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error { return err } - exec := z.rateLimit.RequestExecution() + ze.allocatedSize = fileSize + z.cpuRateLimiter.Request() + z.memoryRateLimiter.Request(ze.allocatedSize) if method == zip.Deflate && fileSize >= minParallelFileSize { wg := new(sync.WaitGroup) @@ -473,14 +482,14 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error { // know the result before we can begin writing the compressed // data out to the zipfile. wg.Add(1) - go z.crcFile(r, ze, exec, compressChan, wg) + go z.crcFile(r, ze, compressChan, wg) for start := int64(0); start < fileSize; start += parallelBlockSize { sr := io.NewSectionReader(r, start, parallelBlockSize) resultChan := make(chan io.Reader, 1) ze.futureReaders <- resultChan - exec := z.rateLimit.RequestExecution() + z.cpuRateLimiter.Request() last := !(start+parallelBlockSize < fileSize) var dict []byte @@ -489,7 +498,7 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error { } wg.Add(1) - go z.compressPartialFile(sr, dict, last, exec, resultChan, wg) + go z.compressPartialFile(sr, dict, last, resultChan, wg) } close(ze.futureReaders) @@ -500,15 +509,15 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error { f.Close() }(wg, r) } else { - go z.compressWholeFile(ze, r, exec, compressChan) + go z.compressWholeFile(ze, r, compressChan) } return nil } -func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultChan chan *zipEntry, wg *sync.WaitGroup) { +func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry, wg *sync.WaitGroup) { defer wg.Done() - defer exec.Finish(0) + defer z.cpuRateLimiter.Finish() crc := crc32.NewIEEE() _, err := io.Copy(crc, r) @@ -522,7 +531,7 @@ func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultCha close(resultChan) } -func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exec Execution, resultChan chan io.Reader, wg *sync.WaitGroup) { +func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, resultChan chan io.Reader, wg *sync.WaitGroup) { defer wg.Done() result, err := z.compressBlock(r, dict, last) @@ -531,7 +540,8 @@ func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exe return } - exec.Finish(result.Len()) + z.cpuRateLimiter.Finish() + resultChan <- result } @@ -569,9 +579,7 @@ func (z *zipWriter) compressBlock(r io.Reader, dict []byte, last bool) (*bytes.B return buf, nil } -func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, compressChan chan *zipEntry) { - var bufSize int - +func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan chan *zipEntry) { defer r.Close() crc := crc32.NewIEEE() @@ -616,7 +624,6 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, } if uint64(compressed.Len()) < ze.fh.UncompressedSize64 { futureReader <- compressed - bufSize = compressed.Len() } else { buf, err := readFile(r) if err != nil { @@ -625,7 +632,6 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, } ze.fh.Method = zip.Store futureReader <- bytes.NewReader(buf) - bufSize = int(ze.fh.UncompressedSize64) } } else { buf, err := readFile(r) @@ -635,10 +641,10 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, } ze.fh.Method = zip.Store futureReader <- bytes.NewReader(buf) - bufSize = int(ze.fh.UncompressedSize64) } - exec.Finish(bufSize) + z.cpuRateLimiter.Finish() + close(futureReader) compressChan <- ze @@ -706,11 +712,6 @@ func (z *zipWriter) writeSymlink(rel, file string) error { futureReader <- bytes.NewBufferString(dest) close(futureReader) - // We didn't ask permission to execute, since this should be very short - // but we still need to increment the outstanding buffer sizes, since - // the read will decrement the buffer size. - z.rateLimit.Release(-len(dest)) - ze <- &zipEntry{ fh: fileHeader, futureReaders: futureReaders,