Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamic limits #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/berater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ module Berater

class Overloaded < StandardError; end

attr_accessor :redis
attr_accessor :redis, :dynamic_limits

def configure
yield self
end

def reset
@redis = nil
@redis = @dynamic_limits = nil
end

def new(key, capacity, interval = nil, **opts)
Expand All @@ -37,6 +37,10 @@ def new(key, capacity, interval = nil, **opts)
end
end

def load_limits(key, redis: Berater.redis)
Berater::Limiter.load_limits(key, redis: redis)
end

def expunge
redis.scan_each(match: "#{self.name}*") do |key|
redis.del key
Expand Down
19 changes: 17 additions & 2 deletions lib/berater/concurrency_limiter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,25 @@ def initialize(key, capacity, **opts)
LUA_SCRIPT = Berater::LuaScript(<<~LUA
local key = KEYS[1]
local lock_key = KEYS[2]
local conf_key = KEYS[3]
local capacity = tonumber(ARGV[1])
local ts = tonumber(ARGV[2])
local ttl = tonumber(ARGV[3])
local cost = tonumber(ARGV[4])
local lock_ids = {}

if conf_key then
local config = redis.call('GET', conf_key)

if config then
-- use dynamic capacity limit
capacity = tonumber(config)

-- reset ttl for a week
redis.call('EXPIRE', conf_key, 604800)
end
end

-- purge stale hosts
if ttl > 0 then
redis.call('ZREMRANGEBYSCORE', key, '-inf', ts - ttl)
Expand Down Expand Up @@ -64,15 +77,17 @@ def initialize(key, capacity, **opts)
)

def limit(capacity: nil, cost: 1, &block)
limit_key = if capacity.nil? && dynamic_limits
config_key
end
capacity ||= @capacity
# cost is Integer >= 0

# timestamp in microseconds
ts = (Time.now.to_f * 10**6).to_i

count, *lock_ids = LUA_SCRIPT.eval(
redis,
[ cache_key(key), cache_key('lock_id') ],
[ cache_key(key), cache_key('lock_id'), limit_key ],
[ capacity, ts, @timeout_usec, cost ]
)

Expand Down
38 changes: 36 additions & 2 deletions lib/berater/limiter.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
module Berater
class Limiter

CONF_TTL = 60 * 60 * 24 * 7 # 1 week

attr_reader :key, :capacity, :options

def redis
options[:redis] || Berater.redis
end

def dynamic_limits
options.fetch(:dynamic_limits, Berater.dynamic_limits) || false
end

def limit
raise NotImplementedError
end
Expand All @@ -19,11 +25,27 @@ def to_s
"#<#{self.class}>"
end

def save_limits
limit = [ capacity, *@args ].map(&:to_s).join(':')
redis.setex(config_key, CONF_TTL, limit)
end

def self.load_limits(key, redis: Berater.redis)
res = redis.get(config_key(key))
case res
when "Infinity"
[ Float::INFINITY ]
when String
res.split(':').map(&:to_i)
end
end

protected

def initialize(key, capacity, **opts)
def initialize(key, capacity, *args, **opts)
@key = key
self.capacity = capacity
@args = args
@options = opts
end

Expand All @@ -38,7 +60,19 @@ def capacity=(capacity)
end

def cache_key(key)
"#{self.class}:#{key}"
self.class.cache_key(key)
end

def self.cache_key(key)
"Berater:#{key}"
end

def config_key
self.class.config_key(key)
end

def self.config_key(key)
cache_key("#{key}-conf")
end

def yield_lock(lock, &block)
Expand Down
25 changes: 22 additions & 3 deletions lib/berater/rate_limiter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ class Overrated < Overloaded; end
attr_accessor :interval

def initialize(key, capacity, interval, **opts)
super(key, capacity, **opts)

self.interval = interval

super(key, capacity, @interval_usec, **opts)
end

private def interval=(interval)
Expand All @@ -19,12 +19,28 @@ def initialize(key, capacity, interval, **opts)
LUA_SCRIPT = Berater::LuaScript(<<~LUA
local key = KEYS[1]
local ts_key = KEYS[2]
local conf_key = KEYS[3]
local ts = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local interval_usec = tonumber(ARGV[3])
local cost = tonumber(ARGV[4])
local count = 0
local allowed

if conf_key then
local config = redis.call('GET', conf_key)

if config then
-- use dynamic capacity limit
capacity, interval_usec = string.match(config, "(%d+):(%d+)")
capacity = tonumber(capacity)
interval_usec = tonumber(interval_usec)

-- reset ttl for a week
redis.call('EXPIRE', conf_key, 604800)
end
end

local usec_per_drip = interval_usec / capacity

-- timestamp of last update
Expand Down Expand Up @@ -61,14 +77,17 @@ def initialize(key, capacity, interval, **opts)
)

def limit(capacity: nil, cost: 1, &block)
limit_key = if capacity.nil? && dynamic_limits
config_key
end
capacity ||= @capacity

# timestamp in microseconds
ts = (Time.now.to_f * 10**6).to_i

count, allowed = LUA_SCRIPT.eval(
redis,
[ cache_key(key), cache_key("#{key}-ts") ],
[ cache_key(key), cache_key("#{key}-ts"), limit_key ],
[ ts, capacity, @interval_usec, cost ]
)

Expand Down
16 changes: 16 additions & 0 deletions spec/benchmark.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,34 @@

COUNT = 10_000

Berater.expunge

Benchmark.bmbm(30) do |x|
x.report('RateLimiter') do
COUNT.times do |i|
Berater(:key, COUNT, :second) { i }
end
end

x.report('RateLimiter(dynamic_limits: true)') do
Berater.new(:key, COUNT * 2, :second).save_limits
COUNT.times do |i|
Berater(:key, COUNT, :second, dynamic_limits: true) { i }
end
end

x.report('ConcurrencyLimiter') do
COUNT.times do |i|
Berater(:key, COUNT) { i }
end
end

x.report('ConcurrencyLimiter(dynamic_limits: true)') do
Berater.new(:key, COUNT * 2).save_limits
COUNT.times do |i|
Berater(:key, COUNT, dynamic_limits: true) { i }
end
end
end

Berater.expunge
84 changes: 84 additions & 0 deletions spec/concurrency_limiter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,90 @@ def expect_bad_capacity(capacity)
end
end

describe 'dynamic_limits' do
let(:limiter) { described_class.new(:key, 1) }

describe '#save_limits' do
it 'saves to redis' do
expect(limiter.redis).to receive(:setex)
expect(limiter).to receive(:config_key)
limiter.save_limits
end
end

describe '.load_limits' do
it 'loads from redis' do
limiter.save_limits
capacity = Berater.load_limits(limiter.key)[0]
expect(capacity).to eq limiter.capacity
end
end

it 'is disabled by default' do
expect(limiter.dynamic_limits).to be false
expect(limiter).not_to receive(:config_key)
limiter.limit
end

it 'can be enabled per instance' do
limiter = described_class.new(:key, 1, dynamic_limits: true)
expect(limiter).to receive(:config_key)
limiter.limit
end

context 'with dynamic_limits enabled' do
before do
Berater.configure do |c|
c.dynamic_limits = true
end

limiter.save_limits
end

it 'is enabled' do
expect(limiter).to receive(:config_key)
limiter.limit
end

it 'overrides instance limit' do
limiter.limit
expect { limiter }.to be_incapacitated

limiter_two = described_class.new(:key, 2)
expect { limiter_two }.to be_incapacitated
end

it 'yields to limit parameter' do
limiter.limit
expect { limiter }.to be_incapacitated

limiter.limit(capacity: 2)
end

describe '#overloaded?' do
let(:limiter) { described_class.new(:key, 2) }

before do
2.times { limiter.limit }
end

it 'respects limit' do
expect(limiter).to be_incapacitated
end

it 'respects saved limit override' do
expect(
described_class.new(:key, 1)
).to be_incapacitated

expect(
described_class.new(:key, 3)
).to be_incapacitated
end
end
end
end

describe '#to_s' do
def check(capacity, expected)
expect(
Expand Down
Loading