Skip to content

Commit

Permalink
feat(sync): support for periodic repo sync in scale-out cluster (#2424)
Browse files Browse the repository at this point in the history
This commit includes support for periodic repo sync in a scale-out
cluster.
Before this commit, all cluster members would sync all the repos as
the config is shared.

With this change, in periodic sync, the cluster member checks whether
it manages the repo. If it does not manage the repo, it will skip the
sync.

This commit also includes a unit test to test on-demand sync too, but
there are no logic changes for it as it is implicitly handled by the
proxying logic.

Signed-off-by: Vishwas Rajashekar <[email protected]>
  • Loading branch information
vrajashkr committed May 31, 2024
1 parent 2bb46b0 commit 767f81d
Show file tree
Hide file tree
Showing 7 changed files with 521 additions and 21 deletions.
20 changes: 2 additions & 18 deletions pkg/api/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import (
"net"
"net/http"

"github.com/dchest/siphash"
"github.com/gorilla/mux"

"zotregistry.dev/zot/pkg/api/config"
"zotregistry.dev/zot/pkg/api/constants"
"zotregistry.dev/zot/pkg/cluster"
"zotregistry.dev/zot/pkg/common"
)

Expand Down Expand Up @@ -46,8 +45,7 @@ func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc {

// the target member is the only one which should do read/write for the dist-spec APIs
// for the given repository.
targetMemberIndex, targetMember := computeTargetMember(config, name)

targetMemberIndex, targetMember := cluster.ComputeTargetMember(config.Cluster.HashKey, config.Cluster.Members, name)
logger.Debug().Str(constants.RepositoryLogKey, name).
Msg(fmt.Sprintf("target member socket: %s index: %d", targetMember, targetMemberIndex))

Expand Down Expand Up @@ -86,20 +84,6 @@ func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc {
}
}

// computes the target member using siphash and returns the index and the member
// siphash was chosen to prevent against hash attacks where an attacker
// can target all requests to one given instance instead of balancing across the cluster
// resulting in a Denial-of-Service (DOS).
// ref: https://en.wikipedia.org/wiki/SipHash
func computeTargetMember(config *config.Config, name string) (uint64, string) {
h := siphash.New([]byte(config.Cluster.HashKey))
h.Write([]byte(name))
sum64 := h.Sum64()
targetIdx := sum64 % uint64(len(config.Cluster.Members))

return targetIdx, config.Cluster.Members[targetIdx]
}

// gets all the server sockets of a target member - IP:Port.
// for IPv6, the socket is [IPv6]:Port.
// if the input is an IP address, returns the same targetMember in an array.
Expand Down
17 changes: 17 additions & 0 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package cluster

import "github.com/dchest/siphash"

// computes the target member using siphash and returns the index and the member
// siphash was chosen to prevent against hash attacks where an attacker
// can target all requests to one given instance instead of balancing across the cluster
// resulting in a Denial-of-Service (DOS).
// ref: https://en.wikipedia.org/wiki/SipHash
func ComputeTargetMember(hashKey string, members []string, repoName string) (uint64, string) {
h := siphash.New([]byte(hashKey))
h.Write([]byte(repoName))
sum64 := h.Sum64()
targetIdx := sum64 % uint64(len(members))

return targetIdx, members[targetIdx]
}
25 changes: 25 additions & 0 deletions pkg/cluster/cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package cluster_test

import (
"testing"

. "github.com/smartystreets/goconvey/convey"

"zotregistry.dev/zot/pkg/cluster"
)

func TestComputeTargetMember(t *testing.T) {
Convey("Should panic when the hashKey is not long enough", t, func() {
So(func() { cluster.ComputeTargetMember("lorem", []string{"member1", "member2"}, "zot-test") }, ShouldPanic)
})

Convey("Should panic when there are no members", t, func() {
So(func() { cluster.ComputeTargetMember("loremipsumdolors", []string{}, "zot-test") }, ShouldPanic)
})

Convey("Should return a valid result when input is valid", t, func() {
index, member := cluster.ComputeTargetMember("loremipsumdolors", []string{"member1", "member2"}, "zot-test")
So(index, ShouldEqual, 1)
So(member, ShouldEqual, "member2")
})
}
3 changes: 2 additions & 1 deletion pkg/extensions/extension_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,

tmpDir := config.Extensions.Sync.DownloadDir
credsPath := config.Extensions.Sync.CredentialsFile
clusterCfg := config.Cluster

service, err := sync.New(registryConfig, credsPath, tmpDir, storeController, metaDB, log)
service, err := sync.New(registryConfig, credsPath, clusterCfg, tmpDir, storeController, metaDB, log)
if err != nil {
log.Error().Err(err).Msg("failed to initialize sync extension")

Expand Down
26 changes: 26 additions & 0 deletions pkg/extensions/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"github.com/opencontainers/go-digest"

zerr "zotregistry.dev/zot/errors"
"zotregistry.dev/zot/pkg/api/config"
"zotregistry.dev/zot/pkg/api/constants"
"zotregistry.dev/zot/pkg/cluster"
"zotregistry.dev/zot/pkg/common"
syncconf "zotregistry.dev/zot/pkg/extensions/config/sync"
client "zotregistry.dev/zot/pkg/extensions/sync/httpclient"
Expand All @@ -25,6 +28,7 @@ import (
type BaseService struct {
config syncconf.RegistryConfig
credentials syncconf.CredentialsFile
clusterConfig *config.ClusterConfig
remote Remote
destination Destination
retryOptions *retry.RetryOptions
Expand All @@ -40,6 +44,7 @@ type BaseService struct {
func New(
opts syncconf.RegistryConfig,
credentialsFilepath string,
clusterConfig *config.ClusterConfig,
tmpDir string,
storeController storage.StoreController,
metadb mTypes.MetaDB,
Expand All @@ -64,6 +69,10 @@ func New(

service.credentials = credentialsFile

// load the cluster config into the object
// can be nil if the user did not configure cluster config
service.clusterConfig = clusterConfig

service.contentManager = NewContentManager(opts.Content, log)

if len(tmpDir) == 0 {
Expand Down Expand Up @@ -229,6 +238,23 @@ func (service *BaseService) GetNextRepo(lastRepo string) (string, error) {
break
}

if service.clusterConfig != nil {
targetIdx, targetMember := cluster.ComputeTargetMember(
service.clusterConfig.HashKey, service.clusterConfig.Members, lastRepo)

// if the target index does not match with the local socket index,
// then the local instance is not responsible for syncing the repo and should skip the sync
if targetIdx != service.clusterConfig.Proxy.LocalMemberClusterSocketIndex {
service.log.Debug().
Str(constants.RepositoryLogKey, lastRepo).
Str("targetMemberIndex", fmt.Sprintf("%d", targetIdx)).
Str("targetMember", targetMember).
Msg("skipping sync of repo not managed by local instance")

continue
}
}

matches = service.contentManager.MatchesContent(lastRepo)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/extensions/sync/sync_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestService(t *testing.T) {
URLs: []string{"http://localhost"},
}

service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
service, err := New(conf, "", nil, os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
So(err, ShouldBeNil)

err = service.SyncRepo(context.Background(), "repo")
Expand All @@ -176,7 +176,7 @@ func TestSyncRepo(t *testing.T) {
URLs: []string{"http://localhost"},
}

service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
service, err := New(conf, "", nil, os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
So(err, ShouldBeNil)

service.remote = mocks.SyncRemote{
Expand Down
Loading

0 comments on commit 767f81d

Please sign in to comment.