Skip to content

Commit

Permalink
Add backup parallel&retry (#15828)
Browse files Browse the repository at this point in the history
Add backup parallel&retry

Approved by: @XuPeng-SH, @daviszhen
  • Loading branch information
LeftHandCold committed May 3, 2024
1 parent 09d98cb commit 6e94513
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 92 deletions.
3 changes: 2 additions & 1 deletion pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func backupConfigs(ctx context.Context, cfg *Config) error {

var backupTae = func(ctx context.Context, config *Config) error {
fs := fileservice.SubPath(config.TaeDir, taeDir)
return BackupData(ctx, config.SharedFs, fs, "")
return BackupData(ctx, config.SharedFs, fs, "", int(config.Parallelism))
}

func backupHakeeper(ctx context.Context, config *Config) error {
Expand Down Expand Up @@ -218,6 +218,7 @@ func saveTaeFilesList(ctx context.Context, Fs fileservice.FileService, taeFiles
if err != nil {
return err
}

return writeFile(ctx, Fs, taeSum, []byte(metas))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestBackupData(t *testing.T) {
for _, location := range files {
locations = append(locations, location)
}
err = execBackup(ctx, db.Opts.Fs, service, locations)
err = execBackup(ctx, db.Opts.Fs, service, locations, 1)
assert.Nil(t, err)
db.Opts.Fs = service
db.Restart(ctx)
Expand Down
50 changes: 32 additions & 18 deletions pkg/backup/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,23 @@ func etlFSDir(filepath string) string {
func writeFile(ctx context.Context, fs fileservice.FileService, path string, data []byte) error {
var err error
//write file
err = fs.Write(ctx, fileservice.IOVector{
FilePath: path,
Entries: []fileservice.IOEntry{
{
Offset: 0,
Size: int64(len(data)),
Data: data,
},
_, err = fileservice.DoWithRetry(
"BackupWrite",
func() (int, error) {
return 0, fs.Write(ctx, fileservice.IOVector{
FilePath: path,
Entries: []fileservice.IOEntry{
{
Offset: 0,
Size: int64(len(data)),
Data: data,
},
},
})
},
})
64,
fileservice.IsRetryableError,
)
if err != nil {
return err
}
Expand All @@ -135,16 +142,23 @@ func writeFile(ctx context.Context, fs fileservice.FileService, path string, dat

//write checksum file for the file
checksumFile := path + ".sha256"
err = fs.Write(ctx, fileservice.IOVector{
FilePath: checksumFile,
Entries: []fileservice.IOEntry{
{
Offset: 0,
Size: int64(len(checksum)),
Data: checksum[:],
},
_, err = fileservice.DoWithRetry(
"BackupWrite",
func() (int, error) {
return 0, fs.Write(ctx, fileservice.IOVector{
FilePath: checksumFile,
Entries: []fileservice.IOEntry{
{
Offset: 0,
Size: int64(len(checksum)),
Data: checksum[:],
},
},
})
},
})
64,
fileservice.IsRetryableError,
)
return err
}

Expand Down
Loading

0 comments on commit 6e94513

Please sign in to comment.