Skip to content

Commit

Permalink
Support origin limit (#48)
Browse files Browse the repository at this point in the history
* Refactor entry and exit interface

* Support tag limit in flow rule

* Sharp select strategy of statistics node in flow rule controller

* Fix reference problem by incorrect use of auto

* Refactor some code in tag flow control module
  • Loading branch information
chenneal committed May 10, 2024
1 parent f2a9e21 commit 36871db
Show file tree
Hide file tree
Showing 37 changed files with 246 additions and 188 deletions.
2 changes: 1 addition & 1 deletion sentinel-core/circuitbreaker/rule_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ CircuitBreakerMap RulePropertyListener::BuildCircuitBreakerMap(
if (it == m.end()) {
m.insert({rule.resource(), {cb}});
} else {
auto vec = it->second;
auto& vec = it->second;
vec.push_back(std::move(cb));
}
}
Expand Down
18 changes: 9 additions & 9 deletions sentinel-core/circuitbreaker/slot.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "sentinel-core/circuitbreaker/slot.h"

#include "sentinel-core/circuitbreaker/rule_manager.h"
#include "sentinel-core/utils/time_utils.h"

Expand All @@ -8,10 +9,10 @@ namespace CircuitBreaker {
// CheckerSlot

Sentinel::Slot::TokenResultSharedPtr CheckerSlot::Entry(
const EntrySharedPtr& entry, const ResourceWrapperSharedPtr& resource,
Stat::NodeSharedPtr& node, int count, int flag,
const EntrySharedPtr& entry, Stat::NodeSharedPtr& node, int count, int flag,
const std::vector<absl::any>& params) {
auto cbs = RuleManager::GetInstance().GetCircuitBreakers(resource->name());
auto cbs =
RuleManager::GetInstance().GetCircuitBreakers(entry->resource()->name());
if (cbs.empty()) {
return Sentinel::Slot::TokenResult::Ok();
}
Expand All @@ -23,14 +24,13 @@ Sentinel::Slot::TokenResultSharedPtr CheckerSlot::Entry(
return Sentinel::Slot::TokenResult::Ok();
}

void CheckerSlot::Exit(const EntrySharedPtr& entry,
const ResourceWrapperSharedPtr& resource, int count,
void CheckerSlot::Exit(const EntrySharedPtr& entry, int count,
const std::vector<absl::any>& params) {}

// CompleteStatSlot

Sentinel::Slot::TokenResultSharedPtr CompleteStatSlot::Entry(
const EntrySharedPtr& entry, const ResourceWrapperSharedPtr& resource,
const EntrySharedPtr& entry,
/*const*/ Stat::NodeSharedPtr& node, int count, int flag,
const std::vector<absl::any>& params) {
if (entry == nullptr || entry->context() == nullptr) {
Expand All @@ -43,13 +43,13 @@ Sentinel::Slot::TokenResultSharedPtr CompleteStatSlot::Entry(
return prev_result;
}

void CompleteStatSlot::Exit(const EntrySharedPtr& entry,
const ResourceWrapperSharedPtr& resource, int count,
void CompleteStatSlot::Exit(const EntrySharedPtr& entry, int count,
const std::vector<absl::any>& params) {
if (entry == nullptr || entry->HasBlockError()) {
return;
}
auto cbs = RuleManager::GetInstance().GetCircuitBreakers(resource->name());
auto cbs =
RuleManager::GetInstance().GetCircuitBreakers(entry->resource()->name());
if (cbs.empty()) {
return;
}
Expand Down
13 changes: 5 additions & 8 deletions sentinel-core/circuitbreaker/slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ class CheckerSlot : public Slot::RuleCheckerSlot {
virtual ~CheckerSlot() = default;

Sentinel::Slot::TokenResultSharedPtr Entry(
const EntrySharedPtr& entry, const ResourceWrapperSharedPtr& resource,
Stat::NodeSharedPtr& node, int count, int flag,
const std::vector<absl::any>& params) override;
void Exit(const EntrySharedPtr& entry,
const ResourceWrapperSharedPtr& resource, int count,
const EntrySharedPtr& entry, Stat::NodeSharedPtr& node, int count,
int flag, const std::vector<absl::any>& params) override;
void Exit(const EntrySharedPtr& entry, int count,
const std::vector<absl::any>& params) override;
const std::string& Name() const override { return name_; };

Expand All @@ -34,11 +32,10 @@ class CompleteStatSlot : public Slot::StatsSlot {

const std::string& Name() const override { return name_; };
Sentinel::Slot::TokenResultSharedPtr Entry(
const EntrySharedPtr& entry, const ResourceWrapperSharedPtr& resource,
const EntrySharedPtr& entry,
/*const*/ Stat::NodeSharedPtr& node, int count, int flag,
const std::vector<absl::any>& params) override;
void Exit(const EntrySharedPtr& entry,
const ResourceWrapperSharedPtr& resource, int count,
void Exit(const EntrySharedPtr& entry, int count,
const std::vector<absl::any>& params) override;

private:
Expand Down
18 changes: 9 additions & 9 deletions sentinel-core/circuitbreaker/slot_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ Sentinel::Slot::TokenResultSharedPtr Entry_And_Exit(
const EntrySharedPtr& entry, const ResourceWrapperSharedPtr& resource,
Stat::NodeSharedPtr& node, int count, int flag,
const std::vector<absl::any>& params) {
auto result = slot_checker.Entry(entry, resource, node, count, flag, params);
auto result = slot_checker.Entry(entry, node, count, flag, params);
EXPECT_EQ(Sentinel::Slot::TokenStatus::RESULT_STATUS_OK, result->status());
slot_complete.Entry(entry, resource, node, count, flag, params);
slot_complete.Exit(entry, resource, count, params);
slot_complete.Entry(entry, node, count, flag, params);
slot_complete.Exit(entry, count, params);

return result;
}
Expand Down Expand Up @@ -78,13 +78,13 @@ TEST(CircuitBreakerSlotTest, CircuitBreakerErrorRatioTest) {
sleep(1);

// Switch state to kHalfOpen
auto result = slot_checker.Entry(entry, resource, node, 1, 0, myParams);
auto result = slot_checker.Entry(entry, node, 1, 0, myParams);
EXPECT_EQ(Sentinel::Slot::TokenStatus::RESULT_STATUS_OK, result->status());
EXPECT_EQ(cbs[0]->CurrentState(), State::kHalfOpen);

// Switch state to kClosed
slot_complete.Entry(entry, resource, node, 1, 0, myParams);
slot_complete.Exit(entry, resource, 1, myParams);
slot_complete.Entry(entry, node, 1, 0, myParams);
slot_complete.Exit(entry, 1, myParams);
EXPECT_EQ(cbs[0]->CurrentState(), State::kClosed);

m.LoadRules({});
Expand Down Expand Up @@ -141,13 +141,13 @@ TEST(CircuitBreakerSlotTest, CircuitBreakerSlowRatioTest) {
sleep(1);

// Switch state to kHalfOpen
auto result = slot_checker.Entry(entry, resource, node, 1, 0, myParams);
auto result = slot_checker.Entry(entry, node, 1, 0, myParams);
EXPECT_EQ(Sentinel::Slot::TokenStatus::RESULT_STATUS_OK, result->status());
EXPECT_EQ(cbs[0]->CurrentState(), State::kHalfOpen);

// Switch state to kClosed
slot_complete.Entry(entry, resource, node, 1, 0, myParams);
slot_complete.Exit(entry, resource, 1, myParams);
slot_complete.Entry(entry, node, 1, 0, myParams);
slot_complete.Exit(entry, 1, myParams);
EXPECT_EQ(cbs[0]->CurrentState(), State::kClosed);

m.LoadRules({});
Expand Down
3 changes: 2 additions & 1 deletion sentinel-core/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ static constexpr const char* kLimitOriginOther = "other";
static constexpr const char* kDefaultContextName = "sentinel_default_context";

static constexpr int kMaxAllowedRt = 4900;
static constexpr int kMaxResourceSize = 6000;
static constexpr int kMaxResourceSize = 10000;
static constexpr int kMaxTagSize = 1000;

}; // namespace Constants
} // namespace Sentinel
3 changes: 0 additions & 3 deletions sentinel-core/common/entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class Entry {
std::chrono::milliseconds create_time() const { return create_time_; }
EntryContextSharedPtr context() const { return context_; }
Stat::NodeSharedPtr cur_node() const { return cur_node_; }
Stat::NodeSharedPtr origin_node() const { return origin_node_; }
int64_t rt() const { return rt_; }
std::vector<absl::any> params() const { return params_; }
bool exited() const { return exited_; }
Expand All @@ -39,7 +38,6 @@ class Entry {
void set_error(const std::string& message) { error_ = message; }
void set_block_error(const std::string& message) { block_error_ = message; }
void set_cur_node(const Stat::NodeSharedPtr& node) { cur_node_ = node; }
void set_origin_node(const Stat::NodeSharedPtr& node) { origin_node_ = node; }
void set_params(const std::vector<absl::any>&& params) { params_ = params; }

private:
Expand All @@ -52,7 +50,6 @@ class Entry {
std::string error_{};
std::string block_error_{};
Stat::NodeSharedPtr cur_node_;
Stat::NodeSharedPtr origin_node_;
std::vector<absl::any> params_;
};

Expand Down
12 changes: 8 additions & 4 deletions sentinel-core/common/entry_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ namespace Sentinel {
class EntryContext {
public:
explicit EntryContext(const std::string& name) : EntryContext(name, "") {}
EntryContext(const std::string& name, const std::string& origin)
: name_(name), origin_(origin) {}
EntryContext(const std::string& name, const std::string& tag)
: name_(name), tag_(tag) {}

const std::string& name() const { return name_; };
const std::string& origin() const { return origin_; };
const std::string& tag() const { return tag_; };
Stat::NodeSharedPtr tag_node() { return tag_node_; }

const Slot::TokenResultSharedPtr& last_token_result() const {
return last_token_result_;
Expand All @@ -26,10 +27,13 @@ class EntryContext {
void set_last_token_result(const Slot::TokenResultSharedPtr& r) {
last_token_result_ = r;
}
void set_tag_node(Stat::NodeSharedPtr& node) { tag_node_ = node; }

private:
const std::string name_;
const std::string origin_;
// Maybe multiple tags in the future, using vector instead of string.
const std::string tag_;
Stat::NodeSharedPtr tag_node_;

Slot::TokenResultSharedPtr last_token_result_;
};
Expand Down
2 changes: 1 addition & 1 deletion sentinel-core/common/entry_result.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ bool EntryResult::Exit(int count) {
Slot::SlotChainSharedPtr chain = Slot::GetGlobalSlotChain();
if (chain != nullptr) {
// NOTE: keep consistent with exit operation in SphU::Entry when blocked.
chain->Exit(entry_, entry_->resource(), count, params);
chain->Exit(entry_, count, params);
}
entry_->exited_ = true;
return true;
Expand Down
45 changes: 43 additions & 2 deletions sentinel-core/flow/flow_rule_checker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Slot::TokenResultSharedPtr FlowRuleChecker::PassLocalCheck(
const FlowRule& rule, const EntrySharedPtr& entry,
const Stat::NodeSharedPtr& node, int count, int flag) {
Stat::NodeSharedPtr selected_node =
SelectNodeByRelStrategy(rule, entry, node);
selectNodeByRequesterAndStrategy(rule, entry, node);
if (selected_node == nullptr) {
return Slot::TokenResult::Ok();
}
Expand All @@ -40,8 +40,43 @@ Slot::TokenResultSharedPtr FlowRuleChecker::PassLocalCheck(
return controller->CanPass(selected_node, count, flag);
}

Stat::NodeSharedPtr FlowRuleChecker::selectNodeByRequesterAndStrategy(
const FlowRule& rule, const EntrySharedPtr& entry,
const Stat::NodeSharedPtr& node) {
FlowRuleManager& m = FlowRuleManager::GetInstance();
std::string tag = entry->context()->tag();
std::string limit_origin = rule.limit_origin();
FlowRelationStrategy strategy = rule.strategy();
Stat::NodeSharedPtr tag_node = entry->context()->tag_node();

if ((tag == limit_origin) && IsValidTag(tag)) {
if (strategy == FlowRelationStrategy::kDirect) {
// When tag matches, return tag node.
return tag_node;
}
return SelectNodeByRelStrategy(rule, entry, node);
} else if (limit_origin == Constants::kLimitOriginDefault) {
if (strategy == FlowRelationStrategy::kDirect) {
// When rule contains default tag, which means all request should follow
// rule's limit count.
return node;
}
return SelectNodeByRelStrategy(rule, entry, node);
} else if ((limit_origin == Constants::kLimitOriginOther) &&
m.IsTagNotInFlowRuleList(rule.resource(), tag)) {
if (strategy == FlowRelationStrategy::kDirect) {
// When rule contains other tag, which means all request except this tag
// should follow this rule.
return tag_node;
}
return SelectNodeByRelStrategy(rule, entry, node);
}

return nullptr;
}

Stat::NodeSharedPtr FlowRuleChecker::SelectNodeByRelStrategy(
const FlowRule& rule, const EntrySharedPtr&,
const FlowRule& rule, const EntrySharedPtr& entry,
const Stat::NodeSharedPtr& node) {
const std::string& ref_resource = rule.ref_resource();
auto rel_strategy = rule.strategy();
Expand All @@ -50,8 +85,14 @@ Stat::NodeSharedPtr FlowRuleChecker::SelectNodeByRelStrategy(
return Stat::ResourceNodeStorage::GetInstance().GetClusterNode(
ref_resource);
}

return node;
}

bool FlowRuleChecker::IsValidTag(const std::string& tag) {
return !tag.empty() && (tag != Constants::kLimitOriginDefault) &&
(tag != Constants::kLimitOriginOther);
}

} // namespace Flow
} // namespace Sentinel
5 changes: 5 additions & 0 deletions sentinel-core/flow/flow_rule_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ class FlowRuleChecker {
const Stat::NodeSharedPtr& node,
int count, int flag);

Stat::NodeSharedPtr selectNodeByRequesterAndStrategy(
const FlowRule& rule, const EntrySharedPtr& entry,
const Stat::NodeSharedPtr& node);

Stat::NodeSharedPtr SelectNodeByRelStrategy(const FlowRule& rule,
const EntrySharedPtr& entry,
const Stat::NodeSharedPtr& node);
bool IsValidTag(const std::string& tag);
};

} // namespace Flow
Expand Down
22 changes: 21 additions & 1 deletion sentinel-core/flow/flow_rule_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,26 @@ std::shared_ptr<TrafficShapingController> FlowRuleManager::GenerateController(
return CreateDefaultController(rule);
}

bool FlowRuleManager::IsTagNotInFlowRuleList(const std::string& resource_name,
const std::string& tag) {
if (tag.empty()) {
return false;
}
absl::ReaderMutexLock lck(&update_mtx_);
auto got = rule_map_.find(resource_name);
if (got == rule_map_.end()) {
return true;
}

for (const auto rule : got->second) {
if (rule.limit_origin() == tag) {
return false;
}
}

return true;
}

// FlowPropertyListener

void LogFlowMap(const std::unordered_map<std::string, FlowRuleList>& map) {
Expand Down Expand Up @@ -169,7 +189,7 @@ void FlowPropertyListener::ConfigUpdate(const FlowRuleList& value, bool) {
if (it == new_rule_map.end()) {
new_rule_map.insert({rule.resource(), {rule}});
} else {
auto vec = it->second;
auto& vec = it->second;
vec.push_back(std::move(rule));
}
}
Expand Down
2 changes: 2 additions & 0 deletions sentinel-core/flow/flow_rule_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class FlowRuleManager {

std::shared_ptr<TrafficShapingController> GetTrafficControllerFor(
const FlowRule& rule) const;
bool IsTagNotInFlowRuleList(const std::string& resource_name,
const std::string& tag);

private:
FlowRuleManager();
Expand Down
5 changes: 2 additions & 3 deletions sentinel-core/flow/flow_slot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ namespace Slot {
const std::string& FlowSlot::Name() const { return name_; }

TokenResultSharedPtr FlowSlot::Entry(const EntrySharedPtr& entry,
const ResourceWrapperSharedPtr& resource,
Stat::NodeSharedPtr& node, int count,
int flag,
const std::vector<absl::any>& params) {
std::vector<Flow::FlowRule> rules =
Flow::FlowRuleManager::GetInstance().GetRulesForResource(
resource->name());
entry->resource()->name());
if (!rules.empty()) {
for (const auto& rule : rules) {
// check in order
Expand All @@ -35,7 +34,7 @@ TokenResultSharedPtr FlowSlot::Entry(const EntrySharedPtr& entry,
return TokenResult::Ok();
}

void FlowSlot::Exit(const EntrySharedPtr&, const ResourceWrapperSharedPtr&, int,
void FlowSlot::Exit(const EntrySharedPtr&, int,
const std::vector<absl::any>& params) {
// Do nothing
}
Expand Down
4 changes: 1 addition & 3 deletions sentinel-core/flow/flow_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ class FlowSlot : public RuleCheckerSlot {
virtual ~FlowSlot() = default;

TokenResultSharedPtr Entry(const EntrySharedPtr& entry,
const ResourceWrapperSharedPtr& resource,
Stat::NodeSharedPtr& node, int count, int flag,
const std::vector<absl::any>& params) override;
void Exit(const EntrySharedPtr& entry,
const ResourceWrapperSharedPtr& resource, int count,
void Exit(const EntrySharedPtr& entry, int count,
const std::vector<absl::any>& params) override;
const std::string& Name() const override;

Expand Down
Loading

0 comments on commit 36871db

Please sign in to comment.