Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not process batches if skip file detected in restore_helper #103

Merged
merged 23 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ func doRestoreAgent() error {
preloadCreatedPipesForRestore(oidWithBatchList, *copyQueue)

var currentPipe string

// If skip file detected for the particular tableOid, Will not process batches related to this oid
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
skipOid := -1

for i, oidWithBatch := range oidWithBatchList {
tableOid := oidWithBatch.oid
batchNum := oidWithBatch.batch
Expand Down Expand Up @@ -239,6 +243,10 @@ func doRestoreAgent() error {
}
}

if tableOid == skipOid {
goto LoopEnd
dkovalev1 marked this conversation as resolved.
Show resolved Hide resolved
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
RekGRpth marked this conversation as resolved.
Show resolved Hide resolved
}

if *singleDataFile {
start[contentToRestore] = tocEntries[contentToRestore][uint(tableOid)].StartByte
end[contentToRestore] = tocEntries[contentToRestore][uint(tableOid)].EndByte
Expand Down Expand Up @@ -280,6 +288,7 @@ func doRestoreAgent() error {
if *onErrorContinue && utils.FileExists(fmt.Sprintf("%s_skip_%d", *pipeFile, tableOid)) {
logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum))
err = nil
skipOid = tableOid
goto LoopEnd
} else {
// keep trying to open the pipe
Expand Down
113 changes: 112 additions & 1 deletion integration/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"math"
"os"
"os/exec"
"path"

"path/filepath"
path "path/filepath"
RekGRpth marked this conversation as resolved.
Show resolved Hide resolved
"strings"
"time"

Expand Down Expand Up @@ -433,6 +434,116 @@ options:
Expect(err).To(HaveOccurred())
assertErrorsHandled()
})

It("skips batches if skip file discovered", func() {
dkovalev1 marked this conversation as resolved.
Show resolved Hide resolved
// Run helper only with restore for a few batches and skip file defined
//
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
By("Write data file")
dataFile := dataFileFullPath
f, _ := os.Create(dataFile + ".gz")
dkovalev1 marked this conversation as resolved.
Show resolved Hide resolved
gzipf := gzip.NewWriter(f)
// Named pipes can buffer, so we need to write more than the buffer size to trigger flush error
customData := "here is some data\n"
dataLength := 128*1024 + 1
customData += strings.Repeat("a", dataLength)
customData += "here is some data\n"

_, _ = gzipf.Write([]byte(customData))
dkovalev1 marked this conversation as resolved.
Show resolved Hide resolved
_ = gzipf.Close()

// Write oid file
fOid, _ := os.Create(restoreOidFile)
dkovalev1 marked this conversation as resolved.
Show resolved Hide resolved
_, _ = fOid.WriteString("1,0\n1,1\n1,2\n")
defer func() {
_ = os.Remove(restoreOidFile)
}()

err := unix.Mkfifo(fmt.Sprintf("%s_%d_0", pipeFile, 1), 0700)
if err != nil {
Fail(fmt.Sprintf("%v", err))
}

By("Write custom TOC")
customTOC := fmt.Sprintf(`dataentries:
1:
startbyte: 0
endbyte: 18
2:
startbyte: 18
endbyte: %[1]d
3:
startbyte: %[1]d
endbyte: %d
`, dataLength+18, dataLength+18+18)
fToc, _ := os.Create(tocFile)
dkovalev1 marked this conversation as resolved.
Show resolved Hide resolved
_, _ = fToc.WriteString(customTOC)
whitehawk marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
_ = os.Remove(tocFile)
}()

By("create skip file")
skipFile := fmt.Sprintf("%s_skip_%d", pipeFile, 1)
fSkip, _ := os.Create(skipFile)
fSkip.Close()

defer func() {
_ = os.Remove(skipFile)
}()

By("Create restore command")
helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--on-error-continue")

pipeNames := []string{}
for i := 1; i <= 3; i++ {
pipeNames = append(pipeNames, fmt.Sprintf("%s_%d_0", pipeFile, i))
}

By("Check pipes")
// Pipe 1 attached to batch with skip
contents, err := ioutil.ReadFile(pipeNames[0])
Expect(err).ToNot(HaveOccurred())
Expect(string(contents)).To(Equal("here is some data\n"))

// Pipe 2 and 3 shall not exists
for _, v := range []int{1, 2} {
_, errOpen := os.Open(pipeNames[v])
Expect(errOpen).To(HaveOccurred())
Expect(errOpen).To(MatchError(ContainSubstring("no such file or directory")))
}
_, errOpen := os.Open(pipeNames[1])
dkovalev1 marked this conversation as resolved.
Show resolved Hide resolved
Expect(errOpen).To(HaveOccurred())
Expect(errOpen).To(MatchError(ContainSubstring("no such file or directory")))
_, errOpen = os.Open(pipeNames[2])
Expect(errOpen).To(HaveOccurred())
Expect(errOpen).To(MatchError(ContainSubstring("no such file or directory")))

// Block here until gpbackup_helper finishes (cleaning up pipes)
By("Block here until gpbackup_helper finishes (cleaning up pipes)")
_ = helperCmd.Wait()
for _, i := range []int{1, 2, 3} {
currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i)
Expect(currentPipe).ToNot(BeAnExistingFile())
}

RekGRpth marked this conversation as resolved.
Show resolved Hide resolved
By("Check in logs that batches were not restored")

homeDir := os.Getenv("HOME")
helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*"))

pattern_helper_pid := fmt.Sprintf(":%06d", helperCmd.Process.Pid)
helper_out, _ := exec.Command("grep", pattern_helper_pid, helperFiles[len(helperFiles)-1]).CombinedOutput()
helperOutput := string(helper_out)

// Batch 1 should be processed
Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 1: Skip file discovered, skipping this relation`))
Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 1: Opening pipe`))

// Batch 2 must not be processed
Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`))
Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`))

})

It("Continues restore process when encountering an error with flag --on-error-continue", func() {
// Write data file
dataFile := dataFileFullPath
Expand Down
Loading