Revert "Revert "Refactor rate_limit.go for more clarify""

This reverts commit 526416b1e4.

Figured out a fix for the deadlocks; resubmitting the patch.
The first version was deadlocking because the switch statement in
zipWriter.write would choose a specific zip entry to start writing,
but the individual chunks may not have all necessarily been compressed
yet. When each individual chunk was made to require to request its own
allocations, the compression of the chunks of the file being currently
written could be blocked waiting for memory to be freed by chunks from
other files that hadn't yet started being written.

This patch is much like the original except it preallocates the memory
for the entire file upfront (and happens to use the total file size
rather than the compressed size, but I didn't observe that to cause any
performance differences).

Bug: 64536066
Test: m -j dist showcommands # which runs soong_zip to package everything

Change-Id: Id1d7ff415e54d3a6be71188abbdbbbab5a719fcf
This commit is contained in:
Jeff Gaston 2017-08-17 21:43:21 -07:00
parent fd697f4256
commit 175f34c5c3
2 changed files with 128 additions and 90 deletions

View File

@ -15,71 +15,54 @@
package main
import (
"fmt"
"runtime"
)
type RateLimit struct {
requests chan struct{}
finished chan int
released chan int
stop chan struct{}
requests chan request
completions chan int64
stop chan struct{}
}
// NewRateLimit starts a new rate limiter with maxExecs number of executions
// allowed to happen at a time. If maxExecs is <= 0, it will default to the
// number of logical CPUs on the system.
//
// With Finish and Release, we'll keep track of outstanding buffer sizes to be
// written. If that size goes above maxMem, we'll prevent starting new
// executions.
//
// The total memory use may be higher due to current executions. This just
// prevents runaway memory use due to slower writes.
func NewRateLimit(maxExecs int, maxMem int64) *RateLimit {
if maxExecs <= 0 {
maxExecs = runtime.NumCPU()
}
if maxMem <= 0 {
// Default to 512MB
maxMem = 512 * 1024 * 1024
}
type request struct {
size int64
serviced chan struct{}
}
// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
// except when no capacity is in use, in which case the first caller is always permitted
func NewRateLimit(capacity int64) *RateLimit {
ret := &RateLimit{
requests: make(chan struct{}),
requests: make(chan request),
completions: make(chan int64),
// Let all of the pending executions to mark themselves as finished,
// even if our goroutine isn't processing input.
finished: make(chan int, maxExecs),
released: make(chan int),
stop: make(chan struct{}),
stop: make(chan struct{}),
}
go ret.goFunc(maxExecs, maxMem)
go ret.monitorChannels(capacity)
return ret
}
// RequestExecution blocks until another execution can be allowed to run.
func (r *RateLimit) RequestExecution() Execution {
<-r.requests
return r.finished
// RequestExecution blocks until another execution of size <size> can be allowed to run.
func (r *RateLimit) Request(size int64) {
request := request{
size: size,
serviced: make(chan struct{}, 1),
}
// wait for the request to be received
r.requests <- request
// wait for the request to be accepted
<-request.serviced
}
type Execution chan<- int
// Finish will mark your execution as finished, and allow another request to be
// approved.
//
// bufferSize may be specified to count memory buffer sizes, and must be
// matched with calls to RateLimit.Release to mark the buffers as released.
func (e Execution) Finish(bufferSize int) {
e <- bufferSize
}
// Call Release when finished with a buffer recorded with Finish.
func (r *RateLimit) Release(bufferSize int) {
r.released <- bufferSize
// Finish declares the completion of an execution of size <size>
func (r *RateLimit) Finish(size int64) {
r.completions <- size
}
// Stop the background goroutine
@ -87,29 +70,83 @@ func (r *RateLimit) Stop() {
close(r.stop)
}
func (r *RateLimit) goFunc(maxExecs int, maxMem int64) {
var curExecs int
var curMemory int64
// monitorChannels processes incoming requests from channels
func (r *RateLimit) monitorChannels(capacity int64) {
var usedCapacity int64
var currentRequest *request
for {
var requests chan struct{}
if curExecs < maxExecs && curMemory < maxMem {
var requests chan request
if currentRequest == nil {
// If we don't already have a queued request, then we should check for a new request
requests = r.requests
}
select {
case requests <- struct{}{}:
curExecs++
case amount := <-r.finished:
curExecs--
curMemory += int64(amount)
if curExecs < 0 {
panic("curExecs < 0")
case newRequest := <-requests:
currentRequest = &newRequest
case amountCompleted := <-r.completions:
usedCapacity -= amountCompleted
if usedCapacity < 0 {
panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted))
}
case amount := <-r.released:
curMemory -= int64(amount)
case <-r.stop:
return
}
if currentRequest != nil {
accepted := false
if usedCapacity == 0 {
accepted = true
} else {
if capacity >= usedCapacity+currentRequest.size {
accepted = true
}
}
if accepted {
usedCapacity += currentRequest.size
currentRequest.serviced <- struct{}{}
currentRequest = nil
}
}
}
}
// A CPURateLimiter limits the number of active calls based on CPU requirements
type CPURateLimiter struct {
impl *RateLimit
}
func NewCPURateLimiter(capacity int64) *CPURateLimiter {
if capacity <= 0 {
capacity = int64(runtime.NumCPU())
}
impl := NewRateLimit(capacity)
return &CPURateLimiter{impl: impl}
}
func (e CPURateLimiter) Request() {
e.impl.Request(1)
}
func (e CPURateLimiter) Finish() {
e.impl.Finish(1)
}
func (e CPURateLimiter) Stop() {
e.impl.Stop()
}
// A MemoryRateLimiter limits the number of active calls based on Memory requirements
type MemoryRateLimiter struct {
*RateLimit
}
func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
if capacity <= 0 {
capacity = 512 * 1024 * 1024 // 512MB
}
impl := NewRateLimit(capacity)
return &MemoryRateLimiter{RateLimit: impl}
}

View File

@ -163,7 +163,8 @@ type zipWriter struct {
errors chan error
writeOps chan chan *zipEntry
rateLimit *RateLimit
cpuRateLimiter *CPURateLimiter
memoryRateLimiter *MemoryRateLimiter
compressorPool sync.Pool
compLevel int
@ -174,6 +175,10 @@ type zipEntry struct {
// List of delayed io.Reader
futureReaders chan chan io.Reader
// Only used for passing into the MemoryRateLimiter to ensure we
// release as much memory as much as we request
allocatedSize int64
}
func main() {
@ -295,9 +300,12 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin
// The RateLimit object will put the upper bounds on the number of
// parallel compressions and outstanding buffers.
z.writeOps = make(chan chan *zipEntry, 1000)
z.rateLimit = NewRateLimit(*parallelJobs, 0)
defer z.rateLimit.Stop()
z.cpuRateLimiter = NewCPURateLimiter(int64(*parallelJobs))
z.memoryRateLimiter = NewMemoryRateLimiter(0)
defer func() {
z.cpuRateLimiter.Stop()
z.memoryRateLimiter.Stop()
}()
go func() {
var err error
defer close(z.writeOps)
@ -369,6 +377,7 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin
currentWriter.Close()
currentWriter = nil
}
z.memoryRateLimiter.Finish(op.allocatedSize)
case futureReader, ok := <-readersChan:
if !ok {
@ -381,12 +390,10 @@ func (z *zipWriter) write(out string, pathMappings []pathMapping, manifest strin
currentReader = futureReader
case reader := <-currentReader:
var count int64
count, err = io.Copy(currentWriter, reader)
_, err = io.Copy(currentWriter, reader)
if err != nil {
return err
}
z.rateLimit.Release(int(count))
currentReader = nil
@ -456,7 +463,9 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
return err
}
exec := z.rateLimit.RequestExecution()
ze.allocatedSize = fileSize
z.cpuRateLimiter.Request()
z.memoryRateLimiter.Request(ze.allocatedSize)
if method == zip.Deflate && fileSize >= minParallelFileSize {
wg := new(sync.WaitGroup)
@ -473,14 +482,14 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
// know the result before we can begin writing the compressed
// data out to the zipfile.
wg.Add(1)
go z.crcFile(r, ze, exec, compressChan, wg)
go z.crcFile(r, ze, compressChan, wg)
for start := int64(0); start < fileSize; start += parallelBlockSize {
sr := io.NewSectionReader(r, start, parallelBlockSize)
resultChan := make(chan io.Reader, 1)
ze.futureReaders <- resultChan
exec := z.rateLimit.RequestExecution()
z.cpuRateLimiter.Request()
last := !(start+parallelBlockSize < fileSize)
var dict []byte
@ -489,7 +498,7 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
}
wg.Add(1)
go z.compressPartialFile(sr, dict, last, exec, resultChan, wg)
go z.compressPartialFile(sr, dict, last, resultChan, wg)
}
close(ze.futureReaders)
@ -500,15 +509,15 @@ func (z *zipWriter) writeFile(dest, src string, method uint16) error {
f.Close()
}(wg, r)
} else {
go z.compressWholeFile(ze, r, exec, compressChan)
go z.compressWholeFile(ze, r, compressChan)
}
return nil
}
func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultChan chan *zipEntry, wg *sync.WaitGroup) {
func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, resultChan chan *zipEntry, wg *sync.WaitGroup) {
defer wg.Done()
defer exec.Finish(0)
defer z.cpuRateLimiter.Finish()
crc := crc32.NewIEEE()
_, err := io.Copy(crc, r)
@ -522,7 +531,7 @@ func (z *zipWriter) crcFile(r io.Reader, ze *zipEntry, exec Execution, resultCha
close(resultChan)
}
func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exec Execution, resultChan chan io.Reader, wg *sync.WaitGroup) {
func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, resultChan chan io.Reader, wg *sync.WaitGroup) {
defer wg.Done()
result, err := z.compressBlock(r, dict, last)
@ -531,7 +540,8 @@ func (z *zipWriter) compressPartialFile(r io.Reader, dict []byte, last bool, exe
return
}
exec.Finish(result.Len())
z.cpuRateLimiter.Finish()
resultChan <- result
}
@ -569,9 +579,7 @@ func (z *zipWriter) compressBlock(r io.Reader, dict []byte, last bool) (*bytes.B
return buf, nil
}
func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution, compressChan chan *zipEntry) {
var bufSize int
func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, compressChan chan *zipEntry) {
defer r.Close()
crc := crc32.NewIEEE()
@ -616,7 +624,6 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution,
}
if uint64(compressed.Len()) < ze.fh.UncompressedSize64 {
futureReader <- compressed
bufSize = compressed.Len()
} else {
buf, err := readFile(r)
if err != nil {
@ -625,7 +632,6 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution,
}
ze.fh.Method = zip.Store
futureReader <- bytes.NewReader(buf)
bufSize = int(ze.fh.UncompressedSize64)
}
} else {
buf, err := readFile(r)
@ -635,10 +641,10 @@ func (z *zipWriter) compressWholeFile(ze *zipEntry, r *os.File, exec Execution,
}
ze.fh.Method = zip.Store
futureReader <- bytes.NewReader(buf)
bufSize = int(ze.fh.UncompressedSize64)
}
exec.Finish(bufSize)
z.cpuRateLimiter.Finish()
close(futureReader)
compressChan <- ze
@ -706,11 +712,6 @@ func (z *zipWriter) writeSymlink(rel, file string) error {
futureReader <- bytes.NewBufferString(dest)
close(futureReader)
// We didn't ask permission to execute, since this should be very short
// but we still need to increment the outstanding buffer sizes, since
// the read will decrement the buffer size.
z.rateLimit.Release(-len(dest))
ze <- &zipEntry{
fh: fileHeader,
futureReaders: futureReaders,