Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug, add method Resize #33

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ import (
"github.com/cespare/xxhash"
)

const (
MIN_CACHE_SIZE = 512 * 1024
SEGMENT_NUMBER = 256
SEG_HASH_REGION = 255
)

type Cache struct {
locks [256]sync.Mutex
segments [256]segment
locks [SEGMENT_NUMBER]sync.Mutex
segments [SEGMENT_NUMBER]segment
cacheSize int
hitCount int64
missCount int64
}
Expand All @@ -24,12 +31,13 @@ func hashFunc(data []byte) uint64 {
// `debug.SetGCPercent()`, set it to a much smaller value
// to limit the memory consumption and GC pause time.
func NewCache(size int) (cache *Cache) {
if size < 512*1024 {
size = 512 * 1024
if size < MIN_CACHE_SIZE {
size = MIN_CACHE_SIZE
}
cache = new(Cache)
for i := 0; i < 256; i++ {
cache.segments[i] = newSegment(size/256, i)
cache.cacheSize = size
for i := 0; i < SEGMENT_NUMBER; i++ {
cache.segments[i] = newSegment(size/SEGMENT_NUMBER, i)
}
return
}
Expand All @@ -39,7 +47,7 @@ func NewCache(size int) (cache *Cache) {
// but it can be evicted when cache is full.
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
hashVal := hashFunc(key)
segId := hashVal & 255
segId := hashVal & SEG_HASH_REGION
cache.locks[segId].Lock()
err = cache.segments[segId].set(key, value, hashVal, expireSeconds)
cache.locks[segId].Unlock()
Expand All @@ -49,7 +57,7 @@ func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) {
// Get the value or not found error.
func (cache *Cache) Get(key []byte) (value []byte, err error) {
hashVal := hashFunc(key)
segId := hashVal & 255
segId := hashVal & SEG_HASH_REGION
cache.locks[segId].Lock()
value, _, err = cache.segments[segId].get(key, hashVal)
cache.locks[segId].Unlock()
Expand All @@ -64,7 +72,7 @@ func (cache *Cache) Get(key []byte) (value []byte, err error) {
// Get the value or not found error.
func (cache *Cache) GetWithExpiration(key []byte) (value []byte, expireAt uint32, err error) {
hashVal := hashFunc(key)
segId := hashVal & 255
segId := hashVal & SEG_HASH_REGION
cache.locks[segId].Lock()
value, expireAt, err = cache.segments[segId].get(key, hashVal)
cache.locks[segId].Unlock()
Expand All @@ -78,14 +86,14 @@ func (cache *Cache) GetWithExpiration(key []byte) (value []byte, expireAt uint32

func (cache *Cache) TTL(key []byte) (timeLeft uint32, err error) {
hashVal := hashFunc(key)
segId := hashVal & 255
segId := hashVal & SEG_HASH_REGION
timeLeft, err = cache.segments[segId].ttl(key, hashVal)
return
}

func (cache *Cache) Del(key []byte) (affected bool) {
hashVal := hashFunc(key)
segId := hashVal & 255
segId := hashVal & SEG_HASH_REGION
cache.locks[segId].Lock()
affected = cache.segments[segId].del(key, hashVal)
cache.locks[segId].Unlock()
Expand Down Expand Up @@ -117,21 +125,21 @@ func (cache *Cache) DelInt(key int64) (affected bool) {
}

func (cache *Cache) EvacuateCount() (count int64) {
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
count += atomic.LoadInt64(&cache.segments[i].totalEvacuate)
}
return
}

func (cache *Cache) ExpiredCount() (count int64) {
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
count += atomic.LoadInt64(&cache.segments[i].totalExpired)
}
return
}

func (cache *Cache) EntryCount() (entryCount int64) {
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
entryCount += atomic.LoadInt64(&cache.segments[i].entryCount)
}
return
Expand All @@ -142,7 +150,7 @@ func (cache *Cache) EntryCount() (entryCount int64) {
// is about to be overwritten by new value.
func (cache *Cache) AverageAccessTime() int64 {
var entryCount, totalTime int64
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
totalTime += atomic.LoadInt64(&cache.segments[i].totalTime)
entryCount += atomic.LoadInt64(&cache.segments[i].totalCount)
}
Expand Down Expand Up @@ -171,14 +179,14 @@ func (cache *Cache) HitRate() float64 {
}

func (cache *Cache) OverwriteCount() (overwriteCount int64) {
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
overwriteCount += atomic.LoadInt64(&cache.segments[i].overwrites)
}
return
}

func (cache *Cache) Clear() {
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
cache.locks[i].Lock()
newSeg := newSegment(len(cache.segments[i].rb.data), i)
cache.segments[i] = newSeg
Expand All @@ -188,10 +196,29 @@ func (cache *Cache) Clear() {
atomic.StoreInt64(&cache.missCount, 0)
}

func (cache *Cache) Resize(newSize int) {
size := newSize
if size < MIN_CACHE_SIZE {
size = MIN_CACHE_SIZE
}

for i := 0; i < SEGMENT_NUMBER; i++ {
cache.locks[i].Lock()
if size >= cache.cacheSize {
cache.segments[i].resize(size / SEGMENT_NUMBER)
} else {
//discard all data
newSeg := newSegment(len(cache.segments[i].rb.data), i)
cache.segments[i] = newSeg
}
cache.locks[i].Unlock()
}
}

func (cache *Cache) ResetStatistics() {
atomic.StoreInt64(&cache.hitCount, 0)
atomic.StoreInt64(&cache.missCount, 0)
for i := 0; i < 256; i++ {
for i := 0; i < SEGMENT_NUMBER; i++ {
cache.locks[i].Lock()
cache.segments[i].resetStatistics()
cache.locks[i].Unlock()
Expand Down
20 changes: 17 additions & 3 deletions ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,22 +213,36 @@ func (rb *RingBuf) Resize(newSize int) {
if len(rb.data) == newSize {
return
}
newData := make([]byte, newSize)

newIndex := rb.index
var offset int
if rb.end-rb.begin == int64(len(rb.data)) {
offset = rb.index
if int(rb.end-rb.begin) > newSize {
offset = rb.index
} else {
offset = 0
}
newIndex = len(rb.data)
}

if int(rb.end-rb.begin) > newSize {
newIndex = 0
discard := int(rb.end-rb.begin) - newSize
offset = (offset + discard) % len(rb.data)
rb.begin = rb.end - int64(newSize)
} else if len(rb.data) > newSize {
offset = 0
newIndex = int(rb.end-rb.begin) % newSize
}

newData := make([]byte, newSize)
n := copy(newData, rb.data[offset:])
if n < newSize {
copy(newData[n:], rb.data[:offset])
}

rb.index = newIndex
rb.data = newData
rb.index = 0
}

func (rb *RingBuf) Skip(length int64) {
Expand Down
53 changes: 33 additions & 20 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@ import (
"unsafe"
)

const HASH_ENTRY_SIZE = 16
const ENTRY_HDR_SIZE = 24
const (
HASH_ENTRY_SIZE = 16
ENTRY_HDR_SIZE = 24

var ErrLargeKey = errors.New("The key is larger than 65535")
var ErrLargeEntry = errors.New("The entry size is larger than 1/1024 of cache size")
var ErrNotFound = errors.New("Entry not found")
MAX_LARGE_KEY = 65535
SLOT_COUNT = 256
)

var (
ErrLargeKey = errors.New("The key is larger than 65535")
ErrLargeEntry = errors.New("The entry size is larger than 1/1024 of cache size")
ErrNotFound = errors.New("Entry not found")
)

// entry pointer struct points to an entry in ring buffer
type entryPtr struct {
Expand Down Expand Up @@ -40,28 +47,28 @@ type segment struct {
rb RingBuf // ring buffer that stores data
segId int
entryCount int64
totalCount int64 // number of entries in ring buffer, including deleted entries.
totalTime int64 // used to calculate least recent used entry.
totalEvacuate int64 // used for debug
totalExpired int64 // used for debug
overwrites int64 // used for debug
vacuumLen int64 // up to vacuumLen, new data can be written without overwriting old data.
slotLens [256]int32 // The actual length for every slot.
slotCap int32 // max number of entry pointers a slot can hold.
slotsData []entryPtr // shared by all 256 slots
totalCount int64 // number of entries in ring buffer, including deleted entries.
totalTime int64 // used to calculate least recent used entry.
totalEvacuate int64 // used for debug
totalExpired int64 // used for debug
overwrites int64 // used for debug
vacuumLen int64 // up to vacuumLen, new data can be written without overwriting old data.
slotLens [SLOT_COUNT]int32 // The actual length for every slot.
slotCap int32 // max number of entry pointers a slot can hold.
slotsData []entryPtr // shared by all 256 slots
}

func newSegment(bufSize int, segId int) (seg segment) {
seg.rb = NewRingBuf(bufSize, 0)
seg.segId = segId
seg.vacuumLen = int64(bufSize)
seg.slotCap = 1
seg.slotsData = make([]entryPtr, 256*seg.slotCap)
seg.slotsData = make([]entryPtr, SLOT_COUNT*seg.slotCap)
return
}

func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) {
if len(key) > 65535 {
if len(key) > MAX_LARGE_KEY {
return ErrLargeKey
}
maxKeyValLen := len(seg.rb.data)/4 - ENTRY_HDR_SIZE
Expand Down Expand Up @@ -103,11 +110,11 @@ func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (e
return
}
// avoid unnecessary memory copy.
seg.delEntryPtr(slotId, hash16, seg.slotsData[idx].offset)
seg.delEntryPtr(slotId, hash16, slot[idx].offset)
match = false
// increase capacity and limit entry len.
for hdr.valCap < hdr.valLen {
hdr.valCap *= 2
hdr.valCap *= hdr.valCap
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*= 2

}
if hdr.valCap > uint32(maxKeyValLen-len(key)) {
hdr.valCap = uint32(maxKeyValLen - len(key))
Expand Down Expand Up @@ -263,8 +270,8 @@ func (seg *segment) ttl(key []byte, hashVal uint64) (timeLeft uint32, err error)
}

func (seg *segment) expand() {
newSlotData := make([]entryPtr, seg.slotCap*2*256)
for i := 0; i < 256; i++ {
newSlotData := make([]entryPtr, seg.slotCap*2*SLOT_COUNT)
for i := 0; i < SLOT_COUNT; i++ {
off := int32(i) * seg.slotCap
copy(newSlotData[off*2:], seg.slotsData[off:off+seg.slotLens[i]])
}
Expand Down Expand Up @@ -366,3 +373,9 @@ func (seg *segment) resetStatistics() {
seg.totalExpired = 0
seg.overwrites = 0
}

func (seg *segment) resize(newsize int) {
oldSize := seg.rb.Size()
seg.rb.Resize(newsize)
seg.vacuumLen += int64(newsize) - oldSize
}