Skip to content

Commit

Permalink
Use asset_stats() stored procedure instead of table
Browse files Browse the repository at this point in the history
  • Loading branch information
chr4 committed Jul 7, 2018
1 parent 67226be commit f22c26b
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 29 deletions.
39 changes: 39 additions & 0 deletions db/migrations/0007_asset_stats_procedure.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
-- Recreate asset_stats table (taken from migration 0005)
CREATE TABLE asset_stats (
id serial PRIMARY KEY,
paging_token character varying(64),
asset_type character varying(64),
asset_code character varying(12),
asset_issuer character varying(56),
issued bigint,
transferred bigint,
accounts_with_trustline integer,
accounts_with_payments integer,
payments integer,
created_at timestamp without time zone
);

CREATE OR REPLACE FUNCTION repopulate_asset_stats()
RETURNS VOID
AS
$$
DECLARE
t_row record;
BEGIN
TRUNCATE asset_stats;
FOR t_row in SELECT paging_token, effect_id, asset_type, asset_code, asset_issuer, created_at FROM effects ORDER BY effect_id LOOP
INSERT INTO asset_stats(paging_token, asset_code, asset_issuer, asset_type, created_at, issued, transferred, accounts_with_trustline, accounts_with_payments, payments)
VALUES (t_row.paging_token, t_row.asset_code, t_row.asset_issuer, t_row.asset_type, t_row.created_at,
(SELECT COALESCE(SUM(amount), 0) FROM effects WHERE type = 'account_debited' AND account = t_row.asset_issuer AND effect_id <= t_row.effect_id),
(SELECT COALESCE(SUM(amount), 0) FROM effects WHERE type = 'account_debited' AND account != t_row.asset_issuer AND effect_id <= t_row.effect_id),
(SELECT COUNT(DISTINCT account) FROM effects WHERE account != t_row.asset_issuer AND type = 'trustline_created' AND effect_id <= t_row.effect_id),
(SELECT COUNT(DISTINCT account) FROM effects WHERE account != t_row.asset_issuer AND type = 'account_debited' AND effect_id <= t_row.effect_id),
(SELECT COUNT(*) FROM effects WHERE type = 'account_debited' AND account != t_row.asset_issuer AND effect_id <= t_row.effect_id)
);
END LOOP;
END;
$$
LANGUAGE plpgsql;

-- Re-populate asset_stats table
SELECT repopulate_asset_stats();
44 changes: 44 additions & 0 deletions db/migrations/0007_asset_stats_procedure.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
CREATE OR REPLACE FUNCTION asset_stats(asset_code_filter character varying(12), asset_issuer_filter character varying(56))
RETURNS TABLE (
-- TODO: ID to order required?
effect_id character varying(56),
paging_token character varying(64),
asset_type character varying(64),
asset_code character varying(12),
asset_issuer character varying(56),
issued bigint,
transferred bigint,
accounts_with_trustline integer,
accounts_with_payments integer,
payments integer,
created_at timestamp without time zone
)
AS
$$
#variable_conflict use_column
DECLARE
t_row record;
BEGIN
FOR t_row in SELECT effect_id, paging_token, asset_type, asset_code, asset_issuer, created_at FROM effects ORDER BY created_at LOOP
-- Next if asset_code and asset_issuer do not match
CONTINUE WHEN t_row.asset_code != asset_code_filter AND t_row.asset_issuer != asset_issuer_filter;

effect_id := t_row.effect_id;
paging_token := t_row.paging_token;
asset_type := t_row.asset_type;
asset_code := t_row.asset_code;
asset_issuer := t_row.asset_issuer;
created_at := t_row.created_at;
issued := (SELECT COALESCE(SUM(amount), 0) FROM effects WHERE type = 'account_debited' AND account = t_row.asset_issuer AND effect_id <= t_row.effect_id);
transferred := (SELECT COALESCE(SUM(amount), 0) FROM effects WHERE type = 'account_debited' AND account != t_row.asset_issuer AND effect_id <= t_row.effect_id);
accounts_with_trustline := (SELECT COUNT(DISTINCT account) FROM effects WHERE account != t_row.asset_issuer AND type = 'trustline_created' AND effect_id <= t_row.effect_id);
accounts_with_payments := (SELECT COUNT(DISTINCT account) FROM effects WHERE account != t_row.asset_issuer AND type = 'account_debited' AND effect_id <= t_row.effect_id);
payments := (SELECT COUNT(*) FROM effects WHERE type = 'account_debited' AND account != t_row.asset_issuer AND effect_id <= t_row.effect_id);
RETURN NEXT;
END LOOP;
END;
$$
LANGUAGE plpgsql;

DROP TABLE asset_stats;
DROP FUNCTION repopulate_asset_stats;
33 changes: 16 additions & 17 deletions models/asset_stat/asset_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ import (
"github.com/cndy-store/analytics/utils/bigint"
"github.com/cndy-store/analytics/utils/filter"
"github.com/cndy-store/analytics/utils/sql"
"github.com/stellar/go/clients/horizon"
"time"
)

type AssetStat struct {
Id *uint32 `db:"id" json:"-"`
EffectId *string `db:"effect_id", json:"effect_id"` // TODO: omitempty, also add to tests?
PagingToken *string `db:"paging_token" json:"paging_token,omitempty"`
AssetType *string `db:"asset_type" json:"asset_type,omitempty"`
AssetCode *string `db:"asset_code" json:"asset_code,omitempty"`
Expand All @@ -29,26 +28,26 @@ type AssetStat struct {
JsonTransferred *string `db:"-" json:"transferred"`
}

func New(db sql.Database, effect horizon.Effect, timestamp time.Time) (err error) {
// Store amount_transfered and amount_issued upon insert in a different table
// (analogue to the asset endpoint of Horizon)
type Filter struct {
From *time.Time
To *time.Time
}

_, err = db.Exec(`INSERT INTO asset_stats(paging_token, asset_code, asset_issuer, asset_type, created_at, issued, transferred, accounts_with_trustline, accounts_with_payments, payments)
VALUES ($1, $2, $3, $4, $5,
(SELECT COALESCE(SUM(amount), 0) FROM effects WHERE type='account_debited' AND account=$6),
(SELECT COALESCE(SUM(amount), 0) FROM effects WHERE type='account_debited' AND account!=$6),
(SELECT COUNT(DISTINCT account) FROM effects WHERE type='trustline_created' AND account!=$6),
(SELECT COUNT(DISTINCT account) FROM effects WHERE type='account_debited' AND account!=$6),
(SELECT COUNT(*) FROM effects WHERE type='account_debited' AND account!=$6)
)`,
effect.PT, effect.Asset.Code, effect.Asset.Issuer, effect.Asset.Type, timestamp, effect.Asset.Issuer)
func (f *Filter) Defaults() {
if f.From == nil {
t := time.Unix(0, 0)
f.From = &t
}

return
if f.To == nil {
t := time.Now()
f.To = &t
}
}

func Get(db sql.Database, filter filter.Filter) (stats []AssetStat, err error) {
filter.Defaults()
err = db.Select(&stats, `SELECT * FROM asset_stats WHERE asset_code=$1 AND asset_issuer=$2 AND created_at BETWEEN $3::timestamp AND $4::timestamp ORDER BY id`,
err = db.Select(&stats, `SELECT * FROM asset_stats($1, $2) WHERE created_at BETWEEN $3::timestamp AND $4::timestamp ORDER BY effect_id`,
filter.AssetCode, filter.AssetIssuer, filter.From, filter.To)
if err == sql.ErrNoRows {
err = nil
Expand All @@ -68,7 +67,7 @@ func Get(db sql.Database, filter filter.Filter) (stats []AssetStat, err error) {

func Latest(db sql.Database, filter filter.Filter) (stats AssetStat, err error) {
filter.Defaults()
err = db.Get(&stats, `SELECT * FROM asset_stats WHERE asset_code=$1 AND asset_issuer=$2 ORDER BY id DESC LIMIT 1`,
err = db.Get(&stats, `SELECT * FROM asset_stats($1, $2) ORDER BY effect_id DESC LIMIT 1`,
filter.AssetCode, filter.AssetIssuer)
if err == sql.ErrNoRows {
err = nil
Expand Down
7 changes: 0 additions & 7 deletions models/effect/effect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package effect

import (
"encoding/json"
"github.com/cndy-store/analytics/models/asset_stat"
"github.com/cndy-store/analytics/utils/bigint"
"github.com/cndy-store/analytics/utils/filter"
"github.com/cndy-store/analytics/utils/sql"
Expand Down Expand Up @@ -109,12 +108,6 @@ func New(db sql.Database, effect horizon.Effect) (err error) {
return
}

// Store asset stats upon insert in a different table
err = assetStat.New(db, effect, operation.CreatedAt)
if err != nil {
return
}

log.Printf("--+--[ %s ]", effect.Asset.Code)
log.Printf(" |")
log.Printf(" +-> Type: %s", effect.Type)
Expand Down
5 changes: 0 additions & 5 deletions utils/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,5 @@ func InsertTestData(tx *sqlx.Tx) (err error) {
}
}

_, err = tx.Exec(`SELECT repopulate_asset_stats()`)
if err != nil {
return
}

return
}

0 comments on commit f22c26b

Please sign in to comment.