From 24c4c449fff15a75e081f5f4061fda947e85887f Mon Sep 17 00:00:00 2001 From: Priya Bibra Date: Thu, 31 Aug 2023 14:39:43 -0700 Subject: [PATCH] apply patch 13655: snapshot keyspace fix for pitr Signed-off-by: Priya Bibra --- go/test/endtoend/recovery/recovery_util.go | 8 +- .../recovery/unshardedrecovery/recovery.go | 115 ++++++++++-------- go/vt/vttablet/tabletmanager/restore.go | 6 +- 3 files changed, 76 insertions(+), 53 deletions(-) diff --git a/go/test/endtoend/recovery/recovery_util.go b/go/test/endtoend/recovery/recovery_util.go index acc1d8ce616..cf413e3d2e0 100644 --- a/go/test/endtoend/recovery/recovery_util.go +++ b/go/test/endtoend/recovery/recovery_util.go @@ -51,10 +51,14 @@ func VerifyQueriesUsingVtgate(t *testing.T, session *vtgateconn.VTGateSession, q } // RestoreTablet performs a PITR restore. -func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tablet *cluster.Vttablet, restoreKSName string, shardName string, keyspaceName string, commonTabletArg []string) { +func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tablet *cluster.Vttablet, restoreKSName string, shardName string, keyspaceName string, commonTabletArg []string, restoreTime time.Time) { tablet.ValidateTabletRestart(t) replicaTabletArgs := commonTabletArg + if restoreTime.IsZero() { + restoreTime = time.Now().UTC() + } + _, err := localCluster.VtctlProcess.ExecuteCommandWithOutput("GetKeyspace", restoreKSName) if err != nil { @@ -62,7 +66,7 @@ func RestoreTablet(t *testing.T, localCluster *cluster.LocalProcessCluster, tabl tm.Format(time.RFC3339) _, err := localCluster.VtctlProcess.ExecuteCommandWithOutput("CreateKeyspace", "--", "--keyspace_type=SNAPSHOT", "--base_keyspace="+keyspaceName, - "--snapshot_time", tm.Format(time.RFC3339), restoreKSName) + "--snapshot_time", restoreTime.Format(time.RFC3339), restoreKSName) require.Nil(t, err) } diff --git a/go/test/endtoend/recovery/unshardedrecovery/recovery.go b/go/test/endtoend/recovery/unshardedrecovery/recovery.go index db76cc3654f..3b4c3aaf060 100644 --- a/go/test/endtoend/recovery/unshardedrecovery/recovery.go +++ b/go/test/endtoend/recovery/unshardedrecovery/recovery.go @@ -24,6 +24,7 @@ import ( "os/exec" "path" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -173,25 +174,29 @@ SET GLOBAL old_alter_table = ON; } -// TestRecoveryImpl does following -// - create a shard with primary and replica1 only -// - run InitShardPrimary -// - insert some data -// - take a backup -// - insert more data on the primary -// - take another backup -// - create a recovery keyspace after first backup -// - bring up tablet_replica2 in the new keyspace -// - check that new tablet does not have data created after backup1 -// - create second recovery keyspace after second backup -// - bring up tablet_replica3 in second keyspace -// - check that new tablet has data created after backup1 but not data created after backup2 -// - check that vtgate queries work correctly +// 1. create a shard with primary and replica1 only +// - run InitShardPrimary +// - insert some data +// +// 2. take a backup +// 3.create a recovery keyspace after first backup +// - bring up tablet_replica2 in the new keyspace +// - check that new tablet has data from backup1 +// +// 4. insert more data on the primary +// +// 5. take another backup +// 6. create a recovery keyspace after second backup +// - bring up tablet_replica3 in the new keyspace +// - check that new tablet has data from backup2 +// +// 7. check that vtgate queries work correctly func TestRecoveryImpl(t *testing.T) { defer cluster.PanicHandler(t) defer tabletsTeardown() verifyInitialReplication(t) + // take first backup of value = test1 err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias) assert.NoError(t, err) @@ -199,10 +204,6 @@ func TestRecoveryImpl(t *testing.T) { require.Equal(t, len(backups), 1) assert.Contains(t, backups[0], replica1.Alias) - _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) - assert.NoError(t, err) - cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2) - err = localCluster.VtctlclientProcess.ApplyVSchema(keyspaceName, vSchema) assert.NoError(t, err) @@ -210,7 +211,9 @@ func TestRecoveryImpl(t *testing.T) { assert.NoError(t, err) assert.Contains(t, output, "vt_insert_test") - recovery.RestoreTablet(t, localCluster, replica2, recoveryKS1, "0", keyspaceName, commonTabletArg) + // restore with latest backup + restoreTime := time.Now().UTC() + recovery.RestoreTablet(t, localCluster, replica2, recoveryKS1, "0", keyspaceName, commonTabletArg, restoreTime) output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvVSchema", cell) assert.NoError(t, err) @@ -219,57 +222,69 @@ func TestRecoveryImpl(t *testing.T) { err = localCluster.VtctlclientProcess.ExecuteCommand("GetSrvKeyspace", cell, keyspaceName) assert.NoError(t, err) - output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", recoveryKS1) assert.NoError(t, err) assert.Contains(t, output, "vt_insert_test") - cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 1) + // verify that restored replica has value = test1 + qr, err := replica2.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true) + assert.NoError(t, err) + assert.Equal(t, "test1", qr.Rows[0][0].ToString()) - cluster.VerifyLocalMetadata(t, replica2, recoveryKS1, shardName, cell) + // insert new row on primary + _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) + assert.NoError(t, err) + cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 2) // update the original row in primary _, err = primary.VttabletProcess.QueryTablet("update vt_insert_test set msg = 'msgx1' where id = 1", keyspaceName, true) assert.NoError(t, err) //verify that primary has new value - qr, err := primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true) + qr, err = primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true) assert.NoError(t, err) assert.Equal(t, "msgx1", qr.Rows[0][0].ToString()) - //verify that restored replica has old value - qr, err = replica2.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true) - assert.NoError(t, err) - assert.Equal(t, "test1", qr.Rows[0][0].ToString()) + // check that replica1, used for the backup, has the new value + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias) - assert.NoError(t, err) + ticker := time.NewTicker(time.Second) + defer ticker.Stop() - _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test3')", keyspaceName, true) - assert.NoError(t, err) - cluster.VerifyRowsInTablet(t, replica1, keyspaceName, 3) + for { + qr, err = replica1.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true) + assert.NoError(t, err) + if qr.Rows[0][0].ToString() == "msgx1" { + break + } - recovery.RestoreTablet(t, localCluster, replica3, recoveryKS2, "0", keyspaceName, commonTabletArg) + select { + case <-ctx.Done(): + t.Error("timeout waiting for new value to be replicated on replica 1") + break + case <-ticker.C: + } + } - output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", recoveryKS2) + // take second backup of value = msgx1 + err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias) assert.NoError(t, err) - assert.Contains(t, output, "vt_insert_test") - cluster.VerifyRowsInTablet(t, replica3, keyspaceName, 2) + // restore to first backup + recovery.RestoreTablet(t, localCluster, replica3, recoveryKS2, "0", keyspaceName, commonTabletArg, restoreTime) - // update the original row in primary - _, err = primary.VttabletProcess.QueryTablet("update vt_insert_test set msg = 'msgx2' where id = 1", keyspaceName, true) + output, err = localCluster.VtctlclientProcess.ExecuteCommandWithOutput("GetVSchema", recoveryKS2) assert.NoError(t, err) + assert.Contains(t, output, "vt_insert_test") - //verify that primary has new value - qr, err = primary.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true) - assert.NoError(t, err) - assert.Equal(t, "msgx2", qr.Rows[0][0].ToString()) + // only one row from first backup + cluster.VerifyRowsInTablet(t, replica3, keyspaceName, 1) - //verify that restored replica has old value + // verify that restored replica has value = test1 qr, err = replica3.VttabletProcess.QueryTablet("select msg from vt_insert_test where id = 1", keyspaceName, true) assert.NoError(t, err) - assert.Equal(t, "msgx1", qr.Rows[0][0].ToString()) + assert.Equal(t, "test1", qr.Rows[0][0].ToString()) vtgateInstance := localCluster.NewVtgateInstance() vtgateInstance.TabletTypesToWait = "REPLICA" @@ -294,26 +309,26 @@ func TestRecoveryImpl(t *testing.T) { session := vtgateConn.Session("@replica", nil) //check that vtgate doesn't route queries to new tablet - recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(3)") - recovery.VerifyQueriesUsingVtgate(t, session, "select msg from vt_insert_test where id = 1", `VARCHAR("msgx2")`) + recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)") + recovery.VerifyQueriesUsingVtgate(t, session, "select msg from vt_insert_test where id = 1", `VARCHAR("msgx1")`) recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS1), "INT64(1)") recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS1), `VARCHAR("test1")`) - recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS2), "INT64(2)") - recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS2), `VARCHAR("msgx1")`) + recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select count(*) from %s.vt_insert_test", recoveryKS2), "INT64(1)") + recovery.VerifyQueriesUsingVtgate(t, session, fmt.Sprintf("select msg from %s.vt_insert_test where id = 1", recoveryKS2), `VARCHAR("test1")`) // check that new keyspace is accessible with 'use ks' cluster.ExecuteQueriesUsingVtgate(t, session, "use "+recoveryKS1+"@replica") recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)") cluster.ExecuteQueriesUsingVtgate(t, session, "use "+recoveryKS2+"@replica") - recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)") + recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)") // check that new tablet is accessible with use `ks:shard` cluster.ExecuteQueriesUsingVtgate(t, session, "use `"+recoveryKS1+":0@replica`") recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)") cluster.ExecuteQueriesUsingVtgate(t, session, "use `"+recoveryKS2+":0@replica`") - recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(2)") + recovery.VerifyQueriesUsingVtgate(t, session, "select count(*) from vt_insert_test", "INT64(1)") } // verifyInitialReplication will create schema in primary, insert some data to primary and verify the same data in replica. diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go index 62b35c2c818..260b21ede6b 100644 --- a/go/vt/vttablet/tabletmanager/restore.go +++ b/go/vt/vttablet/tabletmanager/restore.go @@ -164,6 +164,10 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L log.Infof("Using base_keyspace %v to restore keyspace %v using a backup time of %v", keyspace, tablet.Keyspace, backupTime) } + if backupTime.IsZero() { + backupTime = logutil.ProtoToTime(keyspaceInfo.SnapshotTime) + } + params := mysqlctl.RestoreParams{ Cnf: tm.Cnf, Mysqld: tm.MysqlDaemon, @@ -305,7 +309,7 @@ func (tm *TabletManager) restoreToTimeFromBinlog(ctx context.Context, pos mysql. // getGTIDFromTimestamp computes 2 GTIDs based on restoreTime // afterPos is the GTID of the first event at or after restoreTime. // beforePos is the GTID of the last event before restoreTime. This is the GTID upto which replication will be applied -// afterPos can be used directly in the query `START SLAVE UNTIL SQL_BEFORE_GTIDS = ''` +// afterPos can be used directly in the query `START SLAVE UNTIL SQL_BEFORE_GTIDS = ”` // beforePos will be used to check if replication was able to catch up from the binlog server func (tm *TabletManager) getGTIDFromTimestamp(ctx context.Context, pos mysql.Position, restoreTime int64) (afterPos string, beforePos string, err error) { connParams := &mysql.ConnParams{