Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not process batches if skip file detected in restore_helper #103

Merged
merged 23 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 47 additions & 20 deletions helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@ func (r *RestoreReader) copyAllData() (int64, error) {
return bytesRead, err
}

func closeAndDeletePipe(tableOid int, batchNum int) {
pipe := fmt.Sprintf("%s_%d_%d", *pipeFile, tableOid, batchNum)
logInfo(fmt.Sprintf("Oid %d, Batch %d: Closing pipe %s", tableOid, batchNum, pipe))
err := flushAndCloseRestoreWriter(pipe, tableOid)
if err != nil {
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, err))
}

logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, pipe))
err = deletePipe(pipe)
if err != nil {
logError("Oid %d, Batch %d: Failed to remove pipe %s: %v", tableOid, batchNum, pipe, err)
}
}

type oidWithBatch struct {
oid int
batch int
Expand Down Expand Up @@ -212,6 +227,10 @@ func doRestoreAgent() error {
preloadCreatedPipesForRestore(oidWithBatchList, *copyQueue)

var currentPipe string

// If skip file is detected for the particular tableOid, will not process batches related to this oid
skipOid := -1

for i, oidWithBatch := range oidWithBatchList {
tableOid := oidWithBatch.oid
batchNum := oidWithBatch.batch
Expand All @@ -226,19 +245,27 @@ func doRestoreAgent() error {
if i < len(oidWithBatchList)-*copyQueue {
nextOidWithBatch := oidWithBatchList[i+*copyQueue]
nextOid := nextOidWithBatch.oid
nextBatchNum := nextOidWithBatch.batch
nextPipeToCreate := fmt.Sprintf("%s_%d_%d", *pipeFile, nextOid, nextBatchNum)
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Creating pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
err := createPipe(nextPipeToCreate)
if err != nil {
logError(fmt.Sprintf("Oid %d, Batch %d: Failed to create pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
// In the case this error is hit it means we have lost the
// ability to create pipes normally, so hard quit even if
// --on-error-continue is given
return err

if nextOid != skipOid {
nextBatchNum := nextOidWithBatch.batch
nextPipeToCreate := fmt.Sprintf("%s_%d_%d", *pipeFile, nextOid, nextBatchNum)
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Creating pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
err := createPipe(nextPipeToCreate)
if err != nil {
logError(fmt.Sprintf("Oid %d, Batch %d: Failed to create pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
// In the case this error is hit it means we have lost the
// ability to create pipes normally, so hard quit even if
// --on-error-continue is given
return err
}
}
}

if tableOid == skipOid {
logVerbose(fmt.Sprintf("Oid %d, Batch %d: skip due to skip file\n", tableOid, batchNum))
goto LoopEnd
dkovalev1 marked this conversation as resolved.
Show resolved Hide resolved
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
RekGRpth marked this conversation as resolved.
Show resolved Hide resolved
}

if *singleDataFile {
start[contentToRestore] = tocEntries[contentToRestore][uint(tableOid)].StartByte
end[contentToRestore] = tocEntries[contentToRestore][uint(tableOid)].EndByte
Expand Down Expand Up @@ -280,6 +307,14 @@ func doRestoreAgent() error {
if *onErrorContinue && utils.FileExists(fmt.Sprintf("%s_skip_%d", *pipeFile, tableOid)) {
logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum))
err = nil
skipOid = tableOid
/* Close up to *copyQueue files with this tableOid */
for idx := 0; idx < *copyQueue; idx++ {
batchToDelete := batchNum + idx
if batchToDelete < batches {
RekGRpth marked this conversation as resolved.
Show resolved Hide resolved
closeAndDeletePipe(tableOid, batchToDelete)
}
}
goto LoopEnd
} else {
// keep trying to open the pipe
Expand Down Expand Up @@ -347,10 +382,8 @@ func doRestoreAgent() error {
logInfo(fmt.Sprintf("Oid %d, Batch %d: Copied %d bytes into the pipe", tableOid, batchNum, bytesRead))

RekGRpth marked this conversation as resolved.
Show resolved Hide resolved
LoopEnd:
logInfo(fmt.Sprintf("Oid %d, Batch %d: Closing pipe %s", tableOid, batchNum, currentPipe))
errPipe := flushAndCloseRestoreWriter(currentPipe, tableOid)
if errPipe != nil {
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, errPipe))
if tableOid != skipOid {
closeAndDeletePipe(tableOid, batchNum)
}

logVerbose(fmt.Sprintf("Oid %d, Batch %d: End batch restore", tableOid, batchNum))
Expand All @@ -366,12 +399,6 @@ func doRestoreAgent() error {
}
}

logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, currentPipe))
errPipe = deletePipe(currentPipe)
if errPipe != nil {
logError("Oid %d, Batch %d: Failed to remove pipe %s: %v", tableOid, batchNum, currentPipe, errPipe)
}

if err != nil {
logError(fmt.Sprintf("Oid %d, Batch %d: Error encountered: %v", tableOid, batchNum, err))
if *onErrorContinue {
Expand Down
155 changes: 155 additions & 0 deletions integration/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,63 @@ options:
Expect(err).To(HaveOccurred())
assertErrorsHandled()
})
DescribeTable("Skips batches if skip file is discovered with",
func(oid int, withPlugin bool, args ...string) {
filesToDelete := setupRestoreWithSkipFiles(oid, withPlugin)
for _, f := range filesToDelete {
defer func(filename string) {
os.Remove(filename)
}(f)
}

args = append([]string{
"--toc-file", tocFile,
"--pipe-file", pipeFile,
"--content", "1",
"--restore-agent",
"--oid-file", restoreOidFile,
"--data-file", dataFileFullPath + ".gz",
"--on-error-continue",
}, args...)

helperCmd := exec.Command(gpbackupHelperPath, args...)
err := helperCmd.Start()
Expect(err).ToNot(HaveOccurred())

// Block here until gpbackup_helper finishes (cleaning up pipes)
err = helperCmd.Wait()
Expect(err).ToNot(HaveOccurred())
for _, i := range []int{1, 2, 3} {
currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i)
Expect(currentPipe).ToNot(BeAnExistingFile())
}

By("Check in logs that batches were not restored")

homeDir := os.Getenv("HOME")
helperFiles, _ := filepath.Glob(filepath.Join(homeDir, "gpAdminLogs/gpbackup_helper_*"))
Expect(helperFiles).ToNot(BeEmpty())

patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid)
helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput()
helperOutput := string(helperOut)

Expect(helperOutput).ToNot(BeEmpty())

// Batch 0 should be processed
Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`))
Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`))

// Batch 2 must not be processed
Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`))
Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`))
},
Entry("skips batches if skip file is discovered with single datafile config", -1, false, "--single-data-file"),
Entry("skips batches if skip file is discovered with resize restore", 1, false, "--resize-cluster", "--orig-seg-count", "6", "--dest-seg-count", "3"),
Entry("skips batches if skip file is discovered with single datafile config using a plugin", -1, true, "--single-data-file", "--restore-agent", "--plugin-config", examplePluginTestConfig),
Entry("skips batches if skip file is discovered with resize restore using a plugin", 1, true, "--resize-cluster", "--orig-seg-count", "6", "--dest-seg-count", "3", "--restore-agent", "--plugin-config", examplePluginTestConfig),
dkovalev1 marked this conversation as resolved.
Show resolved Hide resolved
)

RekGRpth marked this conversation as resolved.
Show resolved Hide resolved
It("Continues restore process when encountering an error with flag --on-error-continue", func() {
// Write data file
dataFile := dataFileFullPath
Expand Down Expand Up @@ -543,6 +600,104 @@ func setupRestoreFiles(compressionType string, withPlugin bool) {
_, _ = f.WriteString(expectedTOC)
}

func createDataFile(dataFile string, dataLength int) {
// Write data file
f, err := os.Create(dataFile + ".gz")
if err != nil {
Fail(fmt.Sprintf("%v", err))
}
defer f.Close()
gzipf := gzip.NewWriter(f)
defer gzipf.Close()

// Named pipes can buffer, so we need to write more than the buffer size to trigger flush error
customData := "here is some data\n"

customData += strings.Repeat("a", dataLength)
customData += "here is some data\n"

if _, err := gzipf.Write([]byte(customData)); err != nil {
Fail(fmt.Sprintf("%v", err))
}
}

func createOidFile(fname string, content string) {
fOid, err := os.Create(fname)
if err != nil {
Fail(fmt.Sprintf("Could not create %s: %v", fname, err))
}
defer fOid.Close()

if _, err := fOid.WriteString(content); err != nil {
Fail(fmt.Sprintf("Could not write to %s: %v", fname, err))
}
}

func createCustomTOCFile(fname string, dataLength int) {
customTOC := fmt.Sprintf(`dataentries:
1:
startbyte: 0
endbyte: 18
2:
startbyte: 18
endbyte: %[1]d
3:
startbyte: %[1]d
endbyte: %d
`, dataLength+18, dataLength+18+18)
fToc, err := os.Create(fname)
if err != nil {
Fail(fmt.Sprintf("%v", err))
}
defer fToc.Close()

if _, err = fToc.WriteString(customTOC); err != nil {
Fail(fmt.Sprintf("%v", err))
}
}

/*
Tests with skip files and the one with flag --on-error-continue
require a bit more complicated setup, do different setup function.
Returns file name list which must be deleted when done.
*/
func setupRestoreWithSkipFiles(oid int, withPlugin bool) []string {
dataLength := 128*1024 + 1

ret := []string{}

fileName := dataFileFullPath
if withPlugin {
fileName = examplePluginTestDataFile
}
if oid > 0 {
fileName = fileName + fmt.Sprintf("_%d", oid)
}

createDataFile(fileName, dataLength)
ret = append(ret, fileName)

// Write oid file
createOidFile(restoreOidFile, "1,0\n1,1\n1,2\n")
ret = append(ret, restoreOidFile)

pipename := fmt.Sprintf("%s_%d_0", pipeFile, 1)
err := unix.Mkfifo(pipename, 0700)
if err != nil {
Fail(fmt.Sprintf("%v", err))
}

createCustomTOCFile(tocFile, dataLength)
ret = append(ret, tocFile)

skipFile := fmt.Sprintf("%s_skip_%d", pipeFile, 1)
err = exec.Command("touch", skipFile).Run()
Expect(err).ToNot(HaveOccurred())

ret = append(ret, skipFile)
return ret
}

func assertNoErrors() {
Expect(errorFile).To(Not(BeARegularFile()))
pipes, err := filepath.Glob(pipeFile + "_[1-9]*")
Expand Down
Loading