From c965402a05d8f6f26e747a9d56632d12c5591ccd Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Fri, 9 Aug 2024 18:11:57 +0300 Subject: [PATCH 01/22] Do not process batches if skip file detected in restore_helper --- helper/restore_helper.go | 9 +++ integration/helper_test.go | 113 ++++++++++++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 1 deletion(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index d49029bb7..534fcd605 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -212,10 +212,18 @@ func doRestoreAgent() error { preloadCreatedPipesForRestore(oidWithBatchList, *copyQueue) var currentPipe string + + // If skip file detected for the particular tableOid, Will not process batches related to this oid + skipOid := -1 + for i, oidWithBatch := range oidWithBatchList { tableOid := oidWithBatch.oid batchNum := oidWithBatch.batch + if tableOid == skipOid { + continue + } + contentToRestore := *content + (*destSize * batchNum) if wasTerminated { logError("Terminated due to user request") @@ -280,6 +288,7 @@ func doRestoreAgent() error { if *onErrorContinue && utils.FileExists(fmt.Sprintf("%s_skip_%d", *pipeFile, tableOid)) { logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum)) err = nil + skipOid = tableOid goto LoopEnd } else { // keep trying to open the pipe diff --git a/integration/helper_test.go b/integration/helper_test.go index e518ca93b..2f44d0e9c 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -8,8 +8,9 @@ import ( "math" "os" "os/exec" - "path" + "path/filepath" + path "path/filepath" "strings" "time" @@ -433,6 +434,116 @@ options: Expect(err).To(HaveOccurred()) assertErrorsHandled() }) + + It("skips batches if skip file discovered", func() { + // Run helper only with restore for a few batches and skip file defined + // + By("Write data file") + dataFile := dataFileFullPath + f, _ := os.Create(dataFile + ".gz") + gzipf := gzip.NewWriter(f) + // Named pipes can buffer, so we need to write more than the buffer size to trigger flush error + customData := "here is some data\n" + dataLength := 128*1024 + 1 + customData += strings.Repeat("a", dataLength) + customData += "here is some data\n" + + _, _ = gzipf.Write([]byte(customData)) + _ = gzipf.Close() + + // Write oid file + fOid, _ := os.Create(restoreOidFile) + _, _ = fOid.WriteString("1,0\n1,1\n1,2\n") + defer func() { + _ = os.Remove(restoreOidFile) + }() + + err := unix.Mkfifo(fmt.Sprintf("%s_%d_0", pipeFile, 1), 0700) + if err != nil { + Fail(fmt.Sprintf("%v", err)) + } + + By("Write custom TOC") + customTOC := fmt.Sprintf(`dataentries: + 1: + startbyte: 0 + endbyte: 18 + 2: + startbyte: 18 + endbyte: %[1]d + 3: + startbyte: %[1]d + endbyte: %d +`, dataLength+18, dataLength+18+18) + fToc, _ := os.Create(tocFile) + _, _ = fToc.WriteString(customTOC) + defer func() { + _ = os.Remove(tocFile) + }() + + By("create skip file") + skipFile := fmt.Sprintf("%s_skip_%d", pipeFile, 1) + fSkip, _ := os.Create(skipFile) + fSkip.Close() + + defer func() { + _ = os.Remove(skipFile) + }() + + By("Create restore command") + helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--on-error-continue") + + pipeNames := []string{} + for i := 1; i <= 3; i++ { + pipeNames = append(pipeNames, fmt.Sprintf("%s_%d_0", pipeFile, i)) + } + + By("Check pipes") + // Pipe 1 attached to batch with skip + contents, err := ioutil.ReadFile(pipeNames[0]) + Expect(err).ToNot(HaveOccurred()) + Expect(string(contents)).To(Equal("here is some data\n")) + + // Pipe 2 and 3 shall not exists + for _, v := range []int{1, 2} { + _, errOpen := os.Open(pipeNames[v]) + Expect(errOpen).To(HaveOccurred()) + Expect(errOpen).To(MatchError(ContainSubstring("no such file or directory"))) + } + _, errOpen := os.Open(pipeNames[1]) + Expect(errOpen).To(HaveOccurred()) + Expect(errOpen).To(MatchError(ContainSubstring("no such file or directory"))) + _, errOpen = os.Open(pipeNames[2]) + Expect(errOpen).To(HaveOccurred()) + Expect(errOpen).To(MatchError(ContainSubstring("no such file or directory"))) + + // Block here until gpbackup_helper finishes (cleaning up pipes) + By("Block here until gpbackup_helper finishes (cleaning up pipes)") + _ = helperCmd.Wait() + for _, i := range []int{1, 2, 3} { + currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) + Expect(currentPipe).ToNot(BeAnExistingFile()) + } + + By("Check in logs that batches were not restored") + + homeDir := os.Getenv("HOME") + helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) + + pattern_helper_pid := fmt.Sprintf(":%d", helperCmd.Process.Pid) + helper_out, _ := exec.Command("grep", pattern_helper_pid, helperFiles[len(helperFiles)-1]).CombinedOutput() + helperOutput := string(helper_out) + + // Batch 1 should be processed + Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 1: Skip file discovered, skipping this relation`)) + Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 1: Opening pipe`)) + + // Batch 2 must not be processed + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) + + }) + It("Continues restore process when encountering an error with flag --on-error-continue", func() { // Write data file dataFile := dataFileFullPath From 85e95ccbc6fff8eaed0b1151ed6b3dda40994c4c Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Mon, 12 Aug 2024 15:41:38 +0300 Subject: [PATCH 02/22] Fix pattern format in log analyze --- integration/helper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index 2f44d0e9c..27ce06198 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -530,7 +530,7 @@ options: homeDir := os.Getenv("HOME") helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) - pattern_helper_pid := fmt.Sprintf(":%d", helperCmd.Process.Pid) + pattern_helper_pid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) helper_out, _ := exec.Command("grep", pattern_helper_pid, helperFiles[len(helperFiles)-1]).CombinedOutput() helperOutput := string(helper_out) From ecb0437c2803d211e3950eb856404b45b2ac7b7e Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Tue, 13 Aug 2024 00:40:14 +0300 Subject: [PATCH 03/22] Fix test output pattern --- end_to_end/end_to_end_suite_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index feebe7d2b..3748430ad 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2464,7 +2464,7 @@ LANGUAGE plpgsql NO SQL;`) Expect(err).To(HaveOccurred()) Expect(string(output)).To(ContainSubstring("Encountered 1 errors during metadata restore")) Expect(string(output)).To(ContainSubstring("Error loading data into table schemaone.tabletwo")) - Expect(string(output)).To(ContainSubstring("Encountered 1 error(s) during table data restore")) + Expect(string(output)).To(ContainSubstring("Encountered errors with 3 helper agent(s).")) Expect(string(output)).To(ContainSubstring("Data restore completed with failures")) }) It("Will clean up segments helper processes after error during restore", func() { From d40cdda0bbbd0cf3fe3e2f7cb66481ec2164f13c Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Wed, 14 Aug 2024 00:02:40 +0300 Subject: [PATCH 04/22] Take into account forward pipe opening --- end_to_end/end_to_end_suite_test.go | 2 +- helper/restore_helper.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/end_to_end/end_to_end_suite_test.go b/end_to_end/end_to_end_suite_test.go index 3748430ad..feebe7d2b 100644 --- a/end_to_end/end_to_end_suite_test.go +++ b/end_to_end/end_to_end_suite_test.go @@ -2464,7 +2464,7 @@ LANGUAGE plpgsql NO SQL;`) Expect(err).To(HaveOccurred()) Expect(string(output)).To(ContainSubstring("Encountered 1 errors during metadata restore")) Expect(string(output)).To(ContainSubstring("Error loading data into table schemaone.tabletwo")) - Expect(string(output)).To(ContainSubstring("Encountered errors with 3 helper agent(s).")) + Expect(string(output)).To(ContainSubstring("Encountered 1 error(s) during table data restore")) Expect(string(output)).To(ContainSubstring("Data restore completed with failures")) }) It("Will clean up segments helper processes after error during restore", func() { diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 534fcd605..cf47c3daf 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -220,10 +220,6 @@ func doRestoreAgent() error { tableOid := oidWithBatch.oid batchNum := oidWithBatch.batch - if tableOid == skipOid { - continue - } - contentToRestore := *content + (*destSize * batchNum) if wasTerminated { logError("Terminated due to user request") @@ -247,6 +243,10 @@ func doRestoreAgent() error { } } + if tableOid == skipOid { + goto LoopEnd + } + if *singleDataFile { start[contentToRestore] = tocEntries[contentToRestore][uint(tableOid)].StartByte end[contentToRestore] = tocEntries[contentToRestore][uint(tableOid)].EndByte From 86be017e1bc0bb91885a425c6ad1a9d94419b94c Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Thu, 15 Aug 2024 20:10:11 +0300 Subject: [PATCH 05/22] Add more checks for skip file test --- helper/restore_helper.go | 1 + integration/helper_test.go | 223 +++++++++++++++++++++++++------------ 2 files changed, 151 insertions(+), 73 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index cf47c3daf..c79a0e054 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -244,6 +244,7 @@ func doRestoreAgent() error { } if tableOid == skipOid { + logVerbose(fmt.Sprintf("Oid %d, Batch %d: skip due to skip file\n", tableOid, batchNum)) goto LoopEnd } diff --git a/integration/helper_test.go b/integration/helper_test.go index 27ce06198..dd85d2967 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -74,6 +74,16 @@ func gpbackupHelper(helperPath string, args ...string) *exec.Cmd { return command } +func gpbackupHelperRestoreNoSingle(helperPath string, args ...string) *exec.Cmd { + + args = append([]string{"--toc-file", tocFile, "--pipe-file", pipeFile, + "--content", "1", "--restore-agent", "--oid-file", restoreOidFile}, args...) + command := exec.Command(helperPath, args...) + err := command.Start() + Expect(err).ToNot(HaveOccurred()) + return command +} + func buildAndInstallBinaries() string { _ = os.Chdir("..") command := exec.Command("make", "build") @@ -242,7 +252,7 @@ options: }) It("runs restore gpbackup_helper with gzip compression with plugin", func() { setupRestoreFiles("gzip", true) - helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--plugin-config", examplePluginTestConfig) + helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--verbose", "--data-file", dataFileFullPath+".gz", "--plugin-config", examplePluginTestConfig) for _, i := range []int{1, 3} { contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i)) Expect(string(contents)).To(Equal("here is some data\n")) @@ -434,91 +444,67 @@ options: Expect(err).To(HaveOccurred()) assertErrorsHandled() }) - - It("skips batches if skip file discovered", func() { + It("skips batches if skip file discovered single file", func() { // Run helper only with restore for a few batches and skip file defined // - By("Write data file") - dataFile := dataFileFullPath - f, _ := os.Create(dataFile + ".gz") - gzipf := gzip.NewWriter(f) - // Named pipes can buffer, so we need to write more than the buffer size to trigger flush error - customData := "here is some data\n" - dataLength := 128*1024 + 1 - customData += strings.Repeat("a", dataLength) - customData += "here is some data\n" + files_to_delete := setupRestoreWithSkipFiles() + for _, f := range files_to_delete { + defer func(filename string) { + os.Remove(filename) + }(f) + } - _, _ = gzipf.Write([]byte(customData)) - _ = gzipf.Close() + By("Create restore command") + helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--on-error-continue") - // Write oid file - fOid, _ := os.Create(restoreOidFile) - _, _ = fOid.WriteString("1,0\n1,1\n1,2\n") - defer func() { - _ = os.Remove(restoreOidFile) - }() + pipeNames := []string{} + for i := 1; i <= 3; i++ { + pipeNames = append(pipeNames, fmt.Sprintf("%s_%d_0", pipeFile, i)) + } - err := unix.Mkfifo(fmt.Sprintf("%s_%d_0", pipeFile, 1), 0700) - if err != nil { - Fail(fmt.Sprintf("%v", err)) + // Block here until gpbackup_helper finishes (cleaning up pipes) + _ = helperCmd.Wait() + for _, i := range []int{1, 2, 3} { + currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) + Expect(currentPipe).ToNot(BeAnExistingFile()) } - By("Write custom TOC") - customTOC := fmt.Sprintf(`dataentries: - 1: - startbyte: 0 - endbyte: 18 - 2: - startbyte: 18 - endbyte: %[1]d - 3: - startbyte: %[1]d - endbyte: %d -`, dataLength+18, dataLength+18+18) - fToc, _ := os.Create(tocFile) - _, _ = fToc.WriteString(customTOC) - defer func() { - _ = os.Remove(tocFile) - }() + By("Check in logs that batches were not restored") - By("create skip file") - skipFile := fmt.Sprintf("%s_skip_%d", pipeFile, 1) - fSkip, _ := os.Create(skipFile) - fSkip.Close() + homeDir := os.Getenv("HOME") + helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) - defer func() { - _ = os.Remove(skipFile) - }() + pattern_helper_pid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) + helper_out, _ := exec.Command("grep", pattern_helper_pid, helperFiles[len(helperFiles)-1]).CombinedOutput() + helperOutput := string(helper_out) + + // Batch 0 should be processed + Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) + Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) + + // Batch 2 must not be processed + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) + }) + It("skips batches if skip file discovered", func() { + // Run helper only with restore for a few batches and skip file defined + // + files_to_delete := setupRestoreWithSkipFiles() + for _, f := range files_to_delete { + defer func(filename string) { + os.Remove(filename) + }(f) + } By("Create restore command") - helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--on-error-continue") + helperCmd := gpbackupHelperRestoreNoSingle(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--on-error-continue") pipeNames := []string{} for i := 1; i <= 3; i++ { pipeNames = append(pipeNames, fmt.Sprintf("%s_%d_0", pipeFile, i)) } - By("Check pipes") - // Pipe 1 attached to batch with skip - contents, err := ioutil.ReadFile(pipeNames[0]) - Expect(err).ToNot(HaveOccurred()) - Expect(string(contents)).To(Equal("here is some data\n")) - - // Pipe 2 and 3 shall not exists - for _, v := range []int{1, 2} { - _, errOpen := os.Open(pipeNames[v]) - Expect(errOpen).To(HaveOccurred()) - Expect(errOpen).To(MatchError(ContainSubstring("no such file or directory"))) - } - _, errOpen := os.Open(pipeNames[1]) - Expect(errOpen).To(HaveOccurred()) - Expect(errOpen).To(MatchError(ContainSubstring("no such file or directory"))) - _, errOpen = os.Open(pipeNames[2]) - Expect(errOpen).To(HaveOccurred()) - Expect(errOpen).To(MatchError(ContainSubstring("no such file or directory"))) - // Block here until gpbackup_helper finishes (cleaning up pipes) - By("Block here until gpbackup_helper finishes (cleaning up pipes)") _ = helperCmd.Wait() for _, i := range []int{1, 2, 3} { currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) @@ -534,16 +520,14 @@ options: helper_out, _ := exec.Command("grep", pattern_helper_pid, helperFiles[len(helperFiles)-1]).CombinedOutput() helperOutput := string(helper_out) - // Batch 1 should be processed - Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 1: Skip file discovered, skipping this relation`)) - Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 1: Opening pipe`)) + // Batch 0 should be processed + Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) + Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) // Batch 2 must not be processed Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) - }) - It("Continues restore process when encountering an error with flag --on-error-continue", func() { // Write data file dataFile := dataFileFullPath @@ -654,6 +638,99 @@ func setupRestoreFiles(compressionType string, withPlugin bool) { _, _ = f.WriteString(expectedTOC) } +func createDataFile(dataFile string, dataLength int) { + // Write data file + f, err := os.Create(dataFile + ".gz") + if err != nil { + Fail(fmt.Sprintf("%v", err)) + } + gzipf := gzip.NewWriter(f) + // Named pipes can buffer, so we need to write more than the buffer size to trigger flush error + customData := "here is some data\n" + + customData += strings.Repeat("a", dataLength) + customData += "here is some data\n" + + if _, err := gzipf.Write([]byte(customData)); err != nil { + Fail(fmt.Sprintf("%v", err)) + } + gzipf.Close() + f.Close() +} + +func createOidFile(fname string, content string) { + fOid, err := os.Create(fname) + if err != nil { + Fail(fmt.Sprintf("Could not create %s: %v", fname, err)) + } + defer fOid.Close() + + if _, err := fOid.WriteString(content); err != nil { + Fail(fmt.Sprintf("Could not write to %s: %v", fname, err)) + } +} + +func createCuctomTOCFile(fname string, dataLength int) { + customTOC := fmt.Sprintf(`dataentries: + 1: + startbyte: 0 + endbyte: 18 + 2: + startbyte: 18 + endbyte: %[1]d + 3: + startbyte: %[1]d + endbyte: %d +`, dataLength+18, dataLength+18+18) + fToc, err := os.Create(fname) + if err != nil { + Fail(fmt.Sprintf("%v", err)) + } + + if _, err = fToc.WriteString(customTOC); err != nil { + Fail(fmt.Sprintf("%v", err)) + } + + fToc.Close() +} + +/* +Tests with skip files and the one with flag with flag --on-error-continue +require a bit more complicated setup, do different setup function. +Returns file name list which must be deleted when done. +*/ +func setupRestoreWithSkipFiles() []string { + dataLength := 128*1024 + 1 + + ret := []string{} + + createDataFile(dataFileFullPath, dataLength) + ret = append(ret, dataFileFullPath) + + // Write oid file + createOidFile(restoreOidFile, "1,0\n1,1\n1,2\n") + ret = append(ret, restoreOidFile) + + pipename := fmt.Sprintf("%s_%d_0", pipeFile, 1) + err := unix.Mkfifo(pipename, 0700) + if err != nil { + Fail(fmt.Sprintf("%v", err)) + } + + createCuctomTOCFile(tocFile, dataLength) + ret = append(ret, tocFile) + + skipFile := fmt.Sprintf("%s_skip_%d", pipeFile, 1) + fSkip, err2 := os.Create(skipFile) + if err2 != nil { + Fail(fmt.Sprintf("%v", err2)) + } + fSkip.Close() + + ret = append(ret, skipFile) + return ret +} + func assertNoErrors() { Expect(errorFile).To(Not(BeARegularFile())) pipes, err := filepath.Glob(pipeFile + "_[1-9]*") From cb6e4f90141742de0d2af8d43c047d33fe146d55 Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Mon, 19 Aug 2024 10:34:16 +0300 Subject: [PATCH 06/22] Remove unneeded command line argument --- integration/helper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index dd85d2967..5eb0346ba 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -252,7 +252,7 @@ options: }) It("runs restore gpbackup_helper with gzip compression with plugin", func() { setupRestoreFiles("gzip", true) - helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--verbose", "--data-file", dataFileFullPath+".gz", "--plugin-config", examplePluginTestConfig) + helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--plugin-config", examplePluginTestConfig) for _, i := range []int{1, 3} { contents, _ := ioutil.ReadFile(fmt.Sprintf("%s_%d_0", pipeFile, i)) Expect(string(contents)).To(Equal("here is some data\n")) From 7077cc26d7523e8d4b22290d8005dfb0eaeda86a Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Thu, 22 Aug 2024 01:50:18 +0500 Subject: [PATCH 07/22] Add one more test, fix test with multiple files and fix typos --- integration/helper_test.go | 127 ++++++++++++++++++++++++++++++++----- 1 file changed, 111 insertions(+), 16 deletions(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index 5eb0346ba..e20e33b35 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -75,7 +75,6 @@ func gpbackupHelper(helperPath string, args ...string) *exec.Cmd { } func gpbackupHelperRestoreNoSingle(helperPath string, args ...string) *exec.Cmd { - args = append([]string{"--toc-file", tocFile, "--pipe-file", pipeFile, "--content", "1", "--restore-agent", "--oid-file", restoreOidFile}, args...) command := exec.Command(helperPath, args...) @@ -447,7 +446,7 @@ options: It("skips batches if skip file discovered single file", func() { // Run helper only with restore for a few batches and skip file defined // - files_to_delete := setupRestoreWithSkipFiles() + files_to_delete := setupRestoreWithSkipFiles(-1, false) for _, f := range files_to_delete { defer func(filename string) { os.Remove(filename) @@ -463,7 +462,8 @@ options: } // Block here until gpbackup_helper finishes (cleaning up pipes) - _ = helperCmd.Wait() + err = helperCmd.Wait() + Expect(err).ToNot(HaveOccurred()) for _, i := range []int{1, 2, 3} { currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) Expect(currentPipe).ToNot(BeAnExistingFile()) @@ -486,10 +486,10 @@ options: Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) }) - It("skips batches if skip file discovered", func() { + It("skips batches if skip file discovered at resize restore", func() { // Run helper only with restore for a few batches and skip file defined // - files_to_delete := setupRestoreWithSkipFiles() + files_to_delete := setupRestoreWithSkipFiles(1, false) for _, f := range files_to_delete { defer func(filename string) { os.Remove(filename) @@ -497,7 +497,22 @@ options: } By("Create restore command") - helperCmd := gpbackupHelperRestoreNoSingle(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--on-error-continue") + args := []string{ + "--toc-file", tocFile, + "--oid-file", restoreOidFile, + "--pipe-file", pipeFile, + "--content", "1", + "--resize-cluster", + "--orig-seg-count", "6", + "--dest-seg-count", "3", + "--restore-agent", + "--data-file", dataFileFullPath + ".gz", + "--on-error-continue", + } + helperCmd := exec.Command(gpbackupHelperPath, args...) + + err := helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) pipeNames := []string{} for i := 1; i <= 3; i++ { @@ -505,7 +520,8 @@ options: } // Block here until gpbackup_helper finishes (cleaning up pipes) - _ = helperCmd.Wait() + err = helperCmd.Wait() + Expect(err).ToNot(HaveOccurred()) for _, i := range []int{1, 2, 3} { currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) Expect(currentPipe).ToNot(BeAnExistingFile()) @@ -516,10 +532,81 @@ options: homeDir := os.Getenv("HOME") helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) + Expect(helperFiles).NotTo(BeEmpty()) + pattern_helper_pid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) helper_out, _ := exec.Command("grep", pattern_helper_pid, helperFiles[len(helperFiles)-1]).CombinedOutput() helperOutput := string(helper_out) + fmt.Printf("pattern_helper_pid = %s", pattern_helper_pid) + + Expect(helperOutput).ToNot(BeEmpty()) + + // Batch 0 should be processed + Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) + Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) + + // Batch 2 must not be processed + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) + }) + It("skips batches if skip file discovered at resize with a plugin", func() { + // Run helper only with restore for a few batches and skip file defined + // + files_to_delete := setupRestoreWithSkipFiles(1, true) + for _, f := range files_to_delete { + defer func(filename string) { + os.Remove(filename) + }(f) + } + + args := []string{ + "--toc-file", tocFile, + "--oid-file", restoreOidFile, + "--pipe-file", pipeFile, + "--content", "1", + "--resize-cluster", + "--orig-seg-count", "6", + "--dest-seg-count", "3", + "--restore-agent", + "--plugin-config", examplePluginTestConfig, + "--data-file", dataFileFullPath + ".gz", + "--on-error-continue", + } + helperCmd := exec.Command(gpbackupHelperPath, args...) + + err := helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + pipeNames := []string{} + for i := 1; i <= 3; i++ { + pipeNames = append(pipeNames, fmt.Sprintf("%s_%d_0", pipeFile, i)) + } + + // Block here until gpbackup_helper finishes (cleaning up pipes) + err = helperCmd.Wait() + Expect(err).ToNot(HaveOccurred()) + + for _, i := range []int{1, 2, 3} { + currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) + Expect(currentPipe).ToNot(BeAnExistingFile()) + } + + By("Check in logs that batches were not restored") + + homeDir := os.Getenv("HOME") + helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) + + Expect(helperFiles).NotTo(BeEmpty()) + + pattern_helper_pid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) + helper_out, _ := exec.Command("grep", pattern_helper_pid, helperFiles[len(helperFiles)-1]).CombinedOutput() + helperOutput := string(helper_out) + + fmt.Printf("pattern_helper_pid = %s", pattern_helper_pid) + + Expect(helperOutput).ToNot(BeEmpty()) + // Batch 0 should be processed Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) @@ -670,7 +757,7 @@ func createOidFile(fname string, content string) { } } -func createCuctomTOCFile(fname string, dataLength int) { +func createCustomTOCFile(fname string, dataLength int) { customTOC := fmt.Sprintf(`dataentries: 1: startbyte: 0 @@ -695,17 +782,25 @@ func createCuctomTOCFile(fname string, dataLength int) { } /* -Tests with skip files and the one with flag with flag --on-error-continue +Tests with skip files and the one with flag --on-error-continue require a bit more complicated setup, do different setup function. Returns file name list which must be deleted when done. */ -func setupRestoreWithSkipFiles() []string { +func setupRestoreWithSkipFiles(oid int, withPlugin bool) []string { dataLength := 128*1024 + 1 ret := []string{} - createDataFile(dataFileFullPath, dataLength) - ret = append(ret, dataFileFullPath) + fileName := dataFileFullPath + if withPlugin { + fileName = examplePluginTestDataFile + } + if oid > 0 { + fileName = fileName + fmt.Sprintf("_%d", oid) + } + + createDataFile(fileName, dataLength) + ret = append(ret, fileName) // Write oid file createOidFile(restoreOidFile, "1,0\n1,1\n1,2\n") @@ -717,13 +812,13 @@ func setupRestoreWithSkipFiles() []string { Fail(fmt.Sprintf("%v", err)) } - createCuctomTOCFile(tocFile, dataLength) + createCustomTOCFile(tocFile, dataLength) ret = append(ret, tocFile) skipFile := fmt.Sprintf("%s_skip_%d", pipeFile, 1) - fSkip, err2 := os.Create(skipFile) - if err2 != nil { - Fail(fmt.Sprintf("%v", err2)) + fSkip, err := os.Create(skipFile) + if err != nil { + Fail(fmt.Sprintf("%v", err)) } fSkip.Close() From ff62e1b224b8e2b8d2e84c87ef92f7b011ead3db Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Fri, 23 Aug 2024 09:35:29 +0500 Subject: [PATCH 08/22] A few cosmetic changes --- integration/helper_test.go | 60 +++++++++++--------------------------- 1 file changed, 17 insertions(+), 43 deletions(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index e20e33b35..47593121f 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -74,15 +74,6 @@ func gpbackupHelper(helperPath string, args ...string) *exec.Cmd { return command } -func gpbackupHelperRestoreNoSingle(helperPath string, args ...string) *exec.Cmd { - args = append([]string{"--toc-file", tocFile, "--pipe-file", pipeFile, - "--content", "1", "--restore-agent", "--oid-file", restoreOidFile}, args...) - command := exec.Command(helperPath, args...) - err := command.Start() - Expect(err).ToNot(HaveOccurred()) - return command -} - func buildAndInstallBinaries() string { _ = os.Chdir("..") command := exec.Command("make", "build") @@ -446,8 +437,8 @@ options: It("skips batches if skip file discovered single file", func() { // Run helper only with restore for a few batches and skip file defined // - files_to_delete := setupRestoreWithSkipFiles(-1, false) - for _, f := range files_to_delete { + filesToDelete := setupRestoreWithSkipFiles(-1, false) + for _, f := range filesToDelete { defer func(filename string) { os.Remove(filename) }(f) @@ -456,11 +447,6 @@ options: By("Create restore command") helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--on-error-continue") - pipeNames := []string{} - for i := 1; i <= 3; i++ { - pipeNames = append(pipeNames, fmt.Sprintf("%s_%d_0", pipeFile, i)) - } - // Block here until gpbackup_helper finishes (cleaning up pipes) err = helperCmd.Wait() Expect(err).ToNot(HaveOccurred()) @@ -474,9 +460,9 @@ options: homeDir := os.Getenv("HOME") helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) - pattern_helper_pid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) - helper_out, _ := exec.Command("grep", pattern_helper_pid, helperFiles[len(helperFiles)-1]).CombinedOutput() - helperOutput := string(helper_out) + patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) + helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() + helperOutput := string(helperOut) // Batch 0 should be processed Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) @@ -488,9 +474,8 @@ options: }) It("skips batches if skip file discovered at resize restore", func() { // Run helper only with restore for a few batches and skip file defined - // - files_to_delete := setupRestoreWithSkipFiles(1, false) - for _, f := range files_to_delete { + filesToDelete := setupRestoreWithSkipFiles(1, false) + for _, f := range filesToDelete { defer func(filename string) { os.Remove(filename) }(f) @@ -514,11 +499,6 @@ options: err := helperCmd.Start() Expect(err).ToNot(HaveOccurred()) - pipeNames := []string{} - for i := 1; i <= 3; i++ { - pipeNames = append(pipeNames, fmt.Sprintf("%s_%d_0", pipeFile, i)) - } - // Block here until gpbackup_helper finishes (cleaning up pipes) err = helperCmd.Wait() Expect(err).ToNot(HaveOccurred()) @@ -534,11 +514,11 @@ options: Expect(helperFiles).NotTo(BeEmpty()) - pattern_helper_pid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) - helper_out, _ := exec.Command("grep", pattern_helper_pid, helperFiles[len(helperFiles)-1]).CombinedOutput() - helperOutput := string(helper_out) + patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) + helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() + helperOutput := string(helperOut) - fmt.Printf("pattern_helper_pid = %s", pattern_helper_pid) + fmt.Printf("pattern_helper_pid = %s", patternHelperPid) Expect(helperOutput).ToNot(BeEmpty()) @@ -552,9 +532,8 @@ options: }) It("skips batches if skip file discovered at resize with a plugin", func() { // Run helper only with restore for a few batches and skip file defined - // - files_to_delete := setupRestoreWithSkipFiles(1, true) - for _, f := range files_to_delete { + filesToDelete := setupRestoreWithSkipFiles(1, true) + for _, f := range filesToDelete { defer func(filename string) { os.Remove(filename) }(f) @@ -578,11 +557,6 @@ options: err := helperCmd.Start() Expect(err).ToNot(HaveOccurred()) - pipeNames := []string{} - for i := 1; i <= 3; i++ { - pipeNames = append(pipeNames, fmt.Sprintf("%s_%d_0", pipeFile, i)) - } - // Block here until gpbackup_helper finishes (cleaning up pipes) err = helperCmd.Wait() Expect(err).ToNot(HaveOccurred()) @@ -599,11 +573,11 @@ options: Expect(helperFiles).NotTo(BeEmpty()) - pattern_helper_pid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) - helper_out, _ := exec.Command("grep", pattern_helper_pid, helperFiles[len(helperFiles)-1]).CombinedOutput() - helperOutput := string(helper_out) + patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) + helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() + helperOutput := string(helperOut) - fmt.Printf("pattern_helper_pid = %s", pattern_helper_pid) + fmt.Printf("pattern_helper_pid = %s", patternHelperPid) Expect(helperOutput).ToNot(BeEmpty()) From c61a33e0b600f08d7f3bd4551d2e02cd739242e6 Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Mon, 26 Aug 2024 09:18:07 +0500 Subject: [PATCH 09/22] Fix typo and spelling --- helper/restore_helper.go | 2 +- integration/helper_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index c79a0e054..f914d6bb7 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -213,7 +213,7 @@ func doRestoreAgent() error { var currentPipe string - // If skip file detected for the particular tableOid, Will not process batches related to this oid + // If skip file is detected for the particular tableOid, will not process batches related to this oid skipOid := -1 for i, oidWithBatch := range oidWithBatchList { diff --git a/integration/helper_test.go b/integration/helper_test.go index 47593121f..ce9df6ded 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -436,7 +436,6 @@ options: }) It("skips batches if skip file discovered single file", func() { // Run helper only with restore for a few batches and skip file defined - // filesToDelete := setupRestoreWithSkipFiles(-1, false) for _, f := range filesToDelete { defer func(filename string) { From 401a71fcd7c0d5aba1b42be8e244aa38e14a58ea Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Mon, 26 Aug 2024 15:02:56 +0500 Subject: [PATCH 10/22] Rename tests to be more clear --- integration/helper_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index ce9df6ded..fa2b87562 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -434,7 +434,7 @@ options: Expect(err).To(HaveOccurred()) assertErrorsHandled() }) - It("skips batches if skip file discovered single file", func() { + It("skips batches if skip file is discovered with single datafile config", func() { // Run helper only with restore for a few batches and skip file defined filesToDelete := setupRestoreWithSkipFiles(-1, false) for _, f := range filesToDelete { @@ -471,7 +471,7 @@ options: Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) }) - It("skips batches if skip file discovered at resize restore", func() { + It("skips batches if skip file is discovered with resize restore", func() { // Run helper only with restore for a few batches and skip file defined filesToDelete := setupRestoreWithSkipFiles(1, false) for _, f := range filesToDelete { @@ -529,7 +529,7 @@ options: Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) }) - It("skips batches if skip file discovered at resize with a plugin", func() { + It("skips batches if skip file is discovered widb resize sertore using a plugin", func() { // Run helper only with restore for a few batches and skip file defined filesToDelete := setupRestoreWithSkipFiles(1, true) for _, f := range filesToDelete { From d66e19e5be621e3d95f7b5b7145c52b628040f8e Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Mon, 26 Aug 2024 15:15:18 +0500 Subject: [PATCH 11/22] Fix typos --- integration/helper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index fa2b87562..7527d5d4f 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -529,7 +529,7 @@ options: Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) }) - It("skips batches if skip file is discovered widb resize sertore using a plugin", func() { + It("skips batches if skip file is discovered with resize restore using a plugin", func() { // Run helper only with restore for a few batches and skip file defined filesToDelete := setupRestoreWithSkipFiles(1, true) for _, f := range filesToDelete { From 9e85ef124df9f4ac4e14e9d2c779c436c9a9c2fc Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Tue, 27 Aug 2024 17:20:37 +0500 Subject: [PATCH 12/22] Add one more test, cosmetic changes --- integration/helper_test.go | 66 +++++++++++++++++++++++++++++++------- 1 file changed, 55 insertions(+), 11 deletions(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index 7527d5d4f..4102a5272 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -511,14 +511,12 @@ options: homeDir := os.Getenv("HOME") helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) - Expect(helperFiles).NotTo(BeEmpty()) + Expect(helperFiles).ToNot(BeEmpty()) patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() helperOutput := string(helperOut) - fmt.Printf("pattern_helper_pid = %s", patternHelperPid) - Expect(helperOutput).ToNot(BeEmpty()) // Batch 0 should be processed @@ -529,6 +527,57 @@ options: Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) }) + It("skips batches if skip file is discovered with single datafile config using a plugin", func() { + filesToDelete := setupRestoreWithSkipFiles(-1, true) + for _, f := range filesToDelete { + defer func(filename string) { + os.Remove(filename) + }(f) + } + By("Create restore command") + args := []string{ + "--toc-file", tocFile, + "--oid-file", restoreOidFile, + "--pipe-file", pipeFile, + "--content", "1", + "--single-data-file", + "--restore-agent", + "--plugin-config", examplePluginTestConfig, + "--data-file", dataFileFullPath + ".gz", + "--on-error-continue", + } + helperCmd := exec.Command(gpbackupHelperPath, args...) + err := helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + // Block here until gpbackup_helper finishes (cleaning up pipes) + err = helperCmd.Wait() + Expect(err).ToNot(HaveOccurred()) + + for _, i := range []int{1, 2, 3} { + currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) + Expect(currentPipe).ToNot(BeAnExistingFile()) + } + + By("Check in logs that batches were not restored") + + homeDir := os.Getenv("HOME") + helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) + + Expect(helperFiles).ToNot(BeEmpty()) + + patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) + helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() + helperOutput := string(helperOut) + + // Batch 0 should be processed + Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) + Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) + + // Batch 2 must not be processed + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) + }) It("skips batches if skip file is discovered with resize restore using a plugin", func() { // Run helper only with restore for a few batches and skip file defined filesToDelete := setupRestoreWithSkipFiles(1, true) @@ -570,14 +619,12 @@ options: homeDir := os.Getenv("HOME") helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) - Expect(helperFiles).NotTo(BeEmpty()) + Expect(helperFiles).ToNot(BeEmpty()) patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() helperOutput := string(helperOut) - fmt.Printf("pattern_helper_pid = %s", patternHelperPid) - Expect(helperOutput).ToNot(BeEmpty()) // Batch 0 should be processed @@ -789,11 +836,8 @@ func setupRestoreWithSkipFiles(oid int, withPlugin bool) []string { ret = append(ret, tocFile) skipFile := fmt.Sprintf("%s_skip_%d", pipeFile, 1) - fSkip, err := os.Create(skipFile) - if err != nil { - Fail(fmt.Sprintf("%v", err)) - } - fSkip.Close() + err = exec.Command("touch", skipFile).Run() + Expect(err).ToNot(HaveOccurred()) ret = append(ret, skipFile) return ret From 701438e2d8bf894da92e34a6fbae7ad1908beb6f Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Tue, 27 Aug 2024 19:28:22 +0500 Subject: [PATCH 13/22] Add more checks --- integration/helper_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/integration/helper_test.go b/integration/helper_test.go index 4102a5272..f9ef71751 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -463,6 +463,8 @@ options: helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() helperOutput := string(helperOut) + Expect(helperOutput).ToNot(BeEmpty()) + // Batch 0 should be processed Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) @@ -570,6 +572,8 @@ options: helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() helperOutput := string(helperOut) + Expect(helperOutput).ToNot(BeEmpty()) + // Batch 0 should be processed Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) From 752ace4419da7dfcaffd17fc9b51c05371871a99 Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Wed, 28 Aug 2024 14:36:16 +0500 Subject: [PATCH 14/22] Extract tests content into a function --- integration/helper_test.go | 214 ++++++++++--------------------------- 1 file changed, 55 insertions(+), 159 deletions(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index f9ef71751..0e45ed213 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -87,6 +87,48 @@ func buildAndInstallBinaries() string { return fmt.Sprintf("%s/gpbackup_helper", binDir) } +func doTestSkipFiles(oid int, withPlugin bool, args []string) { + filesToDelete := setupRestoreWithSkipFiles(oid, withPlugin) + for _, f := range filesToDelete { + defer func(filename string) { + os.Remove(filename) + }(f) + } + + By("Create restore command") + helperCmd := exec.Command(gpbackupHelperPath, args...) + err := helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + // Block here until gpbackup_helper finishes (cleaning up pipes) + err = helperCmd.Wait() + Expect(err).ToNot(HaveOccurred()) + for _, i := range []int{1, 2, 3} { + currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) + Expect(currentPipe).ToNot(BeAnExistingFile()) + } + + By("Check in logs that batches were not restored") + + homeDir := os.Getenv("HOME") + helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) + Expect(helperFiles).ToNot(BeEmpty()) + + patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) + helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() + helperOutput := string(helperOut) + + Expect(helperOutput).ToNot(BeEmpty()) + + // Batch 0 should be processed + Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) + Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) + + // Batch 2 must not be processed + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) +} + var _ = Describe("gpbackup_helper end to end integration tests", func() { // Setup example plugin based on current working directory err := os.RemoveAll(examplePluginTestDir) @@ -435,54 +477,19 @@ options: assertErrorsHandled() }) It("skips batches if skip file is discovered with single datafile config", func() { - // Run helper only with restore for a few batches and skip file defined - filesToDelete := setupRestoreWithSkipFiles(-1, false) - for _, f := range filesToDelete { - defer func(filename string) { - os.Remove(filename) - }(f) - } - - By("Create restore command") - helperCmd := gpbackupHelperRestore(gpbackupHelperPath, "--data-file", dataFileFullPath+".gz", "--on-error-continue") - - // Block here until gpbackup_helper finishes (cleaning up pipes) - err = helperCmd.Wait() - Expect(err).ToNot(HaveOccurred()) - for _, i := range []int{1, 2, 3} { - currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) - Expect(currentPipe).ToNot(BeAnExistingFile()) + args := []string{ + "--toc-file", tocFile, + "--pipe-file", pipeFile, + "--content", "1", + "--single-data-file", + "--restore-agent", + "--oid-file", restoreOidFile, + "--data-file", dataFileFullPath + ".gz", + "--on-error-continue", } - - By("Check in logs that batches were not restored") - - homeDir := os.Getenv("HOME") - helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) - - patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) - helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() - helperOutput := string(helperOut) - - Expect(helperOutput).ToNot(BeEmpty()) - - // Batch 0 should be processed - Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) - Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) - - // Batch 2 must not be processed - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) + doTestSkipFiles(-1, false, args) }) It("skips batches if skip file is discovered with resize restore", func() { - // Run helper only with restore for a few batches and skip file defined - filesToDelete := setupRestoreWithSkipFiles(1, false) - for _, f := range filesToDelete { - defer func(filename string) { - os.Remove(filename) - }(f) - } - - By("Create restore command") args := []string{ "--toc-file", tocFile, "--oid-file", restoreOidFile, @@ -495,48 +502,9 @@ options: "--data-file", dataFileFullPath + ".gz", "--on-error-continue", } - helperCmd := exec.Command(gpbackupHelperPath, args...) - - err := helperCmd.Start() - Expect(err).ToNot(HaveOccurred()) - - // Block here until gpbackup_helper finishes (cleaning up pipes) - err = helperCmd.Wait() - Expect(err).ToNot(HaveOccurred()) - for _, i := range []int{1, 2, 3} { - currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) - Expect(currentPipe).ToNot(BeAnExistingFile()) - } - - By("Check in logs that batches were not restored") - - homeDir := os.Getenv("HOME") - helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) - - Expect(helperFiles).ToNot(BeEmpty()) - - patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) - helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() - helperOutput := string(helperOut) - - Expect(helperOutput).ToNot(BeEmpty()) - - // Batch 0 should be processed - Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) - Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) - - // Batch 2 must not be processed - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) + doTestSkipFiles(1, false, args) }) It("skips batches if skip file is discovered with single datafile config using a plugin", func() { - filesToDelete := setupRestoreWithSkipFiles(-1, true) - for _, f := range filesToDelete { - defer func(filename string) { - os.Remove(filename) - }(f) - } - By("Create restore command") args := []string{ "--toc-file", tocFile, "--oid-file", restoreOidFile, @@ -548,49 +516,9 @@ options: "--data-file", dataFileFullPath + ".gz", "--on-error-continue", } - helperCmd := exec.Command(gpbackupHelperPath, args...) - err := helperCmd.Start() - Expect(err).ToNot(HaveOccurred()) - - // Block here until gpbackup_helper finishes (cleaning up pipes) - err = helperCmd.Wait() - Expect(err).ToNot(HaveOccurred()) - - for _, i := range []int{1, 2, 3} { - currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) - Expect(currentPipe).ToNot(BeAnExistingFile()) - } - - By("Check in logs that batches were not restored") - - homeDir := os.Getenv("HOME") - helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) - - Expect(helperFiles).ToNot(BeEmpty()) - - patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) - helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() - helperOutput := string(helperOut) - - Expect(helperOutput).ToNot(BeEmpty()) - - // Batch 0 should be processed - Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) - Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) - - // Batch 2 must not be processed - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) + doTestSkipFiles(-1, true, args) }) It("skips batches if skip file is discovered with resize restore using a plugin", func() { - // Run helper only with restore for a few batches and skip file defined - filesToDelete := setupRestoreWithSkipFiles(1, true) - for _, f := range filesToDelete { - defer func(filename string) { - os.Remove(filename) - }(f) - } - args := []string{ "--toc-file", tocFile, "--oid-file", restoreOidFile, @@ -604,40 +532,8 @@ options: "--data-file", dataFileFullPath + ".gz", "--on-error-continue", } - helperCmd := exec.Command(gpbackupHelperPath, args...) - - err := helperCmd.Start() - Expect(err).ToNot(HaveOccurred()) - - // Block here until gpbackup_helper finishes (cleaning up pipes) - err = helperCmd.Wait() - Expect(err).ToNot(HaveOccurred()) - - for _, i := range []int{1, 2, 3} { - currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) - Expect(currentPipe).ToNot(BeAnExistingFile()) - } - - By("Check in logs that batches were not restored") - - homeDir := os.Getenv("HOME") - helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) - - Expect(helperFiles).ToNot(BeEmpty()) - - patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) - helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() - helperOutput := string(helperOut) - - Expect(helperOutput).ToNot(BeEmpty()) - - // Batch 0 should be processed - Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) - Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) - // Batch 2 must not be processed - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) + doTestSkipFiles(1, true, args) }) It("Continues restore process when encountering an error with flag --on-error-continue", func() { // Write data file From 1c58b940ef7b0483d103fb93add4eaeb96155e3f Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Tue, 3 Sep 2024 10:52:08 +0500 Subject: [PATCH 15/22] Do not open new pipes if skip file detected --- helper/restore_helper.go | 58 +++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index f914d6bb7..1da8a7d46 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -118,6 +118,21 @@ func (r *RestoreReader) copyAllData() (int64, error) { return bytesRead, err } +func closeAndDeletePipe(tableOid int, batchNum int) { + pipe := fmt.Sprintf("%s_%d_%d", *pipeFile, tableOid, batchNum) + logInfo(fmt.Sprintf("Oid %d, Batch %d: Closing pipe %s", tableOid, batchNum, pipe)) + errPipe := flushAndCloseRestoreWriter(pipe, tableOid) + if errPipe != nil { + logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, errPipe)) + } + + logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, pipe)) + errPipe = deletePipe(pipe) + if errPipe != nil { + logError("Oid %d, Batch %d: Failed to remove pipe %s: %v", tableOid, batchNum, pipe, errPipe) + } +} + type oidWithBatch struct { oid int batch int @@ -230,16 +245,19 @@ func doRestoreAgent() error { if i < len(oidWithBatchList)-*copyQueue { nextOidWithBatch := oidWithBatchList[i+*copyQueue] nextOid := nextOidWithBatch.oid - nextBatchNum := nextOidWithBatch.batch - nextPipeToCreate := fmt.Sprintf("%s_%d_%d", *pipeFile, nextOid, nextBatchNum) - logVerbose(fmt.Sprintf("Oid %d, Batch %d: Creating pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate)) - err := createPipe(nextPipeToCreate) - if err != nil { - logError(fmt.Sprintf("Oid %d, Batch %d: Failed to create pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate)) - // In the case this error is hit it means we have lost the - // ability to create pipes normally, so hard quit even if - // --on-error-continue is given - return err + + if nextOid != skipOid { + nextBatchNum := nextOidWithBatch.batch + nextPipeToCreate := fmt.Sprintf("%s_%d_%d", *pipeFile, nextOid, nextBatchNum) + logVerbose(fmt.Sprintf("Oid %d, Batch %d: Creating pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate)) + err := createPipe(nextPipeToCreate) + if err != nil { + logError(fmt.Sprintf("Oid %d, Batch %d: Failed to create pipe %s\n", nextOid, nextBatchNum, nextPipeToCreate)) + // In the case this error is hit it means we have lost the + // ability to create pipes normally, so hard quit even if + // --on-error-continue is given + return err + } } } @@ -290,6 +308,13 @@ func doRestoreAgent() error { logWarn(fmt.Sprintf("Oid %d, Batch %d: Skip file discovered, skipping this relation.", tableOid, batchNum)) err = nil skipOid = tableOid + /* Close up to *copyQueue files with this tableOid */ + for idx := 0; idx < *copyQueue; idx++ { + batchToDelete := batchNum + idx + if batchToDelete < batches { + closeAndDeletePipe(tableOid, batchToDelete) + } + } goto LoopEnd } else { // keep trying to open the pipe @@ -355,12 +380,9 @@ func doRestoreAgent() error { lastByte[contentToRestore] = end[contentToRestore] } logInfo(fmt.Sprintf("Oid %d, Batch %d: Copied %d bytes into the pipe", tableOid, batchNum, bytesRead)) - LoopEnd: - logInfo(fmt.Sprintf("Oid %d, Batch %d: Closing pipe %s", tableOid, batchNum, currentPipe)) - errPipe := flushAndCloseRestoreWriter(currentPipe, tableOid) - if errPipe != nil { - logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, errPipe)) + if tableOid != skipOid { + closeAndDeletePipe(tableOid, batchNum) } logVerbose(fmt.Sprintf("Oid %d, Batch %d: End batch restore", tableOid, batchNum)) @@ -376,12 +398,6 @@ func doRestoreAgent() error { } } - logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, currentPipe)) - errPipe = deletePipe(currentPipe) - if errPipe != nil { - logError("Oid %d, Batch %d: Failed to remove pipe %s: %v", tableOid, batchNum, currentPipe, errPipe) - } - if err != nil { logError(fmt.Sprintf("Oid %d, Batch %d: Error encountered: %v", tableOid, batchNum, err)) if *onErrorContinue { From 563eac9e99d904c640afb14b45f773a1e72a087f Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Wed, 4 Sep 2024 10:51:16 +0500 Subject: [PATCH 16/22] Code cleanup --- helper/restore_helper.go | 13 +++--- integration/helper_test.go | 85 +++++++++++++++++++------------------- 2 files changed, 49 insertions(+), 49 deletions(-) diff --git a/helper/restore_helper.go b/helper/restore_helper.go index 1da8a7d46..5aaa8987a 100644 --- a/helper/restore_helper.go +++ b/helper/restore_helper.go @@ -121,15 +121,15 @@ func (r *RestoreReader) copyAllData() (int64, error) { func closeAndDeletePipe(tableOid int, batchNum int) { pipe := fmt.Sprintf("%s_%d_%d", *pipeFile, tableOid, batchNum) logInfo(fmt.Sprintf("Oid %d, Batch %d: Closing pipe %s", tableOid, batchNum, pipe)) - errPipe := flushAndCloseRestoreWriter(pipe, tableOid) - if errPipe != nil { - logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, errPipe)) + err := flushAndCloseRestoreWriter(pipe, tableOid) + if err != nil { + logVerbose(fmt.Sprintf("Oid %d, Batch %d: Failed to flush and close pipe: %s", tableOid, batchNum, err)) } logVerbose(fmt.Sprintf("Oid %d, Batch %d: Attempt to delete pipe %s", tableOid, batchNum, pipe)) - errPipe = deletePipe(pipe) - if errPipe != nil { - logError("Oid %d, Batch %d: Failed to remove pipe %s: %v", tableOid, batchNum, pipe, errPipe) + err = deletePipe(pipe) + if err != nil { + logError("Oid %d, Batch %d: Failed to remove pipe %s: %v", tableOid, batchNum, pipe, err) } } @@ -380,6 +380,7 @@ func doRestoreAgent() error { lastByte[contentToRestore] = end[contentToRestore] } logInfo(fmt.Sprintf("Oid %d, Batch %d: Copied %d bytes into the pipe", tableOid, batchNum, bytesRead)) + LoopEnd: if tableOid != skipOid { closeAndDeletePipe(tableOid, batchNum) diff --git a/integration/helper_test.go b/integration/helper_test.go index 0e45ed213..a9233dde4 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -87,48 +87,6 @@ func buildAndInstallBinaries() string { return fmt.Sprintf("%s/gpbackup_helper", binDir) } -func doTestSkipFiles(oid int, withPlugin bool, args []string) { - filesToDelete := setupRestoreWithSkipFiles(oid, withPlugin) - for _, f := range filesToDelete { - defer func(filename string) { - os.Remove(filename) - }(f) - } - - By("Create restore command") - helperCmd := exec.Command(gpbackupHelperPath, args...) - err := helperCmd.Start() - Expect(err).ToNot(HaveOccurred()) - - // Block here until gpbackup_helper finishes (cleaning up pipes) - err = helperCmd.Wait() - Expect(err).ToNot(HaveOccurred()) - for _, i := range []int{1, 2, 3} { - currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) - Expect(currentPipe).ToNot(BeAnExistingFile()) - } - - By("Check in logs that batches were not restored") - - homeDir := os.Getenv("HOME") - helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) - Expect(helperFiles).ToNot(BeEmpty()) - - patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) - helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() - helperOutput := string(helperOut) - - Expect(helperOutput).ToNot(BeEmpty()) - - // Batch 0 should be processed - Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) - Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) - - // Batch 2 must not be processed - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) -} - var _ = Describe("gpbackup_helper end to end integration tests", func() { // Setup example plugin based on current working directory err := os.RemoveAll(examplePluginTestDir) @@ -532,7 +490,6 @@ options: "--data-file", dataFileFullPath + ".gz", "--on-error-continue", } - doTestSkipFiles(1, true, args) }) It("Continues restore process when encountering an error with flag --on-error-continue", func() { @@ -743,6 +700,48 @@ func setupRestoreWithSkipFiles(oid int, withPlugin bool) []string { return ret } +func doTestSkipFiles(oid int, withPlugin bool, args []string) { + filesToDelete := setupRestoreWithSkipFiles(oid, withPlugin) + for _, f := range filesToDelete { + defer func(filename string) { + os.Remove(filename) + }(f) + } + + By("Create restore command") + helperCmd := exec.Command(gpbackupHelperPath, args...) + err := helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + // Block here until gpbackup_helper finishes (cleaning up pipes) + err = helperCmd.Wait() + Expect(err).ToNot(HaveOccurred()) + for _, i := range []int{1, 2, 3} { + currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) + Expect(currentPipe).ToNot(BeAnExistingFile()) + } + + By("Check in logs that batches were not restored") + + homeDir := os.Getenv("HOME") + helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) + Expect(helperFiles).ToNot(BeEmpty()) + + patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) + helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() + helperOutput := string(helperOut) + + Expect(helperOutput).ToNot(BeEmpty()) + + // Batch 0 should be processed + Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) + Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) + + // Batch 2 must not be processed + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) +} + func assertNoErrors() { Expect(errorFile).To(Not(BeARegularFile())) pipes, err := filepath.Glob(pipeFile + "_[1-9]*") From 676cbf227866e3732231c516c0f9a68a9d530a3d Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Wed, 4 Sep 2024 14:55:12 +0500 Subject: [PATCH 17/22] Revert uninyentional change, use defer for closing files --- integration/helper_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index a9233dde4..b06be2ad9 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -8,9 +8,8 @@ import ( "math" "os" "os/exec" - + "path" "path/filepath" - path "path/filepath" "strings" "time" @@ -608,7 +607,10 @@ func createDataFile(dataFile string, dataLength int) { if err != nil { Fail(fmt.Sprintf("%v", err)) } + defer f.Close() gzipf := gzip.NewWriter(f) + defer gzipf.Close() + // Named pipes can buffer, so we need to write more than the buffer size to trigger flush error customData := "here is some data\n" @@ -618,8 +620,6 @@ func createDataFile(dataFile string, dataLength int) { if _, err := gzipf.Write([]byte(customData)); err != nil { Fail(fmt.Sprintf("%v", err)) } - gzipf.Close() - f.Close() } func createOidFile(fname string, content string) { @@ -724,7 +724,7 @@ func doTestSkipFiles(oid int, withPlugin bool, args []string) { By("Check in logs that batches were not restored") homeDir := os.Getenv("HOME") - helperFiles, _ := path.Glob(path.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) + helperFiles, _ := filepath.Glob(filepath.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) Expect(helperFiles).ToNot(BeEmpty()) patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) From d96c8f4dea3cadf5534ebcf82e52e959aa98ada0 Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Wed, 4 Sep 2024 16:44:57 +0500 Subject: [PATCH 18/22] use defer to close file --- integration/helper_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index b06be2ad9..0ef488cb7 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -650,12 +650,11 @@ func createCustomTOCFile(fname string, dataLength int) { if err != nil { Fail(fmt.Sprintf("%v", err)) } + defer fToc.Close() if _, err = fToc.WriteString(customTOC); err != nil { Fail(fmt.Sprintf("%v", err)) } - - fToc.Close() } /* From dd0386f11fbc56bb3233948cd3dc78366afa7a63 Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Thu, 5 Sep 2024 15:39:47 +0500 Subject: [PATCH 19/22] Use variadic function --- integration/helper_test.go | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index 0ef488cb7..4454aedc4 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -434,20 +434,19 @@ options: assertErrorsHandled() }) It("skips batches if skip file is discovered with single datafile config", func() { - args := []string{ + doTestSkipFiles(-1, false, "--toc-file", tocFile, "--pipe-file", pipeFile, "--content", "1", "--single-data-file", "--restore-agent", "--oid-file", restoreOidFile, - "--data-file", dataFileFullPath + ".gz", + "--data-file", dataFileFullPath+".gz", "--on-error-continue", - } - doTestSkipFiles(-1, false, args) + ) }) It("skips batches if skip file is discovered with resize restore", func() { - args := []string{ + doTestSkipFiles(1, false, "--toc-file", tocFile, "--oid-file", restoreOidFile, "--pipe-file", pipeFile, @@ -456,13 +455,12 @@ options: "--orig-seg-count", "6", "--dest-seg-count", "3", "--restore-agent", - "--data-file", dataFileFullPath + ".gz", + "--data-file", dataFileFullPath+".gz", "--on-error-continue", - } - doTestSkipFiles(1, false, args) + ) }) It("skips batches if skip file is discovered with single datafile config using a plugin", func() { - args := []string{ + doTestSkipFiles(-1, true, "--toc-file", tocFile, "--oid-file", restoreOidFile, "--pipe-file", pipeFile, @@ -470,13 +468,12 @@ options: "--single-data-file", "--restore-agent", "--plugin-config", examplePluginTestConfig, - "--data-file", dataFileFullPath + ".gz", + "--data-file", dataFileFullPath+".gz", "--on-error-continue", - } - doTestSkipFiles(-1, true, args) + ) }) It("skips batches if skip file is discovered with resize restore using a plugin", func() { - args := []string{ + doTestSkipFiles(1, true, "--toc-file", tocFile, "--oid-file", restoreOidFile, "--pipe-file", pipeFile, @@ -486,10 +483,9 @@ options: "--dest-seg-count", "3", "--restore-agent", "--plugin-config", examplePluginTestConfig, - "--data-file", dataFileFullPath + ".gz", + "--data-file", dataFileFullPath+".gz", "--on-error-continue", - } - doTestSkipFiles(1, true, args) + ) }) It("Continues restore process when encountering an error with flag --on-error-continue", func() { // Write data file @@ -699,7 +695,7 @@ func setupRestoreWithSkipFiles(oid int, withPlugin bool) []string { return ret } -func doTestSkipFiles(oid int, withPlugin bool, args []string) { +func doTestSkipFiles(oid int, withPlugin bool, args ...string) { filesToDelete := setupRestoreWithSkipFiles(oid, withPlugin) for _, f := range filesToDelete { defer func(filename string) { From 842a7037d32a77c7fd3a19605d94ff060ab7be73 Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Fri, 6 Sep 2024 10:18:07 +0500 Subject: [PATCH 20/22] Move tests under DescribeTable/Entry construct --- integration/helper_test.go | 153 ++++++++++++++----------------------- 1 file changed, 57 insertions(+), 96 deletions(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index 4454aedc4..18e6cda10 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -433,60 +433,63 @@ options: Expect(err).To(HaveOccurred()) assertErrorsHandled() }) - It("skips batches if skip file is discovered with single datafile config", func() { - doTestSkipFiles(-1, false, - "--toc-file", tocFile, - "--pipe-file", pipeFile, - "--content", "1", - "--single-data-file", - "--restore-agent", - "--oid-file", restoreOidFile, - "--data-file", dataFileFullPath+".gz", - "--on-error-continue", - ) - }) - It("skips batches if skip file is discovered with resize restore", func() { - doTestSkipFiles(1, false, - "--toc-file", tocFile, - "--oid-file", restoreOidFile, - "--pipe-file", pipeFile, - "--content", "1", - "--resize-cluster", - "--orig-seg-count", "6", - "--dest-seg-count", "3", - "--restore-agent", - "--data-file", dataFileFullPath+".gz", - "--on-error-continue", - ) - }) - It("skips batches if skip file is discovered with single datafile config using a plugin", func() { - doTestSkipFiles(-1, true, - "--toc-file", tocFile, - "--oid-file", restoreOidFile, - "--pipe-file", pipeFile, - "--content", "1", - "--single-data-file", - "--restore-agent", - "--plugin-config", examplePluginTestConfig, - "--data-file", dataFileFullPath+".gz", - "--on-error-continue", - ) - }) - It("skips batches if skip file is discovered with resize restore using a plugin", func() { - doTestSkipFiles(1, true, - "--toc-file", tocFile, - "--oid-file", restoreOidFile, - "--pipe-file", pipeFile, - "--content", "1", - "--resize-cluster", - "--orig-seg-count", "6", - "--dest-seg-count", "3", - "--restore-agent", - "--plugin-config", examplePluginTestConfig, - "--data-file", dataFileFullPath+".gz", - "--on-error-continue", - ) - }) + DescribeTable("Skip files", + func(oid int, withPlugin bool, args ...string) { + filesToDelete := setupRestoreWithSkipFiles(oid, withPlugin) + for _, f := range filesToDelete { + defer func(filename string) { + os.Remove(filename) + }(f) + } + + args = append([]string{ + "--toc-file", tocFile, + "--pipe-file", pipeFile, + "--content", "1", + "--restore-agent", + "--oid-file", restoreOidFile, + "--data-file", dataFileFullPath + ".gz", + "--on-error-continue", + }, args...) + + helperCmd := exec.Command(gpbackupHelperPath, args...) + err := helperCmd.Start() + Expect(err).ToNot(HaveOccurred()) + + // Block here until gpbackup_helper finishes (cleaning up pipes) + err = helperCmd.Wait() + Expect(err).ToNot(HaveOccurred()) + for _, i := range []int{1, 2, 3} { + currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) + Expect(currentPipe).ToNot(BeAnExistingFile()) + } + + By("Check in logs that batches were not restored") + + homeDir := os.Getenv("HOME") + helperFiles, _ := filepath.Glob(filepath.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) + Expect(helperFiles).ToNot(BeEmpty()) + + patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) + helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() + helperOutput := string(helperOut) + + Expect(helperOutput).ToNot(BeEmpty()) + + // Batch 0 should be processed + Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) + Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) + + // Batch 2 must not be processed + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) + Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) + }, + Entry("skips batches if skip file is discovered with single datafile config", -1, false, "--single-data-file"), + Entry("skips batches if skip file is discovered with resize restore", 1, false, "--resize-cluster", "--orig-seg-count", "6", "--dest-seg-count", "3"), + Entry("skips batches if skip file is discovered with single datafile config using a plugin", -1, true, "--single-data-file", "--restore-agent", "--plugin-config", examplePluginTestConfig), + Entry("skips batches if skip file is discovered with resize restore using a plugin", 1, true, "--resize-cluster", "--orig-seg-count", "6", "--dest-seg-count", "3", "--restore-agent", "--plugin-config", examplePluginTestConfig), + ) + It("Continues restore process when encountering an error with flag --on-error-continue", func() { // Write data file dataFile := dataFileFullPath @@ -695,48 +698,6 @@ func setupRestoreWithSkipFiles(oid int, withPlugin bool) []string { return ret } -func doTestSkipFiles(oid int, withPlugin bool, args ...string) { - filesToDelete := setupRestoreWithSkipFiles(oid, withPlugin) - for _, f := range filesToDelete { - defer func(filename string) { - os.Remove(filename) - }(f) - } - - By("Create restore command") - helperCmd := exec.Command(gpbackupHelperPath, args...) - err := helperCmd.Start() - Expect(err).ToNot(HaveOccurred()) - - // Block here until gpbackup_helper finishes (cleaning up pipes) - err = helperCmd.Wait() - Expect(err).ToNot(HaveOccurred()) - for _, i := range []int{1, 2, 3} { - currentPipe := fmt.Sprintf("%s_%d_0", pipeFile, i) - Expect(currentPipe).ToNot(BeAnExistingFile()) - } - - By("Check in logs that batches were not restored") - - homeDir := os.Getenv("HOME") - helperFiles, _ := filepath.Glob(filepath.Join(homeDir, "gpAdminLogs/gpbackup_helper_*")) - Expect(helperFiles).ToNot(BeEmpty()) - - patternHelperPid := fmt.Sprintf(":%06d", helperCmd.Process.Pid) - helperOut, _ := exec.Command("grep", patternHelperPid, helperFiles[len(helperFiles)-1]).CombinedOutput() - helperOutput := string(helperOut) - - Expect(helperOutput).ToNot(BeEmpty()) - - // Batch 0 should be processed - Expect(helperOutput).To(ContainSubstring(`: Skip file discovered, skipping this relation`)) - Expect(helperOutput).To(ContainSubstring(`Segment 1: Oid 1, Batch 0: Opening pipe`)) - - // Batch 2 must not be processed - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) - Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) -} - func assertNoErrors() { Expect(errorFile).To(Not(BeARegularFile())) pipes, err := filepath.Glob(pipeFile + "_[1-9]*") From 08cd51079439f5cd58ac4afc2e2386b16b12075c Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Fri, 6 Sep 2024 10:53:40 +0500 Subject: [PATCH 21/22] Update integration/helper_test.go Co-authored-by: Georgy Shelkovy --- integration/helper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index 18e6cda10..b782039fa 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -433,7 +433,7 @@ options: Expect(err).To(HaveOccurred()) assertErrorsHandled() }) - DescribeTable("Skip files", + DescribeTable("Skips batches if skip file is discovered with", func(oid int, withPlugin bool, args ...string) { filesToDelete := setupRestoreWithSkipFiles(oid, withPlugin) for _, f := range filesToDelete { From d4d44f97b15a3c8fe219f5efc0855bd266ee4ec0 Mon Sep 17 00:00:00 2001 From: Denis Kovalev Date: Fri, 6 Sep 2024 11:44:06 +0500 Subject: [PATCH 22/22] Update integration/helper_test.go Co-authored-by: Georgy Shelkovy --- integration/helper_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/integration/helper_test.go b/integration/helper_test.go index b782039fa..f856ab458 100644 --- a/integration/helper_test.go +++ b/integration/helper_test.go @@ -484,10 +484,10 @@ options: Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Skip file discovered, skipping this relation`)) Expect(helperOutput).ToNot(ContainSubstring(`Segment 1: Oid 1, Batch 2: Opening pipe`)) }, - Entry("skips batches if skip file is discovered with single datafile config", -1, false, "--single-data-file"), - Entry("skips batches if skip file is discovered with resize restore", 1, false, "--resize-cluster", "--orig-seg-count", "6", "--dest-seg-count", "3"), - Entry("skips batches if skip file is discovered with single datafile config using a plugin", -1, true, "--single-data-file", "--restore-agent", "--plugin-config", examplePluginTestConfig), - Entry("skips batches if skip file is discovered with resize restore using a plugin", 1, true, "--resize-cluster", "--orig-seg-count", "6", "--dest-seg-count", "3", "--restore-agent", "--plugin-config", examplePluginTestConfig), + Entry("single datafile config", -1, false, "--single-data-file"), + Entry("resize restore", 1, false, "--resize-cluster", "--orig-seg-count", "6", "--dest-seg-count", "3"), + Entry("single datafile config using a plugin", -1, true, "--single-data-file", "--restore-agent", "--plugin-config", examplePluginTestConfig), + Entry("resize restore using a plugin", 1, true, "--resize-cluster", "--orig-seg-count", "6", "--dest-seg-count", "3", "--restore-agent", "--plugin-config", examplePluginTestConfig), ) It("Continues restore process when encountering an error with flag --on-error-continue", func() {