Skip to content

Commit

Permalink
Drop usage of deprecated ClickHouse feature SYSTEM RESTART DISK
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex-Burmak committed Aug 26, 2023
1 parent e6f97d4 commit 071f5ba
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 146 deletions.
10 changes: 0 additions & 10 deletions ch_backup/backup/metadata/backup_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def __init__(
self.size = 0
self.real_size = 0
self.schema_only = schema_only
self.s3_revisions: Dict[str, int] = {} # S3 disk name -> revision counter.
self.cloud_storage: CloudStorageMetadata = CloudStorageMetadata()

self._state = BackupState.CREATING
Expand Down Expand Up @@ -130,7 +129,6 @@ def dump(self, light: bool = False) -> dict:
# to replace 'date_fmt' with 'time_format'.
"date_fmt": self.time_format,
"schema_only": self.schema_only,
"s3_revisions": self.s3_revisions,
},
}

Expand Down Expand Up @@ -181,7 +179,6 @@ def load(cls, data: dict) -> "BackupMetadata":
backup.labels = meta["labels"]
backup.version = meta["version"]
backup.schema_only = meta.get("schema_only", False)
backup.s3_revisions = meta.get("s3_revisions", {})
# TODO remove after a several weeks/months, when backups rotated
backup._user_defined_functions = data.get(
"user_defined_functions", meta.get("user_defined_functions", [])
Expand Down Expand Up @@ -335,13 +332,6 @@ def set_access_control(self, objects: Sequence[Dict[str, Any]]) -> None:
"""
self._access_control = AccessControlMetadata.from_ch_objects(objects)

def has_s3_data(self) -> bool:
"""
Return True if backup has data on S3 disks.
TODO: could be removed after denial of storing S3 revisions
"""
return len(self.s3_revisions) > 0

def get_sanitized_name(self) -> str:
"""
ClickHouse will place shadow data under this directory.
Expand Down
15 changes: 0 additions & 15 deletions ch_backup/backup/restore_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def __init__(self, config: Dict):
}
)
)
self._restarted_disks: List[str] = []
if exists(self._state_file):
self._load_state()

Expand Down Expand Up @@ -64,18 +63,6 @@ def add_failed_part(self, part: PartMetadata, e: Exception) -> None:
"""
self._failed[part.database][part.table]["failed_parts"][part.name] = repr(e)

def add_restarted_disk(self, disk_name: str) -> None:
"""
Marks that disk was restarted.
"""
self._restarted_disks.append(disk_name)

def disk_restarted(self, disk_name: str) -> bool:
"""
Checks if disk was restarted.
"""
return disk_name in self._restarted_disks

def has_failed_parts(self) -> bool:
"""
Returns whether some parts failed during restore.
Expand All @@ -91,7 +78,6 @@ def dump_state(self) -> None:
{
"databases": self._databases,
"failed": self._failed,
"restarted_disks": self._restarted_disks,
},
f,
)
Expand All @@ -100,4 +86,3 @@ def _load_state(self) -> None:
with open(self._state_file, "r", encoding="utf-8") as f:
state: Dict[str, Any] = json.load(f)
self._databases = state["databases"]
self._restarted_disks = state.get("restarted_disks", [])
9 changes: 1 addition & 8 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ def restore(
cloud_storage_source_bucket: str = None,
cloud_storage_source_path: str = None,
cloud_storage_source_endpoint: str = None,
cloud_storage_latest: bool = False,
skip_cloud_storage: bool = False,
clean_zookeeper: bool = False,
keep_going: bool = False,
Expand All @@ -237,10 +236,7 @@ def restore(
and not sources.schema_only
and not skip_cloud_storage
):
if (
self._context.backup_meta.has_s3_data()
or self._context.backup_meta.cloud_storage.enabled
):
if self._context.backup_meta.cloud_storage.enabled:
raise ClickhouseBackupError(
"Cloud storage source bucket must be set if backup has data on S3 disks"
)
Expand Down Expand Up @@ -297,7 +293,6 @@ def restore(
replica_name=replica_name,
cloud_storage_source_bucket=cloud_storage_source_bucket,
cloud_storage_source_path=cloud_storage_source_path,
cloud_storage_latest=cloud_storage_latest,
cloud_storage_source_endpoint=cloud_storage_source_endpoint,
skip_cloud_storage=skip_cloud_storage,
clean_zookeeper=clean_zookeeper,
Expand Down Expand Up @@ -514,7 +509,6 @@ def _restore(
cloud_storage_source_bucket: Optional[str] = None,
cloud_storage_source_path: Optional[str] = None,
cloud_storage_source_endpoint: Optional[str] = None,
cloud_storage_latest: bool = False,
skip_cloud_storage: bool = False,
clean_zookeeper: bool = False,
keep_going: bool = False,
Expand Down Expand Up @@ -553,7 +547,6 @@ def _restore(
cloud_storage_source_bucket=cloud_storage_source_bucket,
cloud_storage_source_path=cloud_storage_source_path,
cloud_storage_source_endpoint=cloud_storage_source_endpoint,
cloud_storage_latest=cloud_storage_latest,
skip_cloud_storage=skip_cloud_storage,
clean_zookeeper=clean_zookeeper,
keep_going=keep_going,
Expand Down
7 changes: 0 additions & 7 deletions ch_backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,11 +595,6 @@ def backup_command(
option(
"--cloud-storage-source-endpoint", type=str, help="Endpoint for source bucket."
),
option(
"--cloud-storage-latest",
is_flag=True,
help=f'Forces to use {style("revision=0", fg=Color.bright_green)} to cloud storage.',
),
option(
"--skip-cloud-storage",
is_flag=True,
Expand Down Expand Up @@ -664,7 +659,6 @@ def restore_command(
cloud_storage_source_bucket: str = None,
cloud_storage_source_path: str = None,
cloud_storage_source_endpoint: str = None,
cloud_storage_latest: bool = False,
skip_cloud_storage: bool = False,
clean_zookeeper: bool = False,
keep_going: bool = False,
Expand Down Expand Up @@ -703,7 +697,6 @@ def restore_command(
cloud_storage_source_bucket=cloud_storage_source_bucket,
cloud_storage_source_path=cloud_storage_source_path,
cloud_storage_source_endpoint=cloud_storage_source_endpoint,
cloud_storage_latest=cloud_storage_latest,
skip_cloud_storage=skip_cloud_storage,
clean_zookeeper=clean_zookeeper,
keep_going=keep_going,
Expand Down
29 changes: 0 additions & 29 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,6 @@
"""
)

RESTART_DISK_SQL = strip_query(
"""
SYSTEM RESTART DISK {disk_name}
"""
)

RELOAD_CONFIG_SQL = strip_query(
"""
SYSTEM RELOAD CONFIG
Expand Down Expand Up @@ -753,29 +747,6 @@ def read_s3_disk_revision(self, disk_name: str, backup_name: str) -> Optional[in
with open(file_path, "r", encoding="utf-8") as file:
return int(file.read().strip())

def create_s3_disk_restore_file(
self, disk_name: str, revision: int, source_bucket: str, source_path: str
) -> None:
"""
Creates file with restore information for S3 disk.
"""
file_path = os.path.join(self._disks[disk_name].path, "restore")
with open(file_path, "w", encoding="utf-8") as file:
file.write(f"revision={revision}{os.linesep}")
file.write(f"source_bucket={source_bucket}{os.linesep}")
file.write(f"source_path={source_path}{os.linesep}")
file.write(f"detached=true{os.linesep}")

def restart_disk(self, disk_name: str, context: RestoreContext) -> None:
"""
Restarts ClickHouse and waits till it can process queries.
"""
self._ch_client.query(
RESTART_DISK_SQL.format(disk_name=disk_name),
timeout=self._restart_disk_timeout,
)
context.add_restarted_disk(disk_name)

def reload_config(self):
"""
Reload ClickHouse configuration query.
Expand Down
88 changes: 15 additions & 73 deletions ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ def restore(
cloud_storage_source_bucket: Optional[str],
cloud_storage_source_path: Optional[str],
cloud_storage_source_endpoint: Optional[str],
cloud_storage_latest: bool,
skip_cloud_storage: bool,
clean_zookeeper: bool,
keep_going: bool,
Expand Down Expand Up @@ -247,28 +246,6 @@ def restore(
)
return

# Restore data stored on S3 disks.
if context.backup_meta.has_s3_data() and not skip_cloud_storage:
cloud_storage_source_bucket = (
cloud_storage_source_bucket if cloud_storage_source_bucket else ""
)
cloud_storage_source_path = (
cloud_storage_source_path if cloud_storage_source_path else ""
)
cloud_storage_source_endpoint = (
cloud_storage_source_endpoint if cloud_storage_source_endpoint else ""
)
self._restore_cloud_storage_data(
context,
cloud_storage_source_bucket,
cloud_storage_source_path,
cloud_storage_latest,
)
else:
logging.debug(
"Skipping restoring of cloud storage data as --skip-cloud-storage flag passed"
)

failed_tables_names = [f"`{t.database}`.`{t.name}`" for t in failed_tables]
tables_to_restore_data = filter(lambda t: is_merge_tree(t.engine), tables_meta)
tables_to_restore_data = filter(
Expand Down Expand Up @@ -576,34 +553,6 @@ def _restore_tables(
keep_going,
)

@staticmethod
def _restore_cloud_storage_data(
context: BackupContext,
source_bucket: str,
source_path: str,
cloud_storage_latest: bool,
) -> None:
for disk_name, revision in context.backup_meta.s3_revisions.items():
logging.debug(f"Restore disk {disk_name} to revision {revision}")

context.ch_ctl.create_s3_disk_restore_file(
disk_name,
revision if not cloud_storage_latest else 0,
source_bucket,
source_path,
)

if context.restore_context.disk_restarted(disk_name):
logging.debug(
f"Skip restoring disk {disk_name} as it has already been restored"
)
continue

try:
context.ch_ctl.restart_disk(disk_name, context.restore_context)
finally:
context.restore_context.dump_state()

@staticmethod
def _restore_data(
context: BackupContext,
Expand Down Expand Up @@ -639,29 +588,22 @@ def _restore_data(
continue

try:
if (
part.disk_name
not in context.backup_meta.s3_revisions.keys()
):
if (
part.disk_name
in context.backup_meta.cloud_storage.disks
):
if skip_cloud_storage:
logging.debug(
f"Skipping restoring of {table.database}.{table.name} part {part.name} "
"on cloud storage because of --skip-cloud-storage flag"
)
continue

disks.copy_part(context.backup_meta, table, part)
else:
fs_part_path = context.ch_ctl.get_detached_part_path(
table, part.disk_name, part.name
)
context.backup_layout.download_data_part(
context.backup_meta, part, fs_part_path
if part.disk_name in context.backup_meta.cloud_storage.disks:
if skip_cloud_storage:
logging.debug(
f"Skipping restoring of {table.database}.{table.name} part {part.name} "
"on cloud storage because of --skip-cloud-storage flag"
)
continue

disks.copy_part(context.backup_meta, table, part)
else:
fs_part_path = context.ch_ctl.get_detached_part_path(
table, part.disk_name, part.name
)
context.backup_layout.download_data_part(
context.backup_meta, part, fs_part_path
)

attach_parts.append(part)
except Exception:
Expand Down
1 change: 0 additions & 1 deletion tests/integration/features/backup_multiple_disks.feature
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ Feature: Backup & Restore multiple disks and S3
"""
cloud_storage_source_bucket: 'cloud-storage-01'
cloud_storage_source_path: 'data'
cloud_storage_latest: true
"""
And we execute query on clickhouse02
"""
Expand Down
3 changes: 0 additions & 3 deletions tests/integration/modules/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ def restore(
replica_name: str = None,
cloud_storage_source_bucket: str = None,
cloud_storage_source_path: str = None,
cloud_storage_latest: bool = False,
access: bool = None,
data: bool = None,
schema: bool = None,
Expand Down Expand Up @@ -335,8 +334,6 @@ def restore(
)
if cloud_storage_source_path:
options.append(f"--cloud-storage-source-path {cloud_storage_source_path}")
if cloud_storage_latest:
options.append("--cloud-storage-latest")
if access:
options.append("--access")
if data:
Expand Down

0 comments on commit 071f5ba

Please sign in to comment.