diff --git a/backup/data.go b/backup/data.go index e3d4c37d4..c06debb7a 100644 --- a/backup/data.go +++ b/backup/data.go @@ -5,6 +5,7 @@ package backup */ import ( + "context" "errors" "fmt" "strings" @@ -62,7 +63,7 @@ type BackupProgressCounters struct { ProgressBar utils.ProgressBar } -func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) { +func CopyTableOut(queryContext context.Context, connectionPool *dbconn.DBConn, table Table, destinationToWrite string, connNum int) (int64, error) { if wasTerminated { return -1, nil } @@ -112,7 +113,7 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite } else { utils.LogProgress(`%sExecuting "%s" on master`, workerInfo, query) } - result, err := connectionPool.Exec(query, connNum) + result, err := connectionPool.ExecContext(queryContext, query, connNum) if err != nil { return 0, err } @@ -121,7 +122,7 @@ func CopyTableOut(connectionPool *dbconn.DBConn, table Table, destinationToWrite return numRows, nil } -func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error { +func BackupSingleTableData(queryContext context.Context, table Table, rowsCopiedMap map[uint32]int64, counters *BackupProgressCounters, whichConn int) error { workerInfo := "" if gplog.GetVerbosity() >= gplog.LOGVERBOSE { workerInfo = fmt.Sprintf("Worker %d: ", whichConn) @@ -137,7 +138,7 @@ func BackupSingleTableData(table Table, rowsCopiedMap map[uint32]int64, counters } else { destinationToWrite = globalFPInfo.GetTableBackupFilePathForCopyCommand(table.Oid, utils.GetPipeThroughProgram().Extension, false) } - rowsCopied, err := CopyTableOut(connectionPool, table, destinationToWrite, whichConn) + rowsCopied, err := CopyTableOut(queryContext, connectionPool, table, destinationToWrite, whichConn) if err != nil { return err } @@ -181,6 +182,15 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { tasks <- table } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Make sure it's called to release resources even if no errors + + // Launch a checker that polls if the backup helper has ended with an error. It will cancel all pending + // COPY commands that could be hanging on pipes, that the backup helper didn't close before it died. + if MustGetFlagBool(options.SINGLE_DATA_FILE) { + utils.StartHelperChecker(globalCluster, globalFPInfo, cancel) + } + /* * Worker 0 is a special database connection that * 1) Exports the database snapshot if the feature is supported @@ -212,8 +222,15 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { * transaction commits and the locks are released. */ for table := range tasks { + // Check if any error occurred in any other goroutines: + select { + case <-ctx.Done(): + return // Error somewhere, terminate + default: // Default is must to avoid blocking + } if wasTerminated || isErroredBackup.Load() { counters.ProgressBar.(*pb.ProgressBar).NotPrint = true + cancel() return } if backupSnapshot != "" && connectionPool.Tx[whichConn] == nil { @@ -261,7 +278,7 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { break } } - err = BackupSingleTableData(table, rowsCopiedMaps[whichConn], &counters, whichConn) + err = BackupSingleTableData(ctx, table, rowsCopiedMaps[whichConn], &counters, whichConn) if err != nil { // if copy isn't working, skip remaining backups, and let downstream panic // handling deal with it @@ -294,14 +311,21 @@ func BackupDataForAllTables(tables []Table) []map[uint32]int64 { }() for _, table := range tables { for { + // Check if any error occurred in any other goroutines: + select { + case <-ctx.Done(): + return // Error somewhere, terminate + default: // Default is must to avoid blocking + } if wasTerminated || isErroredBackup.Load() { + cancel() return } state, _ := oidMap.Load(table.Oid) if state.(int) == Unknown { time.Sleep(time.Millisecond * 50) } else if state.(int) == Deferred { - err := BackupSingleTableData(table, rowsCopiedMaps[0], &counters, 0) + err := BackupSingleTableData(ctx, table, rowsCopiedMaps[0], &counters, 0) if err != nil { isErroredBackup.Store(true) gplog.Fatal(err, "") diff --git a/backup/data_test.go b/backup/data_test.go index 2743d1328..3fd8fe038 100644 --- a/backup/data_test.go +++ b/backup/data_test.go @@ -1,6 +1,7 @@ package backup_test import ( + "context" "fmt" "regexp" @@ -79,7 +80,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.gz" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -92,7 +93,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -102,7 +103,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456.zst" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -115,7 +116,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -125,7 +126,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -138,7 +139,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -148,7 +149,7 @@ var _ = Describe("backup/data tests", func() { mock.ExpectExec(execStr).WillReturnResult(sqlmock.NewResult(10, 0)) filename := "/backups/20170101/20170101010101/gpbackup__20170101010101_3456" - _, err := backup.CopyTableOut(connectionPool, testTable, filename, defaultConnNum) + _, err := backup.CopyTableOut(context.Background(), connectionPool, testTable, filename, defaultConnNum) Expect(err).ShouldNot(HaveOccurred()) }) @@ -178,7 +179,7 @@ var _ = Describe("backup/data tests", func() { backupFile := fmt.Sprintf("/gpbackup__20170101010101_pipe_(.*)_%d", testTable.Oid) copyCmd := fmt.Sprintf(copyFmtStr, backupFile) mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10)) - err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0) + err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0) Expect(err).ShouldNot(HaveOccurred()) Expect(rowsCopiedMap[0]).To(Equal(int64(10))) @@ -190,7 +191,7 @@ var _ = Describe("backup/data tests", func() { backupFile := fmt.Sprintf("/backups/20170101/20170101010101/gpbackup__20170101010101_%d", testTable.Oid) copyCmd := fmt.Sprintf(copyFmtStr, backupFile) mock.ExpectExec(copyCmd).WillReturnResult(sqlmock.NewResult(0, 10)) - err := backup.BackupSingleTableData(testTable, rowsCopiedMap, &counters, 0) + err := backup.BackupSingleTableData(context.Background(), testTable, rowsCopiedMap, &counters, 0) Expect(err).ShouldNot(HaveOccurred()) Expect(rowsCopiedMap[0]).To(Equal(int64(10))) diff --git a/end_to_end/plugin_test.go b/end_to_end/plugin_test.go index 8fd8b5492..5e04483c5 100644 --- a/end_to_end/plugin_test.go +++ b/end_to_end/plugin_test.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" path "path/filepath" + "time" "github.com/greenplum-db/gp-common-go-libs/cluster" "github.com/greenplum-db/gp-common-go-libs/dbconn" @@ -14,6 +15,7 @@ import ( "github.com/greenplum-db/gpbackup/testutils" "github.com/greenplum-db/gpbackup/utils" . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" ) func copyPluginToAllHosts(conn *dbconn.DBConn, pluginPath string) { @@ -322,6 +324,83 @@ var _ = Describe("End to End plugin tests", func() { Skip("This test is only needed for the most recent backup versions") } }) + It("Will not hang if gpbackup and gprestore runs with single-data-file and the helper goes down at its start", func(ctx SpecContext) { + copyPluginToAllHosts(backupConn, examplePluginExec) + + testhelper.AssertQueryRuns(backupConn, "CREATE TABLE t0(a int);") + testhelper.AssertQueryRuns(backupConn, "INSERT INTO t0 SELECT i FROM generate_series(1, 10)i;") + defer testhelper.AssertQueryRuns(backupConn, "DROP TABLE t0;") + + output := gpbackup(gpbackupPath, backupHelperPath, + "--single-data-file", + "--plugin-config", examplePluginTestConfig) + timestamp := getBackupTimestamp(string(output)) + + backupCluster.GenerateAndExecuteCommand( + "Instruct plugin to fail", + cluster.ON_HOSTS, + func(contentID int) string { + return fmt.Sprintf("touch /tmp/GPBACKUP_PLUGIN_DIE") + }) + + defer backupCluster.GenerateAndExecuteCommand( + "Unset plugin instruction", + cluster.ON_HOSTS, + func(contentID int) string { + return fmt.Sprintf("rm /tmp/GPBACKUP_PLUGIN_DIE") + }) + + gprestoreCmd := exec.Command(gprestorePath, + "--timestamp", timestamp, + "--redirect-db", "restoredb", + "--plugin-config", examplePluginTestConfig) + + _, err := gprestoreCmd.CombinedOutput() + Expect(err).To(HaveOccurred()) + + assertArtifactsCleaned(timestamp) + }, SpecTimeout(time.Second*30)) + It("Will not hang if gprestore runs with cluster resize and the helper goes down on one of the tables", func(ctx SpecContext) { + copyPluginToAllHosts(backupConn, examplePluginExec) + + pluginBackupDirectory := `/tmp/plugin_dest` + os.Mkdir(pluginBackupDirectory, 0777) + command := exec.Command("tar", "-xzf", fmt.Sprintf("resources/%s.tar.gz", "9-segment-db-with-plugin"), "-C", pluginBackupDirectory) + mustRunCommand(command) + + backupCluster.GenerateAndExecuteCommand( + "Instruct plugin to fail", + cluster.ON_HOSTS, + func(contentID int) string { + return fmt.Sprintf("touch /tmp/GPBACKUP_PLUGIN_DIE") + }) + + defer backupCluster.GenerateAndExecuteCommand( + "Unset plugin instruction", + cluster.ON_HOSTS, + func(contentID int) string { + return fmt.Sprintf("rm /tmp/GPBACKUP_PLUGIN_DIE") + }) + + timestamp := "20240812201233" + + gprestoreCmd := exec.Command(gprestorePath, + "--resize-cluster", + "--timestamp", timestamp, + "--redirect-db", "restoredb", + "--plugin-config", examplePluginTestConfig) + + // instruct plugin to die only before restoring the last table + gprestoreCmd.Env = os.Environ() + gprestoreCmd.Env = append(gprestoreCmd.Env, "GPBACKUP_PLUGIN_DIE_ON_OID=16392") + + _, err := gprestoreCmd.CombinedOutput() + Expect(err).To(HaveOccurred()) + + assertArtifactsCleaned(timestamp) + + os.RemoveAll(pluginBackupDirectory) + }, SpecTimeout(time.Second*30)) It("runs gpbackup and gprestore with plugin, single-data-file, and no-compression", func() { copyPluginToAllHosts(backupConn, examplePluginExec) diff --git a/end_to_end/resources/9-segment-db-with-plugin.tar.gz b/end_to_end/resources/9-segment-db-with-plugin.tar.gz new file mode 100644 index 000000000..9ecebe887 Binary files /dev/null and b/end_to_end/resources/9-segment-db-with-plugin.tar.gz differ diff --git a/plugins/example_plugin.bash b/plugins/example_plugin.bash index b2e193d3f..302d5d6c8 100755 --- a/plugins/example_plugin.bash +++ b/plugins/example_plugin.bash @@ -79,7 +79,11 @@ restore_data() { timestamp_day_dir=${timestamp_dir%??????} if [ -e "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR" ] ; then echo 'Some plugin warning' >&2 - elif [ -e "/tmp/GPBACKUP_PLUGIN_DIE" ] ; then + elif [ -e "/tmp/GPBACKUP_PLUGIN_DIE" -a "$GPBACKUP_PLUGIN_DIE_ON_OID" = "" ] ; then + exit 1 + elif [[ -e "/tmp/GPBACKUP_PLUGIN_DIE" && "$filename" == *"$GPBACKUP_PLUGIN_DIE_ON_OID"* ]] ; then + # sleep a while for test purposes - to let gprestore start COPY commands + sleep 5 exit 1 fi cat /tmp/plugin_dest/$timestamp_day_dir/$timestamp_dir/$filename diff --git a/restore/data.go b/restore/data.go index 3c436dd66..2396d7360 100644 --- a/restore/data.go +++ b/restore/data.go @@ -258,6 +258,12 @@ func restoreDataFromTimestamp(fpInfo filepath.FilePathInfo, dataEntries []toc.Co ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Make sure it's called to release resources even if no errors + // Launch a checker that polls if the restore helper has ended with an error. It will cancel all pending + // COPY commands that could be hanging on pipes, that the restore helper didn't close before it died. + if backupConfig.SingleDataFile || resizeCluster { + utils.StartHelperChecker(globalCluster, globalFPInfo, cancel) + } + for i := 0; i < connectionPool.NumConns; i++ { workerPool.Add(1) go func(whichConn int) { diff --git a/utils/agent_remote.go b/utils/agent_remote.go index af43f42d4..3ee401afd 100644 --- a/utils/agent_remote.go +++ b/utils/agent_remote.go @@ -361,3 +361,19 @@ func CreateSkipFileOnSegments(oid string, tableName string, c *cluster.Cluster, return fmt.Sprintf("Could not create skip file %s_skip_%s on segments", fpInfo.GetSegmentPipeFilePath(contentID), oid) }) } + +func StartHelperChecker(cl *cluster.Cluster, fpInfo filepath.FilePathInfo, cancel func()) { + go func() { + for { + time.Sleep(5 * time.Second) + remoteOutput := cl.GenerateAndExecuteCommand("Checking gpbackup_helper agent failure", cluster.ON_SEGMENTS, func(contentID int) string { + helperErrorFileName := fmt.Sprintf("%s_error", fpInfo.GetSegmentPipeFilePath(contentID)) + return fmt.Sprintf("! ls %s", helperErrorFileName) + }) + if remoteOutput.NumErrors != 0 { + gplog.Error("gpbackup_helper failed to start on some segments") + cancel() + } + } + }() +}