Skip to content

Commit

Permalink
support max-allowed Pods for nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
islinwb committed Jun 5, 2018
1 parent 5fc5403 commit a0ce28f
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
46 changes: 44 additions & 2 deletions src/scheduling/flow/cpu_cost_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ CpuCostModel::CpuCostModel(shared_ptr<ResourceMap_t> resource_map,
task_map_(task_map),
knowledge_base_(knowledge_base) {}

void CpuCostModel::AccumulateResourceStats(ResourceDescriptor* accumulator,
ResourceDescriptor* other) {
// Track the aggregate available resources below the machine node
ResourceVector* acc_avail = accumulator->mutable_available_resources();
ResourceVector* other_avail = other->mutable_available_resources();
acc_avail->set_cpu_cores(acc_avail->cpu_cores() + other_avail->cpu_cores());
// Running/idle task count
accumulator->set_num_running_tasks_below(
accumulator->num_running_tasks_below() +
other->num_running_tasks_below());
accumulator->set_num_slots_below(accumulator->num_slots_below() +
other->num_slots_below());
}

ArcDescriptor CpuCostModel::TaskToUnscheduledAgg(TaskID_t task_id) {
return ArcDescriptor(2560000, 1ULL, 0ULL);
}
Expand Down Expand Up @@ -254,7 +268,7 @@ void CpuCostModel::AddMachine(ResourceTopologyNodeDescriptor* rtnd_ptr) {
CHECK(rd.type() == ResourceDescriptor::RESOURCE_MACHINE);
ResourceID_t res_id = ResourceIDFromString(rd.uuid());
vector<EquivClass_t> machine_ecs;
for (uint64_t index = 0; index < FLAGS_max_multi_arcs_for_cpu; ++index) {
for (uint64_t index = 0; index < min(FLAGS_max_multi_arcs_for_cpu, rd.num_slots_below()); ++index) {
EquivClass_t multi_machine_ec = GetMachineEC(rd.friendly_name(), index);
machine_ecs.push_back(multi_machine_ec);
CHECK(InsertIfNotPresent(&ec_to_index_, multi_machine_ec, index));
Expand Down Expand Up @@ -315,7 +329,32 @@ FlowGraphNode* CpuCostModel::GatherStats(FlowGraphNode* accumulator,
CHECK_NOTNULL(other->rd_ptr_);
ResourceDescriptor* rd_ptr = accumulator->rd_ptr_;
CHECK_NOTNULL(rd_ptr);
if (accumulator->type_ == FlowNodeType::MACHINE) {
if (accumulator->type_ == FlowNodeType::PU) {
CHECK(other->resource_id_.is_nil());
ResourceStats latest_stats;
bool have_sample = knowledge_base_->GetLatestStatsForMachine(other->resource_id_,
&latest_stats);
if (have_sample) {
VLOG(2) << "Updating PU " << accumulator->resource_id_ << "'s "
<< "resource stats!";
// Get the CPU stats for this PU
string label = rd_ptr->friendly_name();
uint64_t idx = label.find("PU #");
if (idx != string::npos) {
string core_id_substr = label.substr(idx + 4, label.size() - idx - 4);
uint32_t core_id = strtoul(core_id_substr.c_str(), 0, 10);
float available_cpu_cores =
latest_stats.cpus_stats(core_id).cpu_capacity() *
(1.0 - latest_stats.cpus_stats(core_id).cpu_utilization());
rd_ptr->mutable_available_resources()->set_cpu_cores(
available_cpu_cores);
}
// Running/idle task count
rd_ptr->set_num_running_tasks_below(rd_ptr->current_running_tasks_size());
rd_ptr->set_num_slots_below(FLAGS_max_tasks_per_pu);
return accumulator;
}
} else if (accumulator->type_ == FlowNodeType::MACHINE) {
// Grab the latest available resource sample from the machine
ResourceStats latest_stats;
// Take the most recent sample for now
Expand All @@ -338,6 +377,9 @@ FlowGraphNode* CpuCostModel::GatherStats(FlowGraphNode* accumulator,
rd_ptr->mutable_available_resources()->set_ram_cap(available_ram_cap);
}
}
if (accumulator->rd_ptr_ && other->rd_ptr_) {
AccumulateResourceStats(accumulator->rd_ptr_, other->rd_ptr_);
}

return accumulator;
}
Expand Down
2 changes: 2 additions & 0 deletions src/scheduling/flow/cpu_cost_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class CpuCostModel : public CostModelInterface {
vector<ResourceID_t>* GetOutgoingEquivClassPrefArcs(EquivClass_t tec);
vector<ResourceID_t>* GetTaskPreferenceArcs(TaskID_t task_id);
vector<EquivClass_t>* GetEquivClassToEquivClassesArcs(EquivClass_t tec);
void AccumulateResourceStats(ResourceDescriptor* accumulator,
ResourceDescriptor* other);
void AddMachine(ResourceTopologyNodeDescriptor* rtnd_ptr);
void AddTask(TaskID_t task_id);
void RemoveMachine(ResourceID_t res_id);
Expand Down

0 comments on commit a0ce28f

Please sign in to comment.