Skip to content

Commit

Permalink
added use get batch
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel committed Aug 27, 2024
1 parent b120e27 commit 723e58f
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 191 deletions.
68 changes: 35 additions & 33 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,39 +798,41 @@ func (db *DB) getBatch(keys [][]byte, done []bool) ([]y.ValueStruct, error) {
}

func (db *DB) get(key []byte) (y.ValueStruct, error) {
done := make([]bool, 1)
vals, err := db.getBatch([][]byte{key}, done)
if len(vals) != 0 {
return vals[0], err
}
return y.ValueStruct{}, err

//if db.IsClosed() {
// return y.ValueStruct{}, ErrDBClosed
//}
//tables, decr := db.getMemTables() // Lock should be released.
//defer decr()

//var maxVs y.ValueStruct
//version := y.ParseTs(key)

//y.NumGetsAdd(db.opt.MetricsEnabled, 1)
//for i := 0; i < len(tables); i++ {
// vs := tables[i].sl.Get(key)
// y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1)
// if vs.Meta == 0 && vs.Value == nil {
// continue
// }
// // Found the required version of the key, return immediately.
// if vs.Version == version {
// y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
// return vs, nil
// }
// if maxVs.Version < vs.Version {
// maxVs = vs
// }
//}
//return db.lc.get(key, maxVs, 0)
if db.opt.useGetBatch {
done := make([]bool, 1)
vals, err := db.getBatch([][]byte{key}, done)
if len(vals) != 0 {
return vals[0], err
}
return y.ValueStruct{}, err
}

if db.IsClosed() {
return y.ValueStruct{}, ErrDBClosed
}
tables, decr := db.getMemTables() // Lock should be released.
defer decr()

var maxVs y.ValueStruct
version := y.ParseTs(key)

y.NumGetsAdd(db.opt.MetricsEnabled, 1)
for i := 0; i < len(tables); i++ {
vs := tables[i].sl.Get(key)
y.NumMemtableGetsAdd(db.opt.MetricsEnabled, 1)
if vs.Meta == 0 && vs.Value == nil {
continue
}
// Found the required version of the key, return immediately.
if vs.Version == version {
y.NumGetsWithResultsAdd(db.opt.MetricsEnabled, 1)
return vs, nil
}
if maxVs.Version < vs.Version {
maxVs = vs
}
}
return db.lc.get(key, maxVs, 0)
}

var requestPool = sync.Pool{
Expand Down
158 changes: 0 additions & 158 deletions discard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,170 +17,12 @@
package badger

import (
"bufio"
"bytes"
"fmt"
"os"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/pkg/profile"
"github.com/stretchr/testify/require"
)

func farEnough(itrKey, key []byte) int {
n := len(itrKey)
m := len(key)
if m > n {
m = n
}

for i := 0; i < m; i++ {
if itrKey[i] != key[i] {
return m - i
}
}

return 0

}

type ByteSliceArray [][]byte

// Implementing the sort.Interface for ByteSliceArray

// Len returns the length of the ByteSliceArray.
func (b ByteSliceArray) Len() int {
return len(b)
}

// Less compares two byte arrays at given indices and returns true if the byte array at index i is less than the byte array at index j.
func (b ByteSliceArray) Less(i, j int) bool {
return bytesLessThan(b[i], b[j])
}

// Swap swaps the byte arrays at given indices.
func (b ByteSliceArray) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}

// bytesLessThan compares two byte arrays lexicographically.
func bytesLessThan(a, b []byte) bool {
return bytes.Compare(a, b) >= 0
}

func TestReadC(t *testing.T) {
t.Skip()
allKeysF, err := os.Open("/home/harshil/all_keys_2")
require.NoError(t, err)
defer allKeysF.Close()

scanner := bufio.NewScanner(allKeysF)
// optionally, resize scanner's capacity for lines over 64K, see next example
keysList := [][]byte{}
for scanner.Scan() {
f := strings.Fields(scanner.Text())
b := []byte{}
for _, c := range f {
ic, err := strconv.Atoi(c)
require.NoError(t, err)
b = append(b, uint8(ic))
}
keysList = append(keysList, b)
}

dir := "/home/harshil/data/p/"
opt := DefaultOptions(dir)
opt.managedTxns = true
opt.Compression = 0
opt.IndexCacheSize = 0
db, err := Open(opt)
require.NoError(t, err)

numCh := 64
numPer := len(keysList) / numCh

var wg sync.WaitGroup
defer profile.Start(profile.CPUProfile).Stop()

s := 0

calculateS := func(start int) {
m := 0

for i := start * numPer; i < start*numPer+numPer; i += 1 {
txn := db.NewTransactionAt(270005, false)

key := keysList[i]
item, err := txn.Get(key)
require.NoError(t, err)

item.Value(func(val []byte) error {
m += len(val) + len(key)
return nil
})
txn.Discard()
}
wg.Done()
s += m
}

calculate := func(start int) {
m := 0

num := 500
for i := start * numPer; i < start*numPer+numPer; i += num {
txn := db.NewTransactionAt(270005, false)

keys := ByteSliceArray{}
for j := i; j < start*numPer+numPer && j < i+num; j++ {
keys = append(keys, keysList[j])
}
sort.Sort(keys)
items, err := txn.GetBatch(keys)
require.NoError(t, err)

for j, item := range items {
item.Value(func(val []byte) error {
m += len(val) + len(keys[j])
return nil
})
}
txn.Discard()
}
wg.Done()
s += m
}

t1 := time.Now()
for i := 0; i < numCh; i++ {
wg.Add(1)
go func(startPos int) {
calculateS(startPos)
}(i)
}

wg.Wait()
fmt.Println(time.Since(t1), s)

s = 0
t1 = time.Now()
for i := 0; i < numCh; i++ {
wg.Add(1)
go func(startPos int) {
calculate(startPos)
}(i)
}

wg.Wait()

fmt.Println(time.Since(t1), s)
}

func TestDiscardStats(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ type Options struct {
maxBatchSize int64 // max batch size in bytes

maxValueThreshold float64

// This would let you use get batch instead of get, an experimental api instead
useGetBatch bool
}

// DefaultOptions sets a list of recommended options for good performance.
Expand Down Expand Up @@ -187,6 +190,7 @@ func DefaultOptions(path string) Options {
EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
DetectConflicts: true,
NamespaceOffset: -1,
useGetBatch: true,
}
}

Expand Down

0 comments on commit 723e58f

Please sign in to comment.