Skip to content

Commit

Permalink
Feat/traffic proto (#4095)
Browse files Browse the repository at this point in the history
* init traffic mod

* add DIF properties;
add kb backup pvc monitoring name;
optimize traffic monitor;

* go mod tidy

* add TRAFFICS_SERVICE_CONNECT_ADDRESS env

* ignore return getPodTrafficUsed error

* change to english annotation

* old data content of the monitor collection is automatically deleted

* add Property omitempty

* fix conflict

* change traffics stat request identity to {2,8}
  • Loading branch information
bxy4543 committed Oct 16, 2023
1 parent 57beea9 commit a3728ec
Show file tree
Hide file tree
Showing 12 changed files with 338 additions and 153 deletions.
67 changes: 47 additions & 20 deletions controllers/pkg/database/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
PermanentRetention = "PERMANENT_RETENTION"
)

// override this value at build time
const defaultCryptoKey = "Af0b2Bc5e9d0C84adF0A5887cF43aB63"

var cryptoKey = defaultCryptoKey
Expand All @@ -73,7 +74,7 @@ type MongoDB struct {

type AccountBalanceSpecBSON struct {
// Time metav1.Time `json:"time" bson:"time"`
// time字段如果为time.Time类型无法转换为json crd,所以使用metav1.Time,但是使用metav1.Time无法插入到mongo中,所以需要转换为time.Time
// If the Time field is of the time. time type, it cannot be converted to json crd, so use metav1.Time. However, metav1.Time cannot be inserted into mongo, so you need to convert it to time.Time
Time time.Time `json:"time" bson:"time"`
accountv1.BillingRecordQueryItemInline `json:",inline" bson:",inline"`
}
Expand Down Expand Up @@ -135,9 +136,8 @@ func (m *MongoDB) GetUnsettingBillingHandler(owner string) ([]resources.BillingH
}

func (m *MongoDB) UpdateBillingStatus(orderID string, status resources.BillingStatus) error {
// 创建一个查询过滤器
// create a query filter
filter := bson.M{"order_id": orderID}
// 更新文档
update := bson.M{
"$set": bson.M{
"status": status,
Expand Down Expand Up @@ -456,8 +456,6 @@ func (m *MongoDB) GenerateMeteringData(startTime, endTime time.Time, prices map[
Name: resourceMap[name].Name(),
})
*/
//按照type, name, namespace 来统计billing数据
//统计该time范围内 所有type, name, namespace相同的monitor数据的used平均值或总值(按照其PropertyType)得出一个billing数据并写入billing表
func (m *MongoDB) GenerateBillingData(startTime, endTime time.Time, prols *resources.PropertyTypeLS, namespaces []string, owner string) (orderID []string, amount int64, err error) {
minutes := endTime.Sub(startTime).Minutes()

Expand All @@ -473,25 +471,49 @@ func (m *MongoDB) GenerateBillingData(startTime, endTime time.Time, prols *resou
primitive.E{Key: "category", Value: "$_id.category"},
}

// 初始化 used 阶段
// initialize the used phase
usedStage := bson.M{}

// 根据 EnumMap 动态构建 $group $project 阶段
for key := range prols.EnumMap {
// Build the $group and $project phases dynamically from EnumMap
for key, value := range prols.EnumMap {
keyStr := strconv.Itoa(int(key))

// 添加到 $group 阶段
groupStage = append(groupStage, primitive.E{Key: keyStr, Value: bson.D{{Key: "$sum", Value: "$used." + keyStr}}})
// $max - $min;
// When max is not zero, the minimum value other than the zero value is used to prevent some data from obtaining a value in special cases
// max-min=0 if the hour has only one data piece or no data piece
if value.PriceType == resources.DIF {
// for non 0 $min
minWithCondition := bson.D{
{Key: "$min", Value: bson.D{
{Key: "$cond", Value: bson.A{
bson.D{{Key: "$eq", Value: bson.A{"$used." + keyStr, 0}}},
nil, // 将0值排除在外
"$used." + keyStr,
}},
}},
}

// 添加到 used 阶段
groupStage = append(groupStage,
primitive.E{Key: keyStr + "_max", Value: bson.D{{Key: "$max", Value: "$used." + keyStr}}}, // 正常计算$max
primitive.E{Key: keyStr + "_min", Value: minWithCondition},
)

// added to the used phase
usedStage[keyStr] = bson.D{{Key: "$subtract", Value: bson.A{
"$" + keyStr + "_max",
"$" + keyStr + "_min",
}}}
continue
}
groupStage = append(groupStage, primitive.E{Key: keyStr, Value: bson.D{{Key: "$sum", Value: "$used." + keyStr}}})
usedStage[keyStr] = bson.D{{Key: "$toInt", Value: bson.D{{Key: "$round", Value: bson.D{{Key: "$divide", Value: bson.A{
"$" + keyStr, bson.D{{Key: "$cond", Value: bson.A{bson.D{{Key: "$gt", Value: bson.A{"$count", minutes}}}, "$count", minutes}}}}}}}}}}
}

// used 阶段添加到 $project 阶段
// add the used phase to the $project phase
projectStage = append(projectStage, primitive.E{Key: "used", Value: usedStage})

// 构建 pipeline
// construction-pipeline
pipeline := mongo.Pipeline{
{{Key: "$match", Value: bson.D{{Key: "time", Value: bson.D{{Key: "$gte", Value: startTime}, {Key: "$lt", Value: endTime}}}, {Key: "category", Value: bson.D{{Key: "$in", Value: namespaces}}}}}},
{{Key: "$group", Value: groupStage}},
Expand Down Expand Up @@ -521,6 +543,9 @@ func (m *MongoDB) GenerateBillingData(startTime, endTime time.Time, prols *resou
return nil, 0, fmt.Errorf("decode error: %v", err)
}

//TODO delete
//logger.Info("generate billing data", "result", result)

if _, ok := appCostsMap[result.Namespace]; !ok {
appCostsMap[result.Namespace] = make(map[uint8][]resources.AppCost)
}
Expand Down Expand Up @@ -574,6 +599,8 @@ func (m *MongoDB) GenerateBillingData(startTime, endTime time.Time, prols *resou
if err != nil {
return nil, 0, fmt.Errorf("insert error: %v", err)
}
//TODO delete
//logger.Info("generate billing data", "billing", billing)
}
}

Expand Down Expand Up @@ -767,7 +794,7 @@ func (m *MongoDB) QueryBillingRecords(billingRecordQuery *accountv1.BillingRecor

totalCount := 0

// 总数量
// total quantity
cursorAll, err := billingColl.Aggregate(ctx, pipelineAll)
if err != nil {
return fmt.Errorf("failed to execute aggregate all query: %w", err)
Expand All @@ -783,7 +810,7 @@ func (m *MongoDB) QueryBillingRecords(billingRecordQuery *accountv1.BillingRecor
totalCount = int(result.Result)
}

// 消费总金额Costs Executing the second pipeline for getting the total count, recharge and deduction amount
// Costs Executing the second pipeline for getting the total count, recharge and deduction amount
cursorCountAndAmount, err := billingColl.Aggregate(ctx, pipelineCountAndAmount)
if err != nil {
return fmt.Errorf("failed to execute aggregate query for count and amount: %w", err)
Expand All @@ -809,7 +836,7 @@ func (m *MongoDB) QueryBillingRecords(billingRecordQuery *accountv1.BillingRecor
}
}

// 充值总金额
// the total amount
cursorRechargeAmount, err := billingColl.Aggregate(ctx, pipelineRechargeAmount)
if err != nil {
return fmt.Errorf("failed to execute aggregate query for recharge amount: %w", err)
Expand Down Expand Up @@ -880,7 +907,7 @@ func (m *MongoDB) getMonitorCollection(collTime time.Time) *mongo.Collection {
}

func (m *MongoDB) getMonitorCollectionName(collTime time.Time) string {
// 按天计算尾缀,如202012月1号 尾缀为20201201
// Calculate the suffix by day, for example, the suffix on the first day of 202012 is 20201201
return fmt.Sprintf("%s_%s", m.MonitorConnPrefix, collTime.Format("20060102"))
}

Expand All @@ -906,15 +933,15 @@ func (m *MongoDB) CreateBillingIfNotExist() error {
return fmt.Errorf("failed to create collection for billing: %w", err)
}

// 创建索引
// create index
_, err = m.getBillingCollection().Indexes().CreateMany(ctx, []mongo.IndexModel{
{
// 唯一索引 owner + order_id
// unique index owner order_id
Keys: bson.D{primitive.E{Key: "owner", Value: 1}, primitive.E{Key: "order_id", Value: 1}},
Options: options.Index().SetUnique(true),
},
{
// owner + time + type 索引
// owner + time + type indexes
Keys: bson.D{
primitive.E{Key: "owner", Value: 1},
primitive.E{Key: "time", Value: 1},
Expand Down
3 changes: 3 additions & 0 deletions controllers/pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ replace (

require (
github.com/containers/storage v1.50.2
github.com/dinoallo/sealos-networkmanager-protoapi v0.0.0-20230928031328-cf9649d6af49
github.com/dustin/go-humanize v1.0.1
github.com/gin-gonic/gin v1.9.1
github.com/go-logr/logr v1.2.4
Expand Down Expand Up @@ -113,6 +114,8 @@ require (
golang.org/x/text v0.10.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions controllers/pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dinoallo/sealos-networkmanager-protoapi v0.0.0-20230928031328-cf9649d6af49 h1:4GI5eviCwbPxDE311KryyyPUTO7IDVyHGp3Iyl+fEZY=
github.com/dinoallo/sealos-networkmanager-protoapi v0.0.0-20230928031328-cf9649d6af49/go.mod h1:sbm1DAsayX+XsXCOC2CFAAU9JZhX0SPKwnybDjSd0Ls=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
Expand Down Expand Up @@ -359,13 +361,17 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
110 changes: 102 additions & 8 deletions controllers/pkg/resources/named.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package resources
import (
"strings"

sealos_networkmanager "github.com/dinoallo/sealos-networkmanager-protoapi"

"sigs.k8s.io/controller-runtime/pkg/client"
)

/*
数据库: app.kubernetes.io/instance=gitea app.kubernetes.io/managed-by=kubeblocks apps.kubeblocks.io/component-name
应用:app: xxx
终端: TerminalID: xxx
定时任务:job-name: xxx
other: xxx
map[string][]string{}
AppType (sort by label) :
Database: app.kubernetes.io/instance=gitea app.kubernetes.io/managed-by=kubeblocks apps.kubeblocks.io/component-name
AppLaunchpad:app: xxx
Terminal: TerminalID: xxx
Cronjob:job-name: xxx
Other: in addition to the above all labels
*/

const (
Expand All @@ -28,17 +30,19 @@ const (
TerminalIDLabelKey = "TerminalID"
AppLabelKey = "app"
JobNameLabelKey = "job-name"
KubeBlocksBackUpName = "kubeblocks-backup-data"
)

type ResourceNamed struct {
_name string
// db or app or terminal or job or other
_type string
_type string
labels map[string]string
}

func NewResourceNamed(cr client.Object) *ResourceNamed {
p := &ResourceNamed{}
labels := cr.GetLabels()
p := &ResourceNamed{labels: labels}
switch {
case labels[DBPodLabelComponentNameKey] != "":
p._type = DB
Expand All @@ -52,6 +56,9 @@ func NewResourceNamed(cr client.Object) *ResourceNamed {
case labels[JobNameLabelKey] != "":
p._type = JOB
p._name = strings.SplitN(labels[JobNameLabelKey], "-", 2)[0]
case cr.GetName() == KubeBlocksBackUpName:
p._type = JOB
p._name = KubeBlocksBackUpName
default:
p._type = OTHER
p._name = ""
Expand All @@ -63,6 +70,93 @@ func (p *ResourceNamed) Type() uint8 {
return AppType[p._type]
}

func (p *ResourceNamed) Labels() map[string]string {
label := make(map[string]string)
switch p.Type() {
case db:
label[DBPodLabelComponentNameKey] = p.labels[DBPodLabelComponentNameKey]
label[DBPodLabelInstanceKey] = p.labels[DBPodLabelInstanceKey]
case terminal:
label[TerminalIDLabelKey] = p.labels[TerminalIDLabelKey]
case app:
label[AppLabelKey] = p.labels[AppLabelKey]
case job:
label[JobNameLabelKey] = p.labels[JobNameLabelKey]
//case other:
//default:
}
return label
}

var notExistLabels = func() map[uint8][]*sealos_networkmanager.Label {
labels := make(map[uint8][]*sealos_networkmanager.Label)
for k := range AppTypeReverse {
labels[k] = getNotExistLabels(k)
}
return labels
}()

func (p *ResourceNamed) GetNotExistLabels() []*sealos_networkmanager.Label {
return notExistLabels[p.Type()]
}

func getNotExistLabels(tp uint8) []*sealos_networkmanager.Label {
var labels []*sealos_networkmanager.Label
for appType := range AppTypeReverse {
if tp == appType {
continue
}
switch appType {
case db:
labels = append(labels, &sealos_networkmanager.Label{
Key: DBPodLabelComponentNameKey,
}, &sealos_networkmanager.Label{
Key: DBPodLabelManagedByKey,
})
case app:
labels = append(labels, &sealos_networkmanager.Label{
Key: AppLabelKey,
})
case terminal:
labels = append(labels, &sealos_networkmanager.Label{
Key: TerminalIDLabelKey,
})
case job:
labels = append(labels, &sealos_networkmanager.Label{
Key: JobNameLabelKey,
})
}
}
return labels
}

func (p *ResourceNamed) GetInLabels() []*sealos_networkmanager.Label {
var labelsEqual []*sealos_networkmanager.Label
switch p.Type() {
case db:
labelsEqual = append(labelsEqual, &sealos_networkmanager.Label{
Key: DBPodLabelComponentNameKey,
Value: []string{p.labels[DBPodLabelComponentNameKey]},
})
case terminal:
labelsEqual = append(labelsEqual, &sealos_networkmanager.Label{
Key: TerminalIDLabelKey,
Value: []string{p.labels[TerminalIDLabelKey]},
})
case app:
labelsEqual = append(labelsEqual, &sealos_networkmanager.Label{
Key: AppLabelKey,
Value: []string{p.labels[AppLabelKey]},
})
case job:
labelsEqual = append(labelsEqual, &sealos_networkmanager.Label{
Key: JobNameLabelKey,
Value: []string{p.labels[JobNameLabelKey]},
})
}
return labelsEqual
}

func (p *ResourceNamed) TypeString() string {
return p._type
}
Expand Down
Loading

0 comments on commit a3728ec

Please sign in to comment.