Skip to content

Commit

Permalink
Do not process batches if skip file detected in restore_helper (#103)
Browse files Browse the repository at this point in the history
* Do not process batches if skip file detected in restore_helper

* Fix pattern format in log analyze

* Fix test output pattern

* Take into account forward pipe opening

* Add more checks for skip file test

* Remove unneeded command line argument

* Add one more test, fix test with multiple files and fix typos

* A few cosmetic changes

* Fix typo and spelling

* Rename tests to be more clear

* Fix typos

* Add one more test, cosmetic changes

* Add more checks

* Extract tests content into a function

* Do not open new pipes if skip file detected

* Code cleanup

* Revert uninyentional change, use defer for closing files

* use defer to close file

* Use variadic function

* Move tests under DescribeTable/Entry construct

* Update integration/helper_test.go

Co-authored-by: Georgy Shelkovy <[email protected]>

* Update integration/helper_test.go

Co-authored-by: Georgy Shelkovy <[email protected]>

---------

Co-authored-by: Georgy Shelkovy <[email protected]>
  • Loading branch information
dkovalev1 and RekGRpth committed Sep 9, 2024
1 parent 21ba1a1 commit ae78091
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 20 deletions.
67 changes: 47 additions & 20 deletions helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@ func (r *RestoreReader) copyAllData() (int64, error) {
return bytesRead, err
}

func closeAndDeletePipe(tableOid int, batchNum int) {
pipe := fmt.Sprintf("%s_%d_%d", *pipeFile, tableOid, batchNum)
logInfo(fmt.Sprintf("Oid %d, Batch %d: Closing pipe %s", tableOid, batchNum, pipe))
err := flushAndCloseRestoreWriter(pipe, tableOid)
if err != nil {
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, err))
}

logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, pipe))
err = deletePipe(pipe)
if err != nil {
logError("Oid %d, Batch %d: Failed to remove pipe %s: %v", tableOid, batchNum, pipe, err)
}
}

type oidWithBatch struct {
oid int
batch int
Expand Down Expand Up @@ -212,6 +227,10 @@ func doRestoreAgent() error {
preloadCreatedPipesForRestore(oidWithBatchList, *copyQueue)

var currentPipe string

// If skip file is detected for the particular tableOid, will not process batches related to this oid
skipOid := -1

for i, oidWithBatch := range oidWithBatchList {
tableOid := oidWithBatch.oid
batchNum := oidWithBatch.batch
Expand All @@ -226,19 +245,27 @@ func doRestoreAgent() error {
if i < len(oidWithBatchList)-*copyQueue {
nextOidWithBatch := oidWithBatchList[i+*copyQueue]
nextOid := nextOidWithBatch.oid
nextBatchNum := nextOidWithBatch.batch
nextPipeToCreate := fmt.Sprintf("%s_%d_%d", *pipeFile, nextOid, nextBatchNum)
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Creating pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
err := createPipe(nextPipeToCreate)
if err != nil {
logError(fmt.Sprintf("Oid %d, Batch %d: Failed to create pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
// In the case this error is hit it means we have lost the
// ability to create pipes normally, so hard quit even if
// --on-error-continue is given
return err

if nextOid != skipOid {
nextBatchNum := nextOidWithBatch.batch
nextPipeToCreate := fmt.Sprintf("%s_%d_%d", *pipeFile, nextOid, nextBatchNum)
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Creating pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
err := createPipe(nextPipeToCreate)
if err != nil {
logError(fmt.Sprintf("Oid %d, Batch %d: Failed to create pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate))
// In the case this error is hit it means we have lost the
// ability to create pipes normally, so hard quit even if
// --on-error-continue is given
return err
}
}
}

if tableOid == skipOid {
logVerbose(fmt.Sprintf("Oid %d, Batch %d: skip due to skip file\n", tableOid, batchNum))
goto LoopEnd
}

if *singleDataFile {
start[contentToRestore] = tocEntries[contentToRestore][uint(tableOid)].StartByte
end[contentToRestore] = tocEntries[contentToRestore][uint(tableOid)].EndByte
Expand Down Expand Up @@ -280,6 +307,14 @@ func doRestoreAgent() error {
if *onErrorContinue && utils.FileExists(fmt.Sprintf("%s_skip_%d", *pipeFile, tableOid)) {
logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum))
err = nil
skipOid = tableOid
/* Close up to *copyQueue files with this tableOid */
for idx := 0; idx < *copyQueue; idx++ {
batchToDelete := batchNum + idx
if batchToDelete < batches {
closeAndDeletePipe(tableOid, batchToDelete)
}
}
goto LoopEnd
} else {
// keep trying to open the pipe
Expand Down Expand Up @@ -347,10 +382,8 @@ func doRestoreAgent() error {
logInfo(fmt.Sprintf("Oid %d, Batch %d: Copied %d bytes into the pipe", tableOid, batchNum, bytesRead))

LoopEnd:
logInfo(fmt.Sprintf("Oid %d, Batch %d: Closing pipe %s", tableOid, batchNum, currentPipe))
errPipe := flushAndCloseRestoreWriter(currentPipe, tableOid)
if errPipe != nil {
logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, errPipe))
if tableOid != skipOid {
closeAndDeletePipe(tableOid, batchNum)
}

logVerbose(fmt.Sprintf("Oid %d, Batch %d: End batch restore", tableOid, batchNum))
Expand All @@ -366,12 +399,6 @@ func doRestoreAgent() error {
}
}

logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, currentPipe))
errPipe = deletePipe(currentPipe)
if errPipe != nil {
logError("Oid %d, Batch %d: Failed to remove pipe %s: %v", tableOid, batchNum, currentPipe, errPipe)
}

if err != nil {
logError(fmt.Sprintf("Oid %d, Batch %d: Error encountered: %v", tableOid, batchNum, err))
if *onErrorContinue {
Expand Down
155 changes: 155 additions & 0 deletions integration/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,63 @@ options:
Expect(err).To(HaveOccurred())
assertErrorsHandled()
})
DescribeTable("Skips batches if skip file is discovered with",
func(oid int, withPlugin bool, args ...string) {
filesToDelete := setupRestoreWithSkipFiles(oid, withPlugin)
for _, f := range filesToDelete {
defer func(filename string) {
os.Remove(filename)
}(f)
}

args = append([]string{
"--toc-file", tocFile,
"--pipe-file", pipeFile,
"--content", "1",
"--restore-agent",
"--oid-file", restoreOidFile,
"--data-file", dataFileFullPath + ".gz",
"--on-error-continue",
}, args...)

helperCmd := exec.Command(gpbackupHelperPath, args...)
err := helperCmd.Start()
Expect(err).ToNot(HaveOccurred())

// Block here until gpbackup_helper finishes (cleaning up pipes)
err = helperCmd.Wait()
Expect(err).ToNot(HaveOccurred())
for _, i := range []int{1, 2, 3} {
currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i)
Expect(currentPipe).ToNot(BeAnExistingFile())
}

By("Check in logs that batches were not restored")

homeDir := os.Getenv("HOME")
helperFiles, _ := filepath.Glob(filepath.Join(homeDir, "gpAdminLogs/gpbackup_helper_*"))
Expect(helperFiles).ToNot(BeEmpty())

patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid)
helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput()
helperOutput := string(helperOut)

Expect(helperOutput).ToNot(BeEmpty())

// Batch 0 should be processed
Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`))
Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`))

// Batch 2 must not be processed
Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`))
Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`))
},
Entry("single datafile config", -1, false, "--single-data-file"),
Entry("resize restore", 1, false, "--resize-cluster", "--orig-seg-count", "6", "--dest-seg-count", "3"),
Entry("single datafile config using a plugin", -1, true, "--single-data-file", "--restore-agent", "--plugin-config", examplePluginTestConfig),
Entry("resize restore using a plugin", 1, true, "--resize-cluster", "--orig-seg-count", "6", "--dest-seg-count", "3", "--restore-agent", "--plugin-config", examplePluginTestConfig),
)

It("Continues restore process when encountering an error with flag --on-error-continue", func() {
// Write data file
dataFile := dataFileFullPath
Expand Down Expand Up @@ -543,6 +600,104 @@ func setupRestoreFiles(compressionType string, withPlugin bool) {
_, _ = f.WriteString(expectedTOC)
}

func createDataFile(dataFile string, dataLength int) {
// Write data file
f, err := os.Create(dataFile + ".gz")
if err != nil {
Fail(fmt.Sprintf("%v", err))
}
defer f.Close()
gzipf := gzip.NewWriter(f)
defer gzipf.Close()

// Named pipes can buffer, so we need to write more than the buffer size to trigger flush error
customData := "here is some data\n"

customData += strings.Repeat("a", dataLength)
customData += "here is some data\n"

if _, err := gzipf.Write([]byte(customData)); err != nil {
Fail(fmt.Sprintf("%v", err))
}
}

func createOidFile(fname string, content string) {
fOid, err := os.Create(fname)
if err != nil {
Fail(fmt.Sprintf("Could not create %s: %v", fname, err))
}
defer fOid.Close()

if _, err := fOid.WriteString(content); err != nil {
Fail(fmt.Sprintf("Could not write to %s: %v", fname, err))
}
}

func createCustomTOCFile(fname string, dataLength int) {
customTOC := fmt.Sprintf(`dataentries:
1:
startbyte: 0
endbyte: 18
2:
startbyte: 18
endbyte: %[1]d
3:
startbyte: %[1]d
endbyte: %d
`, dataLength+18, dataLength+18+18)
fToc, err := os.Create(fname)
if err != nil {
Fail(fmt.Sprintf("%v", err))
}
defer fToc.Close()

if _, err = fToc.WriteString(customTOC); err != nil {
Fail(fmt.Sprintf("%v", err))
}
}

/*
Tests with skip files and the one with flag --on-error-continue
require a bit more complicated setup, do different setup function.
Returns file name list which must be deleted when done.
*/
func setupRestoreWithSkipFiles(oid int, withPlugin bool) []string {
dataLength := 128*1024 + 1

ret := []string{}

fileName := dataFileFullPath
if withPlugin {
fileName = examplePluginTestDataFile
}
if oid > 0 {
fileName = fileName + fmt.Sprintf("_%d", oid)
}

createDataFile(fileName, dataLength)
ret = append(ret, fileName)

// Write oid file
createOidFile(restoreOidFile, "1,0\n1,1\n1,2\n")
ret = append(ret, restoreOidFile)

pipename := fmt.Sprintf("%s_%d_0", pipeFile, 1)
err := unix.Mkfifo(pipename, 0700)
if err != nil {
Fail(fmt.Sprintf("%v", err))
}

createCustomTOCFile(tocFile, dataLength)
ret = append(ret, tocFile)

skipFile := fmt.Sprintf("%s_skip_%d", pipeFile, 1)
err = exec.Command("touch", skipFile).Run()
Expect(err).ToNot(HaveOccurred())

ret = append(ret, skipFile)
return ret
}

func assertNoErrors() {
Expect(errorFile).To(Not(BeARegularFile()))
pipes, err := filepath.Glob(pipeFile + "_[1-9]*")
Expand Down

0 comments on commit ae78091

Please sign in to comment.