Skip to content

Commit

Permalink
Add batching for KVstore (cosmos#149)
Browse files Browse the repository at this point in the history
* add batching to KVStore
* add batch for PrefixKV
  • Loading branch information
Raneet10 committed Oct 17, 2021
1 parent 7d2f4ae commit 42da76f
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 22 deletions.
4 changes: 2 additions & 2 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ func TestInitialState(t *testing.T) {
LastBlockHeight: 128,
}

emptyStore := store.New(store.NewInMemoryKVStore())
emptyStore := store.New(store.NewDefaultInMemoryKVStore())

fullStore := store.New(store.NewInMemoryKVStore())
fullStore := store.New(store.NewDefaultInMemoryKVStore())
err := fullStore.UpdateState(sampleState)
require.NoError(t, err)

Expand Down
6 changes: 3 additions & 3 deletions da/mock/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func TestLifecycle(t *testing.T) {
var da da.DataAvailabilityLayerClient = &MockDataAvailabilityLayerClient{}
dalcKV := store.NewInMemoryKVStore()
dalcKV := store.NewDefaultInMemoryKVStore()

require := require.New(t)

Expand All @@ -31,7 +31,7 @@ func TestLifecycle(t *testing.T) {

func TestMockDALC(t *testing.T) {
var dalc da.DataAvailabilityLayerClient = &MockDataAvailabilityLayerClient{}
dalcKV := store.NewInMemoryKVStore()
dalcKV := store.NewDefaultInMemoryKVStore()

require := require.New(t)
assert := assert.New(t)
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestRetrieve(t *testing.T) {
var dalc da.DataAvailabilityLayerClient = mock
var retriever da.BlockRetriever = mock

dalcKV := store.NewInMemoryKVStore()
dalcKV := store.NewDefaultInMemoryKVStore()

require := require.New(t)
assert := assert.New(t)
Expand Down
2 changes: 1 addition & 1 deletion node/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func createNodes(num int, t *testing.T) ([]*Node, []*mocks.Application) {
nodes := make([]*Node, num)
apps := make([]*mocks.Application, num)
dalc := &mockda.MockDataAvailabilityLayerClient{}
_ = dalc.Init(nil, store.NewInMemoryKVStore(), log.TestingLogger())
_ = dalc.Init(nil, store.NewDefaultInMemoryKVStore(), log.TestingLogger())
_ = dalc.Start()
nodes[0], apps[0] = createNode(0, true, dalc, keys, t)
for i := 1; i < num; i++ {
Expand Down
4 changes: 2 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey
var baseKV store.KVStore
if conf.RootDir == "" && conf.DBPath == "" { // this is used for testing
logger.Info("WARNING: working in in-memory mode")
baseKV = store.NewInMemoryKVStore()
baseKV = store.NewDefaultInMemoryKVStore()
} else {
baseKV = store.NewKVStore(conf.RootDir, conf.DBPath, "optimint")
baseKV = store.NewDefaultKVStore(conf.RootDir, conf.DBPath, "optimint")
}
mainKV := store.NewPrefixKV(baseKV, mainPrefix)
dalcKV := store.NewPrefixKV(baseKV, dalcPrefix)
Expand Down
38 changes: 38 additions & 0 deletions store/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
)

var _ KVStore = &BadgerKV{}
var _ Batch = &BadgerBatch{}

var (
// ErrKeyNotFound is returned if key is not found in KVStore.
Expand Down Expand Up @@ -53,3 +54,40 @@ func (b *BadgerKV) Delete(key []byte) error {
}
return txn.Commit()
}

// NewBatch creates new batch.
// Note: badger batches should be short lived as they use extra resources.
func (b *BadgerKV) NewBatch() Batch {
return &BadgerBatch{
txn: b.db.NewTransaction(true),
}
}

// BadgerBatch encapsulates badger transaction
type BadgerBatch struct {
txn *badger.Txn
}

// Set accumulates key-value entries in a transaction
func (bb *BadgerBatch) Set(key, value []byte) error {
if err := bb.txn.Set(key, value); err != nil {
return err
}

return nil
}

// Delete removes the key and associated value from store
func (bb *BadgerBatch) Delete(key []byte) error {
return bb.txn.Delete(key)
}

// Commit commits a transaction
func (bb *BadgerBatch) Commit() error {
return bb.txn.Commit()
}

// Discard cancels a transaction
func (bb *BadgerBatch) Discard() {
bb.txn.Discard()
}
6 changes: 3 additions & 3 deletions store/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestGetErrors(t *testing.T) {
dalcKV := NewInMemoryKVStore()
dalcKV := NewDefaultInMemoryKVStore()

tc := []struct {
name string
Expand All @@ -30,7 +30,7 @@ func TestGetErrors(t *testing.T) {
}

func TestSetErrors(t *testing.T) {
dalcKV := NewInMemoryKVStore()
dalcKV := NewDefaultInMemoryKVStore()

tc := []struct {
name string
Expand All @@ -53,7 +53,7 @@ func TestSetErrors(t *testing.T) {
}

func TestDeleteErrors(t *testing.T) {
dalcKV := NewInMemoryKVStore()
dalcKV := NewDefaultInMemoryKVStore()

tc := []struct {
name string
Expand Down
13 changes: 11 additions & 2 deletions store/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,19 @@ type KVStore interface {
Get(key []byte) ([]byte, error) // Get gets the value for a key.
Set(key []byte, value []byte) error // Set updates the value for a key.
Delete(key []byte) error // Delete deletes a key.
NewBatch() Batch
}

// Batch enables batching of transactions
type Batch interface {
Set(key, value []byte) error // Accumulates KV entries in a transaction
Delete(key []byte) error // Deletes the given key
Commit() error // Commits the transaction
Discard() // Discards the transaction
}

// NewInMemoryKVStore builds KVStore that works in-memory (without accessing disk).
func NewInMemoryKVStore() KVStore {
func NewDefaultInMemoryKVStore() KVStore {
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true))
if err != nil {
panic(err)
Expand All @@ -26,7 +35,7 @@ func NewInMemoryKVStore() KVStore {
}
}

func NewKVStore(rootDir, dbPath, dbName string) KVStore {
func NewDefaultKVStore(rootDir, dbPath, dbName string) KVStore {
path := filepath.Join(rootify(rootDir, dbPath), dbName)
db, err := badger.Open(badger.DefaultOptions(path))
if err != nil {
Expand Down
30 changes: 29 additions & 1 deletion store/prefix.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package store

var _ KVStore = &PrefixKV{}
var _ Batch = &PrefixKVBatch{}

type PrefixKV struct {
kv KVStore
prefix []byte
Expand All @@ -23,5 +26,30 @@ func (p *PrefixKV) Set(key []byte, value []byte) error {
func (p *PrefixKV) Delete(key []byte) error {
return p.kv.Delete(append(p.prefix, key...))
}
func (p *PrefixKV) NewBatch() Batch {
return &PrefixKVBatch{
b: p.kv.NewBatch(),
prefix: p.prefix,
}
}

var _ KVStore = &PrefixKV{}
type PrefixKVBatch struct {
b Batch
prefix []byte
}

func (pb *PrefixKVBatch) Set(key, value []byte) error {
return pb.b.Set(append(pb.prefix, key...), value)
}

func (pb *PrefixKVBatch) Delete(key []byte) error {
return pb.b.Delete(append(pb.prefix, key...))
}

func (pb *PrefixKVBatch) Commit() error {
return pb.b.Commit()
}

func (pb *PrefixKVBatch) Discard() {
pb.b.Discard()
}
35 changes: 34 additions & 1 deletion store/prefix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestPrefixKV(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

base := NewInMemoryKVStore()
base := NewDefaultInMemoryKVStore()

p1 := NewPrefixKV(base, []byte{1})
p2 := NewPrefixKV(base, []byte{2})
Expand Down Expand Up @@ -71,3 +71,36 @@ func TestPrefixKV(t *testing.T) {
require.NoError(err)
assert.Equal(val22, v)
}

func TestPrefixKVBatch(t *testing.T) {
t.Parallel()

assert := assert.New(t)
require := require.New(t)

basekv := NewDefaultInMemoryKVStore()
prefixkv := NewPrefixKV(basekv, []byte("prefix1"))
prefixbatchkv1 := prefixkv.NewBatch()

keys := [][]byte{[]byte("key1"), []byte("key2"), []byte("key3"), []byte("key4")}
values := [][]byte{[]byte("value1"), []byte("value2"), []byte("value3"), []byte("value4")}

for i := 0; i < len(keys); i++ {
err := prefixbatchkv1.Set(keys[i], values[i])
require.NoError(err)
}

err := prefixbatchkv1.Commit()
require.NoError(err)

for i := 0; i < len(keys); i++ {
vals, err := prefixkv.Get(keys[i])
assert.Equal(vals, values[i])
require.NoError(err)
}

prefixbatchkv2 := prefixkv.NewBatch()
err = prefixbatchkv2.Delete([]byte("key1"))
require.NoError(err)

}
18 changes: 13 additions & 5 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ var _ Store = &DefaultStore{}

// New returns new, default store.
func New(kv KVStore) Store {
return &DefaultStore{db: kv}
return &DefaultStore{
db: kv,
}
}

// Height returns height of the highest block saved in the Store.
Expand All @@ -59,12 +61,18 @@ func (s *DefaultStore) SaveBlock(block *types.Block, commit *types.Commit) error

s.mtx.Lock()
defer s.mtx.Unlock()
// TODO(tzdybal): use transaction for consistency of DB (https://github.com/celestiaorg/optimint/issues/80)
err = multierr.Append(err, s.db.Set(getBlockKey(hash), blockBlob))
err = multierr.Append(err, s.db.Set(getCommitKey(hash), commitBlob))
err = multierr.Append(err, s.db.Set(getIndexKey(block.Header.Height), hash[:]))

bb := s.db.NewBatch()
err = multierr.Append(err, bb.Set(getBlockKey(hash), blockBlob))
err = multierr.Append(err, bb.Set(getCommitKey(hash), commitBlob))
err = multierr.Append(err, bb.Set(getIndexKey(block.Header.Height), hash[:]))

if err != nil {
bb.Discard()
return err
}

if err = bb.Commit(); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestBlockstoreHeight(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
assert := assert.New(t)
bstore := New(NewInMemoryKVStore())
bstore := New(NewDefaultInMemoryKVStore())
assert.Equal(uint64(0), bstore.Height())

for _, block := range c.blocks {
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestBlockstoreLoad(t *testing.T) {
}
}()

for _, kv := range []KVStore{NewInMemoryKVStore(), NewKVStore(tmpDir, "db", "test")} {
for _, kv := range []KVStore{NewDefaultInMemoryKVStore(), NewDefaultKVStore(tmpDir, "db", "test")} {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
assert := assert.New(t)
Expand Down

0 comments on commit 42da76f

Please sign in to comment.