From 1155da1df60e19b18c609f9cd3337005c7fa6f0a Mon Sep 17 00:00:00 2001 From: Daniel Pepper Date: Tue, 2 Mar 2021 15:54:37 -0800 Subject: [PATCH] dynamic limits --- lib/berater.rb | 8 ++- lib/berater/concurrency_limiter.rb | 19 ++++++- lib/berater/limiter.rb | 38 ++++++++++++- lib/berater/rate_limiter.rb | 25 ++++++++- spec/benchmark.rb | 16 ++++++ spec/concurrency_limiter_spec.rb | 84 ++++++++++++++++++++++++++++ spec/rate_limiter_spec.rb | 89 +++++++++++++++++++++++++++++- 7 files changed, 269 insertions(+), 10 deletions(-) diff --git a/lib/berater.rb b/lib/berater.rb index 7534156..2896e50 100644 --- a/lib/berater.rb +++ b/lib/berater.rb @@ -9,14 +9,14 @@ module Berater class Overloaded < StandardError; end - attr_accessor :redis + attr_accessor :redis, :dynamic_limits def configure yield self end def reset - @redis = nil + @redis = @dynamic_limits = nil end def new(key, capacity, interval = nil, **opts) @@ -37,6 +37,10 @@ def new(key, capacity, interval = nil, **opts) end end + def load_limits(key, redis: Berater.redis) + Berater::Limiter.load_limits(key, redis: redis) + end + def expunge redis.scan_each(match: "#{self.name}*") do |key| redis.del key diff --git a/lib/berater/concurrency_limiter.rb b/lib/berater/concurrency_limiter.rb index 813981d..bbd8703 100644 --- a/lib/berater/concurrency_limiter.rb +++ b/lib/berater/concurrency_limiter.rb @@ -20,12 +20,25 @@ def initialize(key, capacity, **opts) LUA_SCRIPT = Berater::LuaScript(<<~LUA local key = KEYS[1] local lock_key = KEYS[2] + local conf_key = KEYS[3] local capacity = tonumber(ARGV[1]) local ts = tonumber(ARGV[2]) local ttl = tonumber(ARGV[3]) local cost = tonumber(ARGV[4]) local lock_ids = {} + if conf_key then + local config = redis.call('GET', conf_key) + + if config then + -- use dynamic capacity limit + capacity = tonumber(config) + + -- reset ttl for a week + redis.call('EXPIRE', conf_key, 604800) + end + end + -- purge stale hosts if ttl > 0 then redis.call('ZREMRANGEBYSCORE', key, '-inf', ts - ttl) @@ -64,15 +77,17 @@ def initialize(key, capacity, **opts) ) def limit(capacity: nil, cost: 1, &block) + limit_key = if capacity.nil? && dynamic_limits + config_key + end capacity ||= @capacity - # cost is Integer >= 0 # timestamp in microseconds ts = (Time.now.to_f * 10**6).to_i count, *lock_ids = LUA_SCRIPT.eval( redis, - [ cache_key(key), cache_key('lock_id') ], + [ cache_key(key), cache_key('lock_id'), limit_key ], [ capacity, ts, @timeout_usec, cost ] ) diff --git a/lib/berater/limiter.rb b/lib/berater/limiter.rb index 9d7aaab..e7f3cf7 100644 --- a/lib/berater/limiter.rb +++ b/lib/berater/limiter.rb @@ -1,12 +1,18 @@ module Berater class Limiter + CONF_TTL = 60 * 60 * 24 * 7 # 1 week + attr_reader :key, :capacity, :options def redis options[:redis] || Berater.redis end + def dynamic_limits + options.fetch(:dynamic_limits, Berater.dynamic_limits) || false + end + def limit raise NotImplementedError end @@ -19,11 +25,27 @@ def to_s "#<#{self.class}>" end + def save_limits + limit = [ capacity, *@args ].map(&:to_s).join(':') + redis.setex(config_key, CONF_TTL, limit) + end + + def self.load_limits(key, redis: Berater.redis) + res = redis.get(config_key(key)) + case res + when "Infinity" + [ Float::INFINITY ] + when String + res.split(':').map(&:to_i) + end + end + protected - def initialize(key, capacity, **opts) + def initialize(key, capacity, *args, **opts) @key = key self.capacity = capacity + @args = args @options = opts end @@ -38,7 +60,19 @@ def capacity=(capacity) end def cache_key(key) - "#{self.class}:#{key}" + self.class.cache_key(key) + end + + def self.cache_key(key) + "Berater:#{key}" + end + + def config_key + self.class.config_key(key) + end + + def self.config_key(key) + cache_key("#{key}-conf") end def yield_lock(lock, &block) diff --git a/lib/berater/rate_limiter.rb b/lib/berater/rate_limiter.rb index 0580c9a..57f2eec 100644 --- a/lib/berater/rate_limiter.rb +++ b/lib/berater/rate_limiter.rb @@ -6,9 +6,9 @@ class Overrated < Overloaded; end attr_accessor :interval def initialize(key, capacity, interval, **opts) - super(key, capacity, **opts) - self.interval = interval + + super(key, capacity, @interval_usec, **opts) end private def interval=(interval) @@ -19,12 +19,28 @@ def initialize(key, capacity, interval, **opts) LUA_SCRIPT = Berater::LuaScript(<<~LUA local key = KEYS[1] local ts_key = KEYS[2] + local conf_key = KEYS[3] local ts = tonumber(ARGV[1]) local capacity = tonumber(ARGV[2]) local interval_usec = tonumber(ARGV[3]) local cost = tonumber(ARGV[4]) local count = 0 local allowed + + if conf_key then + local config = redis.call('GET', conf_key) + + if config then + -- use dynamic capacity limit + capacity, interval_usec = string.match(config, "(%d+):(%d+)") + capacity = tonumber(capacity) + interval_usec = tonumber(interval_usec) + + -- reset ttl for a week + redis.call('EXPIRE', conf_key, 604800) + end + end + local usec_per_drip = interval_usec / capacity -- timestamp of last update @@ -61,6 +77,9 @@ def initialize(key, capacity, interval, **opts) ) def limit(capacity: nil, cost: 1, &block) + limit_key = if capacity.nil? && dynamic_limits + config_key + end capacity ||= @capacity # timestamp in microseconds @@ -68,7 +87,7 @@ def limit(capacity: nil, cost: 1, &block) count, allowed = LUA_SCRIPT.eval( redis, - [ cache_key(key), cache_key("#{key}-ts") ], + [ cache_key(key), cache_key("#{key}-ts"), limit_key ], [ ts, capacity, @interval_usec, cost ] ) diff --git a/spec/benchmark.rb b/spec/benchmark.rb index f0bb746..a74d1a6 100644 --- a/spec/benchmark.rb +++ b/spec/benchmark.rb @@ -8,6 +8,8 @@ COUNT = 10_000 +Berater.expunge + Benchmark.bmbm(30) do |x| x.report('RateLimiter') do COUNT.times do |i| @@ -15,11 +17,25 @@ end end + x.report('RateLimiter(dynamic_limits: true)') do + Berater.new(:key, COUNT * 2, :second).save_limits + COUNT.times do |i| + Berater(:key, COUNT, :second, dynamic_limits: true) { i } + end + end + x.report('ConcurrencyLimiter') do COUNT.times do |i| Berater(:key, COUNT) { i } end end + + x.report('ConcurrencyLimiter(dynamic_limits: true)') do + Berater.new(:key, COUNT * 2).save_limits + COUNT.times do |i| + Berater(:key, COUNT, dynamic_limits: true) { i } + end + end end Berater.expunge diff --git a/spec/concurrency_limiter_spec.rb b/spec/concurrency_limiter_spec.rb index 925893e..2770ed7 100644 --- a/spec/concurrency_limiter_spec.rb +++ b/spec/concurrency_limiter_spec.rb @@ -237,6 +237,90 @@ def expect_bad_capacity(capacity) end end + describe 'dynamic_limits' do + let(:limiter) { described_class.new(:key, 1) } + + describe '#save_limits' do + it 'saves to redis' do + expect(limiter.redis).to receive(:setex) + expect(limiter).to receive(:config_key) + limiter.save_limits + end + end + + describe '.load_limits' do + it 'loads from redis' do + limiter.save_limits + capacity = Berater.load_limits(limiter.key)[0] + expect(capacity).to eq limiter.capacity + end + end + + it 'is disabled by default' do + expect(limiter.dynamic_limits).to be false + expect(limiter).not_to receive(:config_key) + limiter.limit + end + + it 'can be enabled per instance' do + limiter = described_class.new(:key, 1, dynamic_limits: true) + expect(limiter).to receive(:config_key) + limiter.limit + end + + context 'with dynamic_limits enabled' do + before do + Berater.configure do |c| + c.dynamic_limits = true + end + + limiter.save_limits + end + + it 'is enabled' do + expect(limiter).to receive(:config_key) + limiter.limit + end + + it 'overrides instance limit' do + limiter.limit + expect { limiter }.to be_incapacitated + + limiter_two = described_class.new(:key, 2) + expect { limiter_two }.to be_incapacitated + end + + it 'yields to limit parameter' do + limiter.limit + expect { limiter }.to be_incapacitated + + limiter.limit(capacity: 2) + end + + describe '#overloaded?' do + let(:limiter) { described_class.new(:key, 2) } + + before do + 2.times { limiter.limit } + end + + it 'respects limit' do + expect(limiter).to be_incapacitated + end + + it 'respects saved limit override' do + expect( + described_class.new(:key, 1) + ).to be_incapacitated + + expect( + described_class.new(:key, 3) + ).to be_incapacitated + end + end + end + end + describe '#to_s' do def check(capacity, expected) expect( diff --git a/spec/rate_limiter_spec.rb b/spec/rate_limiter_spec.rb index 13f71d0..29b1fe1 100644 --- a/spec/rate_limiter_spec.rb +++ b/spec/rate_limiter_spec.rb @@ -88,7 +88,7 @@ def expect_bad_capacity(capacity) expect(limiter).to be_overrated end - it 'accepts a dynamic capacity' do + it 'accepts a capacity override' do limiter = described_class.new(:key, 1, :second) expect { limiter.limit(capacity: 0) }.to be_overrated @@ -159,6 +159,93 @@ def expect_bad_capacity(capacity) end end + describe 'dynamic_limits' do + let(:limiter) { described_class.new(:key, 1, :minute) } + + describe '#save_limits' do + it 'saves to redis' do + expect(limiter.redis).to receive(:setex) + expect(limiter).to receive(:config_key) + limiter.save_limits + end + end + + describe '.load_limits' do + it 'loads from redis' do + limiter.save_limits + + capacity, interval = Berater.load_limits(limiter.key) + expect(capacity).to eq limiter.capacity + expect(interval).to eq limiter.instance_variable_get(:@interval_usec) + end + end + + it 'is disabled by default' do + expect(limiter.dynamic_limits).to be false + expect(limiter).not_to receive(:config_key) + limiter.limit + end + + it 'can be enabled per instance' do + limiter = described_class.new(:key, 1, :minute, dynamic_limits: true) + expect(limiter).to receive(:config_key) + limiter.limit + end + + context 'with dynamic_limits enabled' do + before do + Berater.configure do |c| + c.dynamic_limits = true + end + + limiter.save_limits + end + + it 'is enabled' do + expect(limiter).to receive(:config_key) + limiter.limit + end + + it 'overrides instance limit' do + limiter.limit + expect { limiter }.to be_overrated + + limiter_two = described_class.new(:key, 2, :second) + Timecop.freeze(1) + expect { limiter_two }.to be_overrated + end + + it 'yields to limit parameter' do + limiter.limit + expect { limiter }.to be_overrated + + limiter.limit(capacity: 2) + end + + describe '#overloaded?' do + let(:limiter) { described_class.new(:key, 2, :minute) } + + before do + 2.times { limiter.limit } + end + + it 'respects limit' do + expect(limiter).to be_overrated + end + + it 'respects saved limit override' do + expect( + described_class.new(:key, 1, :second) + ).to be_overrated + + expect( + described_class.new(:key, 3, :second) + ).to be_overrated + end + end + end + end + describe '#to_s' do def check(capacity, interval, expected) expect(