Skip to content

Commit

Permalink
adding object count metric for accounts. (#18363)
Browse files Browse the repository at this point in the history
adding object count metric for accounts.

Approved by: @daviszhen, @aptend, @heni02, @XuPeng-SH, @zhangxu19830126, @qingxinhome, @xzxiong
  • Loading branch information
gouhongshen committed Sep 8, 2024
1 parent aa13822 commit 49dd70c
Show file tree
Hide file tree
Showing 25 changed files with 1,355 additions and 187 deletions.
2 changes: 2 additions & 0 deletions pkg/bootstrap/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v1_2_0"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v1_2_1"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v1_2_2"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v1_2_3"
"github.com/matrixorigin/matrixone/pkg/bootstrap/versions/v1_3_0"
)

Expand All @@ -29,6 +30,7 @@ func (s *service) initUpgrade() {
s.handles = append(s.handles, v1_2_1.Handler)
// TODO: When v1.2.1 release, open the commented code as follows, Enable v1.2.2 upgrade package
s.handles = append(s.handles, v1_2_2.Handler)
s.handles = append(s.handles, v1_2_3.Handler)
s.handles = append(s.handles, v1_3_0.Handler)
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/bootstrap/versions/v1_2_3/tenant_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

var tenantUpgEntries = []versions.UpgradeEntry{
upg_mo_snapshots,
upg_system_metrics_server_object_count_total,
}

var upg_mo_snapshots = versions.UpgradeEntry{
Expand All @@ -44,3 +45,16 @@ var upg_mo_snapshots = versions.UpgradeEntry{
return false, nil
},
}

var upg_system_metrics_server_object_count_total = versions.UpgradeEntry{
Schema: catalog.MO_SYSTEM_METRICS,
TableName: "server_object_count",
UpgType: versions.CREATE_VIEW,
UpgSql: fmt.Sprintf("CREATE VIEW IF NOT EXISTS `%s`.`%s` as "+
"SELECT `collecttime`, `value`, `node`, `role`, `account`, `type` "+
"from `system_metrics`.`metric` "+
"where `metric_name` = 'server_object_count'",
catalog.MO_SYSTEM_METRICS, "server_object_count"),
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) { return false, nil },
PreSql: fmt.Sprintf("DROP VIEW IF EXISTS %s.%s;", catalog.MO_SYSTEM_METRICS, "server_object_count"),
}
6 changes: 6 additions & 0 deletions pkg/frontend/compiler_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ func (tcc *TxnCompilerContext) SetDatabase(db string) {
tcc.dbName = db
}

func (tcc *TxnCompilerContext) GetDatabase() string {
tcc.mu.Lock()
defer tcc.mu.Unlock()
return tcc.dbName
}

func (tcc *TxnCompilerContext) DefaultDatabase() string {
tcc.mu.Lock()
defer tcc.mu.Unlock()
Expand Down
72 changes: 55 additions & 17 deletions pkg/frontend/show_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
"github.com/matrixorigin/matrixone/pkg/sql/plan/function/ctl"
"github.com/matrixorigin/matrixone/pkg/util/metric/mometric"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
Expand Down Expand Up @@ -77,6 +78,7 @@ const (
" db_tbl_counts.tbl_count," +
" CAST(0 AS DOUBLE) AS size," +
" ma.comments" +
" %s" + // possible placeholder for object count
" FROM" +
" db_tbl_counts" +
" JOIN" +
Expand All @@ -89,6 +91,7 @@ const (
)

const idxOfAccountId = 0
const idxOfObjectCount = idxOfComment + 1

const (
// column index in the result set generated by
Expand All @@ -102,14 +105,12 @@ const (
idxOfTableCount
idxOfSize
idxOfComment

totalColumnCnt = 9
)

var cnUsageCache = logtail.NewStorageUsageCache(
logtail.WithLazyThreshold(5))

func getSqlForAccountInfo(like *tree.ComparisonExpr, accId int64) string {
func getSqlForAccountInfo(like *tree.ComparisonExpr, accId int64, needObjectCount bool) string {
var likePattern = ""
var where = ""
var and = ""
Expand All @@ -132,7 +133,12 @@ func getSqlForAccountInfo(like *tree.ComparisonExpr, accId int64) string {

clause := fmt.Sprintf("%s %s %s %s", where, likePattern, and, account)

return fmt.Sprintf(getAccountInfoFormatV2, clause)
var objectCountExpr = ""
if needObjectCount {
objectCountExpr = ", CAST(0 AS BIGINT) AS object_count"
}

return fmt.Sprintf(getAccountInfoFormatV2, objectCountExpr, clause)
}

func requestStorageUsage(ctx context.Context, ses *Session, accIds [][]int64) (resp any, tried bool, err error) {
Expand All @@ -147,7 +153,7 @@ func requestStorageUsage(ctx context.Context, ses *Session, accIds [][]int64) (r
}

responseUnmarshaler := func(payload []byte) (any, error) {
usage := &db.StorageUsageResp{}
usage := &db.StorageUsageResp_V2{}
if err := usage.Unmarshal(payload); err != nil {
return nil, err
}
Expand Down Expand Up @@ -239,7 +245,7 @@ func handleStorageUsageResponse_V0(

func handleStorageUsageResponse(
ctx context.Context,
usage *db.StorageUsageResp,
usage *db.StorageUsageResp_V2,
) (map[int64]uint64, error) {
result := make(map[int64]uint64, 0)

Expand Down Expand Up @@ -274,9 +280,9 @@ func checkStorageUsageCache(accIds [][]int64) (result map[int64]uint64, succeed
return result, true
}

func updateStorageUsageCache(accIds []int64, sizes []uint64) {
func updateStorageUsageCache(usages *db.StorageUsageResp_V2) {

if len(accIds) == 0 {
if len(usages.AccIds) == 0 {
return
}

Expand All @@ -287,11 +293,19 @@ func updateStorageUsageCache(accIds []int64, sizes []uint64) {
cnUsageCache.ClearForUpdate()

// step 2: update
for x := range accIds {
usage := logtail.UsageData{AccId: uint64(accIds[x]), Size: sizes[x]}
if old, exist := cnUsageCache.Get(usage); exist {
usage.Size += old.Size
for x := range usages.AccIds {
usage := logtail.UsageData{
AccId: uint64(usages.AccIds[x]),
Size: usages.Sizes[x],
ObjectAbstract: logtail.ObjectAbstract{
TotalObjCnt: int(usages.ObjCnts[x]),
TotalBlkCnt: int(usages.BlkCnts[x]),
TotalRowCnt: int(usages.RowCnts[x]),
},
}
//if old, exist := cnUsageCache.Get(usage); exist {
// usage.Size += old.Size
//}

cnUsageCache.SetOrReplace(usage)
}
Expand Down Expand Up @@ -330,12 +344,12 @@ func getAccountsStorageUsage(ctx context.Context, ses *Session, accIds [][]int64
return handleStorageUsageResponse_V0(ctx, ses.GetService(), fs, usage, ses.GetLogger())

} else {
usage, ok := response.(*db.StorageUsageResp)
usage, ok := response.(*db.StorageUsageResp_V2)
if !ok || usage.Magic != logtail.StorageUsageMagic {
return nil, moerr.NewInternalErrorNoCtx("storage usage response decode failed, retry later")
}

updateStorageUsageCache(usage.AccIds, usage.Sizes)
updateStorageUsageCache(usage)

// step 3: handling these pulled data
return handleStorageUsageResponse(ctx, usage)
Expand All @@ -351,6 +365,10 @@ func updateCount(ori *vector.Vector, delta int64, rowIdx int) {
vector.SetFixedAtWithTypeCheck[int64](ori, rowIdx, old+delta)
}

func updateObjectCount(ori *vector.Vector, cnt int64, rowIdx int) {
vector.SetFixedAtWithTypeCheck[int64](ori, rowIdx, cnt)
}

func doShowAccounts(ctx context.Context, ses *Session, sa *tree.ShowAccounts) (err error) {
var sql string
var accIds [][]int64
Expand Down Expand Up @@ -397,8 +415,17 @@ func doShowAccounts(ctx context.Context, ses *Session, sa *tree.ShowAccounts) (e
return err
}

var needUpdateObjectCountMetric bool
if account.IsSysTenant() &&
sa.Like == nil &&
ses.GetTxnCompileCtx().GetDatabase() == mometric.MetricDBConst {
// storage usage cron task try to get storage usage for all accounts,
// adding an extra col to return object count val for all accounts.
needUpdateObjectCountMetric = true
}

if account.IsSysTenant() {
sql = getSqlForAccountInfo(sa.Like, -1)
sql = getSqlForAccountInfo(sa.Like, -1, needUpdateObjectCountMetric)
if accInfosBatches, accIds, err = getAccountInfo(ctx, bh, sql, mp); err != nil {
return err
}
Expand All @@ -410,7 +437,7 @@ func doShowAccounts(ctx context.Context, ses *Session, sa *tree.ShowAccounts) (e
}
// switch to the sys account to get account info
newCtx := defines.AttachAccountId(ctx, uint32(sysAccountID))
sql = getSqlForAccountInfo(nil, int64(account.GetTenantID()))
sql = getSqlForAccountInfo(nil, int64(account.GetTenantID()), needUpdateObjectCountMetric)
if accInfosBatches, accIds, err = getAccountInfo(newCtx, bh, sql, mp); err != nil {
return err
}
Expand All @@ -433,13 +460,24 @@ func doShowAccounts(ctx context.Context, ses *Session, sa *tree.ShowAccounts) (e
return err
}

var abstract map[uint64]logtail.ObjectAbstract
if needUpdateObjectCountMetric {
abstract = cnUsageCache.GatherObjectAbstractForAccounts()
}

for x := range accIds {
for y := range accIds[x] {
updateStorageSize(accInfosBatches[x].Vecs[idxOfSize], usage[accIds[x][y]], y)
if accIds[x][y] != sysAccountID {
updateCount(accInfosBatches[x].Vecs[idxOfDbCount], specialDBCnt, y)
updateCount(accInfosBatches[x].Vecs[idxOfTableCount], specialTableCnt, y)
}

if needUpdateObjectCountMetric {
updateObjectCount(
accInfosBatches[x].Vecs[idxOfObjectCount],
int64(abstract[uint64(accIds[x][y])].TotalObjCnt), y)
}
}
}

Expand Down Expand Up @@ -479,7 +517,7 @@ func doShowAccounts(ctx context.Context, ses *Session, sa *tree.ShowAccounts) (e
}

func initOutputRs(dest *MysqlResultSet, src *MysqlResultSet, ctx context.Context) error {
for idx := idxOfAccountName; idx < totalColumnCnt; idx++ {
for idx := idxOfAccountName; idx < int(src.GetColumnCount()); idx++ {
o, err := src.GetColumn(ctx, uint64(idx))
if err != nil {
return err
Expand Down
Loading

0 comments on commit 49dd70c

Please sign in to comment.