From 3d3441ebc91db9d8a962699967ee9506a7d1492c Mon Sep 17 00:00:00 2001 From: Vladimir Sarmin Date: Fri, 28 Jun 2024 15:02:20 +0300 Subject: [PATCH] Avoid using stderr to detect plugin failures, wait for plugin processes (#89) Previously, gpbackup_helper would error out and abort restore operations if any plugin wrote anything to stderr. Additionally, when using the adb_ddp_plugin to restore data, gpbackup_helper did not wait for plugin processes, leading to a large number of zombie processes when restoring with the --resize-cluster flag, causing the process to stop. This patch removes the requirement for stderr to be empty. Now, messages directed to stderr are logged as warnings, allowing the process to continue without interruption. The helper can still detect when a plugin process has exited because the exit of a plugin process closes the associated reader handles, causing an error during subsequent read attempts. The patch also adds logic to wait and reap plugin processes. Instead of turning plugin processes into zombies, gpbackup_helper now calls Wait() on them. This action is performed every time a reader finishes copying its content. Wait() is not done in case of --single-data-file, because Wait() closes pipes immediately, but helper will reuse the same reader and read from its stdout pipe multiple times. Two new tests are introduced: the first one verifies that gpbackup_helper does not fail when a plugin writes something to stderr during the restore operation. The second test ensures that gpbackup_helper errors out when a plugin process terminates in the middle of the restore operation. Changes comparing to the original commit: 1. logWarning() is replaced with already existing logWarn(), that has the same functionality. 2. One of the calls to waitForPlugin() is removed as no more necessary, because there is no more nested loop over batches, and we can leave only one call for waitForPlugin() after 'LoopEnd' label. 3. Several variable names in the test were updated as old names do not exist anymore. Plus the pipefile name in the test was updated, as now it includes batch number. 4. log() doesn't exist anymore and is replaced with logVerbose(). 5. Unreachable call to logPlugin() is removed. 6. New tests are added to cover the case with cluster resize. 7. logPlugin() is merged into waitForPlugin(). 8. Tests are reworked to avoid goroutines. (cherry picked from commit bb75d5a47462fe94d7e39affd1f66e706fcecf5e) Co-authored-by: Roman Eskin --- helper/restore_helper.go | 101 ++++++++++++++++----- integration/helper_test.go | 174 ++++++++++++++++++++++++++++++++++++ plugins/README.md | 2 +- plugins/example_plugin.bash | 5 ++ 4 files changed, 258 insertions(+), 24 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index e045ea344..e02fe9e65 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -47,8 +47,30 @@ type RestoreReader struct { fileHandle *os.File bufReader *bufio.Reader seekReader io.ReadSeeker + pluginCmd *pluginCmd readerType ReaderType - errBuf bytes.Buffer +} + +// Wait for plugin process that should be already finished. This should be +// called on every reader that used a plugin as to not leave any zombies behind. +func (r *RestoreReader) waitForPlugin() error { + var err error + if r.pluginCmd != nil && !r.pluginCmd.isEnded { + logVerbose(fmt.Sprintf("Waiting for the plugin process (%d)", r.pluginCmd.Process.Pid)) + err = r.pluginCmd.Wait() + r.pluginCmd.isEnded = true + if err != nil { + logError(fmt.Sprintf("Plugin process exited with an error: %s", err)) + } + // Log plugin's stderr as warnings. + errLog := strings.Trim(r.pluginCmd.stderrBuffer.String(), "\x00") + if len(errLog) != 0 { + logWarn(fmt.Sprintf("Plugin log: %s", errLog)) + // Consume the entire buffer. + r.pluginCmd.stderrBuffer.Next(r.pluginCmd.stderrBuffer.Len()) + } + } + return err } func (r *RestoreReader) positionReader(pos uint64, oid int) error { @@ -164,7 +186,20 @@ func doRestoreAgent() error { filename := replaceContentInFilename(*dataFile, contentToRestore) readers[contentToRestore], err = getRestoreDataReader(filename, segmentTOC[contentToRestore], oidList) - + if readers[contentToRestore] != nil { + // NOTE: If we reach here with batches > 1, there will be + // *origSize / *destSize (N old segments / N new segments) + // readers + 1, which is presumably a small number, so we just + // defer the cleanup. + // + // The loops under are constructed in a way that needs to keep + // all readers open for the entire duration of restore (oid is + // in outer loop -- batches in inner loop, we'll need all + // readers for every outer loop iteration), so we can't properly + // close any of the readers until we restore every oid yet, + // unless The Big Refactoring will arrive. + defer readers[contentToRestore].waitForPlugin() + } if err != nil { logError(fmt.Sprintf("Error encountered getting restore data reader for single data file: %v", err)) return err @@ -303,12 +338,7 @@ func doRestoreAgent() error { if *singleDataFile { lastByte[contentToRestore] = start[contentToRestore] + uint64(bytesRead) } - errBuf := readers[contentToRestore].errBuf - if errBuf.Len() > 0 { - err = errors.Wrap(err, strings.Trim(errBuf.String(), "\x00")) - } else { - err = errors.Wrap(err, "Error copying data") - } + err = errors.Wrap(err, "Error copying data") goto LoopEnd } @@ -326,6 +356,17 @@ func doRestoreAgent() error { logVerbose(fmt.Sprintf("Oid %d, Batch %d: End batch restore", tableOid, batchNum)) + // On resize restore reader might be nil. + if !*singleDataFile && !(*isResizeRestore && contentToRestore >= *origSize) { + if errPlugin := readers[contentToRestore].waitForPlugin(); errPlugin != nil { + if err != nil { + err = errors.Wrap(err, errPlugin.Error()) + } else { + err = errPlugin + } + } + } + logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, currentPipe)) errPipe := deletePipe(currentPipe) if errPipe != nil { @@ -371,10 +412,12 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i var seekHandle io.ReadSeeker var isSubset bool var err error = nil + var pluginCmd *pluginCmd = nil restoreReader := new(RestoreReader) if *pluginConfigFile != "" { - readHandle, isSubset, err = startRestorePluginCommand(fileToRead, objToc, oidList, &restoreReader.errBuf) + pluginCmd, readHandle, isSubset, err = startRestorePluginCommand(fileToRead, objToc, oidList) + restoreReader.pluginCmd = pluginCmd if isSubset { // Reader that operates on subset data restoreReader.readerType = SUBSET @@ -400,6 +443,9 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i // error logging handled by calling functions return nil, err } + if pluginCmd != nil { + logVerbose(fmt.Sprintf("Started plugin process (%d)", pluginCmd.Process.Pid)) + } // Set the underlying stream reader in restoreReader if restoreReader.readerType == SEEKABLE { @@ -422,12 +468,6 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i restoreReader.bufReader = bufio.NewReader(readHandle) } - // Check that no error has occurred in plugin command - errMsg := strings.Trim(restoreReader.errBuf.String(), "\x00") - if len(errMsg) != 0 { - return nil, errors.New(errMsg) - } - return restoreReader, err } @@ -466,12 +506,27 @@ func getSubsetFlag(fileToRead string, pluginConfig *utils.PluginConfig) bool { return true } -func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidList []int, errBuffer *bytes.Buffer) (io.Reader, bool, error) { +// pluginCmd is needed to keep track of readable stderr and whether the command +// has already been ended. +type pluginCmd struct { + *exec.Cmd + stderrBuffer *bytes.Buffer + isEnded bool +} + +func newPluginCmd(name string, arg ...string) *pluginCmd { + var errBuf bytes.Buffer + cmd := exec.Command(name, arg...) + cmd.Stderr = &errBuf + return &pluginCmd{cmd, &errBuf, false} +} + +func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*pluginCmd, io.Reader, bool, error) { isSubset := false pluginConfig, err := utils.ReadPluginConfig(*pluginConfigFile) if err != nil { logError(fmt.Sprintf("Error encountered when reading plugin config: %v", err)) - return nil, false, err + return nil, nil, false, err } cmdStr := "" if objToc != nil && getSubsetFlag(fileToRead, pluginConfig) { @@ -492,14 +547,14 @@ func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidLis cmdStr = fmt.Sprintf("%s restore_data %s %s", pluginConfig.ExecutablePath, pluginConfig.ConfigPath, fileToRead) } logVerbose(cmdStr) - cmd := exec.Command("bash", "-c", cmdStr) + pluginCmd := newPluginCmd("bash", "-c", cmdStr) - readHandle, err := cmd.StdoutPipe() + readHandle, err := pluginCmd.StdoutPipe() if err != nil { - return nil, false, err + return nil, nil, false, err } - cmd.Stderr = errBuffer - err = cmd.Start() - return readHandle, isSubset, err + err = pluginCmd.Start() + + return pluginCmd, readHandle, isSubset, err } diff --git a/integration/helper_test.go b/integration/helper_test.go index a91dd807a..15575b914 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -263,6 +263,180 @@ options: Expect(err).ToNot(HaveOccurred()) assertNoErrors() }) + It("gpbackup_helper will not error out when plugin writes something to stderr", func() { + setupRestoreFiles("", true) + + err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run() + Expect(err).ToNot(HaveOccurred()) + + args := []string{ + "--toc-file", tocFile, + "--oid-file", restoreOidFile, + "--pipe-file", pipeFile, + "--content", "1", + "--single-data-file", + "--restore-agent", + "--data-file", dataFileFullPath, + "--plugin-config", examplePluginTestConfig} + helperCmd := exec.Command(gpbackupHelperPath, args...) + + var outBuffer bytes.Buffer + helperCmd.Stdout = &outBuffer + helperCmd.Stderr = &outBuffer + + err = helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + for _, i := range []int{1, 3} { + contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i)) + Expect(string(contents)).To(Equal("here is some data\n")) + } + + err = helperCmd.Wait() + printHelperLogOnError(err) + Expect(err).ToNot(HaveOccurred()) + + outputStr := outBuffer.String() + Expect(outputStr).To(ContainSubstring("Some plugin warning")) + + err = exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run() + Expect(err).ToNot(HaveOccurred()) + + assertNoErrors() + }) + It("gpbackup_helper will not error out when plugin writes something to stderr with cluster resize", func() { + setupRestoreFiles("", true) + for _, i := range []int{1, 3} { + f, _ := os.Create(fmt.Sprintf("%s_%d", examplePluginTestDataFile, i)) + f.WriteString("here is some data\n") + } + + err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run() + Expect(err).ToNot(HaveOccurred()) + + args := []string{ + "--toc-file", tocFile, + "--oid-file", restoreOidFile, + "--pipe-file", pipeFile, + "--content", "1", + "--resize-cluster", + "--orig-seg-count", "6", + "--dest-seg-count", "3", + "--restore-agent", + "--data-file", examplePluginTestDataFile, + "--plugin-config", examplePluginTestConfig} + helperCmd := exec.Command(gpbackupHelperPath, args...) + + var outBuffer bytes.Buffer + helperCmd.Stdout = &outBuffer + helperCmd.Stderr = &outBuffer + + err = helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + for _, i := range []int{1, 3} { + contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i)) + Expect(string(contents)).To(Equal("here is some data\n")) + } + + err = helperCmd.Wait() + printHelperLogOnError(err) + Expect(err).ToNot(HaveOccurred()) + + outputStr := outBuffer.String() + Expect(outputStr).To(ContainSubstring("Some plugin warning")) + + err = exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run() + Expect(err).ToNot(HaveOccurred()) + + assertNoErrors() + }) + It("gpbackup_helper will error out if plugin exits early", func() { + setupRestoreFiles("", true) + + err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_DIE").Run() + Expect(err).ToNot(HaveOccurred()) + + args := []string{ + "--toc-file", tocFile, + "--oid-file", restoreOidFile, + "--pipe-file", pipeFile, + "--content", "1", + "--single-data-file", + "--restore-agent", + "--data-file", dataFileFullPath, + "--plugin-config", examplePluginTestConfig} + helperCmd := exec.Command(gpbackupHelperPath, args...) + + var outBuffer bytes.Buffer + helperCmd.Stdout = &outBuffer + helperCmd.Stderr = &outBuffer + + err = helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + for _, i := range []int{1, 3} { + contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i)) + // Empty output + Expect(contents).To(Equal([]byte{})) + } + + err = helperCmd.Wait() + Expect(err).To(HaveOccurred()) + + outputStr := outBuffer.String() + Expect(outputStr).To(ContainSubstring("Plugin process exited with an error")) + + err = exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_DIE").Run() + Expect(err).ToNot(HaveOccurred()) + + assertErrorsHandled() + }) + It("gpbackup_helper will error out if plugin exits early with cluster resize", func() { + setupRestoreFiles("", true) + for _, i := range []int{1, 3} { + f, _ := os.Create(fmt.Sprintf("%s_%d", examplePluginTestDataFile, i)) + f.WriteString("here is some data\n") + } + + err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_DIE").Run() + Expect(err).ToNot(HaveOccurred()) + + args := []string{ + "--toc-file", tocFile, + "--oid-file", restoreOidFile, + "--pipe-file", pipeFile, + "--content", "1", + "--resize-cluster", + "--orig-seg-count", "6", + "--dest-seg-count", "3", + "--restore-agent", + "--data-file", examplePluginTestDataFile, + "--plugin-config", examplePluginTestConfig} + helperCmd := exec.Command(gpbackupHelperPath, args...) + + var outBuffer bytes.Buffer + helperCmd.Stdout = &outBuffer + helperCmd.Stderr = &outBuffer + + err = helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, 1)) + // Empty output + Expect(contents).To(Equal([]byte{})) + + err = helperCmd.Wait() + Expect(err).To(HaveOccurred()) + + outputStr := outBuffer.String() + Expect(outputStr).To(ContainSubstring("Plugin process exited with an error")) + + err = exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_DIE").Run() + Expect(err).ToNot(HaveOccurred()) + + assertErrorsHandled() + }) It("Generates error file when restore agent interrupted", FlakeAttempts(5), func() { setupRestoreFiles("gzip", false) helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--single-data-file") diff --git a/plugins/README.md b/plugins/README.md index ca3657d15..0708e1f9a 100644 --- a/plugins/README.md +++ b/plugins/README.md @@ -43,7 +43,7 @@ gpbackup and gprestore will call the plugin executable in the format [plugin_executable_name] [command] arg1 arg2 ``` -If an error occurs during plugin execution, plugins should write an error message to stderr and return a non-zero error code. +If an error occurs during plugin execution, the plugin should exit with a non-zero code. Plugins may write log messages to stderr without affecting the execution. diff --git a/plugins/example_plugin.bash b/plugins/example_plugin.bash index c13ac334a..b2e193d3f 100755 --- a/plugins/example_plugin.bash +++ b/plugins/example_plugin.bash @@ -77,6 +77,11 @@ restore_data() { filename=`basename "$2"` timestamp_dir=`basename $(dirname "$2")` timestamp_day_dir=${timestamp_dir%??????} + if [ -e "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR" ] ; then + echo 'Some plugin warning' >&2 + elif [ -e "/tmp/GPBACKUP_PLUGIN_DIE" ] ; then + exit 1 + fi cat /tmp/plugin_dest/$timestamp_day_dir/$timestamp_dir/$filename }