Skip to content

Commit

Permalink
-
Browse files Browse the repository at this point in the history
  • Loading branch information
ivsnk committed Sep 12, 2024
1 parent cfdb8ca commit e832790
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 158 deletions.
8 changes: 7 additions & 1 deletion ch_backup/backup/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ch_backup.util import Slotted, utcnow


# pylint: disable=too-many-instance-attributes
class PartDedupInfo(Slotted):
"""
Information about data part to use for deduplication / creation incremental backups.
Expand All @@ -31,6 +32,7 @@ class PartDedupInfo(Slotted):
"tarball",
"disk_name",
"verified",
"encrypted",
)

# pylint: disable=too-many-arguments
Expand All @@ -46,6 +48,7 @@ def __init__(
tarball: bool,
disk_name: str,
verified: bool,
encrypted: bool,
) -> None:
self.database = database
self.table = table
Expand All @@ -57,13 +60,14 @@ def __init__(
self.tarball = tarball
self.disk_name = disk_name
self.verified = verified
self.encrypted = encrypted

def to_sql(self):
"""
Convert to string to use it in insert query
"""
files_array = "[" + ",".join(f"'{file}'" for file in self.files) + "]"
return f"('{self.database}','{self.table}','{self.name}','{self.backup_path}','{self.checksum}',{self.size},{files_array},{int(self.tarball)},'{self.disk_name}',{int(self.verified)})"
return f"('{self.database}','{self.table}','{self.name}','{self.backup_path}','{self.checksum}',{self.size},{files_array},{int(self.tarball)},'{self.disk_name}',{int(self.verified)},{int(self.encrypted)})"


TableDedupReferences = Set[str]
Expand Down Expand Up @@ -204,6 +208,7 @@ def _populate_dedup_info(
tarball=part.tarball,
disk_name=part.disk_name,
verified=verified,
encrypted=part.encrypted,
)

table_dedup_info.add(part.name)
Expand Down Expand Up @@ -247,6 +252,7 @@ def deduplicate_parts(
files=existing_part["files"],
tarball=existing_part["tarball"],
disk_name=existing_part["disk_name"],
encrypted=existing_part.get("encrypted", True),
)

if not existing_part["verified"]:
Expand Down
36 changes: 15 additions & 21 deletions ch_backup/backup/layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def upload_database_create_statement(
self._storage_loader.upload_file(
local_path,
remote_path=remote_path,
encryption=backup_meta.is_encryption_enabled,
encryption=True,
)
except Exception as e:
msg = f"Failed to create async upload of {remote_path}"
Expand Down Expand Up @@ -109,7 +109,7 @@ def upload_table_create_statement(
create_statement,
remote_path,
is_async=True,
encryption=backup_meta.is_encryption_enabled,
encryption=True,
)
except Exception as e:
msg = f"Failed to create async upload of {remote_path}"
Expand All @@ -130,7 +130,7 @@ def upload_access_control_file(
self._storage_loader.upload_file(
local_path=local_path,
remote_path=remote_path,
encryption=backup_meta.is_encryption_enabled,
encryption=True,
)
except Exception as e:
msg = f'Failed to upload access control metadata file "{remote_path}"'
Expand All @@ -151,7 +151,7 @@ def upload_access_control_files(
local_path,
remote_path,
files=file_names,
encryption=backup_meta.is_encryption_enabled,
encryption=True,
)

except Exception as e:
Expand All @@ -169,7 +169,7 @@ def upload_udf(
self._storage_loader.upload_data(
data=metadata,
remote_path=remote_path,
encryption=backup_meta.is_encryption_enabled,
encryption=True,
)
except Exception as e:
msg = f'Failed to upload udf metadata "{remote_path}"'
Expand Down Expand Up @@ -201,7 +201,7 @@ def upload_data_part(
files=fpart.files,
remote_path=remote_path,
is_async=True,
encryption=backup_meta.is_encryption_enabled,
encryption=backup_meta.encrypted,
delete=True,
callback=callback,
)
Expand Down Expand Up @@ -260,7 +260,7 @@ def upload_named_collections_create_statement(
self._storage_loader.upload_file(
local_path,
remote_path=remote_path,
encryption=backup_meta.is_encryption_enabled,
encryption=True,
)
except Exception as e:
msg = f"Failed to create async upload of {remote_path}"
Expand All @@ -277,7 +277,7 @@ def get_udf_create_statement(
)
return self._storage_loader.download_data(
remote_path,
encryption=backup_meta.is_encryption_enabled,
encryption=True, # backup_meta.encrypted,
)

def get_local_nc_create_statement(self, nc_name: str) -> Optional[str]:
Expand Down Expand Up @@ -306,7 +306,7 @@ def get_named_collection_create_statement(
remote_path = _named_collections_data_path(backup_meta.path, filename)
return self._storage_loader.download_data(
remote_path,
encryption=backup_meta.is_encryption_enabled,
encryption=True,
)

def get_backup_names(self) -> Sequence[str]:
Expand Down Expand Up @@ -395,10 +395,7 @@ def get_database_create_statement(
Download and return database create statement.
"""
remote_path = _db_metadata_path(backup_meta.path, db_name)
return self._storage_loader.download_data(
remote_path,
encryption=backup_meta.is_encryption_enabled,
)
return self._storage_loader.download_data(remote_path, encryption=True)

def write_database_metadata(self, db: Database, db_sql: str) -> None:
"""
Expand All @@ -417,10 +414,7 @@ def get_table_create_statement(
Download and return table create statement.
"""
remote_path = _table_metadata_path(backup_meta.path, db_name, table_name)
return self._storage_loader.download_data(
remote_path,
encryption=backup_meta.is_encryption_enabled,
)
return self._storage_loader.download_data(remote_path, encryption=True)

def download_access_control_file(
self, local_path: str, backup_meta: BackupMetadata, file_name: str
Expand All @@ -439,7 +433,7 @@ def download_access_control_file(
self._storage_loader.download_file(
remote_path,
local_path,
encryption=backup_meta.is_encryption_enabled,
encryption=True,
)
except Exception as e:
msg = f"Failed to download access control metadata file {remote_path}"
Expand All @@ -461,7 +455,7 @@ def download_access_control(
self._storage_loader.download_files(
remote_path,
local_path,
encryption=backup_meta.is_encryption_enabled,
encryption=True,
)
except Exception as e:
msg = f"Failed to download access control metadata file {remote_path}"
Expand Down Expand Up @@ -498,7 +492,7 @@ def download_data_part(
remote_path=remote_path,
local_path=fs_part_path,
is_async=True,
encryption=backup_meta.is_encryption_enabled,
encryption=part.encrypted,
)
except Exception as e:
msg = f"Failed to download part tarball file {remote_path}"
Expand All @@ -513,7 +507,7 @@ def download_data_part(
remote_path=remote_path,
local_path=local_path,
is_async=True,
encryption=backup_meta.is_encryption_enabled,
encryption=part.encrypted,
)
except Exception as e:
msg = f"Failed to download part file {remote_path}"
Expand Down
8 changes: 4 additions & 4 deletions ch_backup/backup/metadata/backup_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(
hostname: str = None,
labels: dict = None,
schema_only: bool = False,
is_encryption_enabled: bool = True,
encrypted: bool = True,
) -> None:
self.name = name
self.path = path
Expand All @@ -62,7 +62,7 @@ def __init__(
self.size = 0
self.real_size = 0
self.schema_only = schema_only
self.is_encryption_enabled = is_encryption_enabled
self.encrypted = encrypted
self.cloud_storage: CloudStorageMetadata = CloudStorageMetadata()

self._state = BackupState.CREATING
Expand Down Expand Up @@ -147,7 +147,7 @@ def dump(self, light: bool = False) -> dict:
# to replace 'date_fmt' with 'time_format'.
"date_fmt": self.time_format,
"schema_only": self.schema_only,
"is_encryption_enabled": self.is_encryption_enabled,
"encrypted": self.encrypted,
},
}

Expand Down Expand Up @@ -199,7 +199,7 @@ def load(cls, data: dict) -> "BackupMetadata":
backup.labels = meta["labels"]
backup.version = meta["version"]
backup.schema_only = meta.get("schema_only", False)
backup.is_encryption_enabled = meta.get("is_encryption_enabled", True)
backup.encrypted = meta.get("encrypted", True)
# TODO remove after a several weeks/months, when backups rotated
# OR NOT TODO, because of backward compatibility
backup._user_defined_functions = data.get(
Expand Down
16 changes: 14 additions & 2 deletions ch_backup/backup/metadata/part_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class RawMetadata(Slotted):
Raw metadata for ClickHouse data part.
"""

__slots__ = "checksum", "size", "files", "tarball", "link", "disk_name"
__slots__ = "checksum", "size", "files", "tarball", "link", "disk_name", "encrypted"

def __init__(
self,
Expand All @@ -23,13 +23,15 @@ def __init__(
tarball: bool,
link: str = None,
disk_name: str = None,
encrypted: bool = True,
) -> None:
self.checksum = checksum
self.size = size
self.files = files
self.tarball = tarball
self.link = link
self.disk_name = disk_name
self.encrypted = encrypted


class PartMetadata(Slotted):
Expand All @@ -51,12 +53,13 @@ def __init__(
tarball: bool,
link: str = None,
disk_name: str = None,
encrypted: bool = True,
) -> None:
self.database: str = database
self.table: str = table
self.name: str = name
self.raw_metadata: RawMetadata = RawMetadata(
checksum, size, files, tarball, link, disk_name
checksum, size, files, tarball, link, disk_name, encrypted
)

@property
Expand Down Expand Up @@ -101,6 +104,13 @@ def tarball(self) -> bool:
"""
return self.raw_metadata.tarball

@property
def encrypted(self) -> bool:
"""
Returns true if part is encrypted
"""
return self.raw_metadata.encrypted

@classmethod
def load(
cls, db_name: str, table_name: str, part_name: str, raw_metadata: dict
Expand All @@ -118,6 +128,7 @@ def load(
tarball=raw_metadata.get("tarball", False),
link=raw_metadata["link"],
disk_name=raw_metadata.get("disk_name", "default"),
encrypted=raw_metadata.get("encrypted", True),
)

@classmethod
Expand All @@ -134,4 +145,5 @@ def from_frozen_part(cls, frozen_part: FrozenPart) -> "PartMetadata":
files=frozen_part.files,
tarball=True,
disk_name=frozen_part.disk_name,
encrypted=frozen_part.encrypted,
)
1 change: 1 addition & 0 deletions ch_backup/backup/metadata/table_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def add_part(self, part: PartMetadata) -> None:
"link": part.link,
"tarball": part.tarball,
"disk_name": part.disk_name,
"encrypted": part.encrypted,
}

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ def backup(
ch_version=self._context.ch_ctl.get_version(),
time_format=self._context.config["time_format"],
schema_only=sources.schema_only,
is_encryption_enabled=self._config.get(EncryptStage.stype, {}).get(
"is_enabled", True
encrypted=self._config.get(EncryptStage.stype, {}).get(
"enabled", True
),
)

Expand Down
12 changes: 8 additions & 4 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from pkg_resources import parse_version

from ch_backup import logging
from ch_backup.backup.metadata import TableMetadata
from ch_backup.backup.metadata import BackupMetadata, TableMetadata
from ch_backup.backup.restore_context import RestoreContext
from ch_backup.calculators import calc_aligned_files_size
from ch_backup.clickhouse.client import ClickhouseClient
Expand Down Expand Up @@ -182,7 +182,8 @@
files Array(String),
tarball Bool,
disk_name String,
verified Bool
verified Bool,
encrypted Nullable(Bool)
)
ENGINE = MergeTree()
ORDER BY (database, table, name, checksum)
Expand Down Expand Up @@ -724,13 +725,15 @@ def get_zookeeper_admin_uuid(self) -> Dict[str, str]:

@staticmethod
def scan_frozen_parts(
table: Table, disk: Disk, data_path: str, backup_name: str
table: Table, disk: Disk, data_path: str, backup_meta: BackupMetadata,
) -> Iterable[FrozenPart]:
"""
Yield frozen parts from specific disk and path.
"""
table_relative_path = os.path.relpath(data_path, disk.path)
path = os.path.join(disk.path, "shadow", backup_name, table_relative_path)
path = os.path.join(
disk.path, "shadow", backup_meta.get_sanitized_name(), table_relative_path
)

if not os.path.exists(path):
logging.debug("Shadow path {} is empty", path)
Expand All @@ -757,6 +760,7 @@ def scan_frozen_parts(
checksum,
size,
rel_paths,
backup_meta.encrypted,
)

@staticmethod
Expand Down
3 changes: 3 additions & 0 deletions ch_backup/clickhouse/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ class FrozenPart(Slotted):
"checksum",
"size",
"files",
"encrypted",
)

def __init__(
Expand All @@ -248,6 +249,7 @@ def __init__(
checksum: str,
size: int,
files: List[str],
encrypted: bool,
):
super().__init__()
self.database = database
Expand All @@ -258,3 +260,4 @@ def __init__(
self.checksum = checksum
self.size = size
self.files = files
self.encrypted = encrypted
2 changes: 1 addition & 1 deletion ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _as_seconds(t: str) -> int:
"queue_size": 10,
},
"encryption": {
"is_enabled": True,
"enabled": True,
"type": "nacl",
# Chunk size used when encrypting / decrypting data, in bytes.
"chunk_size": parse_size("8 MiB"),
Expand Down
Loading

0 comments on commit e832790

Please sign in to comment.