Skip to content

Commit

Permalink
refactor: remove global locking feature from the imagestore
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Aaron <[email protected]>
  • Loading branch information
andaaron committed Aug 17, 2024
1 parent 501c79b commit 7343d22
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 170 deletions.
111 changes: 47 additions & 64 deletions pkg/storage/imagestore/imagestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,40 +89,6 @@ func NewImageStore(rootDir string, cacheDir string, dedupe, commit bool, log zlo
return imgStore
}

// RLock read-lock.
func (is *ImageStore) RLock(lockStart *time.Time) {
*lockStart = time.Now()

is.lock.RLock()
}

// RUnlock read-unlock.
func (is *ImageStore) RUnlock(lockStart *time.Time) {
is.lock.RUnlock()

lockEnd := time.Now()
// includes time spent in acquiring and holding a lock
latency := lockEnd.Sub(*lockStart)
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RLOCK) // histogram
}

// Lock write-lock.
func (is *ImageStore) Lock(lockStart *time.Time) {
*lockStart = time.Now()

is.lock.Lock()
}

// Unlock write-unlock.
func (is *ImageStore) Unlock(lockStart *time.Time) {
is.lock.Unlock()

lockEnd := time.Now()
// includes time spent in acquiring and holding a lock
latency := lockEnd.Sub(*lockStart)
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram
}

// RLock read-lock for specific repo.
func (is *ImageStore) RLockRepo(repo string, lockStart *time.Time) {
*lockStart = time.Now()
Expand Down Expand Up @@ -296,13 +262,12 @@ func (is *ImageStore) ValidateRepo(name string) (bool, error) {

// GetRepositories returns a list of all the repositories under this store.
func (is *ImageStore) GetRepositories() ([]string, error) {
var lockLatency time.Time

// Ideally this function would lock while walking in order to avoid concurrency issues
// but we can't lock everything as we don't have a valid list of all repositories
// let's assume the result of this function is a best effort and some repos may be
// added or removed by the time it returns
dir := is.rootDir

is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)

stores := make([]string, 0)

err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error {
Expand Down Expand Up @@ -342,13 +307,12 @@ func (is *ImageStore) GetRepositories() ([]string, error) {

// GetNextRepository returns next repository under this store.
func (is *ImageStore) GetNextRepository(repo string) (string, error) {
var lockLatency time.Time

// Ideally this function would lock while walking in order to avoid concurrency issues
// but we can't lock everything as we don't have a valid list of all repositories
// let's assume the result of this function is a best effort and some repos may be
// added or removed by the time it returns
dir := is.rootDir

is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)

_, err := is.storeDriver.List(dir)
if err != nil {
if errors.As(err, &driver.PathNotFoundError{}) {
Expand Down Expand Up @@ -1190,8 +1154,6 @@ func (is *ImageStore) BlobPath(repo string, digest godigest.Digest) string {
}

func (is *ImageStore) GetAllDedupeReposCandidates(digest godigest.Digest) ([]string, error) {
var lockLatency time.Time

if err := digest.Validate(); err != nil {
return nil, err
}
Expand All @@ -1200,9 +1162,6 @@ func (is *ImageStore) GetAllDedupeReposCandidates(digest godigest.Digest) ([]str
return nil, nil //nolint:nilnil
}

is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)

blobsPaths, err := is.cache.GetAllBlobs(digest)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1799,41 +1758,62 @@ func (is *ImageStore) GetAllBlobs(repo string) ([]godigest.Digest, error) {
return ret, nil
}

func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest,
func (is *ImageStore) GetNextDigestWithBlobPaths(allRepos []string, lastDigests []godigest.Digest,
) (godigest.Digest, []string, []string, error) {
var lockLatency time.Time

dir := is.rootDir

is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
for _, repo := range allRepos {
is.RLockRepo(repo, &lockLatency)
defer is.RUnlockRepo(repo, &lockLatency)
}

var duplicateBlobs, duplicateRepos []string

var digest godigest.Digest

var repo string

err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error {
// skip blobs under .sync and .uploads
if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) ||
strings.HasSuffix(fileInfo.Path(), storageConstants.BlobUploadDir) {
return driver.ErrSkipDir
}

if strings.HasSuffix(fileInfo.Path(), ispec.ImageLayoutFile) ||
strings.HasSuffix(fileInfo.Path(), ispec.ImageIndexFile) ||
strings.HasSuffix(fileInfo.Path(), ".db") {
return driver.ErrSkipDir
}

// the path is always under root dir because the walk function walks the root dir
rel, _ := filepath.Rel(is.rootDir, fileInfo.Path())

if fileInfo.IsDir() {
// skip repositories not found in repos
repo = path.Base(fileInfo.Path())
if !zcommon.Contains(repos, repo) && repo != ispec.ImageBlobsDir {
candidateAlgorithm := godigest.Algorithm(repo)
if fileInfo.Path() == is.rootDir || zcommon.Contains(allRepos, rel) {
// this is the root directory or a repo, go deeped into subfolders
return nil
}

if !candidateAlgorithm.Available() {
return driver.ErrSkipDir
}
// attempt to determine is the base folder
lastFolderInPath := path.Base(rel)
if lastFolderInPath == ispec.ImageBlobsDir {
// this is the blobs dir, go deeper into subfolders
return nil
}

// this is not the root dir, a repo, or a blobs dir
// it is also unclear if we are under a repo, as this could be .trivy
// skip entire directory if the base name does not match a valid hash algorithm
candidateAlgorithm := godigest.Algorithm(lastFolderInPath)
if !candidateAlgorithm.Available() {
return driver.ErrSkipDir
} else {
// this is the folder sha256 or similar
return nil
}
}

repo = path.Dir(path.Dir(fileInfo.Path()))
digestHash := path.Base(fileInfo.Path())
digestAlgorithm := godigest.Algorithm(path.Base(path.Dir(fileInfo.Path())))

Expand All @@ -1853,6 +1833,7 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g
if blobDigest == digest {
duplicateBlobs = append(duplicateBlobs, fileInfo.Path())

repo := path.Dir(path.Dir(path.Dir(rel)))
if !zcommon.Contains(duplicateRepos, repo) {
duplicateRepos = append(duplicateRepos, repo)
}
Expand Down Expand Up @@ -2047,8 +2028,10 @@ func (is *ImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Di
) error {
var lockLatency time.Time

is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
for _, repo := range duplicateRepos {
is.LockRepo(repo, &lockLatency)
defer is.UnlockRepo(repo, &lockLatency)
}

if dedupe {
return is.dedupeBlobs(ctx, digest, duplicateBlobs)
Expand Down
105 changes: 52 additions & 53 deletions pkg/storage/imagestore/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,93 +6,92 @@ import (

type ImageStoreLock struct {
// locks per repository paths
repoLocks sync.Map
// lock for the entire storage, needed in case all repos need to be processed
// including blocking creating new repos
globalLock *sync.RWMutex
repoLocks map[string]*sync.RWMutex
// lock for managing the content of the repo lock map
internalLock *sync.Mutex
}

func NewImageStoreLock() *ImageStoreLock {
return &ImageStoreLock{
repoLocks: sync.Map{},
globalLock: &sync.RWMutex{},
repoLocks: map[string]*sync.RWMutex{},
internalLock: &sync.Mutex{},
}
}

func (sl *ImageStoreLock) RLock() {
// block reads and writes to the entire storage, including new repos
sl.globalLock.RLock()
}

func (sl *ImageStoreLock) RUnlock() {
// unlock to the storage in general
sl.globalLock.RUnlock()
}

func (sl *ImageStoreLock) Lock() {
// block reads and writes to the entire storage, including new repos
sl.globalLock.Lock()
}

func (sl *ImageStoreLock) Unlock() {
// unlock to the storage in general
sl.globalLock.Unlock()
}

func (sl *ImageStoreLock) RLockRepo(repo string) {
// besides the individual repo increment the read counter for the
// global lock, this will make sure the storage cannot be
// write-locked at global level while individual repos are accessed
sl.globalLock.RLock()

val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
repoLock, _ := sl.loadLock(repo)

// lock individual repo
repoLock, _ := val.(*sync.RWMutex)
repoLock.RLock()
}

func (sl *ImageStoreLock) RUnlockRepo(repo string) {
val, ok := sl.repoLocks.Load(repo)
repoLock, ok := sl.loadLock(repo)
if !ok {
// somehow the unlock is called for repo that was never locked
// somehow the unlock is called for a repo that was not locked
return
}

// read-unlock individual repo
repoLock, _ := val.(*sync.RWMutex)
repoLock.RUnlock()

// decrement the global read counter after the one for the individual repo is decremented
sl.globalLock.RUnlock()
}

func (sl *ImageStoreLock) LockRepo(repo string) {
// besides the individual repo increment the read counter for the
// global lock, this will make sure the storage cannot be
// write-locked at global level while individual repos are accessed
// we are not using the write lock here, as that would make all repos
// wait for one another
sl.globalLock.RLock()

val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
repoLock, _ := sl.loadLock(repo)

// write-lock individual repo
repoLock, _ := val.(*sync.RWMutex)
repoLock.Lock()
}

func (sl *ImageStoreLock) UnlockRepo(repo string) {
val, ok := sl.repoLocks.Load(repo)
repoLock, ok := sl.loadLock(repo)
if !ok {
// somehow the unlock is called for a repo that was never locked
// somehow the unlock is called for a repo that was not locked
return
}

// write-unlock individual repo
repoLock, _ := val.(*sync.RWMutex)
repoLock.Unlock()

// decrement the global read counter after the individual repo was unlocked
sl.globalLock.RUnlock()
// attempt to clean up the map of unused locks
sl.discardLockIfPossible(repo)
}

func (sl *ImageStoreLock) loadLock(repo string) (*sync.RWMutex, bool) {
sl.internalLock.Lock()
defer sl.internalLock.Unlock()

repoLock, ok := sl.repoLocks[repo]
if !ok {
sl.repoLocks[repo] = &sync.RWMutex{}
repoLock = sl.repoLocks[repo]
}

return repoLock, ok
}

func (sl *ImageStoreLock) discardLockIfPossible(repo string) {
sl.internalLock.Lock()
defer sl.internalLock.Unlock()

repoLock, ok := sl.repoLocks[repo]
if !ok {
// the lock is not set, no need to do anything else
return
}

// check if the lock is in use
// this is a non-blocking operation if someone else is already blocking the lock
// the internalLock prevents the case where someone else attempts
// to load/block the lock after this function started executing
ok = repoLock.TryLock()
if !ok {
// if someone else is using this lock, it is still needed, keep it as is
return
}
// final unlock
defer repoLock.Unlock()

// nobody else is using this lock, remove it from the map
delete(sl.repoLocks, repo)
}
8 changes: 7 additions & 1 deletion pkg/storage/local/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,13 @@ func (driver *Driver) WriteFile(filepath string, content []byte) (int, error) {
func (driver *Driver) Walk(path string, walkFn storagedriver.WalkFn) error {
children, err := driver.List(path)
if err != nil {
return err
switch errors.As(err, &storagedriver.PathNotFoundError{}) {
case true:
// repository was removed in between listing and enumeration. Ignore it.
return nil
default:
return err
}
}

sort.Stable(sort.StringSlice(children))
Expand Down
28 changes: 0 additions & 28 deletions pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"path"
"slices"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -889,33 +888,6 @@ func TestStorageAPIs(t *testing.T) {
_, _, err = imgStore.PutImageManifest("replace", "1.0", ispec.MediaTypeImageManifest, manifestBuf)
So(err, ShouldBeNil)
})

Convey("Locks", func() {
// in parallel, a mix of read and write locks - mainly for coverage
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(2)

go func() {
var lockLatency time.Time

defer wg.Done()
imgStore.Lock(&lockLatency)
func() {}()
imgStore.Unlock(&lockLatency)
}()
go func() {
var lockLatency time.Time

defer wg.Done()
imgStore.RLock(&lockLatency)
func() {}()
imgStore.RUnlock(&lockLatency)
}()
}

wg.Wait()
})
})
})
}
Expand Down
Loading

0 comments on commit 7343d22

Please sign in to comment.