Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov committed Sep 25, 2023
1 parent 725d11d commit f2aca39
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 72 deletions.
2 changes: 1 addition & 1 deletion ch_backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def cli(
if zk_hosts is not None:
cfg["zookeeper"]["hosts"] = zk_hosts

logging.configure(cfg["logging"],cfg["loguru"])
logging.configure(cfg["loguru"])
setup_environment(cfg["main"])

if not drop_privileges(cfg["main"]):
Expand Down
63 changes: 24 additions & 39 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ def _as_seconds(t: str) -> int:
return int(parse_timespan(t))


def _handler_configuration(name: str, sink: str, level: str) -> dict:
return {
"name": name,
"sink": sink,
"format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[logger_name]}: {message}",
"level": level,
"enqueue": True,
}


DEFAULT_CONFIG = {
"clickhouse": {
"data_path": "/var/lib/clickhouse",
Expand Down Expand Up @@ -154,47 +164,22 @@ def _as_seconds(t: str) -> int:
},
"loguru": {
"handlers": [
{
'name': 'ch-backup',
"sink": "/var/log/ch-backup/ch-backup.log",
"format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}",
'level': 'DEBUG',
'enqueue': True,
},
{
'name': 'kazoo',
"sink": "/var/log/ch-backup/ch-backup.log",
"format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}",
'level': 'DEBUG',
'enqueue': True,

},
{
'name': 'boto',
"sink": "/var/log/ch-backup/boto.log",
"format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}",
'level': 'DEBUG',
'enqueue': True,
},
{
'name': 'clickhouse-disks',
"sink": "/var/log/ch-backup/clickhouse-disks.log",
"format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}",
'level': 'INFO',
'enqueue': True,
},
{
'name': 'urllib3.connectionpool',
"sink": "/var/log/ch-backup/boto.log",
"format": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[name]}: {message}",
'level': 'DEBUG',
'enqueue': True,
},


_handler_configuration(
"ch-backup", "/var/log/ch-backup/ch-backup.log", "DEBUG"
),
_handler_configuration(
"kazoo", "/var/log/ch-backup/ch-backup.log", "DEBUG"
),
_handler_configuration("boto", "/var/log/ch-backup/ch-boto.log", "DEBUG"),
_handler_configuration(
"clickhouse-disks", "/var/log/ch-backup/clickhouse-disks.log", "INFO"
),
_handler_configuration(
"urllib3.connectionpool", "/var/log/ch-backup/boto.log", "DEBUG"
),
],
"activation": [
("",True),
("", True),
],
},
"zookeeper": {
Expand Down
91 changes: 63 additions & 28 deletions ch_backup/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import inspect
import logging
from typing import Any

import psutil
from loguru import logger
Expand All @@ -12,36 +13,55 @@


class Filter:
"""
Filter for luguru handler.
"""

def __init__(self, name):
self._name = name

def __call__(self, record):
if 'name' in record.get('extra', {}):
return record["extra"].get("name") == self._name
"""
Filter callback to decide for each logged message whether it should be sent to the sink or not.
If the log comes from ch-backup code then the logger name will be in `logger_name`.
If the log comes from other module and it caught by InterceptHandler then we filtering by the module_name.
"""

if "logger_name" in record.get("extra", {}):
return record["extra"].get("logger_name") == self._name

if 'name' not in record:
if "module_name" not in record.get("extra", {}):
return False

if record['name'][:len(self._name)] == self._name:
record["extra"]['name'] = self._name
if record["extra"]["module_name"][: len(self._name)] == self._name:
record["extra"]["logger_name"] = self._name
return True
return False



def make_filter(name):
"""
Factory for filter creation.
"""

return Filter(name)


class InterceptHandler(logging.Handler):
"""
Helper class for logging interception.
"""

def emit(self, record: logging.LogRecord) -> None:
"""
Intercept all records from the logging module and redirect them into loguru.
The handler for loguru will be choosen based on module name.
The handler for loguru will be chosen based on module name.
"""

# Get corresponding Loguru level if it exists.
level: str | int
level: int or str # type: ignore
try:
level = logger.level(record.levelname).name
except ValueError:
Expand All @@ -52,39 +72,48 @@ def emit(self, record: logging.LogRecord) -> None:
while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__):
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())

try:
frame_name = frame.f_globals["__name__"] # type: ignore
except KeyError:
frame_name = None

logger.bind(module_name=frame_name).opt(
depth=depth, exception=record.exc_info
).log(level, record.getMessage())


def configure(config: dict, config_loguru: dict) -> None:
def configure(config_loguru: dict) -> None:
"""
Configure logger.
"""
# Configure loguru.
for handler in config_loguru["handlers"]:
handler['filter'] = make_filter(handler['name'])
del handler['name']

logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
handler["filter"] = make_filter(handler["name"])
del handler["name"]

logger.configure(
handlers = config_loguru["handlers"],
activation = config_loguru["activation"]
handlers=config_loguru["handlers"], activation=config_loguru["activation"]
)

# Configure logging.
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)


def critical(msg, *args, **kwargs):
"""
Log a message with severity 'CRITICAL'.
"""
with_exception = kwargs.get("exc_info", False)
getLogger('ch-backup').opt(exception=with_exception).critical(msg, *args, **kwargs)
getLogger("ch-backup").opt(exception=with_exception).critical(msg, *args, **kwargs)


def error(msg, exc_info=False,*args, **kwargs):
def error(msg, *args, **kwargs):
"""
Log a message with severity 'ERROR'.
"""
with_exception = kwargs.get("exc_info", False)
getLogger('ch-backup').opt(exception=with_exception).error(msg, *args, **kwargs)

getLogger("ch-backup").opt(exception=with_exception).error(msg, *args, **kwargs)


def exception(msg, *args, **kwargs):
Expand All @@ -93,31 +122,31 @@ def exception(msg, *args, **kwargs):
"""

with_exception = kwargs.get("exc_info", False)
getLogger('ch-backup').opt(exception=with_exception).debug(msg, *args, **kwargs)
getLogger("ch-backup").opt(exception=with_exception).debug(msg, *args, **kwargs)


def warning(msg, exc_info=False, *args, **kwargs):
def warning(msg, *args, **kwargs):
"""
Log a message with severity 'WARNING'.
"""
with_exception = kwargs.get("exc_info", False)
getLogger('ch-backup').opt(exception=with_exception).warning(msg, *args, **kwargs)
getLogger("ch-backup").opt(exception=with_exception).warning(msg, *args, **kwargs)


def info(msg, *args, **kwargs):
"""
Log a message with severity 'INFO'.
"""
with_exception = kwargs.get("exc_info", False)
getLogger('ch-backup').opt(exception=with_exception).info(msg, *args, **kwargs)
getLogger("ch-backup").opt(exception=with_exception).info(msg, *args, **kwargs)


def debug(msg, *args, **kwargs):
"""
Log a message with severity 'DEBUG'.
"""
with_exception = kwargs.get("exc_info", False)
getLogger('ch-backup').opt(exception=with_exception).debug(msg, *args, **kwargs)
getLogger("ch-backup").opt(exception=with_exception).debug(msg, *args, **kwargs)


def memory_usage():
Expand Down Expand Up @@ -146,7 +175,13 @@ def memory_usage():
)

except Exception:
warning("Unable to get memory usage",exc_info=True)
warning("Unable to get memory usage", exc_info=True)


# pylint: disable=invalid-name
def getLogger(name: str) -> Any:
"""
Get logger with specific name.
"""

def getLogger(name: str):
return logger.bind(name=name)
return logger.bind(logger_name=name)
5 changes: 2 additions & 3 deletions ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def submit(self, future_id: str, func: Callable, *args: Any, **kwargs: Any) -> N
if future_id in self._futures:
raise RuntimeError("Duplicate")
future = self._pool.submit(func, *args, **kwargs)
future.add_done_callback(lambda _: logging.debug('Future {} completed', future_id)) # type: ignore[misc]
future.add_done_callback(lambda _: logging.debug("Future {} completed", future_id)) # type: ignore[misc]
self._futures[future_id] = future

def wait_all(self, keep_going: bool = False) -> None:
Expand All @@ -55,8 +55,7 @@ def wait_all(self, keep_going: bool = False) -> None:
)
continue
logging.error(
'Future "{}" generated an exception:', future_id
, exc_info=True
'Future "{}" generated an exception:', future_id, exc_info=True
)
raise
self._futures = {}
Expand Down
2 changes: 1 addition & 1 deletion ch_backup/zookeeper/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from kazoo.exceptions import KazooException, NoNodeError
from kazoo.handlers.threading import KazooTimeoutError

import ch_backup.logging as logging
from ch_backup import logging

from ..clickhouse.models import Table
from ..util import retry
Expand Down

0 comments on commit f2aca39

Please sign in to comment.