Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug, add method Resize #33

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 73 additions & 42 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ import (
"github.com/cespare/xxhash"
)

const (
MIN_CACHE_SIZE = 512 * 1024
SEGMENT_NUMBER = 256
SEG_HASH_REGION = 255
)

type Cache struct {
locks [256]sync.Mutex
segments [256]segment
hitCount int64
missCount int64
locks [SEGMENT_NUMBER]sync.Mutex
segments [SEGMENT_NUMBER]segment
cacheSize int
}

func hashFunc(data []byte) uint64 {
Expand All @@ -24,12 +29,13 @@ func hashFunc(data []byte) uint64 {
// `debug.SetGCPercent()`, set it to a much smaller value
// to limit the memory consumption and GC pause time.
func NewCache(size int) (cache *Cache) {
if size < 512*1024 {
size = 512 * 1024
if size < MIN_CACHE_SIZE {
size = MIN_CACHE_SIZE
}
cache = new(Cache)
for i := 0; i < 256; i++ {
cache.segments[i] = newSegment(size/256, i)
cache.cacheSize = size
for i := 0; i < SEGMENT_NUMBER; i++ {
cache.segments[i] = newSegment(size/SEGMENT_NUMBER, i)
}
return
}
Expand All @@ -39,7 +45,7 @@ func NewCache(size int) (cache *Cache) {
// but it can be evicted when cache is full.
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
hashVal := hashFunc(key)
segId := hashVal & 255
segId := hashVal & SEG_HASH_REGION
cache.locks[segId].Lock()
err = cache.segments[segId].set(key, value, hashVal, expireSeconds)
cache.locks[segId].Unlock()
Expand All @@ -49,43 +55,33 @@ func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
// Get the value or not found error.
func (cache *Cache) Get(key []byte) (value []byte, err error) {
hashVal := hashFunc(key)
segId := hashVal & 255
segId := hashVal & SEG_HASH_REGION
cache.locks[segId].Lock()
value, _, err = cache.segments[segId].get(key, hashVal)
cache.locks[segId].Unlock()
if err == nil {
atomic.AddInt64(&cache.hitCount, 1)
} else {
atomic.AddInt64(&cache.missCount, 1)
}
return
}

// Get the value or not found error.
func (cache *Cache) GetWithExpiration(key []byte) (value []byte, expireAt uint32, err error) {
hashVal := hashFunc(key)
segId := hashVal & 255
segId := hashVal & SEG_HASH_REGION
cache.locks[segId].Lock()
value, expireAt, err = cache.segments[segId].get(key, hashVal)
cache.locks[segId].Unlock()
if err == nil {
atomic.AddInt64(&cache.hitCount, 1)
} else {
atomic.AddInt64(&cache.missCount, 1)
}
return
}

func (cache *Cache) TTL(key []byte) (timeLeft uint32, err error) {
hashVal := hashFunc(key)
segId := hashVal & 255
segId := hashVal & SEG_HASH_REGION
timeLeft, err = cache.segments[segId].ttl(key, hashVal)
return
}

func (cache *Cache) Del(key []byte) (affected bool) {
hashVal := hashFunc(key)
segId := hashVal & 255
segId := hashVal & SEG_HASH_REGION
cache.locks[segId].Lock()
affected = cache.segments[segId].del(key, hashVal)
cache.locks[segId].Unlock()
Expand Down Expand Up @@ -117,22 +113,28 @@ func (cache *Cache) DelInt(key int64) (affected bool) {
}

func (cache *Cache) EvacuateCount() (count int64) {
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
count += atomic.LoadInt64(&cache.segments[i].totalEvacuate)
}
return
}

func (cache *Cache) ExpiredCount() (count int64) {
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
count += atomic.LoadInt64(&cache.segments[i].totalExpired)
}
return
}

func (cache *Cache) EntryCount() (entryCount int64) {
for i := 0; i < 256; i++ {
entryCount += atomic.LoadInt64(&cache.segments[i].entryCount)
it := cache.NewIterator()
for {
entry := it.Next()
if entry == nil {
break
}
//entryCount =
atomic.AddInt64(&entryCount, 1)
}
return
}
Expand All @@ -142,7 +144,7 @@ func (cache *Cache) EntryCount() (entryCount int64) {
// is about to be overwritten by new value.
func (cache *Cache) AverageAccessTime() int64 {
var entryCount, totalTime int64
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
totalTime += atomic.LoadInt64(&cache.segments[i].totalTime)
entryCount += atomic.LoadInt64(&cache.segments[i].totalCount)
}
Expand All @@ -153,47 +155,76 @@ func (cache *Cache) AverageAccessTime() int64 {
}
}

func (cache *Cache) HitCount() int64 {
return atomic.LoadInt64(&cache.hitCount)
func (cache *Cache) HitCount() (count int64) {
for i := range cache.segments {
count += atomic.LoadInt64(&cache.segments[i].hitCount)
}
return
}

func (cache *Cache) MissCount() (count int64) {
for i := range cache.segments {
count += atomic.LoadInt64(&cache.segments[i].missCount)
}
return
}

func (cache *Cache) LookupCount() int64 {
return atomic.LoadInt64(&cache.hitCount) + atomic.LoadInt64(&cache.missCount)
return cache.HitCount() + cache.MissCount()
}

func (cache *Cache) HitRate() float64 {
lookupCount := cache.LookupCount()
hitCount, missCount := cache.HitCount(), cache.MissCount()
lookupCount := hitCount + missCount
if lookupCount == 0 {
return 0
} else {
return float64(cache.HitCount()) / float64(lookupCount)
return float64(hitCount) / float64(lookupCount)
}
}

func (cache *Cache) OverwriteCount() (overwriteCount int64) {
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
overwriteCount += atomic.LoadInt64(&cache.segments[i].overwrites)
}
return
}

func (cache *Cache) Clear() {
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
cache.locks[i].Lock()
newSeg := newSegment(len(cache.segments[i].rb.data), i)
cache.segments[i] = newSeg
cache.segments[i].clear()
cache.locks[i].Unlock()
}
atomic.StoreInt64(&cache.hitCount, 0)
atomic.StoreInt64(&cache.missCount, 0)
}

func (cache *Cache) ResetStatistics() {
atomic.StoreInt64(&cache.hitCount, 0)
atomic.StoreInt64(&cache.missCount, 0)
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
cache.locks[i].Lock()
cache.segments[i].resetStatistics()
cache.locks[i].Unlock()
}
}

func (cache *Cache) Resize(newSize int) {
size := newSize
if size < MIN_CACHE_SIZE {
size = MIN_CACHE_SIZE
}

if size == cache.cacheSize {
return
}

for i := 0; i < SEGMENT_NUMBER; i++ {
cache.locks[i].Lock()
if size > cache.cacheSize {
cache.segments[i].resize(size / SEGMENT_NUMBER)
} else {
//discard all data
newSeg := newSegment(len(cache.segments[i].rb.data), i)
cache.segments[i] = newSeg
}
cache.locks[i].Unlock()
}
}
140 changes: 140 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"crypto/rand"
"encoding/binary"
"fmt"
mrand "math/rand"
"strings"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -186,6 +188,34 @@ func TestExpire(t *testing.T) {
}
}

func TestEntryCount(t *testing.T) {
cache := NewCache(1024)
key := []byte("key1")
val := []byte("value1")
err := cache.Set(key, val, 1)
if err != nil {
t.Error("err should be nil")
}

key2 := []byte("key2")
val2 := []byte("value2")
err = cache.Set(key2, val2, 3)
if err != nil {
t.Error("err should be nil")
}

entryCount := cache.EntryCount()
if entryCount != 2 {
t.Error("entryCount should be 2")
}
time.Sleep(time.Second)

entryCount = cache.EntryCount()
if entryCount != 1 {
t.Error("entryCount should be 1")
}
}

func TestTTL(t *testing.T) {
cache := NewCache(1024)
key := []byte("abcd")
Expand Down Expand Up @@ -380,6 +410,116 @@ func TestIterator(t *testing.T) {
}
}

func TestSetLargerEntryDeletesWrongEntry(t *testing.T) {
cachesize := 512 * 1024
cache := NewCache(cachesize)

value1 := "aaa"
key1 := []byte("key1")
value := value1
cache.Set(key1, []byte(value), 0)

it := cache.NewIterator()
entry := it.Next()
if !bytes.Equal(entry.Key, key1) {
t.Fatalf("key %s not equal to %s", entry.Key, key1)
}
if !bytes.Equal(entry.Value, []byte(value)) {
t.Fatalf("value %s not equal to %s", entry.Value, value)
}
entry = it.Next()
if entry != nil {
t.Fatalf("expected nil entry but got %s %s", entry.Key, entry.Value)
}

value = value1 + "XXXXXX"
cache.Set(key1, []byte(value), 0)

value = value1 + "XXXXYYYYYYY"
cache.Set(key1, []byte(value), 0)
it = cache.NewIterator()
entry = it.Next()
if !bytes.Equal(entry.Key, key1) {
t.Fatalf("key %s not equal to %s", entry.Key, key1)
}
if !bytes.Equal(entry.Value, []byte(value)) {
t.Fatalf("value %s not equal to %s", entry.Value, value)
}
entry = it.Next()
if entry != nil {
t.Fatalf("expected nil entry but got %s %s", entry.Key, entry.Value)
}
}

func TestRace(t *testing.T) {
cache := NewCache(MIN_CACHE_SIZE)
inUse := 8
wg := sync.WaitGroup{}
var iters int64 = 1000

wg.Add(6)
addFunc := func() {
var i int64
for i = 0; i < iters; i++ {
err := cache.SetInt(int64(mrand.Intn(inUse)), []byte("abc"), 1)
if err != nil {
t.Errorf("err: %s", err)
}
}
wg.Done()
}
getFunc := func() {
var i int64
for i = 0; i < iters; i++ {
_, _ = cache.GetInt(int64(mrand.Intn(inUse))) //it will likely error w/ delFunc running too
}
wg.Done()
}
delFunc := func() {
var i int64
for i = 0; i < iters; i++ {
cache.DelInt(int64(mrand.Intn(inUse)))
}
wg.Done()
}
evacFunc := func() {
var i int64
for i = 0; i < iters; i++ {
_ = cache.EvacuateCount()
_ = cache.ExpiredCount()
_ = cache.EntryCount()
_ = cache.AverageAccessTime()
_ = cache.HitCount()
_ = cache.LookupCount()
_ = cache.HitRate()
_ = cache.OverwriteCount()
}
wg.Done()
}
resetFunc := func() {
var i int64
for i = 0; i < iters; i++ {
cache.ResetStatistics()
}
wg.Done()
}
clearFunc := func() {
var i int64
for i = 0; i < iters; i++ {
cache.Clear()
}
wg.Done()
}

go addFunc()
go getFunc()
go delFunc()
go evacFunc()
go resetFunc()
go clearFunc()
wg.Wait()
}

func BenchmarkCacheSet(b *testing.B) {
cache := NewCache(256 * 1024 * 1024)
var key [8]byte
Expand Down
Loading