153 lines
3.5 KiB
Go
153 lines
3.5 KiB
Go
// Copyright 2016 Google Inc. All rights reserved.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package zip
|
|
|
|
import (
|
|
"fmt"
|
|
"runtime"
|
|
)
|
|
|
|
type RateLimit struct {
|
|
requests chan request
|
|
completions chan int64
|
|
|
|
stop chan struct{}
|
|
}
|
|
|
|
type request struct {
|
|
size int64
|
|
serviced chan struct{}
|
|
}
|
|
|
|
// NewRateLimit starts a new rate limiter that permits the usage of up to <capacity> at once,
|
|
// except when no capacity is in use, in which case the first caller is always permitted
|
|
func NewRateLimit(capacity int64) *RateLimit {
|
|
ret := &RateLimit{
|
|
requests: make(chan request),
|
|
completions: make(chan int64),
|
|
|
|
stop: make(chan struct{}),
|
|
}
|
|
|
|
go ret.monitorChannels(capacity)
|
|
|
|
return ret
|
|
}
|
|
|
|
// RequestExecution blocks until another execution of size <size> can be allowed to run.
|
|
func (r *RateLimit) Request(size int64) {
|
|
request := request{
|
|
size: size,
|
|
serviced: make(chan struct{}, 1),
|
|
}
|
|
|
|
// wait for the request to be received
|
|
r.requests <- request
|
|
|
|
// wait for the request to be accepted
|
|
<-request.serviced
|
|
}
|
|
|
|
// Finish declares the completion of an execution of size <size>
|
|
func (r *RateLimit) Finish(size int64) {
|
|
r.completions <- size
|
|
}
|
|
|
|
// Stop the background goroutine
|
|
func (r *RateLimit) Stop() {
|
|
close(r.stop)
|
|
}
|
|
|
|
// monitorChannels processes incoming requests from channels
|
|
func (r *RateLimit) monitorChannels(capacity int64) {
|
|
var usedCapacity int64
|
|
var currentRequest *request
|
|
|
|
for {
|
|
var requests chan request
|
|
if currentRequest == nil {
|
|
// If we don't already have a queued request, then we should check for a new request
|
|
requests = r.requests
|
|
}
|
|
|
|
select {
|
|
case newRequest := <-requests:
|
|
currentRequest = &newRequest
|
|
case amountCompleted := <-r.completions:
|
|
usedCapacity -= amountCompleted
|
|
|
|
if usedCapacity < 0 {
|
|
panic(fmt.Sprintf("usedCapacity < 0: %v (decreased by %v)", usedCapacity, amountCompleted))
|
|
}
|
|
case <-r.stop:
|
|
return
|
|
}
|
|
|
|
if currentRequest != nil {
|
|
accepted := false
|
|
if usedCapacity == 0 {
|
|
accepted = true
|
|
} else {
|
|
if capacity >= usedCapacity+currentRequest.size {
|
|
accepted = true
|
|
}
|
|
}
|
|
if accepted {
|
|
usedCapacity += currentRequest.size
|
|
currentRequest.serviced <- struct{}{}
|
|
currentRequest = nil
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// A CPURateLimiter limits the number of active calls based on CPU requirements
|
|
type CPURateLimiter struct {
|
|
impl *RateLimit
|
|
}
|
|
|
|
func NewCPURateLimiter(capacity int64) *CPURateLimiter {
|
|
if capacity <= 0 {
|
|
capacity = int64(runtime.NumCPU())
|
|
}
|
|
impl := NewRateLimit(capacity)
|
|
return &CPURateLimiter{impl: impl}
|
|
}
|
|
|
|
func (e CPURateLimiter) Request() {
|
|
e.impl.Request(1)
|
|
}
|
|
|
|
func (e CPURateLimiter) Finish() {
|
|
e.impl.Finish(1)
|
|
}
|
|
|
|
func (e CPURateLimiter) Stop() {
|
|
e.impl.Stop()
|
|
}
|
|
|
|
// A MemoryRateLimiter limits the number of active calls based on Memory requirements
|
|
type MemoryRateLimiter struct {
|
|
*RateLimit
|
|
}
|
|
|
|
func NewMemoryRateLimiter(capacity int64) *MemoryRateLimiter {
|
|
if capacity <= 0 {
|
|
capacity = 512 * 1024 * 1024 // 512MB
|
|
}
|
|
impl := NewRateLimit(capacity)
|
|
return &MemoryRateLimiter{RateLimit: impl}
|
|
}
|