Skip to content

Commit

Permalink
Do not redownload parts that was downloaded, but not attached
Browse files Browse the repository at this point in the history
Change-Id: I13aba57ca58b75192481897673b6a705d7cfe0c8
  • Loading branch information
Pervakov Grigorii committed Sep 6, 2023
1 parent 2656373 commit f6705c5
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
26 changes: 22 additions & 4 deletions ch_backup/backup/restore_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,29 @@

import json
from collections import defaultdict
from enum import Enum
from os.path import exists
from typing import Any, Dict, List, Mapping
from typing import Any, Dict, Mapping

from ch_backup.backup.metadata import PartMetadata


class PartState(Enum):
INVALID = "invalid"
DOWNLOADED = "downloaded"
RESTORED = "restored"


class RestoreContext:
"""
Backup restore context. Allows continue restore process after errors.
"""

def __init__(self, config: Dict):
self._state_file = config["restore_context_path"]
self._databases: Dict[str, Dict[str, List]] = {}
self._databases: Mapping[str, Mapping[str, Dict[str, PartState]]] = defaultdict(
lambda: defaultdict(defaultdict(PartState.INVALID))
)
self._failed: Mapping[str, Any] = defaultdict(
lambda: defaultdict(
lambda: {
Expand All @@ -39,17 +48,26 @@ def add_table(self, database: str, table: str) -> None:
if table not in self._databases[database]:
self._databases[database][table] = []

def add_part(self, part: PartMetadata) -> None:
def add_part(self, part: PartMetadata, state: PartState) -> None:
"""
Marks that data part was restored.
"""
self._databases[part.database][part.table].append(part.name)

def _part(self, part: PartMetadata) -> PartState:
return self._databases[part.database][part.table][part.name]

def part_downloaded(self, part: PartMetadata) -> bool:
"""
Checks if data part was downloaded.
"""
return self._part(part) == PartState.DOWNLOADED

def part_restored(self, part: PartMetadata) -> bool:
"""
Checks if data part was restored.
"""
return part.name in self._databases[part.database][part.table]
return self._part(part) == PartState.RESTORED

def add_failed_chown(self, database: str, table: str, path: str) -> None:
"""
Expand Down
11 changes: 10 additions & 1 deletion ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
deduplicate_part,
)
from ch_backup.backup.metadata import PartMetadata, TableMetadata
from ch_backup.backup.restore_context import PartState
from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.client import ClickhouseError
from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks
Expand Down Expand Up @@ -587,6 +588,13 @@ def _restore_data(
)
continue

if context.restore_context.part_downloaded(part):
logging.debug(
f"{table.database}.{table.name} part {part.name} already downloading, only attach it"
)
attach_parts.append(part)
continue

try:
if part.disk_name in context.backup_meta.cloud_storage.disks:
if skip_cloud_storage:
Expand All @@ -605,6 +613,7 @@ def _restore_data(
context.backup_meta, part, fs_part_path
)

context.restore_context.add_part(part, PartState.DOWNLOADED)
attach_parts.append(part)
except Exception:
if keep_going:
Expand All @@ -628,7 +637,7 @@ def _restore_data(
)
try:
context.ch_ctl.attach_part(table, part.name)
context.restore_context.add_part(part)
context.restore_context.add_part(part, PartState.RESTORED)
except Exception as e:
logging.warning(
'Attaching "%s.%s" part %s failed: %s',
Expand Down

0 comments on commit f6705c5

Please sign in to comment.