Skip to content

Commit

Permalink
Fixing backup_pitr flaky tests via wait-for loop on topo reads (vit…
Browse files Browse the repository at this point in the history
…essio#13781)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Aug 15, 2023
1 parent 16024c2 commit d48782e
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 64 deletions.
45 changes: 40 additions & 5 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vtctlbackup

import (
"bufio"
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -52,7 +53,8 @@ const (
XtraBackup = iota
BuiltinBackup
Mysqlctld
timeout = time.Duration(60 * time.Second)
timeout = time.Duration(60 * time.Second)
topoConsistencyTimeout = 20 * time.Second
)

var (
Expand Down Expand Up @@ -1077,12 +1079,13 @@ func terminateRestore(t *testing.T) {

func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, replicaIndex int) (backups []string) {
replica := getReplica(t, replicaIndex)
numBackups := len(waitForNumBackups(t, -1))

err := localCluster.VtctldClientProcess.ExecuteCommand("Backup", replica.Alias)
require.Nil(t, err)

backups, err = localCluster.ListBackups(shardKsName)
require.Nil(t, err)
backups = waitForNumBackups(t, numBackups+1)
require.NotEmpty(t, backups)

verifyTabletBackupStats(t, replica.VttabletProcess.GetVars())

Expand Down Expand Up @@ -1203,7 +1206,38 @@ func TestReplicaFullBackup(t *testing.T, replicaIndex int) (manifest *mysqlctl.B
return readManifestFile(t, backupLocation)
}

// waitForNumBackups waits for GetBackups to list exactly the given expected number.
// If expectNumBackups < 0 then any response is considered valid
func waitForNumBackups(t *testing.T, expectNumBackups int) []string {
ctx, cancel := context.WithTimeout(context.Background(), topoConsistencyTimeout)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
backups, err := localCluster.ListBackups(shardKsName)
require.NoError(t, err)
if expectNumBackups < 0 {
// any result is valid
return backups
}
if len(backups) == expectNumBackups {
// what we waited for
return backups
}
assert.Less(t, len(backups), expectNumBackups)
select {
case <-ctx.Done():
assert.Failf(t, ctx.Err().Error(), "expected %d backups, got %d", expectNumBackups, len(backups))
return nil
case <-ticker.C:
}
}
}

func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incrementalFromPos replication.Position, expectError string) (manifest *mysqlctl.BackupManifest, backupName string) {
numBackups := len(waitForNumBackups(t, -1))
incrementalFromPosArg := "auto"
if !incrementalFromPos.IsZero() {
incrementalFromPosArg = replication.EncodePosition(incrementalFromPos)
Expand All @@ -1216,8 +1250,9 @@ func testReplicaIncrementalBackup(t *testing.T, replica *cluster.Vttablet, incre
}
require.NoErrorf(t, err, "output: %v", output)

backups, err := localCluster.ListBackups(shardKsName)
require.NoError(t, err)
backups := waitForNumBackups(t, numBackups+1)
require.NotEmptyf(t, backups, "output: %v", output)

verifyTabletBackupStats(t, replica.VttabletProcess.GetVars())
backupName = backups[len(backups)-1]
backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backupName
Expand Down
24 changes: 20 additions & 4 deletions go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
{
name: "first incremental backup",
},
{
name: "fail1",
expectError: "no binary logs to backup",
},
{
name: "fail2",
expectError: "no binary logs to backup",
},
{
name: "make writes, succeed",
writeBeforeBackup: true,
Expand Down Expand Up @@ -170,10 +178,10 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
if tc.writeBeforeBackup {
InsertRowOnPrimary(t, "")
}
// we wait for 1 second because backups are written to a directory named after the current timestamp,
// we wait for >1 second because backups are written to a directory named after the current timestamp,
// in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this
// is only ever a problem in this end-to-end test, not in production.
// Also, we gie the replica a chance to catch up.
// Also, we give the replica a chance to catch up.
time.Sleep(postWriteSleepDuration)
// randomly flush binary logs 0, 1 or 2 times
FlushBinaryLogsOnReplica(t, 0, rand.Intn(3))
Expand Down Expand Up @@ -295,6 +303,14 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
{
name: "first incremental backup",
},
{
name: "fail1",
expectError: "no binary logs to backup",
},
{
name: "fail2",
expectError: "no binary logs to backup",
},
{
name: "make writes, succeed",
writeBeforeBackup: true,
Expand Down Expand Up @@ -333,10 +349,10 @@ func ExecTestIncrementalBackupAndRestoreToTimestamp(t *testing.T, tcase *PITRTes
if tc.writeBeforeBackup {
insertRowOnPrimary(t, "")
}
// we wait for 1 second because backups are written to a directory named after the current timestamp,
// we wait for >1 second because backups are written to a directory named after the current timestamp,
// in 1 second resolution. We want to avoid two backups that have the same pathname. Realistically this
// is only ever a problem in this end-to-end test, not in production.
// Also, we gie the replica a chance to catch up.
// Also, we give the replica a chance to catch up.
time.Sleep(postWriteSleepDuration)
waitForReplica(t, 0)
rowsBeforeBackup := ReadRowsFromReplica(t, 0)
Expand Down
1 change: 1 addition & 0 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func Backup(ctx context.Context, params BackupParams) error {
if err != nil {
return vterrors.Wrap(err, "StartBackup failed")
}
params.Logger.Infof("Starting backup %v", bh.Name())

// Scope stats to selected backup engine.
beParams := params.Copy()
Expand Down
4 changes: 2 additions & 2 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
// everything that's ever been applied, and a subset of that is gtid_purged, which are the event no longer available in binary logs.
// When we consider Vitess incremental backups, what's important for us is "what's the GTIDSet that's true when this backup was taken,
// and which will be true when we restore this backup". The answer to this is the GTIDSet that includes the purged GTIDs.
// It's also nice for icnremental backups that are taken on _other_ tablets, so that they don't need to understand what exactly was purged
// It's also nice for incremental backups that are taken on _other_ tablets, so that they don't need to understand what exactly was purged
// on _this_ tablet. They don't care, all they want to know is "what GTIDSet can we get from this".
incrementalBackupToPosition.GTIDSet = incrementalBackupToPosition.GTIDSet.Union(gtidPurged.GTIDSet)
req := &mysqlctl.ReadBinlogFilesTimestampsRequest{}
Expand All @@ -345,7 +345,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
return false, vterrors.Wrapf(err, "reading timestamps from binlog files %v", binaryLogsToBackup)
}
if resp.FirstTimestampBinlog == "" || resp.LastTimestampBinlog == "" {
return false, vterrors.Wrapf(err, "empty binlog name in response. Request=%v, Response=%v", req, resp)
return false, vterrors.Errorf(vtrpc.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp)
}
incrDetails := &IncrementalBackupDetails{
FirstTimestamp: FormatRFC3339(logutil.ProtoToTime(resp.FirstTimestamp)),
Expand Down
109 changes: 56 additions & 53 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,60 @@ func parseBinlogEntryTimestamp(logEntry string) (found bool, t time.Time, err er
return false, t, nil
}

// scanBinlogTimestamp invokes a `mysqlbinlog` binary to look for a timestamp in the given binary. The function
// either looks for the first such timestamp or the last.
func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv []string, mysqlbinlogName string, binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) {
args := []string{binlogFile}
mysqlbinlogCmd := exec.Command(mysqlbinlogName, args...)
mysqlbinlogCmd.Dir = mysqlbinlogDir
mysqlbinlogCmd.Env = mysqlbinlogEnv
log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd)
pipe, err := mysqlbinlogCmd.StdoutPipe() // to be piped into mysql
if err != nil {
return matchedTime, false, err
}
scanComplete := make(chan error)
intentionalKill := false
scan := func() {
defer close(scanComplete)
defer func() {
intentionalKill = true
mysqlbinlogCmd.Process.Kill() // ensures the binlog file is released
}()
// Read line by line and process it
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
logEntry := scanner.Text()

found, t, err := parseBinlogEntryTimestamp(logEntry)
if err != nil {
scanComplete <- err
return
}
if found {
matchedTime = t
matchFound = true
}
if found && stopAtFirst {
// Found the first timestamp and it's all we need. We won't scan any further and so we should also
// kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe).
return
}
}
}
if err := mysqlbinlogCmd.Start(); err != nil {
return matchedTime, false, err
}
go scan()
if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill {
return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps")
}
if err := <-scanComplete; err != nil {
return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ")
}
return matchedTime, matchFound, nil
}

// ReadBinlogFilesTimestamps reads all given binlog files via `mysqlbinlog` command and returns the first and last found transaction timestamps
func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlctlpb.ReadBinlogFilesTimestampsRequest) (*mysqlctlpb.ReadBinlogFilesTimestampsResponse, error) {
if len(req.BinlogFileNames) == 0 {
Expand All @@ -1335,8 +1389,6 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc
defer client.Close()
return client.ReadBinlogFilesTimestamps(ctx, req)
}
var mysqlbinlogCmd *exec.Cmd

dir, err := vtenv.VtMysqlRoot()
if err != nil {
return nil, err
Expand All @@ -1350,59 +1402,10 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc
return nil, err
}

scanTimestamp := func(binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) {
args := []string{binlogFile}
mysqlbinlogCmd = exec.Command(mysqlbinlogName, args...)
mysqlbinlogCmd.Dir = dir
mysqlbinlogCmd.Env = env
log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd)
pipe, err := mysqlbinlogCmd.StdoutPipe() // to be piped into mysql
if err != nil {
return matchedTime, false, err
}
scanner := bufio.NewScanner(pipe)
scanComplete := make(chan error)
intentionalKill := false
scan := func() {
defer close(scanComplete)
// Read line by line and process it
for scanner.Scan() {
logEntry := scanner.Text()

found, t, err := parseBinlogEntryTimestamp(logEntry)
if err != nil {
scanComplete <- err
return
}
if found {
matchedTime = t
matchFound = true
}
if found && stopAtFirst {
// Found the first timestamp and it's all we need. We won't scan any further and so we should also
// kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe).
intentionalKill = true
mysqlbinlogCmd.Process.Kill()
return
}
}
}
if err := mysqlbinlogCmd.Start(); err != nil {
return matchedTime, false, err
}
go scan()
if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill {
return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps")
}
if err := <-scanComplete; err != nil {
return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ")
}
return matchedTime, matchFound, nil
}
resp := &mysqlctlpb.ReadBinlogFilesTimestampsResponse{}
// Find first timestamp
for _, binlogFile := range req.BinlogFileNames {
t, found, err := scanTimestamp(binlogFile, true)
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true)
if err != nil {
return nil, err
}
Expand All @@ -1415,7 +1418,7 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc
// Find last timestamp
for i := len(req.BinlogFileNames) - 1; i >= 0; i-- {
binlogFile := req.BinlogFileNames[i]
t, found, err := scanTimestamp(binlogFile, false)
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit d48782e

Please sign in to comment.