From aa1b71292993a0a5766664143ba6ac517d0c0b0e Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Thu, 21 Sep 2023 13:08:48 +0000 Subject: [PATCH 1/8] init --- ch_backup/backup/deduplication.py | 6 +- ch_backup/backup/layout.py | 32 +++++----- ch_backup/ch_backup.py | 15 ++--- ch_backup/cli.py | 8 +-- ch_backup/clickhouse/client.py | 2 +- ch_backup/clickhouse/control.py | 6 +- ch_backup/clickhouse/disks.py | 4 +- ch_backup/config.py | 49 ++++++++++++++-- ch_backup/logging.py | 58 +++++++++++-------- ch_backup/logic/database.py | 4 +- ch_backup/logic/table.py | 38 ++++++------ ch_backup/logic/udf.py | 8 +-- .../async_pipeline/base_pipeline/exec_pool.py | 9 +-- ch_backup/storage/pipeline.py | 4 +- ch_backup/util.py | 2 +- requirements.txt | 1 + 16 files changed, 149 insertions(+), 97 deletions(-) diff --git a/ch_backup/backup/deduplication.py b/ch_backup/backup/deduplication.py index e50010a0..84ff904b 100644 --- a/ch_backup/backup/deduplication.py +++ b/ch_backup/backup/deduplication.py @@ -229,7 +229,7 @@ def deduplicate_part( """ part_name = fpart.name - logging.debug('Looking for deduplication of part "%s"', part_name) + logging.debug('Looking for deduplication of part "{}"', part_name) existing_part = dedup_info.get(part_name) if not existing_part: @@ -253,14 +253,14 @@ def deduplicate_part( if not existing_part.verified: if not layout.check_data_part(existing_part.backup_path, part): logging.debug( - 'Part "%s" found in "%s", but it\'s invalid, skipping', + 'Part "{}" found in "{}", but it\'s invalid, skipping', part_name, existing_part.backup_path, ) return None logging.debug( - 'Part "%s" found in "%s", reusing', part_name, existing_part.backup_path + 'Part "{}" found in "{}", reusing', part_name, existing_part.backup_path ) return part diff --git a/ch_backup/backup/layout.py b/ch_backup/backup/layout.py index 99f850b1..22490a44 100644 --- a/ch_backup/backup/layout.py +++ b/ch_backup/backup/layout.py @@ -45,11 +45,11 @@ def upload_backup_metadata(self, backup: BackupMetadata) -> None: remote_path = self._backup_metadata_path(backup.name) remote_light_path = self._backup_light_metadata_path(backup.name) try: - logging.debug("Saving backup metadata in %s", remote_path) + logging.debug("Saving backup metadata in {}", remote_path) self._storage_loader.upload_data( backup.dump_json(light=False), remote_path=remote_path ) - logging.debug("Saving backup light metadata in %s", remote_light_path) + logging.debug("Saving backup light metadata in {}", remote_light_path) self._storage_loader.upload_data( backup.dump_json(light=True), remote_path=remote_light_path ) @@ -66,7 +66,7 @@ def upload_database_create_statement(self, backup_name: str, db: Database) -> No remote_path = _db_metadata_path(self.get_backup_path(backup_name), db.name) try: logging.debug( - 'Uploading metadata (create statement) for database "%s"', db.name + 'Uploading metadata (create statement) for database "{}"', db.name ) self._storage_loader.upload_file( local_path, remote_path=remote_path, encryption=True @@ -88,7 +88,7 @@ def upload_table_create_statement( ) try: logging.debug( - 'Uploading metadata (create statement) for table "%s"."%s"', + 'Uploading metadata (create statement) for table "{}"."{}"', db.name, table.name, ) @@ -108,7 +108,7 @@ def upload_access_control_file(self, backup_name: str, file_name: str) -> None: self.get_backup_path(backup_name), file_name ) try: - logging.debug('Uploading access control data "%s"', local_path) + logging.debug('Uploading access control data "{}"', local_path) self._storage_loader.upload_file( local_path=local_path, remote_path=remote_path, encryption=True ) @@ -127,7 +127,7 @@ def upload_access_control_files( self.get_backup_path(backup_name), ACCESS_CONTROL_FNAME ) try: - logging.debug('Uploading access control data "%s"', local_path) + logging.debug('Uploading access control data "{}"', local_path) self._storage_loader.upload_files_tarball( self._access_control_path, file_names, remote_path, encryption=True ) @@ -154,7 +154,7 @@ def upload_data_part(self, backup_name: str, fpart: FrozenPart) -> None: Upload part data. """ logging.debug( - 'Uploading data part %s of "%s"."%s"', + 'Uploading data part {} of "{}"."{}"', fpart.name, fpart.database, fpart.table, @@ -255,7 +255,7 @@ def get_backups(self, use_light_meta: bool = False) -> List[BackupMetadata]: Return list of existing backups sorted by start_time in descent order. """ logging.debug( - "Collecting %s of existing backups", + "Collecting {} of existing backups", "light metadata" if use_light_meta else "metadata", ) @@ -322,7 +322,7 @@ def download_access_control_file(self, backup_name: str, file_name: str) -> None ) local_path = os.path.join(self._access_control_path, file_name) logging.debug( - 'Downloading access control metadata "%s" to "%s', remote_path, local_path + 'Downloading access control metadata "{}" to "{}', remote_path, local_path ) try: self._storage_loader.download_file(remote_path, local_path, encryption=True) @@ -339,7 +339,7 @@ def download_access_control(self, backup_name: str) -> None: ) local_path = self._access_control_path logging.debug( - 'Downloading access control metadata "%s" to "%s', remote_path, local_path + 'Downloading access control metadata "{}" to "{}', remote_path, local_path ) try: self._storage_loader.download_files( @@ -356,7 +356,7 @@ def download_data_part( Download part data to the specified directory. """ logging.debug( - 'Downloading data part %s of "%s"."%s"', + 'Downloading data part {} of "{}"."{}"', part.name, part.database, part.table, @@ -370,7 +370,7 @@ def download_data_part( if part.tarball: remote_path = os.path.join(remote_dir_path, f"{part.name}.tar") - logging.debug("Downloading part tarball file: %s", remote_path) + logging.debug("Downloading part tarball file: {}", remote_path) try: self._storage_loader.download_files( remote_path=remote_path, @@ -386,7 +386,7 @@ def download_data_part( local_path = os.path.join(fs_part_path, filename) remote_path = os.path.join(remote_dir_path, filename) try: - logging.debug("Downloading part file: %s", remote_path) + logging.debug("Downloading part file: {}", remote_path) self._storage_loader.download_file( remote_path=remote_path, local_path=local_path, @@ -422,7 +422,7 @@ def check_data_part(self, backup_path: str, part: PartMetadata) -> bool: notfound_files = set(part.files) - set(remote_files) if notfound_files: logging.warning( - "Some part files were not found in %s: %s", + "Some part files were not found in {}: {}", remote_dir_path, ", ".join(notfound_files), ) @@ -469,7 +469,7 @@ def delete_backup(self, backup_name: str) -> None: """ backup_path = self.get_backup_path(backup_name) - logging.debug("Deleting data in %s", backup_path) + logging.debug("Deleting data in {}", backup_path) deleting_files = self._storage_loader.list_dir( backup_path, recursive=True, absolute=True @@ -490,7 +490,7 @@ def delete_data_parts( part_path = _part_path( part.link or backup_meta.path, part.database, part.table, part.name ) - logging.debug("Deleting data part %s", part_path) + logging.debug("Deleting data part {}", part_path) if part.tarball: deleting_files.append(os.path.join(part_path, f"{part.name}.tar")) else: diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index 713ace20..d627a947 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -165,7 +165,7 @@ def backup( self._context.backup_meta ) logging.debug( - 'Starting backup "%s" for databases: %s', + 'Starting backup "{}" for databases: {}', self._context.backup_meta.name, ", ".join(map(lambda db: db.name, databases)), ) @@ -191,7 +191,8 @@ def backup( self._context.backup_meta.state = BackupState.CREATED except (Exception, TerminatingSignal): - logging.critical("Backup failed", exc_info=True) + logging.critical("Backup failed") + #exc_info=True) self._context.backup_meta.state = BackupState.FAILED raise finally: @@ -273,7 +274,7 @@ def restore( ] if missed_databases: logging.critical( - "Required databases %s were not found in backup metadata: %s", + "Required databases {} were not found in backup metadata: {}", ", ".join(missed_databases), self._context.backup_meta.path, ) @@ -360,13 +361,13 @@ def purge(self) -> Tuple[Sequence[str], Optional[str]]: with self._context.locker(): for backup in self._context.backup_layout.get_backups(use_light_meta=False): if backup.name not in backup_names: - logging.info("Deleting backup without metadata: %s", backup.name) + logging.info("Deleting backup without metadata: {}", backup.name) self._context.backup_layout.delete_backup(backup.name) continue if retain_count > 0: logging.info( - "Preserving backup per retain count policy: %s, state %s", + "Preserving backup per retain count policy: {}, state {}", backup.name, backup.state, ) @@ -377,7 +378,7 @@ def purge(self) -> Tuple[Sequence[str], Optional[str]]: if retain_time_limit and backup.start_time >= retain_time_limit: logging.info( - "Preserving backup per retain time policy: %s, state %s", + "Preserving backup per retain time policy: {}, state {}", backup.name, backup.state, ) @@ -413,7 +414,7 @@ def _delete( self, backup: BackupMetadata, dedup_references: DedupReferences ) -> Tuple[Optional[str], Optional[str]]: logging.info( - "Deleting backup %s, state: %s", + "Deleting backup {}, state: {}", backup.name, backup.state, ) diff --git a/ch_backup/cli.py b/ch_backup/cli.py index 17f33dc2..0dd498da 100755 --- a/ch_backup/cli.py +++ b/ch_backup/cli.py @@ -127,7 +127,7 @@ def cli( if zk_hosts is not None: cfg["zookeeper"]["hosts"] = zk_hosts - logging.configure(cfg["logging"]) + logging.configure(cfg["logging"],cfg["loguru"]) setup_environment(cfg["main"]) if not drop_privileges(cfg["main"]): @@ -149,7 +149,7 @@ def decorator(f): def wrapper(ctx, *args, **kwargs): try: logging.info( - "Executing command '%s', params: %s, args: %s, version: %s", + "Executing command '{}', params: {}, args: {}, version: {}", ctx.command.name, { **ctx.parent.params, @@ -159,10 +159,10 @@ def wrapper(ctx, *args, **kwargs): get_version(), ) result = ctx.invoke(f, ctx, ctx.obj["backup"], *args, **kwargs) - logging.info("Command '%s' completed", ctx.command.name) + logging.info("Command '{}' completed", ctx.command.name) return result except (Exception, TerminatingSignal): - logging.exception("Command '%s' failed", ctx.command.name) + logging.exception("Command '{}' failed", ctx.command.name) raise return cli.command(*args, **kwargs)(wrapper) diff --git a/ch_backup/clickhouse/client.py b/ch_backup/clickhouse/client.py index d292e2dc..aeded6d5 100644 --- a/ch_backup/clickhouse/client.py +++ b/ch_backup/clickhouse/client.py @@ -48,7 +48,7 @@ def query( Execute query. """ try: - logging.debug("Executing query: %s", query) + logging.debug("Executing query: {}", query) if timeout is None: timeout = self.timeout diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 394d1d0e..72d11318 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -386,14 +386,14 @@ def remove_freezed_data(self) -> None: for disk in self._disks.values(): if disk.type == "local": shadow_path = os.path.join(disk.path, "shadow") - logging.debug("Removing shadow data: %s", shadow_path) + logging.debug("Removing shadow data: {}", shadow_path) self._remove_shadow_data(shadow_path) def remove_freezed_part(self, part: FrozenPart) -> None: """ Remove the freezed part. """ - logging.debug("Removing freezed part: %s", part.path) + logging.debug("Removing freezed part: {}", part.path) self._remove_shadow_data(part.path) def get_databases( @@ -607,7 +607,7 @@ def list_frozen_parts( path = os.path.join(disk.path, "shadow", backup_name, table_relative_path) if not os.path.exists(path): - logging.debug("Shadow path %s is empty", path) + logging.debug("Shadow path {} is empty", path) return [] freezed_parts: List[FrozenPart] = [] diff --git a/ch_backup/clickhouse/disks.py b/ch_backup/clickhouse/disks.py index be157b77..87fd8a04 100644 --- a/ch_backup/clickhouse/disks.py +++ b/ch_backup/clickhouse/disks.py @@ -2,7 +2,7 @@ Clickhouse-disks controls temporary cloud storage disks management. """ -import logging + import os from subprocess import PIPE, Popen from typing import Dict, List, Optional @@ -184,7 +184,7 @@ def _copy_dir(from_disk: str, from_path: str, to_disk: str, to_path: str) -> Non def _exec(command: str, common_args: List[str], command_args: List[str]) -> List[str]: - logger = logging.getLogger("clickhouse-disks") + logger = ch_logging.getLogger("clickhouse-disks") command_args = [ "/usr/bin/clickhouse-disks", *common_args, diff --git a/ch_backup/config.py b/ch_backup/config.py index 20c5b1d4..d07cfd79 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -9,7 +9,7 @@ import yaml from humanfriendly import parse_size, parse_timespan -from ch_backup import logging +from loguru import logger def _as_seconds(t: str) -> int: @@ -165,17 +165,17 @@ def _as_seconds(t: str) -> int: }, "handlers": { "ch-backup": { - "class": "logging.FileHandler", + "class": "logger.FileHandler", "filename": "/var/log/ch-backup/ch-backup.log", "formatter": "ch-backup", }, "boto": { - "class": "logging.FileHandler", + "class": "logger.FileHandler", "filename": "/var/log/ch-backup/boto.log", "formatter": "boto", }, "clickhouse-disks": { - "class": "logging.FileHandler", + "class": "logger.FileHandler", "filename": "/var/log/ch-backup/clickhouse-disks.log", "formatter": "ch-backup", }, @@ -212,6 +212,43 @@ def _as_seconds(t: str) -> int: }, }, }, + "loguru": { + "handlers": [ + { + 'name': 'ch-backup', + "sink": "/var/log/ch-backup/ch-backup.log", + "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} {extra[name]} [{level:8}] {message}", + 'level': 'DEBUG', + 'enqueue': True, + }, + { + 'name': 'zookeper', + "sink": "/var/log/ch-backup/ch-backup.log", + "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} {extra[name]} [{level:8}] {message}", + 'level': 'DEBUG', + 'enqueue': True, + + }, + { + 'name': 'boto', + "sink": "/var/log/ch-backup/ch-boto.log", + "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} {extra[name]} [{level:8}] {message}", + 'level': 'DEBUG', + 'enqueue': True, + }, + { + 'name': 'clickhouse-disks', + "sink": "/var/log/ch-backup/clickhouse-disks.log", + "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} {extra[name]} [{level:8}] {message}", + 'level': 'INFO', + 'enqueue': True, + }, + + ], + "activation": [ + ("",True), + ], + }, "zookeeper": { "secure": False, "cert": None, @@ -272,14 +309,14 @@ def __getitem__(self, item): try: return self._conf[item] except KeyError: - logging.critical('Config item "%s" was not defined', item) + logger.critical('Config item "{}" was not defined', item) raise def __setitem__(self, item, value): try: self._conf[item] = value except KeyError: - logging.critical('Config item "%s" was not defined', item) + logger.critical('Config item "{}" was not defined', item) raise def get(self, key: str, default: Any = None) -> Any: diff --git a/ch_backup/logging.py b/ch_backup/logging.py index 0f80e21b..94437110 100644 --- a/ch_backup/logging.py +++ b/ch_backup/logging.py @@ -2,67 +2,80 @@ Logging module. """ -import logging -import logging.config import os +from typing import Any import psutil -from ch_backup.util import format_size +from ch_backup.util import format_size, cached_property +from loguru import logger -def configure(config: dict) -> None: +class Filter: + def __init__(self, name): + self._name = name + + def __call__(self, record): + return record["extra"].get("name") == self._name + + +def make_filter(name): + return Filter(name) + +def configure(config: dict, config_loguru: dict) -> None: """ - Configure logging. + Configure logger. """ - for handler in config.get("handlers", {}).values(): - filename = handler.get("filename") - if filename: - os.makedirs(os.path.dirname(filename), exist_ok=True) + for handler in config_loguru["handlers"]: + handler['filter'] = make_filter(handler['name']) - logging.config.dictConfig(config) + config_loguru + logger.configure( + handlers = config_loguru["handlers"], + activation = config_loguru["activation"] + ) def critical(msg, *args, **kwargs): """ Log a message with severity 'CRITICAL'. """ - _get_logger().critical(msg, *args, **kwargs) + getLogger('ch-backup').critical(msg, *args, **kwargs) -def error(msg, *args, **kwargs): +def error(msg, exc_info=False,*args, **kwargs): """ Log a message with severity 'ERROR'. """ - _get_logger().error(msg, *args, **kwargs) + getLogger('ch-backup').opt(exception=exc_info).error(msg, *args, **kwargs) def exception(msg, *args, **kwargs): """ Log a message with severity 'ERROR' with exception information. """ - _get_logger().exception(msg, *args, **kwargs) + getLogger('ch-backup').exception(msg, *args, **kwargs) -def warning(msg, *args, **kwargs): +def warning(msg, exc_info=False, *args, **kwargs): """ Log a message with severity 'WARNING'. """ - _get_logger().warning(msg, *args, **kwargs) + getLogger('ch-backup').opt(exception=exc_info).warning(msg, *args, **kwargs) def info(msg, *args, **kwargs): """ Log a message with severity 'INFO'. """ - _get_logger().info(msg, *args, **kwargs) + getLogger('ch-backup').info(msg, *args, **kwargs) def debug(msg, *args, **kwargs): """ Log a message with severity 'DEBUG'. """ - _get_logger().debug(msg, *args, **kwargs) + getLogger('ch-backup').debug(msg, *args, **kwargs) def memory_usage(): @@ -84,15 +97,14 @@ def memory_usage(): total_usage = main_proc_usage + worker_procs_usage debug( - "Memory usage: %s (main process: %s, worker processes: %s)", + "Memory usage: {} (main process: {}, worker processes: {})", format_size(total_usage), format_size(main_proc_usage), format_size(worker_procs_usage), ) except Exception: - warning("Unable to get memory usage", exc_info=True) - + warning("Unable to get memory usage",exc_info=True) -def _get_logger() -> logging.Logger: - return logging.getLogger("ch-backup") +def getLogger(name: str): + return logger.bind(name=name) diff --git a/ch_backup/logic/database.py b/ch_backup/logic/database.py index a23bd518..a13669e3 100644 --- a/ch_backup/logic/database.py +++ b/ch_backup/logic/database.py @@ -53,7 +53,7 @@ def restore(context: BackupContext, databases: Dict[str, Database]) -> None: databases_to_restore[name] = db continue - logging.info("Restoring databases: %s", ", ".join(databases_to_restore.keys())) + logging.info("Restoring databases: {}", ", ".join(databases_to_restore.keys())) for db in databases_to_restore.values(): if db.has_embedded_metadata(): db_sql = embedded_schema_db_sql(db) @@ -86,7 +86,7 @@ def _backup_database(context: BackupContext, db: Database) -> None: """ Backup database. """ - logging.debug('Performing database backup for "%s"', db.name) + logging.debug('Performing database backup for "{}"', db.name) if not db.has_embedded_metadata(): context.backup_layout.upload_database_create_statement( diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 43b6f7d2..9e29e600 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -88,7 +88,7 @@ def _collect_local_metadata_mtime( mtime = self._get_mtime(table.metadata_path) if mtime is None: logging.warning( - 'Cannot get metadata mtime for table "%s"."%s". Skipping it', + 'Cannot get metadata mtime for table "{}"."{}". Skipping it', table.database, table.name, ) @@ -200,7 +200,7 @@ def restore( ] if missed_tables: logging.critical( - "Required tables %s were not found in backup metadata", + "Required tables {} were not found in backup metadata", ", ".join([f"{t.database}.{t.name}" for t in missed_tables]), ) raise ClickhouseBackupError( @@ -279,7 +279,7 @@ def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: """ if not table.metadata_path: logging.debug( - 'Cannot load a create statement of the table "%s"."%s". Metadata is empty', + 'Cannot load a create statement of the table "{}"."{}". Metadata is empty', table.database, table.name, ) @@ -288,7 +288,7 @@ def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: return Path(table.metadata_path).read_bytes() except OSError as e: logging.debug( - 'Cannot load a create statement of the table "%s"."%s": %s', + 'Cannot load a create statement of the table "{}"."{}": {}', table.database, table.name, str(e), @@ -311,14 +311,14 @@ def _backup_table( Return backup metadata of successfully backuped table, otherwise None. """ logging.debug( - 'Performing table backup for "%s"."%s"', table.database, table.name + 'Performing table backup for "{}"."{}"', table.database, table.name ) table_meta = TableMetadata(table.database, table.name, table.engine, table.uuid) create_statement = self._load_create_statement_from_disk(table) if not create_statement: logging.warning( - 'Skipping table backup for "%s"."%s". Local metadata is empty or absent', + 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', db.name, table.name, ) @@ -333,7 +333,7 @@ def _backup_table( raise logging.warning( - 'Table "%s"."%s" was removed by a user during backup', + 'Table "{}"."{}" was removed by a user during backup', table.database, table.name, ) @@ -343,7 +343,7 @@ def _backup_table( new_mtime = self._get_mtime(table.metadata_path) if new_mtime is None or mtimes[table.name].mtime != new_mtime: logging.warning( - 'Skipping table backup for "%s"."%s". The metadata file was updated or removed during backup', + 'Skipping table backup for "{}"."{}". The metadata file was updated or removed during backup', table.database, table.name, ) @@ -375,13 +375,13 @@ def _backup_frozen_table_data( """ if not is_merge_tree(table.engine): logging.info( - 'Skipping table data backup for non MergeTree table "%s"."%s"', + 'Skipping table data backup for non MergeTree table "{}"."{}"', table.database, table.name, ) return - logging.debug('Uploading table data for "%s"."%s"', table.database, table.name) + logging.debug('Uploading table data for "{}"."{}"', table.database, table.name) uploaded_parts = [] for data_path, disk in table.paths_with_disks: @@ -390,7 +390,7 @@ def _backup_frozen_table_data( ) for fpart in freezed_parts: - logging.debug("Working on %s", fpart) + logging.debug("Working on {}", fpart) if disk.type == "s3": table_meta.add_part(PartMetadata.from_frozen_part(fpart)) @@ -470,7 +470,7 @@ def _preprocess_tables_to_restore( ): continue logging.warning( - 'Table "%s"."%s" will be recreated as its schema mismatches the schema from backup: "%s" != "%s"', + 'Table "{}"."{}" will be recreated as its schema mismatches the schema from backup: "{}" != "{}"', table.database, table.name, existing_table.create_statement, @@ -518,7 +518,7 @@ def _restore_tables( other_tables = [] for table in tables: logging.debug( - "Preparing table %s for restoring", f"{table.database}.{table.name}" + "Preparing table {} for restoring", f"{table.database}.{table.name}" ) self._rewrite_table_schema( context, databases[table.database], table, add_uuid_if_required=True @@ -540,7 +540,7 @@ def _restore_tables( ] logging.debug( - "Deleting replica metadata for replicated tables: %s", + "Deleting replica metadata for replicated tables: {}", ", ".join([f"{t.database}.{t.name}" for t in replicated_tables]), ) context.zk_ctl.delete_replica_metadata( @@ -567,7 +567,7 @@ def _restore_data( for table_meta in tables: try: logging.debug( - 'Running table "%s.%s" data restore', + 'Running table "{}.{}" data restore', table_meta.database, table_meta.name, ) @@ -631,7 +631,7 @@ def _restore_data( ) for part in attach_parts: logging.debug( - 'Attaching "%s.%s" part: %s', + 'Attaching "{}.{}" part: {}', table_meta.database, table.name, part.name, @@ -641,7 +641,7 @@ def _restore_data( context.restore_context.add_part(part, PartState.RESTORED) except Exception as e: logging.warning( - 'Attaching "%s.%s" part %s failed: %s', + 'Attaching "{}.{}" part {} failed: {}', table_meta.database, table.name, part.name, @@ -698,7 +698,7 @@ def _restore_table_objects( table = unprocessed.popleft() try: logging.debug( - "Trying to restore table object for table %s", + "Trying to restore table object for table {}", f"{table.database}.{table.name}", ) self._restore_table_object(context, databases[table.database], table) @@ -723,7 +723,7 @@ def _restore_table_objects( if errors: logging.error( - "Failed to restore tables:\n%s", + "Failed to restore tables:\n{}", "\n".join(f'"{v.database}"."{v.name}": {e!r}' for v, e in errors), ) diff --git a/ch_backup/logic/udf.py b/ch_backup/logic/udf.py index 84237bb2..84dd1670 100644 --- a/ch_backup/logic/udf.py +++ b/ch_backup/logic/udf.py @@ -23,7 +23,7 @@ def backup(self, context: BackupContext) -> None: for udf_name in udf.keys(): context.backup_meta.add_udf(udf_name) - logging.debug("Performing UDF backup for: %s", " ,".join(udf.keys())) + logging.debug("Performing UDF backup for: {}", " ,".join(udf.keys())) for udf_name, udf_statement in udf.items(): context.backup_layout.upload_udf( context.backup_meta.name, udf_name, udf_statement @@ -40,13 +40,13 @@ def restore(self, context: BackupContext) -> None: if not udf_list: return - logging.info("Restoring UDFs: %s", " ,".join(udf_list)) + logging.info("Restoring UDFs: {}", " ,".join(udf_list)) udf_on_clickhouse = context.ch_ctl.get_udf_query() udf_on_clickhouse_list = udf_on_clickhouse.keys() for udf_name in udf_list: - logging.debug("Restoring UDF %s", udf_name) + logging.debug("Restoring UDF {}", udf_name) statement = context.backup_layout.get_udf_create_statement( context.backup_meta, udf_name @@ -62,7 +62,7 @@ def restore(self, context: BackupContext) -> None: if udf_name not in udf_on_clickhouse_list: context.ch_ctl.restore_udf(statement) - logging.debug("UDF %s restored", udf_name) + logging.debug("UDF {} restored", udf_name) logging.info("All UDFs restored") diff --git a/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py b/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py index d189f141..e2a39704 100644 --- a/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py +++ b/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py @@ -31,7 +31,7 @@ def submit(self, future_id: str, func: Callable, *args: Any, **kwargs: Any) -> N if future_id in self._futures: raise RuntimeError("Duplicate") future = self._pool.submit(func, *args, **kwargs) - future.add_done_callback(lambda _: logging.debug('Future "%s" completed', future_id)) # type: ignore[misc] + future.add_done_callback(lambda _: logging.debug('Future {} completed', future_id)) # type: ignore[misc] self._futures[future_id] = future def wait_all(self, keep_going: bool = False) -> None: @@ -49,13 +49,14 @@ def wait_all(self, keep_going: bool = False) -> None: except Exception: if keep_going: logging.warning( - 'Future "%s" generated an exception, skipping due to keep_going flag', + 'Future "{}" generated an exception, skipping due to keep_going flag', + # exc_info=True, future_id, - exc_info=True, ) continue logging.error( - 'Future "%s" generated an exception:', future_id, exc_info=True + 'Future "{}" generated an exception:', future_id + #, exc_info=True ) raise self._futures = {} diff --git a/ch_backup/storage/pipeline.py b/ch_backup/storage/pipeline.py index 6fce62d8..87764d9b 100644 --- a/ch_backup/storage/pipeline.py +++ b/ch_backup/storage/pipeline.py @@ -88,7 +88,7 @@ def submit(self, future_id: str, func: Callable, *args: Any, **kwargs: Any) -> N Schedule job for execution """ future = self._pool.submit(func, *args, **kwargs) - future.add_done_callback(logging.debug('Future "%s" completed', future_id)) + future.add_done_callback(logging.debug('Future "{}" completed', future_id)) self._futures[future_id] = future def wait_all(self) -> None: @@ -103,7 +103,7 @@ def wait_all(self) -> None: future.result() except Exception: logging.error( - 'Future "%s" generated an exception:', future_id, exc_info=True + 'Future "{}" generated an exception:', future_id, exc_info=True ) raise self._futures = {} diff --git a/ch_backup/util.py b/ch_backup/util.py index cb0b5334..f7ba3249 100644 --- a/ch_backup/util.py +++ b/ch_backup/util.py @@ -206,7 +206,7 @@ def retry( def _log_retry(retry_state): logging.debug( - "Retrying %s.%s in %.2fs, attempt: %s, reason: %r", + "Retrying {}.{} in {}, attempt: {}, reason: {}", retry_state.fn.__module__, retry_state.fn.__qualname__, retry_state.next_action.sleep, diff --git a/requirements.txt b/requirements.txt index c2ac3a98..ba26cb5c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,4 @@ xmltodict pypeln==0.4.9 dataclasses>=0.7,<0.8; python_version <"3.7" # required for pypeln==0.4.9 typing_extensions>=3.7.4,<4.0; python_version <"3.8" # required for pypeln==0.4.9 +loguru From 4e9462c0079abc5bb2f93392d13e1402ec6a659f Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Fri, 22 Sep 2023 12:44:24 +0000 Subject: [PATCH 2/8] More --- ch_backup/ch_backup.py | 3 +- ch_backup/config.py | 18 ++++-- ch_backup/logging.py | 55 +++++++++++++++---- .../async_pipeline/base_pipeline/exec_pool.py | 4 +- ch_backup/zookeeper/zookeeper.py | 8 +-- 5 files changed, 62 insertions(+), 26 deletions(-) diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index d627a947..de63e05e 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -191,8 +191,7 @@ def backup( self._context.backup_meta.state = BackupState.CREATED except (Exception, TerminatingSignal): - logging.critical("Backup failed") - #exc_info=True) + logging.critical("Backup failed", exc_info=True) self._context.backup_meta.state = BackupState.FAILED raise finally: diff --git a/ch_backup/config.py b/ch_backup/config.py index d07cfd79..b55eaf3b 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -217,32 +217,40 @@ def _as_seconds(t: str) -> int: { 'name': 'ch-backup', "sink": "/var/log/ch-backup/ch-backup.log", - "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} {extra[name]} [{level:8}] {message}", + "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}", 'level': 'DEBUG', 'enqueue': True, }, { 'name': 'zookeper', "sink": "/var/log/ch-backup/ch-backup.log", - "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} {extra[name]} [{level:8}] {message}", + "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}", 'level': 'DEBUG', 'enqueue': True, }, { 'name': 'boto', - "sink": "/var/log/ch-backup/ch-boto.log", - "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} {extra[name]} [{level:8}] {message}", + "sink": "/var/log/ch-backup/boto.log", + "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}", 'level': 'DEBUG', 'enqueue': True, }, { 'name': 'clickhouse-disks', "sink": "/var/log/ch-backup/clickhouse-disks.log", - "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} {extra[name]} [{level:8}] {message}", + "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}", 'level': 'INFO', 'enqueue': True, }, + { + 'name': 'urllib3.connectionpool', + "sink": "/var/log/ch-backup/boto.log", + "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}", + 'level': 'DEBUG', + 'enqueue': True, + }, + ], "activation": [ diff --git a/ch_backup/logging.py b/ch_backup/logging.py index 94437110..9ab0e763 100644 --- a/ch_backup/logging.py +++ b/ch_backup/logging.py @@ -4,9 +4,9 @@ import os from typing import Any - +import logging import psutil - +import inspect from ch_backup.util import format_size, cached_property from loguru import logger @@ -16,20 +16,43 @@ def __init__(self, name): self._name = name def __call__(self, record): - return record["extra"].get("name") == self._name - + if 'name' in record.get('extra', {}): + return record["extra"].get("name") == self._name + if record['name'] == self._name: + record["extra"]['name'] = self._name + return True + return False + def make_filter(name): return Filter(name) +class InterceptHandler(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + # Get corresponding Loguru level if it exists. + level: str | int + try: + level = logger.level(record.levelname).name + except ValueError: + level = record.levelno + + # Find caller from where originated the logged message. + frame, depth = inspect.currentframe(), 0 + while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__): + frame = frame.f_back + depth += 1 + logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) + + def configure(config: dict, config_loguru: dict) -> None: """ Configure logger. """ for handler in config_loguru["handlers"]: handler['filter'] = make_filter(handler['name']) - - config_loguru + del handler['name'] + + logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True) logger.configure( handlers = config_loguru["handlers"], activation = config_loguru["activation"] @@ -40,42 +63,50 @@ def critical(msg, *args, **kwargs): """ Log a message with severity 'CRITICAL'. """ - getLogger('ch-backup').critical(msg, *args, **kwargs) + with_exception = kwargs.get("exc_info", False) + getLogger('ch-backup').opt(exception=with_exception).critical(msg, *args, **kwargs) def error(msg, exc_info=False,*args, **kwargs): """ Log a message with severity 'ERROR'. """ - getLogger('ch-backup').opt(exception=exc_info).error(msg, *args, **kwargs) + with_exception = kwargs.get("exc_info", False) + getLogger('ch-backup').opt(exception=with_exception).error(msg, *args, **kwargs) + def exception(msg, *args, **kwargs): """ Log a message with severity 'ERROR' with exception information. """ - getLogger('ch-backup').exception(msg, *args, **kwargs) + + with_exception = kwargs.get("exc_info", False) + getLogger('ch-backup').opt(exception=with_exception).debug(msg, *args, **kwargs) def warning(msg, exc_info=False, *args, **kwargs): """ Log a message with severity 'WARNING'. """ - getLogger('ch-backup').opt(exception=exc_info).warning(msg, *args, **kwargs) + with_exception = kwargs.get("exc_info", False) + getLogger('ch-backup').opt(exception=with_exception).warning(msg, *args, **kwargs) def info(msg, *args, **kwargs): """ Log a message with severity 'INFO'. """ - getLogger('ch-backup').info(msg, *args, **kwargs) + with_exception = kwargs.get("exc_info", False) + getLogger('ch-backup').opt(exception=with_exception).info(msg, *args, **kwargs) def debug(msg, *args, **kwargs): """ Log a message with severity 'DEBUG'. """ - getLogger('ch-backup').debug(msg, *args, **kwargs) + with_exception = kwargs.get("exc_info", False) + getLogger('ch-backup').opt(exception=with_exception).debug(msg, *args, **kwargs) def memory_usage(): diff --git a/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py b/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py index e2a39704..e2a33284 100644 --- a/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py +++ b/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py @@ -50,13 +50,13 @@ def wait_all(self, keep_going: bool = False) -> None: if keep_going: logging.warning( 'Future "{}" generated an exception, skipping due to keep_going flag', - # exc_info=True, future_id, + exc_info=True, ) continue logging.error( 'Future "{}" generated an exception:', future_id - #, exc_info=True + , exc_info=True ) raise self._futures = {} diff --git a/ch_backup/zookeeper/zookeeper.py b/ch_backup/zookeeper/zookeeper.py index 41ea2e09..dd12e521 100644 --- a/ch_backup/zookeeper/zookeeper.py +++ b/ch_backup/zookeeper/zookeeper.py @@ -1,8 +1,6 @@ """ ZooKeeper-control classes module """ - -import logging import os from typing import Dict, Iterable, Tuple @@ -10,7 +8,7 @@ from kazoo.exceptions import KazooException, NoNodeError from kazoo.handlers.threading import KazooTimeoutError -from ch_backup.logging import debug +import ch_backup.logging as logging from ..clickhouse.models import Table from ..util import retry @@ -102,7 +100,7 @@ def delete_replica_metadata( "replicas", replica, ) # remove leading '/' - debug(f'Deleting zk node: "{path}"') + logging.debug(f'Deleting zk node: "{path}"') try: client.delete(path, recursive=True) except NoNodeError: @@ -124,7 +122,7 @@ def delete_replicated_database_metadata( path = os.path.join( self._zk_root_path, zk_path[1:].format(**macros) ) # remove leading '/' - debug(f'Deleting zk node: "{path}"') + logging.debug(f'Deleting zk node: "{path}"') try: client.delete(path, recursive=True) except NoNodeError: From 725d11d339154bd101fcc052e872a73394ddbcf3 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Fri, 22 Sep 2023 14:38:35 +0000 Subject: [PATCH 3/8] fix kazoo --- ch_backup/config.py | 62 +------------------------------- ch_backup/logging.py | 21 ++++++++--- ch_backup/zookeeper/zookeeper.py | 3 +- 3 files changed, 18 insertions(+), 68 deletions(-) diff --git a/ch_backup/config.py b/ch_backup/config.py index b55eaf3b..75115f7d 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -8,7 +8,6 @@ import yaml from humanfriendly import parse_size, parse_timespan - from loguru import logger @@ -153,65 +152,6 @@ def _as_seconds(t: str) -> int: "ca_bundle": [], "disable_ssl_warnings": False, }, - "logging": { - "version": 1, - "formatters": { - "ch-backup": { - "format": "%(asctime)s %(processName)-11s %(process)-5d [%(levelname)-8s] %(name)s: %(message)s", - }, - "boto": { - "format": "%(asctime)s %(processName)-11s %(process)-5d [%(levelname)-8s] %(name)s: %(message)s", - }, - }, - "handlers": { - "ch-backup": { - "class": "logger.FileHandler", - "filename": "/var/log/ch-backup/ch-backup.log", - "formatter": "ch-backup", - }, - "boto": { - "class": "logger.FileHandler", - "filename": "/var/log/ch-backup/boto.log", - "formatter": "boto", - }, - "clickhouse-disks": { - "class": "logger.FileHandler", - "filename": "/var/log/ch-backup/clickhouse-disks.log", - "formatter": "ch-backup", - }, - }, - "loggers": { - "ch-backup": { - "handlers": ["ch-backup"], - "level": "DEBUG", - }, - "botocore": { - "handlers": ["boto"], - "level": "INFO", - }, - "botocore.endpoint": { - "level": "DEBUG", - }, - "botocore.vendored.requests": { - "level": "DEBUG", - }, - "botocore.parsers": { - "level": "DEBUG", - }, - "urllib3.connectionpool": { - "handlers": ["boto"], - "level": "DEBUG", - }, - "clickhouse-disks": { - "handlers": ["clickhouse-disks"], - "level": "INFO", - }, - "zookeeper": { - "handlers": ["ch-backup"], - "level": "DEBUG", - }, - }, - }, "loguru": { "handlers": [ { @@ -222,7 +162,7 @@ def _as_seconds(t: str) -> int: 'enqueue': True, }, { - 'name': 'zookeper', + 'name': 'kazoo', "sink": "/var/log/ch-backup/ch-backup.log", "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}", 'level': 'DEBUG', diff --git a/ch_backup/logging.py b/ch_backup/logging.py index 9ab0e763..edb2bf65 100644 --- a/ch_backup/logging.py +++ b/ch_backup/logging.py @@ -2,14 +2,14 @@ Logging module. """ -import os -from typing import Any +import inspect import logging + import psutil -import inspect -from ch_backup.util import format_size, cached_property from loguru import logger +from ch_backup.util import format_size + class Filter: def __init__(self, name): @@ -19,7 +19,10 @@ def __call__(self, record): if 'name' in record.get('extra', {}): return record["extra"].get("name") == self._name - if record['name'] == self._name: + if 'name' not in record: + return False + + if record['name'][:len(self._name)] == self._name: record["extra"]['name'] = self._name return True return False @@ -28,7 +31,15 @@ def make_filter(name): return Filter(name) class InterceptHandler(logging.Handler): + """ + Helper class for logging interception. + """ def emit(self, record: logging.LogRecord) -> None: + """ + Intercept all records from the logging module and redirect them into loguru. + + The handler for loguru will be choosen based on module name. + """ # Get corresponding Loguru level if it exists. level: str | int try: diff --git a/ch_backup/zookeeper/zookeeper.py b/ch_backup/zookeeper/zookeeper.py index dd12e521..c5615669 100644 --- a/ch_backup/zookeeper/zookeeper.py +++ b/ch_backup/zookeeper/zookeeper.py @@ -8,7 +8,7 @@ from kazoo.exceptions import KazooException, NoNodeError from kazoo.handlers.threading import KazooTimeoutError -import ch_backup.logging as logging +import ch_backup.logging as logging from ..clickhouse.models import Table from ..util import retry @@ -30,7 +30,6 @@ def __init__(self, config: dict): certfile=config.get("cert"), keyfile=config.get("key"), ca=config.get("ca"), - logger=logging.getLogger("zookeeper"), randomize_hosts=config.get("randomize_hosts", True), ) self._zk_user = config.get("user") From f2aca39161b2da79720542b25d2e1ab6a2409754 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Mon, 25 Sep 2023 12:36:33 +0000 Subject: [PATCH 4/8] Fixes --- ch_backup/cli.py | 2 +- ch_backup/config.py | 63 +++++-------- ch_backup/logging.py | 91 +++++++++++++------ .../async_pipeline/base_pipeline/exec_pool.py | 5 +- ch_backup/zookeeper/zookeeper.py | 2 +- 5 files changed, 91 insertions(+), 72 deletions(-) diff --git a/ch_backup/cli.py b/ch_backup/cli.py index 0dd498da..4cad8872 100755 --- a/ch_backup/cli.py +++ b/ch_backup/cli.py @@ -127,7 +127,7 @@ def cli( if zk_hosts is not None: cfg["zookeeper"]["hosts"] = zk_hosts - logging.configure(cfg["logging"],cfg["loguru"]) + logging.configure(cfg["loguru"]) setup_environment(cfg["main"]) if not drop_privileges(cfg["main"]): diff --git a/ch_backup/config.py b/ch_backup/config.py index 75115f7d..a9a52abd 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -15,6 +15,16 @@ def _as_seconds(t: str) -> int: return int(parse_timespan(t)) +def _handler_configuration(name: str, sink: str, level: str) -> dict: + return { + "name": name, + "sink": sink, + "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[logger_name]}: {message}", + "level": level, + "enqueue": True, + } + + DEFAULT_CONFIG = { "clickhouse": { "data_path": "/var/lib/clickhouse", @@ -154,47 +164,22 @@ def _as_seconds(t: str) -> int: }, "loguru": { "handlers": [ - { - 'name': 'ch-backup', - "sink": "/var/log/ch-backup/ch-backup.log", - "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}", - 'level': 'DEBUG', - 'enqueue': True, - }, - { - 'name': 'kazoo', - "sink": "/var/log/ch-backup/ch-backup.log", - "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}", - 'level': 'DEBUG', - 'enqueue': True, - - }, - { - 'name': 'boto', - "sink": "/var/log/ch-backup/boto.log", - "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}", - 'level': 'DEBUG', - 'enqueue': True, - }, - { - 'name': 'clickhouse-disks', - "sink": "/var/log/ch-backup/clickhouse-disks.log", - "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}", - 'level': 'INFO', - 'enqueue': True, - }, - { - 'name': 'urllib3.connectionpool', - "sink": "/var/log/ch-backup/boto.log", - "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}", - 'level': 'DEBUG', - 'enqueue': True, - }, - - + _handler_configuration( + "ch-backup", "/var/log/ch-backup/ch-backup.log", "DEBUG" + ), + _handler_configuration( + "kazoo", "/var/log/ch-backup/ch-backup.log", "DEBUG" + ), + _handler_configuration("boto", "/var/log/ch-backup/ch-boto.log", "DEBUG"), + _handler_configuration( + "clickhouse-disks", "/var/log/ch-backup/clickhouse-disks.log", "INFO" + ), + _handler_configuration( + "urllib3.connectionpool", "/var/log/ch-backup/boto.log", "DEBUG" + ), ], "activation": [ - ("",True), + ("", True), ], }, "zookeeper": { diff --git a/ch_backup/logging.py b/ch_backup/logging.py index edb2bf65..69d69299 100644 --- a/ch_backup/logging.py +++ b/ch_backup/logging.py @@ -4,6 +4,7 @@ import inspect import logging +from typing import Any import psutil from loguru import logger @@ -12,36 +13,55 @@ class Filter: + """ + Filter for luguru handler. + """ + def __init__(self, name): self._name = name def __call__(self, record): - if 'name' in record.get('extra', {}): - return record["extra"].get("name") == self._name + """ + Filter callback to decide for each logged message whether it should be sent to the sink or not. + + If the log comes from ch-backup code then the logger name will be in `logger_name`. + If the log comes from other module and it caught by InterceptHandler then we filtering by the module_name. + """ + + if "logger_name" in record.get("extra", {}): + return record["extra"].get("logger_name") == self._name - if 'name' not in record: + if "module_name" not in record.get("extra", {}): return False - if record['name'][:len(self._name)] == self._name: - record["extra"]['name'] = self._name + if record["extra"]["module_name"][: len(self._name)] == self._name: + record["extra"]["logger_name"] = self._name return True return False - + + def make_filter(name): + """ + Factory for filter creation. + """ + return Filter(name) + class InterceptHandler(logging.Handler): """ Helper class for logging interception. """ + def emit(self, record: logging.LogRecord) -> None: """ Intercept all records from the logging module and redirect them into loguru. - The handler for loguru will be choosen based on module name. + The handler for loguru will be chosen based on module name. """ + # Get corresponding Loguru level if it exists. - level: str | int + level: int or str # type: ignore try: level = logger.level(record.levelname).name except ValueError: @@ -52,39 +72,48 @@ def emit(self, record: logging.LogRecord) -> None: while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__): frame = frame.f_back depth += 1 - logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) + + try: + frame_name = frame.f_globals["__name__"] # type: ignore + except KeyError: + frame_name = None + + logger.bind(module_name=frame_name).opt( + depth=depth, exception=record.exc_info + ).log(level, record.getMessage()) -def configure(config: dict, config_loguru: dict) -> None: +def configure(config_loguru: dict) -> None: """ Configure logger. """ + # Configure loguru. for handler in config_loguru["handlers"]: - handler['filter'] = make_filter(handler['name']) - del handler['name'] - - logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True) + handler["filter"] = make_filter(handler["name"]) + del handler["name"] + logger.configure( - handlers = config_loguru["handlers"], - activation = config_loguru["activation"] + handlers=config_loguru["handlers"], activation=config_loguru["activation"] ) + # Configure logging. + logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True) + def critical(msg, *args, **kwargs): """ Log a message with severity 'CRITICAL'. """ with_exception = kwargs.get("exc_info", False) - getLogger('ch-backup').opt(exception=with_exception).critical(msg, *args, **kwargs) + getLogger("ch-backup").opt(exception=with_exception).critical(msg, *args, **kwargs) -def error(msg, exc_info=False,*args, **kwargs): +def error(msg, *args, **kwargs): """ Log a message with severity 'ERROR'. """ with_exception = kwargs.get("exc_info", False) - getLogger('ch-backup').opt(exception=with_exception).error(msg, *args, **kwargs) - + getLogger("ch-backup").opt(exception=with_exception).error(msg, *args, **kwargs) def exception(msg, *args, **kwargs): @@ -93,15 +122,15 @@ def exception(msg, *args, **kwargs): """ with_exception = kwargs.get("exc_info", False) - getLogger('ch-backup').opt(exception=with_exception).debug(msg, *args, **kwargs) + getLogger("ch-backup").opt(exception=with_exception).debug(msg, *args, **kwargs) -def warning(msg, exc_info=False, *args, **kwargs): +def warning(msg, *args, **kwargs): """ Log a message with severity 'WARNING'. """ with_exception = kwargs.get("exc_info", False) - getLogger('ch-backup').opt(exception=with_exception).warning(msg, *args, **kwargs) + getLogger("ch-backup").opt(exception=with_exception).warning(msg, *args, **kwargs) def info(msg, *args, **kwargs): @@ -109,7 +138,7 @@ def info(msg, *args, **kwargs): Log a message with severity 'INFO'. """ with_exception = kwargs.get("exc_info", False) - getLogger('ch-backup').opt(exception=with_exception).info(msg, *args, **kwargs) + getLogger("ch-backup").opt(exception=with_exception).info(msg, *args, **kwargs) def debug(msg, *args, **kwargs): @@ -117,7 +146,7 @@ def debug(msg, *args, **kwargs): Log a message with severity 'DEBUG'. """ with_exception = kwargs.get("exc_info", False) - getLogger('ch-backup').opt(exception=with_exception).debug(msg, *args, **kwargs) + getLogger("ch-backup").opt(exception=with_exception).debug(msg, *args, **kwargs) def memory_usage(): @@ -146,7 +175,13 @@ def memory_usage(): ) except Exception: - warning("Unable to get memory usage",exc_info=True) + warning("Unable to get memory usage", exc_info=True) + + +# pylint: disable=invalid-name +def getLogger(name: str) -> Any: + """ + Get logger with specific name. + """ -def getLogger(name: str): - return logger.bind(name=name) + return logger.bind(logger_name=name) diff --git a/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py b/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py index e2a33284..cb6c071f 100644 --- a/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py +++ b/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py @@ -31,7 +31,7 @@ def submit(self, future_id: str, func: Callable, *args: Any, **kwargs: Any) -> N if future_id in self._futures: raise RuntimeError("Duplicate") future = self._pool.submit(func, *args, **kwargs) - future.add_done_callback(lambda _: logging.debug('Future {} completed', future_id)) # type: ignore[misc] + future.add_done_callback(lambda _: logging.debug("Future {} completed", future_id)) # type: ignore[misc] self._futures[future_id] = future def wait_all(self, keep_going: bool = False) -> None: @@ -55,8 +55,7 @@ def wait_all(self, keep_going: bool = False) -> None: ) continue logging.error( - 'Future "{}" generated an exception:', future_id - , exc_info=True + 'Future "{}" generated an exception:', future_id, exc_info=True ) raise self._futures = {} diff --git a/ch_backup/zookeeper/zookeeper.py b/ch_backup/zookeeper/zookeeper.py index c5615669..2b928363 100644 --- a/ch_backup/zookeeper/zookeeper.py +++ b/ch_backup/zookeeper/zookeeper.py @@ -8,7 +8,7 @@ from kazoo.exceptions import KazooException, NoNodeError from kazoo.handlers.threading import KazooTimeoutError -import ch_backup.logging as logging +from ch_backup import logging from ..clickhouse.models import Table from ..util import retry From 30dacafe68ec687b47d823eea93c8f7b036004db Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Mon, 25 Sep 2023 13:22:22 +0000 Subject: [PATCH 5/8] ch_backup/logging.py --- ch_backup/logging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ch_backup/logging.py b/ch_backup/logging.py index 69d69299..3d573afe 100644 --- a/ch_backup/logging.py +++ b/ch_backup/logging.py @@ -97,7 +97,7 @@ def configure(config_loguru: dict) -> None: ) # Configure logging. - logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True) + logging.basicConfig(handlers=[InterceptHandler()], level=0) def critical(msg, *args, **kwargs): From a53591f0d154db25126e42a04e7bcecee0d919a9 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Tue, 26 Sep 2023 09:19:19 +0000 Subject: [PATCH 6/8] Changes --- ch_backup/config.py | 48 ++++++++++++++++++++------------ ch_backup/logging.py | 35 +++++++++++------------ ch_backup/zookeeper/zookeeper.py | 2 ++ 3 files changed, 48 insertions(+), 37 deletions(-) diff --git a/ch_backup/config.py b/ch_backup/config.py index a9a52abd..4dce056b 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -163,24 +163,36 @@ def _handler_configuration(name: str, sink: str, level: str) -> dict: "disable_ssl_warnings": False, }, "loguru": { - "handlers": [ - _handler_configuration( - "ch-backup", "/var/log/ch-backup/ch-backup.log", "DEBUG" - ), - _handler_configuration( - "kazoo", "/var/log/ch-backup/ch-backup.log", "DEBUG" - ), - _handler_configuration("boto", "/var/log/ch-backup/ch-boto.log", "DEBUG"), - _handler_configuration( - "clickhouse-disks", "/var/log/ch-backup/clickhouse-disks.log", "INFO" - ), - _handler_configuration( - "urllib3.connectionpool", "/var/log/ch-backup/boto.log", "DEBUG" - ), - ], - "activation": [ - ("", True), - ], + "formaters":{ + "ch-backup" : "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[logger_name]}: {message}", + }, + "handlers": { + "ch-backup": { + "sink" : "/var/log/ch-backup/ch-backup.log", + "level": "DEBUG", + "format": "ch-backup", + }, + "zookeeper": { + "sink" : "/var/log/ch-backup/ch-backup.log", + "level": "DEBUG", + "format": "ch-backup", + }, + "botocore": { + "sink" : "/var/log/ch-backup/ch-backup.log", + "level": "INFO", + "format": "ch-backup", + }, + "clickhouse-disks": { + "sink" : "/var/log/ch-backup/clickhouse-disks.log", + "level": "DEBUG", + "format": "ch-backup", + }, + "urllib3.connectionpool": { + "sink" : "/var/log/ch-backup/boto.log", + "level": "DEBUG", + "format": "ch-backup", + }, + }, }, "zookeeper": { "secure": False, diff --git a/ch_backup/logging.py b/ch_backup/logging.py index 3d573afe..6de9ec71 100644 --- a/ch_backup/logging.py +++ b/ch_backup/logging.py @@ -27,16 +27,10 @@ def __call__(self, record): If the log comes from ch-backup code then the logger name will be in `logger_name`. If the log comes from other module and it caught by InterceptHandler then we filtering by the module_name. """ - + print(record, self._name) if "logger_name" in record.get("extra", {}): return record["extra"].get("logger_name") == self._name - if "module_name" not in record.get("extra", {}): - return False - - if record["extra"]["module_name"][: len(self._name)] == self._name: - record["extra"]["logger_name"] = self._name - return True return False @@ -72,13 +66,8 @@ def emit(self, record: logging.LogRecord) -> None: while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__): frame = frame.f_back depth += 1 - - try: - frame_name = frame.f_globals["__name__"] # type: ignore - except KeyError: - frame_name = None - - logger.bind(module_name=frame_name).opt( + print(record) + logger.bind(logger_name=record.name).opt( depth=depth, exception=record.exc_info ).log(level, record.getMessage()) @@ -88,12 +77,20 @@ def configure(config_loguru: dict) -> None: Configure logger. """ # Configure loguru. - for handler in config_loguru["handlers"]: - handler["filter"] = make_filter(handler["name"]) - del handler["name"] - + loguru_handlers = [] + + for name, value in config_loguru['handlers'].items(): + handler = { + "sink": value['sink'], + "level": value['level'], + "format": config_loguru['formaters'][value['format']], + "filter": make_filter(name), + "enqueue": True, + } + loguru_handlers.append(handler) + logger.configure( - handlers=config_loguru["handlers"], activation=config_loguru["activation"] + handlers=loguru_handlers, activation=[("",True)] ) # Configure logging. diff --git a/ch_backup/zookeeper/zookeeper.py b/ch_backup/zookeeper/zookeeper.py index 2b928363..6f20ef80 100644 --- a/ch_backup/zookeeper/zookeeper.py +++ b/ch_backup/zookeeper/zookeeper.py @@ -2,6 +2,7 @@ ZooKeeper-control classes module """ import os +import logging as py_logging from typing import Dict, Iterable, Tuple from kazoo.client import KazooClient @@ -30,6 +31,7 @@ def __init__(self, config: dict): certfile=config.get("cert"), keyfile=config.get("key"), ca=config.get("ca"), + logger=py_logging.getLogger("zookeeper"), randomize_hosts=config.get("randomize_hosts", True), ) self._zk_user = config.get("user") From 8e9973aeeb3abbbb8181630ec5db9038576bc992 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Tue, 26 Sep 2023 12:03:01 +0000 Subject: [PATCH 7/8] Review changes --- ch_backup/config.py | 33 ++++++++++++++++++-------------- ch_backup/logging.py | 25 ++++++++++++------------ ch_backup/zookeeper/zookeeper.py | 2 +- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/ch_backup/config.py b/ch_backup/config.py index 4dce056b..f3e14cce 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -163,32 +163,37 @@ def _handler_configuration(name: str, sink: str, level: str) -> dict: "disable_ssl_warnings": False, }, "loguru": { - "formaters":{ - "ch-backup" : "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[logger_name]}: {message}", - }, + "formatters": { + "ch-backup": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[logger_name]}: {message}", + }, "handlers": { - "ch-backup": { - "sink" : "/var/log/ch-backup/ch-backup.log", + "ch-backup": { + "sink": "/var/log/ch-backup/ch-backup.log", "level": "DEBUG", "format": "ch-backup", }, - "zookeeper": { - "sink" : "/var/log/ch-backup/ch-backup.log", + "zookeeper": { + "sink": "/var/log/ch-backup/ch-backup.log", "level": "DEBUG", "format": "ch-backup", }, - "botocore": { - "sink" : "/var/log/ch-backup/ch-backup.log", - "level": "INFO", + "botocore": { + "sink": "/var/log/ch-backup/boto.log", "format": "ch-backup", + "filter": { + "botocore": "INFO", + "botocore.endpoint": "DEBUG", + "botocore.vendored.requests": "DEBUG", + "botocore.parsers": "DEBUG", + }, }, - "clickhouse-disks": { - "sink" : "/var/log/ch-backup/clickhouse-disks.log", + "clickhouse-disks": { + "sink": "/var/log/ch-backup/clickhouse-disks.log", "level": "DEBUG", "format": "ch-backup", }, - "urllib3.connectionpool": { - "sink" : "/var/log/ch-backup/boto.log", + "urllib3.connectionpool": { + "sink": "/var/log/ch-backup/boto.log", "level": "DEBUG", "format": "ch-backup", }, diff --git a/ch_backup/logging.py b/ch_backup/logging.py index 6de9ec71..e617122d 100644 --- a/ch_backup/logging.py +++ b/ch_backup/logging.py @@ -27,10 +27,9 @@ def __call__(self, record): If the log comes from ch-backup code then the logger name will be in `logger_name`. If the log comes from other module and it caught by InterceptHandler then we filtering by the module_name. """ - print(record, self._name) - if "logger_name" in record.get("extra", {}): - return record["extra"].get("logger_name") == self._name + if "logger_name" not in record.get("extra", {}): + return record["extra"].get("logger_name") == self._name return False @@ -66,7 +65,7 @@ def emit(self, record: logging.LogRecord) -> None: while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__): frame = frame.f_back depth += 1 - print(record) + logger.bind(logger_name=record.name).opt( depth=depth, exception=record.exc_info ).log(level, record.getMessage()) @@ -79,22 +78,22 @@ def configure(config_loguru: dict) -> None: # Configure loguru. loguru_handlers = [] - for name, value in config_loguru['handlers'].items(): + for name, value in config_loguru["handlers"].items(): handler = { - "sink": value['sink'], - "level": value['level'], - "format": config_loguru['formaters'][value['format']], - "filter": make_filter(name), + "sink": value["sink"], + "format": config_loguru["formatters"][value["format"]], "enqueue": True, + "filter": value["filter"] if "filter" in value else make_filter(name), } + if "level" in value: + handler["level"] = value["level"] loguru_handlers.append(handler) - - logger.configure( - handlers=loguru_handlers, activation=[("",True)] - ) + + logger.configure(handlers=loguru_handlers, activation=[("", True)]) # Configure logging. logging.basicConfig(handlers=[InterceptHandler()], level=0) + logging.debug("Checkroot") def critical(msg, *args, **kwargs): diff --git a/ch_backup/zookeeper/zookeeper.py b/ch_backup/zookeeper/zookeeper.py index 6f20ef80..ebed089c 100644 --- a/ch_backup/zookeeper/zookeeper.py +++ b/ch_backup/zookeeper/zookeeper.py @@ -1,8 +1,8 @@ """ ZooKeeper-control classes module """ -import os import logging as py_logging +import os from typing import Dict, Iterable, Tuple from kazoo.client import KazooClient From 842e31842de0bf07ba23b18dfe1881cb39841ef7 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Tue, 26 Sep 2023 14:33:24 +0000 Subject: [PATCH 8/8] Fixes --- ch_backup/clickhouse/disks.py | 20 +++++++++----------- ch_backup/config.py | 10 ---------- ch_backup/logging.py | 7 +------ 3 files changed, 10 insertions(+), 27 deletions(-) diff --git a/ch_backup/clickhouse/disks.py b/ch_backup/clickhouse/disks.py index 87fd8a04..c2a4e6ee 100644 --- a/ch_backup/clickhouse/disks.py +++ b/ch_backup/clickhouse/disks.py @@ -10,7 +10,7 @@ import xmltodict -import ch_backup.logging as ch_logging +from ch_backup import logging from ch_backup.backup.layout import BackupLayout from ch_backup.backup.metadata import BackupMetadata, PartMetadata from ch_backup.clickhouse.config import ClickhouseConfig @@ -78,13 +78,13 @@ def __enter__(self): def __exit__(self, exc_type, *args, **kwargs): if exc_type is not None: - ch_logging.warning( + logging.warning( f'Omitting tmp cloud storage disk cleanup due to exception: "{exc_type}"' ) return False for disk in self._created_disks.values(): - ch_logging.debug(f"Removing tmp disk {disk.name}") + logging.debug(f"Removing tmp disk {disk.name}") try: os.remove(_get_config_path(self._config_dir, disk.name)) return True @@ -100,7 +100,7 @@ def _create_temporary_disk( source_endpoint: str, ) -> None: tmp_disk_name = _get_tmp_disk_name(disk.name) - ch_logging.debug(f"Creating tmp disk {tmp_disk_name}") + logging.debug(f"Creating tmp disk {tmp_disk_name}") disk_config = self._ch_config.config["storage_configuration"]["disks"][ disk.name ] @@ -129,9 +129,7 @@ def _create_temporary_disk( self._ch_ctl.reload_config() source_disk = self._ch_ctl.get_disk(tmp_disk_name) - ch_logging.debug( - f'Restoring Cloud Storage "shadow" data of disk "{disk.name}"' - ) + logging.debug(f'Restoring Cloud Storage "shadow" data of disk "{disk.name}"') self._backup_layout.download_cloud_storage_metadata( backup_meta, source_disk, disk.name ) @@ -180,23 +178,23 @@ def _copy_dir(from_disk: str, from_path: str, to_disk: str, to_path: str) -> Non common_args=[], command_args=["--diskFrom", from_disk, "--diskTo", to_disk, from_path, to_path], ) - ch_logging.warning(f"clickhouse-disks copy result: {os.linesep.join(result)}") + logging.warning(f"clickhouse-disks copy result: {os.linesep.join(result)}") def _exec(command: str, common_args: List[str], command_args: List[str]) -> List[str]: - logger = ch_logging.getLogger("clickhouse-disks") + ch_disks_logger = logging.getLogger("clickhouse-disks") command_args = [ "/usr/bin/clickhouse-disks", *common_args, command, *command_args, ] - ch_logging.debug(f'Executing "{" ".join(command_args)}"') + logging.debug(f'Executing "{" ".join(command_args)}"') with Popen(command_args, stdout=PIPE, stderr=PIPE, shell=False) as proc: while proc.poll() is None: for line in proc.stderr.readlines(): # type: ignore - logger.info(line.decode("utf-8").strip()) + ch_disks_logger.info(line.decode("utf-8").strip()) if proc.returncode != 0: raise ClickHouseDisksException( f"clickhouse-disks call failed with exitcode: {proc.returncode}" diff --git a/ch_backup/config.py b/ch_backup/config.py index f3e14cce..7d1b6e6e 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -15,16 +15,6 @@ def _as_seconds(t: str) -> int: return int(parse_timespan(t)) -def _handler_configuration(name: str, sink: str, level: str) -> dict: - return { - "name": name, - "sink": sink, - "format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[logger_name]}: {message}", - "level": level, - "enqueue": True, - } - - DEFAULT_CONFIG = { "clickhouse": { "data_path": "/var/lib/clickhouse", diff --git a/ch_backup/logging.py b/ch_backup/logging.py index e617122d..6049cd83 100644 --- a/ch_backup/logging.py +++ b/ch_backup/logging.py @@ -23,14 +23,9 @@ def __init__(self, name): def __call__(self, record): """ Filter callback to decide for each logged message whether it should be sent to the sink or not. - - If the log comes from ch-backup code then the logger name will be in `logger_name`. - If the log comes from other module and it caught by InterceptHandler then we filtering by the module_name. """ - if "logger_name" not in record.get("extra", {}): - return record["extra"].get("logger_name") == self._name - return False + return record["extra"].get("logger_name") == self._name def make_filter(name):