From f6705c5f94c1940cc0b54bf2647433ae08db07e4 Mon Sep 17 00:00:00 2001 From: Pervakov Grigorii Date: Wed, 6 Sep 2023 14:26:33 +0200 Subject: [PATCH] Do not redownload parts that was downloaded, but not attached Change-Id: I13aba57ca58b75192481897673b6a705d7cfe0c8 --- ch_backup/backup/restore_context.py | 26 ++++++++++++++++++++++---- ch_backup/logic/table.py | 11 ++++++++++- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/ch_backup/backup/restore_context.py b/ch_backup/backup/restore_context.py index f0d53521..2a0bbcc3 100644 --- a/ch_backup/backup/restore_context.py +++ b/ch_backup/backup/restore_context.py @@ -4,12 +4,19 @@ import json from collections import defaultdict +from enum import Enum from os.path import exists -from typing import Any, Dict, List, Mapping +from typing import Any, Dict, Mapping from ch_backup.backup.metadata import PartMetadata +class PartState(Enum): + INVALID = "invalid" + DOWNLOADED = "downloaded" + RESTORED = "restored" + + class RestoreContext: """ Backup restore context. Allows continue restore process after errors. @@ -17,7 +24,9 @@ class RestoreContext: def __init__(self, config: Dict): self._state_file = config["restore_context_path"] - self._databases: Dict[str, Dict[str, List]] = {} + self._databases: Mapping[str, Mapping[str, Dict[str, PartState]]] = defaultdict( + lambda: defaultdict(defaultdict(PartState.INVALID)) + ) self._failed: Mapping[str, Any] = defaultdict( lambda: defaultdict( lambda: { @@ -39,17 +48,26 @@ def add_table(self, database: str, table: str) -> None: if table not in self._databases[database]: self._databases[database][table] = [] - def add_part(self, part: PartMetadata) -> None: + def add_part(self, part: PartMetadata, state: PartState) -> None: """ Marks that data part was restored. """ self._databases[part.database][part.table].append(part.name) + def _part(self, part: PartMetadata) -> PartState: + return self._databases[part.database][part.table][part.name] + + def part_downloaded(self, part: PartMetadata) -> bool: + """ + Checks if data part was downloaded. + """ + return self._part(part) == PartState.DOWNLOADED + def part_restored(self, part: PartMetadata) -> bool: """ Checks if data part was restored. """ - return part.name in self._databases[part.database][part.table] + return self._part(part) == PartState.RESTORED def add_failed_chown(self, database: str, table: str, path: str) -> None: """ diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index f4793142..d16bbafb 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -16,6 +16,7 @@ deduplicate_part, ) from ch_backup.backup.metadata import PartMetadata, TableMetadata +from ch_backup.backup.restore_context import PartState from ch_backup.backup_context import BackupContext from ch_backup.clickhouse.client import ClickhouseError from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks @@ -587,6 +588,13 @@ def _restore_data( ) continue + if context.restore_context.part_downloaded(part): + logging.debug( + f"{table.database}.{table.name} part {part.name} already downloading, only attach it" + ) + attach_parts.append(part) + continue + try: if part.disk_name in context.backup_meta.cloud_storage.disks: if skip_cloud_storage: @@ -605,6 +613,7 @@ def _restore_data( context.backup_meta, part, fs_part_path ) + context.restore_context.add_part(part, PartState.DOWNLOADED) attach_parts.append(part) except Exception: if keep_going: @@ -628,7 +637,7 @@ def _restore_data( ) try: context.ch_ctl.attach_part(table, part.name) - context.restore_context.add_part(part) + context.restore_context.add_part(part, PartState.RESTORED) except Exception as e: logging.warning( 'Attaching "%s.%s" part %s failed: %s',