Skip to content

Commit

Permalink
add sum traffic used
Browse files Browse the repository at this point in the history
delete useless code
  • Loading branch information
bxy4543 committed Jan 9, 2024
1 parent 24ceaff commit 2645d45
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 302 deletions.
121 changes: 14 additions & 107 deletions controllers/pkg/database/mongo/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -49,6 +48,8 @@ const (
DefaultUserConn = "user"
DefaultPricesConn = "prices"
DefaultPropertiesConn = "properties"
//TODO fix
DefaultTrafficConn = "traffic"
)

const DefaultRetentionDay = 30
Expand All @@ -68,16 +69,7 @@ type mongoDB struct {
BillingConn string
PricesConn string
PropertiesConn string
}

func (m *mongoDB) GetTrafficSentBytes(_, _ time.Time, _ string, _ uint8, _ string) (int64, error) {
//TODO implement me
panic("implement me")
}

func (m *mongoDB) GetTrafficRecvBytes(_, _ time.Time, _ string, _ uint8, _ string) (int64, error) {
//TODO implement me
panic("implement me")
TrafficConn string
}

type AccountBalanceSpecBSON struct {
Expand Down Expand Up @@ -258,36 +250,34 @@ func (m *mongoDB) InsertMonitor(ctx context.Context, monitors ...*resources.Moni
}

func (m *mongoDB) GetDistinctMonitorCombinations(startTime, endTime time.Time, namespace string) ([]resources.Monitor, error) {
matchStage := bson.D{
{Key: "$match", Value: bson.M{
pipeline := mongo.Pipeline{
{{Key: "$match", Value: bson.M{
"time": bson.M{
"$gte": startTime.UTC(),
"$lt": endTime.UTC(),
},
"category": namespace,
}},
}
groupStage := bson.D{
{Key: "$group", Value: bson.M{
}}},
{{Key: "$group", Value: bson.M{
"_id": bson.M{
"category": "$category",
"name": "$name",
"type": "$type",
},
}},
}}},
}
cursor, err := m.getMonitorCollection(startTime).Aggregate(context.Background(), mongo.Pipeline{matchStage, groupStage})
cursor, err := m.getMonitorCollection(startTime).Aggregate(context.Background(), pipeline)
if err != nil {
return nil, fmt.Errorf("aggregate error: %v", err)
}
defer cursor.Close(context.Background())
var monitors []resources.Monitor
for cursor.Next(context.Background()) {
var monitor resources.Monitor
if err := cursor.Decode(&monitor); err != nil {
var result = make(map[string]resources.Monitor, 1)
if err := cursor.Decode(result); err != nil {
return nil, fmt.Errorf("decode error: %v", err)
}
monitors = append(monitors, monitor)
monitors = append(monitors, result["_id"])
}
if err := cursor.Err(); err != nil {
return nil, fmt.Errorf("cursor error: %v", err)
Expand Down Expand Up @@ -351,90 +341,6 @@ func (m *mongoDB) SavePropertyTypes(types []resources.PropertyType) error {
return err
}

// 2020-12-01 23:00:00 - 2020-12-02 00:00:00
// 2020-12-02 00:00:00 - 2020-12-02 01:00:00
func (m *mongoDB) GenerateMeteringData(startTime, endTime time.Time, prices map[string]resources.Price) error {
filter := bson.M{
"time": bson.M{
"$gte": startTime,
"$lt": endTime,
},
}
cursor, err := m.getMonitorCollection(startTime).Find(context.Background(), filter)
if err != nil {
return fmt.Errorf("find monitors error: %v", err)
}
defer cursor.Close(context.Background())

meteringMap := make(map[string]map[string]int64)
countMap := make(map[string]map[string]int64)
updateTimeMap := make(map[string]map[string]*time.Time)

for cursor.Next(context.Background()) {
var monitor resources.Monitor
if err := cursor.Decode(&monitor); err != nil {
return fmt.Errorf("decode monitor error: %v", err)
}

if _, ok := updateTimeMap[monitor.Category]; !ok {
updateTimeMap[monitor.Category] = make(map[string]*time.Time)
}
if _, ok := updateTimeMap[monitor.Category][monitor.Property]; !ok {
lastUpdateTime, err := m.GetUpdateTimeForCategoryAndPropertyFromMetering(monitor.Category, monitor.Property)
if err != nil {
logger.Debug(err, "get latest update time failed", "category", monitor.Category, "property", monitor.Property)
}
updateTimeMap[monitor.Category][monitor.Property] = &lastUpdateTime
}
lastUpdateTime := updateTimeMap[monitor.Category][monitor.Property].UTC()

if /* skip last update lte 1 hour*/ lastUpdateTime.Before(startTime) || lastUpdateTime.Equal(startTime) {
if _, ok := meteringMap[monitor.Category]; !ok {
meteringMap[monitor.Category] = make(map[string]int64)
countMap[monitor.Category] = make(map[string]int64)
}
//TODO interface will delete
//meteringMap[monitor.Category][monitor.Property] += monitor.Value
countMap[monitor.Category][monitor.Property]++
continue
}
logger.Debug("Info", "skip metering", "category", monitor.Category, "property", monitor.Property, "lastUpdateTime", updateTimeMap[monitor.Category][monitor.Property].UTC(), "startTime", startTime)
}

if err := cursor.Err(); err != nil {
return fmt.Errorf("cursor error: %v", err)
}
eg, _ := errgroup.WithContext(context.Background())

for category, propertyMap := range meteringMap {
for property, totalValue := range propertyMap {
count := countMap[category][property]
if count < 60 {
count = 60
}
unitValue := math.Ceil(float64(totalValue) / float64(count))
metering := &resources.Metering{
Category: category,
Property: property,
Time: endTime,
Amount: int64(unitValue * float64(prices[property].Price)),
Value: int64(unitValue),
//Detail: "",
}
_category, _property := category, property
eg.Go(func() error {
_, err := m.getMeteringCollection().InsertOne(context.Background(), metering)
if err != nil {
//TODO if insert failed, should todo?
logger.Error(err, "insert metering data failed", "category", _category, "property", _property)
}
return err
})
}
}
return eg.Wait()
}

/*
monitors = append(monitors, &common.Monitor{
Category: namespace.Name,
Expand Down Expand Up @@ -495,7 +401,7 @@ func (m *mongoDB) GenerateBillingData(startTime, endTime time.Time, prols *resou
}
if value.PriceType == resources.SUM {
groupStage = append(groupStage, primitive.E{Key: keyStr, Value: bson.D{{Key: "$sum", Value: "$used." + keyStr}}})
usedStage[keyStr] = primitive.E{Key: keyStr, Value: bson.D{{Key: "$sum", Value: "$used." + keyStr}}}
usedStage[keyStr] = bson.D{{Key: "$toInt", Value: "$" + keyStr}}
continue
}
groupStage = append(groupStage, primitive.E{Key: keyStr, Value: bson.D{{Key: "$sum", Value: "$used." + keyStr}}})
Expand Down Expand Up @@ -1041,5 +947,6 @@ func NewMongoInterface(ctx context.Context, URL string) (database.Interface, err
BillingConn: DefaultBillingConn,
PricesConn: DefaultPricesConn,
PropertiesConn: DefaultPropertiesConn,
TrafficConn: DefaultTrafficConn,
}, err
}
72 changes: 72 additions & 0 deletions controllers/pkg/database/mongo/traffic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package mongo

import (
"context"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

/* example:
{
_id: ObjectId("60eea26373c4cdcb6356827d"),
traffic_meta: {
pod_name: "my-pod",
pod_namespace: "my-namespace",
pod_address: "100.64.0.1",
traffic_tag: "port:80",
pod_type: 1,
pod_type_name: "mongodb"
},
timestamp: "2024-01-04T04:02:25",
sent_bytes: 31457280,
recv_bytes: 15728640
}
*/

func (m *mongoDB) GetTrafficRecvBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) {
return m.getTrafficBytes(false, startTime, endTime, namespace, _type, name)
}

func (m *mongoDB) GetTrafficSentBytes(startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) {
return m.getTrafficBytes(true, startTime, endTime, namespace, _type, name)
}

func (m *mongoDB) getTrafficBytes(sent bool, startTime, endTime time.Time, namespace string, _type uint8, name string) (int64, error) {
filter := bson.M{
"traffic_meta.pod_namespace": namespace,
"traffic_meta.pod_type": _type,
"traffic_meta.pod_type_name": name,
"time": bson.M{
"$gte": startTime,
"$lte": endTime,
},
}
pipeline := mongo.Pipeline{
bson.D{{"$match", filter}},
bson.D{{"$group", bson.D{{"_id", nil}, {"total", bson.D{{"$sum", "$recv_bytes"}}}}}},
}
if sent {
pipeline = append(pipeline, bson.D{{"$group", bson.D{{"_id", nil}, {"total", bson.D{{"$sum", "$sent_bytes"}}}}}})
}
cur, err := m.getTrafficCollection().Aggregate(context.Background(), pipeline)
if err != nil {
return 0, err
}
defer cur.Close(context.Background())
for cur.Next(context.Background()) {
var result struct {
Total int64 `bson:"total"`
}
if err := cur.Decode(&result); err != nil {
return 0, err
}
return result.Total, nil
}
return 0, nil
}

func (m *mongoDB) getTrafficCollection() *mongo.Collection {
return m.Client.Database(m.AuthDB).Collection(m.TrafficConn)
}
17 changes: 1 addition & 16 deletions controllers/pkg/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ var DefaultPropertyTypeList = []PropertyType{
{
Name: "network",
Enum: 3,
PriceType: DIF,
PriceType: SUM,
UnitPrice: 0,
UnitString: "1Mi",
},
Expand Down Expand Up @@ -378,21 +378,6 @@ func GetGpuResourceProduct(resource string) string {
return strings.TrimPrefix(resource, GpuResourcePrefix)
}

var DefaultPrices = map[string]Price{
"cpu": {
Property: "cpu",
Price: 67,
},
"memory": {
Property: "memory",
Price: 33,
},
"storage": {
Property: "storage",
Price: 2,
},
}

// infra residual code
//const (
// PropertyInfraCPU = "infra-cpu"
Expand Down
Loading

0 comments on commit 2645d45

Please sign in to comment.