Skip to content

Commit

Permalink
Avoid using stderr to detect plugin failures, wait for plugin process…
Browse files Browse the repository at this point in the history
…es (#89)

Previously, gpbackup_helper would error out and abort restore operations if any
plugin wrote anything to stderr. Additionally, when using the adb_ddp_plugin to
restore data, gpbackup_helper did not wait for plugin processes, leading to a
large number of zombie processes when restoring with the --resize-cluster flag,
causing the process to stop.

This patch removes the requirement for stderr to be empty. Now, messages
directed to stderr are logged as warnings, allowing the process to continue
without interruption. The helper can still detect when a plugin process has
exited because the exit of a plugin process closes the associated reader
handles, causing an error during subsequent read attempts.

The patch also adds logic to wait and reap plugin processes. Instead of turning
plugin processes into zombies, gpbackup_helper now calls Wait() on them. This
action is performed every time a reader finishes copying its content. Wait() is
not done in case of --single-data-file, because Wait() closes pipes immediately,
but helper will reuse the same reader and read from its stdout pipe multiple
times.

Two new tests are introduced: the first one verifies that gpbackup_helper does
not fail when a plugin writes something to stderr during the restore operation.
The second test ensures that gpbackup_helper errors out when a plugin process
terminates in the middle of the restore operation.

Changes comparing to the original commit:
1. logWarning() is replaced with already existing logWarn(), that has the same
functionality.
2. One of the calls to waitForPlugin() is removed as no more necessary, because
there is no more nested loop over batches, and we can leave only one call for
waitForPlugin() after 'LoopEnd' label.
3. Several variable names in the test were updated as old names do not exist
anymore. Plus the pipefile name in the test was updated, as now it includes
batch number.
4. log() doesn't exist anymore and is replaced with logVerbose().
5. Unreachable call to logPlugin() is removed.
6. New tests are added to cover the case with cluster resize.
7. logPlugin() is merged into waitForPlugin().
8. Tests are reworked to avoid goroutines.
9. Cleanup of plugin's test control files in the test is now done from a defer
function in order not to leave them if test failed (otherwise a failed test
could affect the subsequent tests).
10. SpecTimeout is added to some new tests to ensure that if the delta from
this commit is broken, the test will not hang and will provide more useful
output.

(cherry picked from commit bb75d5a)

Co-authored-by: Roman Eskin <[email protected]>
  • Loading branch information
2 people authored and RekGRpth committed Aug 1, 2024
1 parent d1eb0bb commit bfd98bd
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 24 deletions.
95 changes: 72 additions & 23 deletions helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,29 @@ type RestoreReader struct {
fileHandle *os.File
bufReader *bufio.Reader
seekReader io.ReadSeeker
pluginCmd *PluginCmd
readerType ReaderType
errBuf bytes.Buffer
}

// Wait for plugin process that should be already finished. This should be
// called on every reader that used a plugin as to not leave any zombies behind.
func (r *RestoreReader) waitForPlugin() error {
var err error
if r.pluginCmd != nil && r.pluginCmd.Process != nil {
logVerbose(fmt.Sprintf("Waiting for the plugin process (%d)", r.pluginCmd.Process.Pid))
err = r.pluginCmd.Wait()
if err != nil {
logError(fmt.Sprintf("Plugin process exited with an error: %s", err))
}
// Log plugin's stderr as warnings.
errLog := strings.Trim(r.pluginCmd.errBuf.String(), "\x00")
if len(errLog) != 0 {
logWarn(fmt.Sprintf("Plugin log: %s", errLog))
// Consume the entire buffer.
r.pluginCmd.errBuf.Next(r.pluginCmd.errBuf.Len())
}
}
return err
}

func (r *RestoreReader) positionReader(pos uint64, oid int) error {
Expand Down Expand Up @@ -164,7 +185,20 @@ func doRestoreAgent() error {

filename := replaceContentInFilename(*dataFile, contentToRestore)
readers[contentToRestore], err = getRestoreDataReader(filename, segmentTOC[contentToRestore], oidList)

if readers[contentToRestore] != nil {
// NOTE: If we reach here with batches > 1, there will be
// *origSize / *destSize (N old segments / N new segments)
// readers + 1, which is presumably a small number, so we just
// defer the cleanup.
//
// The loops under are constructed in a way that needs to keep
// all readers open for the entire duration of restore (oid is
// in outer loop -- batches in inner loop, we'll need all
// readers for every outer loop iteration), so we can't properly
// close any of the readers until we restore every oid yet,
// unless The Big Refactoring will arrive.
defer readers[contentToRestore].waitForPlugin()
}
if err != nil {
logError(fmt.Sprintf("Error encountered getting restore data reader for single data file: %v", err))
return err
Expand Down Expand Up @@ -303,12 +337,7 @@ func doRestoreAgent() error {
if *singleDataFile {
lastByte[contentToRestore] = start[contentToRestore] + uint64(bytesRead)
}
errBuf := readers[contentToRestore].errBuf
if errBuf.Len() > 0 {
err = errors.Wrap(err, strings.Trim(errBuf.String(), "\x00"))
} else {
err = errors.Wrap(err, "Error copying data")
}
err = errors.Wrap(err, "Error copying data")
goto LoopEnd
}

Expand All @@ -326,6 +355,17 @@ func doRestoreAgent() error {

logVerbose(fmt.Sprintf("Oid %d, Batch %d: End batch restore", tableOid, batchNum))

// On resize restore reader might be nil.
if !*singleDataFile && !(*isResizeRestore && contentToRestore >= *origSize) {
if errPlugin := readers[contentToRestore].waitForPlugin(); errPlugin != nil {
if err != nil {
err = errors.Wrap(err, errPlugin.Error())
} else {
err = errPlugin
}
}
}

logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, currentPipe))
errPipe := deletePipe(currentPipe)
if errPipe != nil {
Expand Down Expand Up @@ -371,10 +411,12 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i
var seekHandle io.ReadSeeker
var isSubset bool
var err error = nil
var pluginCmd *PluginCmd = nil
restoreReader := new(RestoreReader)

if *pluginConfigFile != "" {
readHandle, isSubset, err = startRestorePluginCommand(fileToRead, objToc, oidList, &restoreReader.errBuf)
pluginCmd, readHandle, isSubset, err = startRestorePluginCommand(fileToRead, objToc, oidList)
restoreReader.pluginCmd = pluginCmd
if isSubset {
// Reader that operates on subset data
restoreReader.readerType = SUBSET
Expand All @@ -400,6 +442,9 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i
// error logging handled by calling functions
return nil, err
}
if pluginCmd != nil {
logVerbose(fmt.Sprintf("Started plugin process (%d)", pluginCmd.Process.Pid))
}

// Set the underlying stream reader in restoreReader
if restoreReader.readerType == SEEKABLE {
Expand All @@ -422,12 +467,6 @@ func getRestoreDataReader(fileToRead string, objToc *toc.SegmentTOC, oidList []i
restoreReader.bufReader = bufio.NewReader(readHandle)
}

// Check that no error has occurred in plugin command
errMsg := strings.Trim(restoreReader.errBuf.String(), "\x00")
if len(errMsg) != 0 {
return nil, errors.New(errMsg)
}

return restoreReader, err
}

Expand Down Expand Up @@ -466,12 +505,19 @@ func getSubsetFlag(fileToRead string, pluginConfig *utils.PluginConfig) bool {
return true
}

func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidList []int, errBuffer *bytes.Buffer) (io.Reader, bool, error) {
// pluginCmd is needed to keep track of readable stderr and whether the command
// has already been ended.
type PluginCmd struct {
*exec.Cmd
errBuf bytes.Buffer
}

func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidList []int) (*PluginCmd, io.Reader, bool, error) {
isSubset := false
pluginConfig, err := utils.ReadPluginConfig(*pluginConfigFile)
if err != nil {
logError(fmt.Sprintf("Error encountered when reading plugin config: %v", err))
return nil, false, err
return nil, nil, false, err
}
cmdStr := ""
if objToc != nil && getSubsetFlag(fileToRead, pluginConfig) {
Expand All @@ -492,14 +538,17 @@ func startRestorePluginCommand(fileToRead string, objToc *toc.SegmentTOC, oidLis
cmdStr = fmt.Sprintf("%s restore_data %s %s", pluginConfig.ExecutablePath, pluginConfig.ConfigPath, fileToRead)
}
logVerbose(cmdStr)
cmd := exec.Command("bash", "-c", cmdStr)

readHandle, err := cmd.StdoutPipe()
pluginCmd := &PluginCmd{}
pluginCmd.Cmd = exec.Command("bash", "-c", cmdStr)
pluginCmd.Stderr = &pluginCmd.errBuf

readHandle, err := pluginCmd.StdoutPipe()
if err != nil {
return nil, false, err
return nil, nil, false, err
}
cmd.Stderr = errBuffer

err = cmd.Start()
return readHandle, isSubset, err
err = pluginCmd.Start()

return pluginCmd, readHandle, isSubset, err
}
162 changes: 162 additions & 0 deletions integration/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,168 @@ options:
Expect(err).ToNot(HaveOccurred())
assertNoErrors()
})
It("gpbackup_helper will not error out when plugin writes something to stderr", func() {
setupRestoreFiles("", true)

err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run()
Expect(err).ToNot(HaveOccurred())
defer exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run()

args := []string{
"--toc-file", tocFile,
"--oid-file", restoreOidFile,
"--pipe-file", pipeFile,
"--content", "1",
"--single-data-file",
"--restore-agent",
"--data-file", dataFileFullPath,
"--plugin-config", examplePluginTestConfig}
helperCmd := exec.Command(gpbackupHelperPath, args...)

var outBuffer bytes.Buffer
helperCmd.Stdout = &outBuffer
helperCmd.Stderr = &outBuffer

err = helperCmd.Start()
Expect(err).ToNot(HaveOccurred())

for _, i := range []int{1, 3} {
contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i))
Expect(string(contents)).To(Equal("here is some data\n"))
}

err = helperCmd.Wait()
printHelperLogOnError(err)
Expect(err).ToNot(HaveOccurred())

outputStr := outBuffer.String()
Expect(outputStr).To(ContainSubstring("Some plugin warning"))

assertNoErrors()
})
It("gpbackup_helper will not error out when plugin writes something to stderr with cluster resize", func() {
setupRestoreFiles("", true)
for _, i := range []int{1, 3} {
f, _ := os.Create(fmt.Sprintf("%s_%d", examplePluginTestDataFile, i))
f.WriteString("here is some data\n")
}

err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run()
Expect(err).ToNot(HaveOccurred())
defer exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR").Run()

args := []string{
"--toc-file", tocFile,
"--oid-file", restoreOidFile,
"--pipe-file", pipeFile,
"--content", "1",
"--resize-cluster",
"--orig-seg-count", "6",
"--dest-seg-count", "3",
"--restore-agent",
"--data-file", examplePluginTestDataFile,
"--plugin-config", examplePluginTestConfig}
helperCmd := exec.Command(gpbackupHelperPath, args...)

var outBuffer bytes.Buffer
helperCmd.Stdout = &outBuffer
helperCmd.Stderr = &outBuffer

err = helperCmd.Start()
Expect(err).ToNot(HaveOccurred())

for _, i := range []int{1, 3} {
contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i))
Expect(string(contents)).To(Equal("here is some data\n"))
}

err = helperCmd.Wait()
printHelperLogOnError(err)
Expect(err).ToNot(HaveOccurred())

outputStr := outBuffer.String()
Expect(outputStr).To(ContainSubstring("Some plugin warning"))

assertNoErrors()
})
It("gpbackup_helper will error out if plugin exits early", func(ctx SpecContext) {
setupRestoreFiles("", true)

err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_DIE").Run()
Expect(err).ToNot(HaveOccurred())
defer exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_DIE").Run()

args := []string{
"--toc-file", tocFile,
"--oid-file", restoreOidFile,
"--pipe-file", pipeFile,
"--content", "1",
"--single-data-file",
"--restore-agent",
"--data-file", dataFileFullPath,
"--plugin-config", examplePluginTestConfig}
helperCmd := exec.Command(gpbackupHelperPath, args...)

var outBuffer bytes.Buffer
helperCmd.Stdout = &outBuffer
helperCmd.Stderr = &outBuffer

err = helperCmd.Start()
Expect(err).ToNot(HaveOccurred())

for _, i := range []int{1, 3} {
contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i))
// Empty output
Expect(contents).To(Equal([]byte{}))
}

err = helperCmd.Wait()
Expect(err).To(HaveOccurred())

outputStr := outBuffer.String()
Expect(outputStr).To(ContainSubstring("Plugin process exited with an error"))

assertErrorsHandled()
}, SpecTimeout(time.Second*10))
It("gpbackup_helper will error out if plugin exits early with cluster resize", func(ctx SpecContext) {
setupRestoreFiles("", true)

err := exec.Command("touch", "/tmp/GPBACKUP_PLUGIN_DIE").Run()
Expect(err).ToNot(HaveOccurred())
defer exec.Command("rm", "/tmp/GPBACKUP_PLUGIN_DIE").Run()

args := []string{
"--toc-file", tocFile,
"--oid-file", restoreOidFile,
"--pipe-file", pipeFile,
"--content", "1",
"--resize-cluster",
"--orig-seg-count", "6",
"--dest-seg-count", "3",
"--restore-agent",
"--data-file", examplePluginTestDataFile,
"--plugin-config", examplePluginTestConfig}
helperCmd := exec.Command(gpbackupHelperPath, args...)

var outBuffer bytes.Buffer
helperCmd.Stdout = &outBuffer
helperCmd.Stderr = &outBuffer

err = helperCmd.Start()
Expect(err).ToNot(HaveOccurred())

contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, 1))
// Empty output
Expect(contents).To(Equal([]byte{}))

err = helperCmd.Wait()
Expect(err).To(HaveOccurred())

outputStr := outBuffer.String()
Expect(outputStr).To(ContainSubstring("Plugin process exited with an error"))

assertErrorsHandled()
}, SpecTimeout(time.Second*10))
It("Generates error file when restore agent interrupted", FlakeAttempts(5), func() {
setupRestoreFiles("gzip", false)
helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--single-data-file")
Expand Down
2 changes: 1 addition & 1 deletion plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ gpbackup and gprestore will call the plugin executable in the format
[plugin_executable_name] [command] arg1 arg2
```

If an error occurs during plugin execution, plugins should write an error message to stderr and return a non-zero error code.
If an error occurs during plugin execution, the plugin should exit with a non-zero code. Plugins may write log messages to stderr without affecting the execution.



Expand Down
5 changes: 5 additions & 0 deletions plugins/example_plugin.bash
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ restore_data() {
filename=`basename "$2"`
timestamp_dir=`basename $(dirname "$2")`
timestamp_day_dir=${timestamp_dir%??????}
if [ -e "/tmp/GPBACKUP_PLUGIN_LOG_TO_STDERR" ] ; then
echo 'Some plugin warning' >&2
elif [ -e "/tmp/GPBACKUP_PLUGIN_DIE" ] ; then
exit 1
fi
cat /tmp/plugin_dest/$timestamp_day_dir/$timestamp_dir/$filename
}

Expand Down

0 comments on commit bfd98bd

Please sign in to comment.