diff --git a/db.go b/db.go index ba060cc57..45ed3e59b 100644 --- a/db.go +++ b/db.go @@ -746,6 +746,8 @@ func (db *DB) getMemTables() ([]*memTable, func()) { // get returns the value in memtable or disk for given key. // Note that value will include meta byte. // +// getBatch would return the values of list of keys in order +// // IMPORTANT: We should never write an entry with an older timestamp for the same key, We need to // maintain this invariant to search for the latest value of a key, or else we need to search in all // tables and find the max version among them. To maintain this invariant, we also need to ensure @@ -757,7 +759,54 @@ func (db *DB) getMemTables() ([]*memTable, func()) { // do that. For every get("fooX") call where X is the version, we will search // for "fooX" in all the levels of the LSM tree. This is expensive but it // removes the overhead of handling move keys completely. +func (db *DB) getBatch(keys [][]byte, done []bool) ([]y.ValueStruct, error) { + if db.IsClosed() { + return []y.ValueStruct{}, ErrDBClosed + } + tables, decr := db.getMemTables() // Lock should be released. + defer decr() + + maxVs := make([]y.ValueStruct, len(keys)) + + y.NumGetsAdd(db.opt.MetricsEnabled, 1) + // For memtable, we need to check every memtable each time + for j, key := range keys { + if done[j] { + continue + } + version := y.ParseTs(key) + for i := 0; i < len(tables); i++ { + vs := tables[i].sl.Get(key) + y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1) + if vs.Meta == 0 && vs.Value == nil { + continue + } + // Found the required version of the key, mark as done, no need to process + // it further + if vs.Version == version { + y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1) + maxVs[j] = vs + done[j] = true + break + } + if maxVs[j].Version < vs.Version { + maxVs[j] = vs + } + } + } + return db.lc.getBatch(keys, maxVs, 0, done) +} + func (db *DB) get(key []byte) (y.ValueStruct, error) { + if db.opt.useGetBatch { + done := make([]bool, 1) + vals, err := db.getBatch([][]byte{key}, done) + if len(vals) != 0 { + return vals[0], err + } + return y.ValueStruct{}, err + } + if db.IsClosed() { return y.ValueStruct{}, ErrDBClosed } diff --git a/go.mod b/go.mod index bd48bf6c5..c65e16747 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/google/flatbuffers v24.3.25+incompatible github.com/klauspost/compress v1.17.9 github.com/pkg/errors v0.9.1 + github.com/pkg/profile v1.7.0 github.com/spf13/cobra v1.8.1 github.com/stretchr/testify v1.9.0 go.opencensus.io v0.24.0 @@ -20,7 +21,9 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/felixge/fgprof v0.9.3 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect + github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/go.sum b/go.sum index 8a6bb9ec3..2cde9a348 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,9 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -18,6 +21,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g= +github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -44,7 +49,10 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/pprof v0.0.0-20211214055906-6f57359322fd h1:1FjCyPC+syAzJ5/2S8fqdZK1R22vvA0J7JZKcuOIQ7Y= +github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -53,6 +61,8 @@ github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2 github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA= +github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -103,6 +113,7 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/level_handler.go b/level_handler.go index fc81cc452..a603d81aa 100644 --- a/level_handler.go +++ b/level_handler.go @@ -273,6 +273,113 @@ func (s *levelHandler) getTableForKey(key []byte) ([]*table.Table, func() error) return []*table.Table{tbl}, tbl.DecrRef } +func (s *levelHandler) getBatch(keys [][]byte, done []bool) ([]y.ValueStruct, error) { + // Find the table for which the key is in, and then seek it + getForKey := func(key []byte) (y.ValueStruct, func() error, []*table.Iterator) { + tables, decr := s.getTableForKey(key) + keyNoTs := y.ParseKey(key) + itrs := make([]*table.Iterator, 0) + + hash := y.Hash(keyNoTs) + var maxVs y.ValueStruct + for _, th := range tables { + if th.DoesNotHave(hash) { + y.NumLSMBloomHitsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1) + continue + } + + it := th.NewIterator(0) + itrs = append(itrs, it) + + y.NumLSMGetsAdd(s.db.opt.MetricsEnabled, s.strLevel, 1) + it.Seek(key) + if !it.Valid() { + continue + } + if y.SameKey(key, it.Key()) { + if version := y.ParseTs(it.Key()); maxVs.Version < version { + maxVs = it.ValueCopy() + maxVs.Version = version + } + } + } + + return maxVs, decr, itrs + } + + // Use old results from getForKey and find in those tables. + findInIter := func(key []byte, itrs []*table.Iterator) y.ValueStruct { + var maxVs y.ValueStruct + + for _, it := range itrs { + it.Seek(key) + if !it.Valid() { + continue + } + if y.SameKey(key, it.Key()) { + if version := y.ParseTs(it.Key()); maxVs.Version < version { + maxVs = it.ValueCopy() + maxVs.Version = version + } + } + } + + return maxVs + } + + results := make([]y.ValueStruct, len(keys)) + // For L0, we need to search all tables each time, so we can just call get() as required + if s.level == 0 { + var err error + for i, key := range keys { + if done[i] { + continue + } + results[i], err = s.get(key) + if err != nil { + return results, err + } + } + return results, nil + } else { + decr := func() error { return nil } + var itrs []*table.Iterator + + started := false + for i := 0; i < len(keys); i++ { + if done[i] { + continue + } + if !started { + var maxVs y.ValueStruct + maxVs, decr, itrs = getForKey(keys[0]) + results[i] = maxVs + started = true + } else { + results[i] = findInIter(keys[i], itrs) + // If we can't find in the current tables, maybe the + // data is there in other tables + if len(results[i].Value) == 0 { + for i := 0; i < len(itrs); i++ { + itrs[i].Close() + } + err := decr() + if err != nil { + return nil, err + } + results[i], decr, itrs = getForKey(keys[i]) + } + } + } + + for i := 0; i < len(itrs); i++ { + itrs[i].Close() + } + return results, decr() + } + +} + // get returns value for a given key or the key after that. If not found, return nil. func (s *levelHandler) get(key []byte) (y.ValueStruct, error) { tables, decr := s.getTableForKey(key) diff --git a/levels.go b/levels.go index 6bbaf55ca..ba23683c4 100644 --- a/levels.go +++ b/levels.go @@ -1601,6 +1601,55 @@ func (s *levelsController) close() error { return y.Wrap(err, "levelsController.Close") } +func (s *levelsController) getBatch(keys [][]byte, maxVs []y.ValueStruct, startLevel int, done []bool) ( + []y.ValueStruct, error) { + if s.kv.IsClosed() { + return []y.ValueStruct{}, ErrDBClosed + } + // It's important that we iterate the levels from 0 on upward. The reason is, if we iterated + // in opposite order, or in parallel (naively calling all the h.RLock() in some order) we could + // read level L's tables post-compaction and level L+1's tables pre-compaction. (If we do + // parallelize this, we will need to call the h.RLock() function by increasing order of level + // number.) + for _, h := range s.levels { + // Ignore all levels below startLevel. This is useful for GC when L0 is kept in memory. + if h.level < startLevel { + continue + } + vs, err := h.getBatch(keys, done) // Calls h.RLock() and h.RUnlock(). + if err != nil { + return []y.ValueStruct{}, y.Wrapf(err, "get keys: %q", keys) + } + + for i, v := range vs { + // Done is only update by this function or one in db. levelhandler will + // not update done. No need to do anything is done is set. + if done[i] { + continue + } + if v.Value == nil && v.Meta == 0 { + continue + } + y.NumBytesReadsLSMAdd(s.kv.opt.MetricsEnabled, int64(len(v.Value))) + version := y.ParseTs(keys[i]) + if v.Version == version { + maxVs[i] = v + done[i] = true + } + if maxVs[i].Version < v.Version { + maxVs[i] = v + } + } + } + + for i := 0; i < len(maxVs); i++ { + if len(maxVs[i].Value) > 0 { + y.NumGetsWithResultsAdd(s.kv.opt.MetricsEnabled, 1) + } + } + return maxVs, nil +} + // get searches for a given key in all the levels of the LSM tree. It returns // key version <= the expected version (version in key). If not found, // it returns an empty y.ValueStruct. diff --git a/options.go b/options.go index bb6131b30..52416352e 100644 --- a/options.go +++ b/options.go @@ -129,6 +129,9 @@ type Options struct { maxBatchSize int64 // max batch size in bytes maxValueThreshold float64 + + // This would let you use get batch instead of get, an experimental api instead + useGetBatch bool } // DefaultOptions sets a list of recommended options for good performance. @@ -187,6 +190,7 @@ func DefaultOptions(path string) Options { EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days. DetectConflicts: true, NamespaceOffset: -1, + useGetBatch: true, } } diff --git a/txn.go b/txn.go index 438af8d5d..21c13b558 100644 --- a/txn.go +++ b/txn.go @@ -440,6 +440,83 @@ func (txn *Txn) Delete(key []byte) error { return txn.modify(e) } +func (txn *Txn) GetBatch(keys [][]byte) (items []*Item, rerr error) { + if txn.discarded { + return nil, ErrDiscardedTxn + } + + for _, key := range keys { + if len(key) == 0 { + return nil, ErrEmptyKey + } + if err := txn.db.isBanned(key); err != nil { + return nil, err + } + } + + items = make([]*Item, len(keys)) + done := make([]bool, len(keys)) + + if txn.update { + doneAll := 0 + for i, key := range keys { + item := items[i] + if e, has := txn.pendingWrites[string(key)]; has && bytes.Equal(key, e.Key) { + if isDeletedOrExpired(e.meta, e.ExpiresAt) { + return nil, ErrKeyNotFound + } + // Fulfill from cache. + item.meta = e.meta + item.val = e.Value + item.userMeta = e.UserMeta + item.key = key + item.status = prefetched + item.version = txn.readTs + item.expiresAt = e.ExpiresAt + // We probably don't need to set db on item here. + done[i] = true + doneAll += 1 + } + // Only track reads if this is update txn. No need to track read if txn serviced it + // internally. + txn.addReadKey(key) + } + if doneAll == len(keys) { + return items, nil + } + } + + seeks := make([][]byte, len(keys)) + for i, key := range keys { + seeks[i] = y.KeyWithTs(key, txn.readTs) + } + vss, err := txn.db.getBatch(seeks, done) + if err != nil { + return nil, y.Wrapf(err, "DB::Get keys: %q", keys) + } + + for i, vs := range vss { + if vs.Value == nil && vs.Meta == 0 { + items[i] = nil + } + if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) { + items[i] = nil + } + + items[i] = new(Item) + items[i].key = keys[i] + items[i].version = vs.Version + items[i].meta = vs.Meta + items[i].userMeta = vs.UserMeta + items[i].vptr = y.SafeCopy(items[i].vptr, vs.Value) + items[i].txn = txn + items[i].expiresAt = vs.ExpiresAt + } + + return items, nil + +} + // Get looks for key and returns corresponding Item. // If key is not found, ErrKeyNotFound is returned. func (txn *Txn) Get(key []byte) (item *Item, rerr error) { diff --git a/txn_test.go b/txn_test.go index 3830855fe..da4985e89 100644 --- a/txn_test.go +++ b/txn_test.go @@ -32,6 +32,45 @@ import ( "github.com/dgraph-io/ristretto/z" ) +func TestTxnSimpleTsRead(t *testing.T) { + dir, err := os.MkdirTemp("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + opts := getTestOptions(dir) + opts.Dir = dir + opts.ValueDir = dir + + opts.managedTxns = true + + db, err := Open(opts) + require.NoError(t, err) + defer func() { + require.NoError(t, db.Close()) + }() + + for i := 0; i < 10; i++ { + txn := db.NewTransactionAt(uint64(i)+1, true) + k := []byte(fmt.Sprintf("key=%d", 1)) + v := []byte(fmt.Sprintf("val=%d", i)) + require.NoError(t, txn.SetEntry(NewEntry(k, v))) + err = txn.CommitAt(uint64(i)*3+1, nil) + require.NoError(t, err) + } + + for i := 7; i < 10; i++ { + txn := db.NewTransactionAt(uint64(i), false) + item, err := txn.Get([]byte("key=1")) + require.NoError(t, err) + + require.NoError(t, item.Value(func(val []byte) error { + require.Equal(t, []byte("val=2"), val) + return nil + })) + + txn.Discard() + } +} + func TestTxnSimple(t *testing.T) { runBadgerTest(t, nil, func(t *testing.T, db *DB) { txn := db.NewTransaction(true)