Skip to content

Commit

Permalink
dynamic limits
Browse files Browse the repository at this point in the history
  • Loading branch information
dpep committed Mar 3, 2021
1 parent c67af90 commit bc9ab2d
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 11 deletions.
8 changes: 6 additions & 2 deletions lib/berater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ module Berater

class Overloaded < StandardError; end

attr_accessor :redis
attr_accessor :redis, :dynamic_limits

def configure
yield self
end

def reset
@redis = nil
@redis = @dynamic_limits = nil
end

def new(key, capacity, interval = nil, **opts)
Expand All @@ -37,6 +37,10 @@ def new(key, capacity, interval = nil, **opts)
end
end

def load_limits(key, redis: Berater.redis)
Berater::Limiter.load_limits(key, redis: redis)
end

def expunge
redis.scan_each(match: "#{self.name}*") do |key|
redis.del key
Expand Down
19 changes: 17 additions & 2 deletions lib/berater/concurrency_limiter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,25 @@ def initialize(key, capacity, **opts)
LUA_SCRIPT = Berater::LuaScript(<<~LUA
local key = KEYS[1]
local lock_key = KEYS[2]
local conf_key = KEYS[3]
local capacity = tonumber(ARGV[1])
local ts = tonumber(ARGV[2])
local ttl = tonumber(ARGV[3])
local cost = tonumber(ARGV[4])
local lock_ids = {}
if conf_key then
local config = redis.call('GET', conf_key)
if config then
-- use dynamic capacity limit
capacity = tonumber(config)
-- reset ttl for a week
redis.call('EXPIRE', conf_key, 604800)
end
end
-- purge stale hosts
if ttl > 0 then
redis.call('ZREMRANGEBYSCORE', key, '-inf', ts - ttl)
Expand Down Expand Up @@ -64,15 +77,17 @@ def initialize(key, capacity, **opts)
)

def limit(capacity: nil, cost: 1, &block)
limit_key = if capacity.nil? && dynamic_limits
config_key
end
capacity ||= @capacity
# cost is Integer >= 0

# timestamp in microseconds
ts = (Time.now.to_f * 10**6).to_i

count, *lock_ids = LUA_SCRIPT.eval(
redis,
[ cache_key(key), cache_key('lock_id') ],
[ cache_key(key), cache_key('lock_id'), limit_key ],
[ capacity, ts, @timeout_usec, cost ]
)

Expand Down
38 changes: 36 additions & 2 deletions lib/berater/limiter.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
module Berater
class Limiter

CONF_TTL = 60 * 60 * 24 * 7 # 1 week

attr_reader :key, :capacity, :options

def redis
options[:redis] || Berater.redis
end

def dynamic_limits
options.fetch(:dynamic_limits, Berater.dynamic_limits) || false
end

def limit
raise NotImplementedError
end
Expand All @@ -19,11 +25,27 @@ def to_s
"#<#{self.class}>"
end

def save_limits
limit = [ capacity, *@args ].map(&:to_s).join(':')
redis.setex(config_key, CONF_TTL, limit)
end

def self.load_limits(key, redis: Berater.redis)
res = redis.get(config_key(key))
case res
when "Infinity"
[ Float::INFINITY ]
when String
res.split(':').map(&:to_i)
end
end

protected

def initialize(key, capacity, **opts)
def initialize(key, capacity, *args, **opts)
@key = key
self.capacity = capacity
@args = args
@options = opts
end

Expand All @@ -38,7 +60,19 @@ def capacity=(capacity)
end

def cache_key(key)
"#{self.class}:#{key}"
self.class.cache_key(key)
end

def self.cache_key(key)
"Berater:#{key}"
end

def config_key
self.class.config_key(key)
end

def self.config_key(key)
cache_key("#{key}-conf")
end

def yield_lock(lock, &block)
Expand Down
25 changes: 22 additions & 3 deletions lib/berater/rate_limiter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ class Overrated < Overloaded; end
attr_accessor :interval

def initialize(key, capacity, interval, **opts)
super(key, capacity, **opts)

self.interval = interval

super(key, capacity, @interval_usec, **opts)
end

private def interval=(interval)
Expand All @@ -19,12 +19,28 @@ def initialize(key, capacity, interval, **opts)
LUA_SCRIPT = Berater::LuaScript(<<~LUA
local key = KEYS[1]
local ts_key = KEYS[2]
local conf_key = KEYS[3]
local ts = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local interval_usec = tonumber(ARGV[3])
local cost = tonumber(ARGV[4])
local count = 0
local allowed
if conf_key then
local config = redis.call('GET', conf_key)
if config then
-- use dynamic capacity limit
capacity, interval_usec = string.match(config, "(%d+):(%d+)")
capacity = tonumber(capacity)
interval_usec = tonumber(interval_usec)
-- reset ttl for a week
redis.call('EXPIRE', conf_key, 604800)
end
end
local usec_per_drip = interval_usec / capacity
-- timestamp of last update
Expand Down Expand Up @@ -61,14 +77,17 @@ def initialize(key, capacity, interval, **opts)
)

def limit(capacity: nil, cost: 1, &block)
limit_key = if capacity.nil? && dynamic_limits
config_key
end
capacity ||= @capacity

# timestamp in microseconds
ts = (Time.now.to_f * 10**6).to_i

count, allowed = LUA_SCRIPT.eval(
redis,
[ cache_key(key), cache_key("#{key}-ts") ],
[ cache_key(key), cache_key("#{key}-ts"), limit_key ],
[ ts, capacity, @interval_usec, cost ]
)

Expand Down
16 changes: 16 additions & 0 deletions spec/benchmark.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,34 @@

COUNT = 10_000

Berater.expunge

Benchmark.bmbm(30) do |x|
x.report('RateLimiter') do
COUNT.times do |i|
Berater(:key, COUNT, :second) { i }
end
end

x.report('RateLimiter(dynamic_limits: true)') do
Berater.new(:key, COUNT * 2, :second).save_limits
COUNT.times do |i|
Berater(:key, COUNT, :second, dynamic_limits: true) { i }
end
end

x.report('ConcurrencyLimiter') do
COUNT.times do |i|
Berater(:key, COUNT) { i }
end
end

x.report('ConcurrencyLimiter(dynamic_limits: true)') do
Berater.new(:key, COUNT * 2).save_limits
COUNT.times do |i|
Berater(:key, COUNT, dynamic_limits: true) { i }
end
end
end

Berater.expunge
84 changes: 84 additions & 0 deletions spec/concurrency_limiter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,90 @@ def expect_bad_capacity(capacity)
end
end

describe 'dynamic_limits' do
let(:limiter) { described_class.new(:key, 1) }

describe '#save_limits' do
it 'saves to redis' do
expect(limiter.redis).to receive(:setex)
expect(limiter).to receive(:config_key)
limiter.save_limits
end
end

describe '.load_limits' do
it 'loads from redis' do
limiter.save_limits
capacity = Berater.load_limits(limiter.key)[0]
expect(capacity).to eq limiter.capacity
end
end

it 'is disabled by default' do
expect(limiter.dynamic_limits).to be false
expect(limiter).not_to receive(:config_key)
limiter.limit
end

it 'can be enabled per instance' do
limiter = described_class.new(:key, 1, dynamic_limits: true)
expect(limiter).to receive(:config_key)
limiter.limit
end

context 'with dynamic_limits enabled' do
before do
Berater.configure do |c|
c.dynamic_limits = true
end

limiter.save_limits
end

it 'is enabled' do
expect(limiter).to receive(:config_key)
limiter.limit
end

it 'overrides instance limit' do
limiter.limit
expect { limiter }.to be_incapacitated

limiter_two = described_class.new(:key, 2)
expect { limiter_two }.to be_incapacitated
end

it 'yields to limit parameter' do
limiter.limit
expect { limiter }.to be_incapacitated

limiter.limit(capacity: 2)
end

describe '#overloaded?' do
let(:limiter) { described_class.new(:key, 2) }

before do
2.times { limiter.limit }
end

it 'respects limit' do
expect(limiter).to be_incapacitated
end

it 'respects saved limit override' do
expect(
described_class.new(:key, 1)
).to be_incapacitated

expect(
described_class.new(:key, 3)
).to be_incapacitated
end
end
end
end

describe '#to_s' do
def check(capacity, expected)
expect(
Expand Down
Loading

0 comments on commit bc9ab2d

Please sign in to comment.