diff --git a/admin/c2cgeoportal_admin/schemas/treegroup.py b/admin/c2cgeoportal_admin/schemas/treegroup.py index e80b66cfbf..139ec04f4c 100644 --- a/admin/c2cgeoportal_admin/schemas/treegroup.py +++ b/admin/c2cgeoportal_admin/schemas/treegroup.py @@ -42,7 +42,7 @@ from c2cgeoportal_commons.lib.literal import Literal from c2cgeoportal_commons.models.main import LayergroupTreeitem, TreeGroup, TreeItem -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) # Correspondence between TreeItem.item_type and route table segment ITEM_TYPE_ROUTE_MAP = { @@ -149,7 +149,7 @@ def treeitem_edit_url(request: pyramid.request.Request, treeitem: TreeGroup) -> return None table = ITEM_TYPE_ROUTE_MAP.get(treeitem.item_type, None) if table is None: - LOG.warning("%s not found in ITEM_TYPE_ROUTE_MAP", treeitem.item_type) + _LOG.warning("%s not found in ITEM_TYPE_ROUTE_MAP", treeitem.item_type) return None return request.route_url( # type: ignore "c2cgeoform_item", diff --git a/admin/c2cgeoportal_admin/views/ogc_servers.py b/admin/c2cgeoportal_admin/views/ogc_servers.py index ca7ccda540..d8310261e1 100644 --- a/admin/c2cgeoportal_admin/views/ogc_servers.py +++ b/admin/c2cgeoportal_admin/views/ogc_servers.py @@ -62,7 +62,7 @@ base_schema = GeoFormSchemaNode(OGCServer, widget=FormWidget(fields_template="ogcserver_fields")) base_schema.add_unique_validator(OGCServer.name, OGCServer.id) -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) @view_defaults(match_param="table=ogc_servers") @@ -252,8 +252,8 @@ def update_cache() -> None: timeout=60, ) if not response.ok: - LOG.error("Error while cleaning the OGC server cache:\n%s", response.text) + _LOG.error("Error while cleaning the OGC server cache:\n%s", response.text) threading.Thread(target=update_cache).start() except Exception: - LOG.error("Error on cleaning the OGC server cache", exc_info=True) + _LOG.error("Error on cleaning the OGC server cache", exc_info=True) diff --git a/commons/c2cgeoportal_commons/alembic/env.py b/commons/c2cgeoportal_commons/alembic/env.py index f2a9463425..1b4b61f211 100755 --- a/commons/c2cgeoportal_commons/alembic/env.py +++ b/commons/c2cgeoportal_commons/alembic/env.py @@ -37,7 +37,7 @@ from plaster.loaders import setup_logging from sqlalchemy import engine_from_config, pool -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) # Interpret the config file for Python logging. diff --git a/commons/c2cgeoportal_commons/lib/email_.py b/commons/c2cgeoportal_commons/lib/email_.py index 70486d7251..5a667b7639 100644 --- a/commons/c2cgeoportal_commons/lib/email_.py +++ b/commons/c2cgeoportal_commons/lib/email_.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -42,7 +42,7 @@ class HTTPInternalServerError(BaseException): # type: ignore """Fallback class.""" -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) def send_email_config(settings: dict[str, Any], email_config_name: str, email: str, **kwargs: Any) -> None: @@ -59,7 +59,7 @@ def send_email_config(settings: dict[str, Any], email_config_name: str, email: s smtp_config, ) except gaierror: - LOG.exception("Unable to send the email.") + _LOG.exception("Unable to send the email.") raise HTTPInternalServerError("See server logs for details") # pylint: disable=raise-missing-from diff --git a/commons/c2cgeoportal_commons/lib/url.py b/commons/c2cgeoportal_commons/lib/url.py index effa219067..76a26a3670 100644 --- a/commons/c2cgeoportal_commons/lib/url.py +++ b/commons/c2cgeoportal_commons/lib/url.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -32,7 +32,7 @@ from pyramid.request import Request -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class Url: @@ -55,7 +55,7 @@ def __init__(self, url: Optional[str] = None): try: self._port = url_split.port except ValueError as error: - LOG.debug(error) + _LOG.debug(error) self.path = url_split.path self.query = dict(urllib.parse.parse_qsl(url_split.query)) self.fragment = url_split.fragment @@ -94,7 +94,7 @@ def netloc(self, netloc: str) -> None: if len(netloc_split) == 2: allowed = re.compile(r"^[0-9]+$") if not allowed.match(netloc_split[1]): - LOG.debug("The netloc '%s' contains invalid port", netloc) + _LOG.debug("The netloc '%s' contains invalid port", netloc) self._port = None else: self._port = int(netloc_split[1]) diff --git a/commons/c2cgeoportal_commons/models/__init__.py b/commons/c2cgeoportal_commons/models/__init__.py index 2c9013f40d..fdf721692a 100644 --- a/commons/c2cgeoportal_commons/models/__init__.py +++ b/commons/c2cgeoportal_commons/models/__init__.py @@ -57,7 +57,7 @@ class BaseType(sqlalchemy.ext.declarative.DeclarativeMeta, type): DBSessions: dict[str, sqlalchemy.orm.scoping.scoped_session[sqlalchemy.orm.Session]] = {} -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class InvalidateCacheEvent: @@ -77,4 +77,4 @@ def _cache_invalidate_cb() -> None: zope.event.notify(InvalidateCacheEvent()) except ModuleNotFoundError: - LOG.error("c2cwsgiutils broadcast not found") + _LOG.error("c2cwsgiutils broadcast not found") diff --git a/commons/c2cgeoportal_commons/models/main.py b/commons/c2cgeoportal_commons/models/main.py index 8d89f99478..66f6541f8b 100644 --- a/commons/c2cgeoportal_commons/models/main.py +++ b/commons/c2cgeoportal_commons/models/main.py @@ -91,7 +91,7 @@ def state_str(state: Any) -> str: # information sqlalchemy.orm.base.state_str = state_str -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) _schema: str = config["schema"] or "main" _srid: int = cast(int, config["srid"]) or 3857 diff --git a/commons/c2cgeoportal_commons/models/static.py b/commons/c2cgeoportal_commons/models/static.py index 6dcbd6f9d1..b61d1a279c 100644 --- a/commons/c2cgeoportal_commons/models/static.py +++ b/commons/c2cgeoportal_commons/models/static.py @@ -66,7 +66,7 @@ def __init__(self, *args: Any, **kwargs: Any): RelationSelect2Widget = GenericClass # type: ignore[misc,assignment] -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) _schema: str = config["schema_static"] or "static" diff --git a/docker/qgisserver/geomapfish_qgisserver/__init__.py b/docker/qgisserver/geomapfish_qgisserver/__init__.py index 685afdf24c..b0085139ec 100644 --- a/docker/qgisserver/geomapfish_qgisserver/__init__.py +++ b/docker/qgisserver/geomapfish_qgisserver/__init__.py @@ -15,7 +15,7 @@ from . import gmf_logging -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) def serverClassFactory( # pylint: disable=invalid-name @@ -32,12 +32,12 @@ def serverClassFactory( # pylint: disable=invalid-name level=Qgis.Critical, ) - LOG.info("Starting GeoMapFish access restriction...") + _LOG.info("Starting GeoMapFish access restriction...") try: from .accesscontrol import GeoMapFishAccessControl # pylint: disable=import-outside-toplevel return GeoMapFishAccessControl(serverIface) except Exception: # pylint: disable=broad-exception-caught - LOG.error("Cannot setup GeoMapFishAccessControl", exc_info=True) + _LOG.error("Cannot setup GeoMapFishAccessControl", exc_info=True) return None diff --git a/docker/qgisserver/geomapfish_qgisserver/accesscontrol.py b/docker/qgisserver/geomapfish_qgisserver/accesscontrol.py index ed44fee3f0..db773ae16b 100644 --- a/docker/qgisserver/geomapfish_qgisserver/accesscontrol.py +++ b/docker/qgisserver/geomapfish_qgisserver/accesscontrol.py @@ -41,13 +41,13 @@ if TYPE_CHECKING: from c2cgeoportal_commons.models import main # pylint: disable=ungrouped-imports,useless-suppression -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) def create_session_factory(url: str, configuration: dict[str, Any]) -> sessionmaker: configure_mappers() db_match = re.match(".*(@[^@]+)$", url) - LOG.info( + _LOG.info( "Connect to the database: ***%s, with config: %s", db_match.group(1) if db_match else "", ", ".join([f"{e[0]}={e[1]}" for e in configuration.items()]), @@ -97,7 +97,7 @@ def __init__(self, server_iface: qgis.server.QgsServerInterface): DBSession, ) - LOG.info("Use OGC server named '%s'.", os.environ["GEOMAPFISH_OGCSERVER"]) + _LOG.info("Use OGC server named '%s'.", os.environ["GEOMAPFISH_OGCSERVER"]) self.initialized = True elif "GEOMAPFISH_ACCESSCONTROL_CONFIG" in os.environ: self.single = False @@ -110,7 +110,7 @@ def __init__(self, server_iface: qgis.server.QgsServerInterface): server_iface, map_config["ogc_server"], map_, config.get("srid"), DBSession ) self.ogcserver_accesscontrols[map_] = map_config - LOG.info("Use config '%s'.", os.environ["GEOMAPFISH_ACCESSCONTROL_CONFIG"]) + _LOG.info("Use config '%s'.", os.environ["GEOMAPFISH_ACCESSCONTROL_CONFIG"]) self.initialized = True elif "GEOMAPFISH_ACCESSCONTROL_BASE_URL" in os.environ: self.ogcserver_accesscontrols = {} @@ -133,14 +133,14 @@ def __init__(self, server_iface: qgis.server.QgsServerInterface): ) if errors: - LOG.warning( + _LOG.warning( "Ignoring OGC server '%s', get error on parsing URL:\n%s", ogcserver.name, "\n".join(errors), ) continue if url is None: - LOG.warning("Ignoring OGC server '%s', the URL is None", ogcserver.name) + _LOG.warning("Ignoring OGC server '%s', the URL is None", ogcserver.name) continue if ( base_url.scheme == url.scheme @@ -151,12 +151,12 @@ def __init__(self, server_iface: qgis.server.QgsServerInterface): if "map" not in query: if single_ogc_server is None: single_ogc_server = ogcserver - LOG.debug( + _LOG.debug( "OGC server '%s', 'map' is not in the parameters => single server?", ogcserver.name, ) else: - LOG.error( + _LOG.error( "OGC server '%s', 'map' is not in the parameters and we already " "have a single OCG server '%s'", ogcserver.name, @@ -176,9 +176,9 @@ def __init__(self, server_iface: qgis.server.QgsServerInterface): ogcserver=ogcserver, ), } - LOG.info("OGC server '%s' registered for map", ogcserver.name) + _LOG.info("OGC server '%s' registered for map", ogcserver.name) else: - LOG.debug( + _LOG.debug( "Ignoring OGC server '%s', Don't match the base URL '%s' and '%s'", ogcserver.name, base_url, @@ -186,13 +186,13 @@ def __init__(self, server_iface: qgis.server.QgsServerInterface): ) if self.ogcserver_accesscontrols and single_ogc_server is not None: if os.environ.get("QGIS_PROJECT_FILE"): - LOG.error( + _LOG.error( "We have OGC servers with and without parameter MAP and a value in " "QGIS_PROJECT_FILE, fallback to single OGC server mode." ) self.ogcserver_accesscontrols = {} else: - LOG.error( + _LOG.error( "We have OGC servers with and without parameter MAP but no value in " "QGIS_PROJECT_FILE, fallback to multiple OGC server mode." ) @@ -208,20 +208,20 @@ def __init__(self, server_iface: qgis.server.QgsServerInterface): single_ogc_server, ) - LOG.info("Use OGC server named '%s'.", single_ogc_server.name) + _LOG.info("Use OGC server named '%s'.", single_ogc_server.name) else: self.single = False self.initialized = True finally: session.close() else: - LOG.error( + _LOG.error( "The environment variable 'GEOMAPFISH_OGCSERVER', 'GEOMAPFISH_ACCESSCONTROL_CONFIG' " "or 'GEOMAPFISH_ACCESSCONTROL_BASE_URL' should be defined.", ) except Exception: # pylint: disable=broad-except - LOG.error("Cannot setup GeoMapFishAccessControl", exc_info=True) + _LOG.error("Cannot setup GeoMapFishAccessControl", exc_info=True) server_iface.registerAccessControl(self, int(os.environ.get("GEOMAPFISH_POSITION", 100))) @@ -252,22 +252,22 @@ def layerFilterSubsetString(self, layer: QgsVectorLayer) -> Optional[str]: # py """Return an additional subset string (typically SQL) filter.""" try: if not self.initialized: - LOG.error("Call on uninitialized plugin") + _LOG.error("Call on uninitialized plugin") return "0" return self.get_ogcserver_accesscontrol().layerFilterSubsetString(layer) except Exception: - LOG.error("Unhandled error", exc_info=True) + _LOG.error("Unhandled error", exc_info=True) raise def layerFilterExpression(self, layer: QgsVectorLayer) -> Optional[str]: # pylint: disable=invalid-name """Return an additional expression filter.""" try: if not self.initialized: - LOG.error("Call on uninitialized plugin") + _LOG.error("Call on uninitialized plugin") return "0" return self.get_ogcserver_accesscontrol().layerFilterExpression(layer) except Exception: - LOG.error("Unhandled error", exc_info=True) + _LOG.error("Unhandled error", exc_info=True) raise def layerPermissions( # pylint: disable=invalid-name @@ -276,13 +276,13 @@ def layerPermissions( # pylint: disable=invalid-name """Return the layer rights.""" try: if not self.initialized: - LOG.error("Call on uninitialized plugin") + _LOG.error("Call on uninitialized plugin") no_rights = QgsAccessControlFilter.LayerPermissions() no_rights.canRead = no_rights.canInsert = no_rights.canUpdate = no_rights.canDelete = False return no_rights return self.get_ogcserver_accesscontrol().layerPermissions(layer) except Exception: - LOG.error("Unhandled error", exc_info=True) + _LOG.error("Unhandled error", exc_info=True) raise def authorizedLayerAttributes( # pylint: disable=invalid-name @@ -291,32 +291,32 @@ def authorizedLayerAttributes( # pylint: disable=invalid-name """Return the authorized layer attributes.""" try: if not self.initialized: - LOG.error("Call on uninitialized plugin") + _LOG.error("Call on uninitialized plugin") return [] return self.get_ogcserver_accesscontrol().authorizedLayerAttributes(layer, attributes) except Exception: - LOG.error("Unhandled error", exc_info=True) + _LOG.error("Unhandled error", exc_info=True) raise def allowToEdit(self, layer: QgsVectorLayer, feature: QgsFeature) -> bool: # pylint: disable=invalid-name """Are we authorize to modify the following geometry.""" try: if not self.initialized: - LOG.error("Call on uninitialized plugin") + _LOG.error("Call on uninitialized plugin") return False return self.get_ogcserver_accesscontrol().allowToEdit(layer, feature) except Exception: - LOG.error("Unhandled error", exc_info=True) + _LOG.error("Unhandled error", exc_info=True) raise def cacheKey(self) -> str: # pylint: disable=invalid-name try: if not self.initialized: - LOG.error("Call on uninitialized plugin") + _LOG.error("Call on uninitialized plugin") return str(random.randrange(1000000)) # nosec return self.get_ogcserver_accesscontrol().cacheKey() except Exception: - LOG.error("Unhandled error", exc_info=True) + _LOG.error("Unhandled error", exc_info=True) raise @@ -353,7 +353,7 @@ def __init__( @zope.event.classhandler.handler(InvalidateCacheEvent) def handle(_: InvalidateCacheEvent) -> None: - LOG.info("=== invalidate ===") + _LOG.info("=== invalidate ===") self._init(ogcserver_name) self._init(ogcserver_name) @@ -379,13 +379,13 @@ def _init(self, ogcserver_name: str) -> None: finally: session.close() if self.ogcserver is None: - LOG.error( + _LOG.error( "No OGC server found for '%s', project: '%s' => no rights", ogcserver_name, self.map_file, ) except Exception: # pylint: disable=broad-except - LOG.error("Cannot setup OGCServerAccessControl", exc_info=True) + _LOG.error("Cannot setup OGCServerAccessControl", exc_info=True) def ogc_layer_name(self, layer: QgsVectorLayer) -> str: use_layer_id, _ = self.project().readBoolEntry("WMSUseLayerIDs", "/", False) @@ -437,7 +437,7 @@ def browse(path: list[str], node: QgsLayerTreeNode) -> None: browse([], self.project().layerTreeRoot()) for ogc_layer_name, ancestors in nodes.items(): - LOG.debug("QGIS layer: %s", ogc_layer_name) + _LOG.debug("QGIS layer: %s", ogc_layer_name) # Transform ancestor names in LayerWMS instances layers: dict[str, list[LayerWMS]] = {} @@ -452,13 +452,13 @@ def browse(path: list[str], node: QgsLayerTreeNode) -> None: for ancestor in ancestors: if ancestor in layer.layer.split(","): found = True - LOG.debug("GeoMapFish layer: name: %s, layer: %s", layer.name, layer.layer) + _LOG.debug("GeoMapFish layer: name: %s, layer: %s", layer.name, layer.layer) layers.setdefault(ogc_layer_name, []).append(layer) if not found: - LOG.info("Rejected GeoMapFish layer: name: %s, layer: %s", layer.name, layer.layer) + _LOG.info("Rejected GeoMapFish layer: name: %s, layer: %s", layer.name, layer.layer) session.expunge_all() - LOG.debug( + _LOG.debug( "layers: %s", json.dumps( {k: [layer.name for layer in v] for k, v in layers.items()}, sort_keys=True, indent=4 @@ -485,7 +485,7 @@ def get_roles(self, session: Session) -> Union[str, list["main.Role"]]: roles = session.query(Role).filter(Role.id.in_(parameters.get("ROLE_IDS").split(","))).all() - LOG.debug("Roles: %s", ",".join([role.name for role in roles]) if roles else "-") + _LOG.debug("Roles: %s", ",".join([role.name for role in roles]) if roles else "-") return roles @staticmethod @@ -582,11 +582,11 @@ def layerFilterSubsetString(self, layer: QgsVectorLayer) -> Optional[str]: # py Returns an additional subset string (typically SQL) filter. """ - LOG.debug("layerFilterSubsetString %s %s", layer.name(), layer.dataProvider().storageType()) + _LOG.debug("layerFilterSubsetString %s %s", layer.name(), layer.dataProvider().storageType()) if self.ogcserver is None: parameters = self.serverInterface().requestHandler().parameterMap() - LOG.warning( + _LOG.warning( "Call on uninitialized plugin, map: %s", os.environ.get("QGIS_PROJECT_FILE", parameters.get("MAP")), ) @@ -594,7 +594,7 @@ def layerFilterSubsetString(self, layer: QgsVectorLayer) -> Optional[str]: # py try: if layer.dataProvider().storageType() not in self.SUBSETSTRING_TYPE: - LOG.debug("layerFilterSubsetString not in type") + _LOG.debug("layerFilterSubsetString not in type") return None session = self.DBSession() @@ -603,10 +603,10 @@ def layerFilterSubsetString(self, layer: QgsVectorLayer) -> Optional[str]: # py finally: session.close() if access is Access.FULL: - LOG.debug("layerFilterSubsetString no area") + _LOG.debug("layerFilterSubsetString no area") return None if access is Access.NO: - LOG.debug("layerFilterSubsetString not allowed") + _LOG.debug("layerFilterSubsetString not allowed") return "0" area = f"ST_GeomFromText('{area.wkt}', {self.srid})" @@ -616,10 +616,10 @@ def layerFilterSubsetString(self, layer: QgsVectorLayer) -> Optional[str]: # py "ST_intersects(" f"{QgsDataSourceUri(layer.dataProvider().dataSourceUri()).geometryColumn()}, {area})" ) - LOG.debug("layerFilterSubsetString filter: %s", result) + _LOG.debug("layerFilterSubsetString filter: %s", result) return result except Exception: - LOG.error("Cannot run layerFilterSubsetString", exc_info=True) + _LOG.error("Cannot run layerFilterSubsetString", exc_info=True) raise def layerFilterExpression(self, layer: QgsVectorLayer) -> Optional[str]: # pylint: disable=invalid-name @@ -627,11 +627,11 @@ def layerFilterExpression(self, layer: QgsVectorLayer) -> Optional[str]: # pyli Returns an additional expression filter. """ - LOG.debug("layerFilterExpression %s %s", layer.name(), layer.dataProvider().storageType()) + _LOG.debug("layerFilterExpression %s %s", layer.name(), layer.dataProvider().storageType()) if self.ogcserver is None: parameters = self.serverInterface().requestHandler().parameterMap() - LOG.warning( + _LOG.warning( "Call on uninitialized plugin, map: %s", os.environ.get("QGIS_PROJECT_FILE", parameters.get("MAP")), ) @@ -639,7 +639,7 @@ def layerFilterExpression(self, layer: QgsVectorLayer) -> Optional[str]: # pyli try: if layer.dataProvider().storageType() in self.SUBSETSTRING_TYPE: - LOG.debug("layerFilterExpression not in type") + _LOG.debug("layerFilterExpression not in type") return None session = self.DBSession() @@ -648,10 +648,10 @@ def layerFilterExpression(self, layer: QgsVectorLayer) -> Optional[str]: # pyli finally: session.close() if access is Access.FULL: - LOG.debug("layerFilterExpression no area") + _LOG.debug("layerFilterExpression no area") return None if access is Access.NO: - LOG.debug("layerFilterExpression not allowed") + _LOG.debug("layerFilterExpression not allowed") return "0" result = ( @@ -659,10 +659,10 @@ def layerFilterExpression(self, layer: QgsVectorLayer) -> Optional[str]: # pyli f"'{layer.crs().authid()}'))" ) - LOG.debug("layerFilterExpression filter: %s", result) + _LOG.debug("layerFilterExpression filter: %s", result) return result except Exception: - LOG.error("Cannot run layerFilterExpression", exc_info=True) + _LOG.error("Cannot run layerFilterExpression", exc_info=True) raise def layerPermissions( # pylint: disable=invalid-name @@ -672,7 +672,7 @@ def layerPermissions( # pylint: disable=invalid-name Returns the layer rights. """ - LOG.debug("layerPermissions %s", layer.name()) + _LOG.debug("layerPermissions %s", layer.name()) try: rights = QgsAccessControlFilter.LayerPermissions() @@ -680,7 +680,7 @@ def layerPermissions( # pylint: disable=invalid-name if self.ogcserver is None: parameters = self.serverInterface().requestHandler().parameterMap() - LOG.warning( + _LOG.warning( "Call on uninitialized plugin, map: %s", os.environ.get("QGIS_PROJECT_FILE", parameters.get("MAP")), ) @@ -709,7 +709,7 @@ def layerPermissions( # pylint: disable=invalid-name return rights except Exception: - LOG.error("Cannot run layerPermissions", exc_info=True) + _LOG.error("Cannot run layerPermissions", exc_info=True) raise def authorizedLayerAttributes( # pylint: disable=invalid-name @@ -722,7 +722,7 @@ def authorizedLayerAttributes( # pylint: disable=invalid-name if self.ogcserver is None: parameters = self.serverInterface().requestHandler().parameterMap() - LOG.warning( + _LOG.warning( "Call on uninitialized plugin, map: %s", os.environ.get("QGIS_PROJECT_FILE", parameters.get("MAP")), ) @@ -732,11 +732,11 @@ def authorizedLayerAttributes( # pylint: disable=invalid-name def allowToEdit(self, layer: QgsVectorLayer, feature: QgsFeature) -> bool: # pylint: disable=invalid-name """Are we authorize to modify the following geometry.""" - LOG.debug("allowToEdit") + _LOG.debug("allowToEdit") if self.ogcserver is None: parameters = self.serverInterface().requestHandler().parameterMap() - LOG.warning( + _LOG.warning( "Call on uninitialized plugin, map: %s", os.environ.get("QGIS_PROJECT_FILE", parameters.get("MAP")), ) @@ -749,15 +749,15 @@ def allowToEdit(self, layer: QgsVectorLayer, feature: QgsFeature) -> bool: # py finally: session.close() if access is Access.FULL: - LOG.debug("layerFilterExpression no area") + _LOG.debug("layerFilterExpression no area") return True if access is Access.NO: - LOG.debug("layerFilterExpression not allowed") + _LOG.debug("layerFilterExpression not allowed") return False return area.intersects(wkb.loads(feature.geometry().asWkb().data())) except Exception: - LOG.error("Cannot run allowToEdit", exc_info=True) + _LOG.error("Cannot run allowToEdit", exc_info=True) raise def cacheKey(self) -> str: # pylint: disable=invalid-name @@ -770,7 +770,7 @@ def cacheKey(self) -> str: # pylint: disable=invalid-name if roles == "ROOT": return f"{self.serverInterface().getEnv('HTTP_HOST')}-ROOT" if isinstance(roles, str): - LOG.error("Unknown values for roles: %s", roles) + _LOG.error("Unknown values for roles: %s", roles) return f"{self.serverInterface().getEnv('HTTP_HOST')}-ROOT" return ( f'{self.serverInterface().getEnv("HTTP_HOST")}-' diff --git a/geoportal/c2cgeoportal_geoportal/__init__.py b/geoportal/c2cgeoportal_geoportal/__init__.py index 167dd06b40..e64772180c 100644 --- a/geoportal/c2cgeoportal_geoportal/__init__.py +++ b/geoportal/c2cgeoportal_geoportal/__init__.py @@ -77,7 +77,7 @@ from c2cgeoportal_commons.models import static # pylint: disable=ungrouped-imports,useless-suppression -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) # Header predicate to accept only JSON content JSON_CONTENT_TYPE = "Content-Type:application/json" @@ -140,7 +140,7 @@ def add_interface( **kwargs, ) else: - LOG.error( + _LOG.error( "Unknown interface type '%s', should be '%s' or '%s'.", interface_type, INTERFACE_TYPE_NGEO, @@ -282,7 +282,7 @@ def is_valid_referrer(request: pyramid.request.Request, settings: Optional[dict[ authorized_referrers = settings.get("authorized_referers", []) referrer_hostname, ok = is_allowed_url(request, request.referrer, authorized_referrers) if not ok: - LOG.info( + _LOG.info( "Invalid referrer hostname '%s', " "is not the current host '%s' " "or part of authorized_referers: %s", @@ -319,7 +319,7 @@ def get_user_from_request( if not hasattr(request, "is_valid_referer"): request.is_valid_referer = is_valid_referrer(request, settings) if not request.is_valid_referer: - LOG.debug("Invalid referrer for %s: %s", request.path_qs, repr(request.referrer)) + _LOG.debug("Invalid referrer for %s: %s", request.path_qs, repr(request.referrer)) return None if not hasattr(request, "user_"): @@ -376,16 +376,16 @@ def default_user_validator(request: pyramid.request.Request, username: str, pass user = DBSession.query(User).filter_by(username=username).first() if user is None: - LOG.info('Unknown user "%s" tried to log in', username) + _LOG.info('Unknown user "%s" tried to log in', username) return None if user.deactivated: - LOG.info('Deactivated user "%s" tried to log in', username) + _LOG.info('Deactivated user "%s" tried to log in', username) return None if user.expired(): - LOG.info("Expired user %s tried to log in", username) + _LOG.info("Expired user %s tried to log in", username) return None if not user.validate_password(password): - LOG.info('User "%s" tried to log in with bad credentials', username) + _LOG.info('User "%s" tried to log in with bad credentials', username) return None return username @@ -431,7 +431,7 @@ def error_handler( http_exception: HTTPException, request: pyramid.request.Request ) -> pyramid.response.Response: """View callable for handling all the exceptions that are not already handled.""" - LOG.warning("%s returned status code %s", request.url, http_exception.status_code) + _LOG.warning("%s returned status code %s", request.url, http_exception.status_code) return set_common_headers(request, "error", Cache.PRIVATE_NO, http_exception) @@ -728,9 +728,9 @@ def add_static_route(name: str, attr: str, path: str, renderer: str) -> None: add_admin_interface(config) else: if not config.get_settings().get("enable_admin_interface", False): - LOG.info("Admin interface disabled by configuration") + _LOG.info("Admin interface disabled by configuration") else: - LOG.info("Admin interface disabled because c2cgeoportal_admin is not installed") + _LOG.info("Admin interface disabled because c2cgeoportal_admin is not installed") add_getitfixed(config) # Add the project static view with cache buster diff --git a/geoportal/c2cgeoportal_geoportal/lib/__init__.py b/geoportal/c2cgeoportal_geoportal/lib/__init__.py index f17713af90..f7523ef2c2 100644 --- a/geoportal/c2cgeoportal_geoportal/lib/__init__.py +++ b/geoportal/c2cgeoportal_geoportal/lib/__init__.py @@ -45,9 +45,9 @@ from c2cgeoportal_geoportal.lib.cacheversion import get_cache_version from c2cgeoportal_geoportal.lib.caching import get_region -LOG = logging.getLogger(__name__) -CACHE_REGION = get_region("std") -CACHE_REGION_OBJ = get_region("obj") +_LOG = logging.getLogger(__name__) +_CACHE_REGION = get_region("std") +_CACHE_REGION_OBJ = get_region("obj") def get_types_map(types_array: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: @@ -142,7 +142,7 @@ def get_setting(settings: Any, path: Iterable[str], default: Any = None) -> Any: return value if value else default -@CACHE_REGION_OBJ.cache_on_arguments() +@_CACHE_REGION_OBJ.cache_on_arguments() def get_ogc_server_wms_url_ids(request: pyramid.request.Request) -> dict[str, list[int]]: """Get the OGCServer ids mapped on the WMS URL.""" from c2cgeoportal_commons.models import DBSession # pylint: disable=import-outside-toplevel @@ -159,7 +159,7 @@ def get_ogc_server_wms_url_ids(request: pyramid.request.Request) -> dict[str, li return servers -@CACHE_REGION_OBJ.cache_on_arguments() +@_CACHE_REGION_OBJ.cache_on_arguments() def get_ogc_server_wfs_url_ids(request: pyramid.request.Request) -> dict[str, list[int]]: """Get the OGCServer ids mapped on the WFS URL.""" from c2cgeoportal_commons.models import DBSession # pylint: disable=import-outside-toplevel @@ -203,7 +203,7 @@ def __call__(self, request: pyramid.request.Request, elements: Any, kw: Any) -> _formatter = Formatter() -@CACHE_REGION_OBJ.cache_on_arguments() +@_CACHE_REGION_OBJ.cache_on_arguments() def _get_intranet_networks( request: pyramid.request.Request, ) -> list[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]: @@ -213,7 +213,7 @@ def _get_intranet_networks( ] -@CACHE_REGION.cache_on_arguments() +@_CACHE_REGION.cache_on_arguments() def get_role_id(name: str) -> int: """Get the role ID.""" from c2cgeoportal_commons.models import DBSession, main # pylint: disable=import-outside-toplevel diff --git a/geoportal/c2cgeoportal_geoportal/lib/authentication.py b/geoportal/c2cgeoportal_geoportal/lib/authentication.py index 150c135f38..22c822d440 100644 --- a/geoportal/c2cgeoportal_geoportal/lib/authentication.py +++ b/geoportal/c2cgeoportal_geoportal/lib/authentication.py @@ -1,4 +1,4 @@ -# Copyright (c) 2014-2023, Camptocamp SA +# Copyright (c) 2014-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -48,7 +48,7 @@ from c2cgeoportal_geoportal.lib import oauth2 from c2cgeoportal_geoportal.resources import defaultgroupsfinder -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) @implementer(IAuthenticationPolicy) @@ -88,7 +88,7 @@ def unauthenticated_userid(self, request: pyramid.request.Request) -> Optional[s return cast(str, auth["u"]) except Exception as e: - LOG.error("URL login error: %s.", e, exc_info=True) + _LOG.error("URL login error: %s.", e, exc_info=True) return None @@ -115,7 +115,7 @@ def unauthenticated_userid(request: pyramid.request.Request) -> Optional[str]: except ValueError: route_url = request.route_url("base", _query={**request.GET}) - LOG.debug( + _LOG.debug( "Call OAuth verify_request with:\nurl: %s\nmethod: %s\nbody:\n%s", route_url, request.method, @@ -128,7 +128,7 @@ def unauthenticated_userid(request: pyramid.request.Request) -> Optional[str]: request.headers, [], ) - LOG.debug("OAuth verify_request: %s", valid) + _LOG.debug("OAuth verify_request: %s", valid) if valid: request.user_ = oauth2_request.user @@ -206,7 +206,7 @@ def create_authentication(settings: dict[str, Any]) -> MultiAuthenticationPolicy if basicauth: if settings["authentication"].get("two_factor", False): - LOG.warning( + _LOG.warning( "Basic auth and two factor auth should not be enable together, " "you should use OAuth2 instead of Basic auth" ) diff --git a/geoportal/c2cgeoportal_geoportal/lib/caching.py b/geoportal/c2cgeoportal_geoportal/lib/caching.py index bc61b9f085..9e2327eb0a 100644 --- a/geoportal/c2cgeoportal_geoportal/lib/caching.py +++ b/geoportal/c2cgeoportal_geoportal/lib/caching.py @@ -47,7 +47,7 @@ else: SerializedReturnType = Any -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) _REGION: dict[str, CacheRegion] = {} MEMORY_CACHE_DICT: dict[str, Any] = {} diff --git a/geoportal/c2cgeoportal_geoportal/lib/check_collector.py b/geoportal/c2cgeoportal_geoportal/lib/check_collector.py index bca45bd061..d9ebd4d0cc 100644 --- a/geoportal/c2cgeoportal_geoportal/lib/check_collector.py +++ b/geoportal/c2cgeoportal_geoportal/lib/check_collector.py @@ -1,4 +1,4 @@ -# Copyright (c) 2011-2023, Camptocamp SA +# Copyright (c) 2011-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -34,7 +34,7 @@ from c2cgeoportal_geoportal.lib.checker import build_url -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) def init(config: pyramid.config.Configurator, health_check: c2cwsgiutils.health_check.HealthCheck) -> None: diff --git a/geoportal/c2cgeoportal_geoportal/lib/checker.py b/geoportal/c2cgeoportal_geoportal/lib/checker.py index 218305cee7..664ed829a6 100644 --- a/geoportal/c2cgeoportal_geoportal/lib/checker.py +++ b/geoportal/c2cgeoportal_geoportal/lib/checker.py @@ -39,7 +39,7 @@ import pyramid.request import requests -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) def build_url( @@ -54,7 +54,7 @@ def build_url( if forward_host: headers["Host"] = request.host - LOG.debug("%s, URL: %s", name, url) + _LOG.debug("%s, URL: %s", name, url) return {"url": url, "headers": headers} diff --git a/geoportal/c2cgeoportal_geoportal/lib/filter_capabilities.py b/geoportal/c2cgeoportal_geoportal/lib/filter_capabilities.py index e5175372a6..abef7b2974 100644 --- a/geoportal/c2cgeoportal_geoportal/lib/filter_capabilities.py +++ b/geoportal/c2cgeoportal_geoportal/lib/filter_capabilities.py @@ -1,4 +1,4 @@ -# Copyright (c) 2014-2023, Camptocamp SA +# Copyright (c) 2014-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -47,12 +47,12 @@ from c2cgeoportal_geoportal.lib import caching, get_ogc_server_wfs_url_ids, get_ogc_server_wms_url_ids from c2cgeoportal_geoportal.lib.layers import get_private_layers, get_protected_layers, get_writable_layers -CACHE_REGION = caching.get_region("std") -LOG = logging.getLogger(__name__) +_CACHE_REGION = caching.get_region("std") +_LOG = logging.getLogger(__name__) ContentMetadata = Union[ContentMetadata111, ContentMetadata130] -@CACHE_REGION.cache_on_arguments() +@_CACHE_REGION.cache_on_arguments() def wms_structure(wms_url: Url, host: str, request: pyramid.request.Request) -> dict[str, list[str]]: """Get a simple serializable structure of the WMS capabilities.""" url = wms_url.clone().add_query({"SERVICE": "WMS", "VERSION": "1.1.1", "REQUEST": "GetCapabilities"}) @@ -66,7 +66,7 @@ def wms_structure(wms_url: Url, host: str, request: pyramid.request.Request) -> url.url(), headers=headers, **request.registry.settings.get("http_options", {}) ) except Exception: - LOG.exception("Unable to GetCapabilities from wms_url '%s'", wms_url) + _LOG.exception("Unable to GetCapabilities from wms_url '%s'", wms_url) raise HTTPBadGateway( # pylint: disable=raise-missing-from "Unable to GetCapabilities, see logs for details" ) @@ -96,13 +96,13 @@ def _fill(name: str, parent: ContentMetadata) -> None: except AttributeError: error = "WARNING! an error occurred while trying to read the mapfile and recover the themes." error = f"{error}\nurl: {wms_url}\nxml:\n{response.text}" - LOG.exception(error) + _LOG.exception(error) raise HTTPBadGateway(error) # pylint: disable=raise-missing-from except SyntaxError: error = "WARNING! an error occurred while trying to read the mapfile and recover the themes." error = f"{error}\nurl: {wms_url}\nxml:\n{response.text}" - LOG.exception(error) + _LOG.exception(error) raise HTTPBadGateway(error) # pylint: disable=raise-missing-from @@ -128,7 +128,7 @@ def filter_capabilities( if ogc_layer in wms_structure_: private_layers.update(wms_structure_[ogc_layer]) - LOG.debug( + _LOG.debug( "Filter capabilities of OGC server %s\nprivate_layers: %s", ", ".join([str(e) for e in ogc_server_ids]), ", ".join(private_layers), @@ -154,13 +154,13 @@ def filter_wfst_capabilities(content: str, wfs_url: Url, request: pyramid.reques writable_layers: set[str] = set() ogc_server_ids = get_ogc_server_wfs_url_ids(request).get(wfs_url.url()) if ogc_server_ids is None: - LOG.error("No OGC server found for WFS URL %s", wfs_url) + _LOG.error("No OGC server found for WFS URL %s", wfs_url) raise pyramid.httpexceptions.HTTPInternalServerError("No OGC server found for WFS URL") for gmf_layer in list(get_writable_layers(request, ogc_server_ids).values()): writable_layers |= set(gmf_layer.layer.split(",")) - LOG.debug( + _LOG.debug( "Filter WFS-T capabilities of OGC server %s\nlayers: %s", ", ".join([str(e) for e in ogc_server_ids]), ", ".join(writable_layers), diff --git a/geoportal/c2cgeoportal_geoportal/lib/functionality.py b/geoportal/c2cgeoportal_geoportal/lib/functionality.py index bdeac69f3f..2c016d80b9 100644 --- a/geoportal/c2cgeoportal_geoportal/lib/functionality.py +++ b/geoportal/c2cgeoportal_geoportal/lib/functionality.py @@ -36,12 +36,12 @@ from c2cgeoportal_geoportal.lib import get_typed, get_types_map, is_intranet from c2cgeoportal_geoportal.lib.caching import get_region -LOG = logging.getLogger(__name__) -CACHE_REGION_OBJ = get_region("obj") -CACHE_REGION = get_region("std") +_LOG = logging.getLogger(__name__) +_CACHE_REGION_OBJ = get_region("obj") +_CACHE_REGION = get_region("std") -@CACHE_REGION_OBJ.cache_on_arguments() +@_CACHE_REGION_OBJ.cache_on_arguments() def _get_role(name: str) -> dict[str, Any]: from c2cgeoportal_commons.models import DBSession # pylint: disable=import-outside-toplevel @@ -96,7 +96,7 @@ def _get_db_functionality( return [r for r in values if r is not None] -@CACHE_REGION_OBJ.cache_on_arguments() +@_CACHE_REGION_OBJ.cache_on_arguments() def _get_functionalities_type(request: pyramid.request.Request) -> dict[str, dict[str, Any]]: return get_types_map( request.registry.settings.get("admin_interface", {}).get("available_functionalities", []) @@ -142,7 +142,7 @@ def get_functionality( ) if errors != set(): - LOG.error("\n".join(errors)) + _LOG.error("\n".join(errors)) return result @@ -162,5 +162,5 @@ def get_mapserver_substitution_params(request: pyramid.request.Request) -> dict[ else: params[attribute] = value else: - LOG.warning("Mapserver Substitution '%s' does not respect pattern: =", s) + _LOG.warning("Mapserver Substitution '%s' does not respect pattern: =", s) return params diff --git a/geoportal/c2cgeoportal_geoportal/lib/loader.py b/geoportal/c2cgeoportal_geoportal/lib/loader.py index fe0356070f..44d4d556ca 100644 --- a/geoportal/c2cgeoportal_geoportal/lib/loader.py +++ b/geoportal/c2cgeoportal_geoportal/lib/loader.py @@ -34,7 +34,7 @@ from c2cgeoportal_geoportal.lib.i18n import available_locale_names -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class Loader(BaseLoader): diff --git a/geoportal/c2cgeoportal_geoportal/lib/oauth2.py b/geoportal/c2cgeoportal_geoportal/lib/oauth2.py index dffe2bac99..095d3357a0 100644 --- a/geoportal/c2cgeoportal_geoportal/lib/oauth2.py +++ b/geoportal/c2cgeoportal_geoportal/lib/oauth2.py @@ -38,8 +38,8 @@ import c2cgeoportal_commons from c2cgeoportal_geoportal.lib.caching import get_region -LOG = logging.getLogger(__name__) -OBJECT_CACHE_REGION = get_region("obj") +_LOG = logging.getLogger(__name__) +_OBJECT_CACHE_REGION = get_region("obj") class _Token(TypedDict): @@ -89,7 +89,7 @@ def authenticate_client( """ del args, kwargs - LOG.debug("authenticate_client => unimplemented") + _LOG.debug("authenticate_client => unimplemented") raise NotImplementedError("Not implemented, the method `authenticate_client_id` should be used.") @@ -115,7 +115,7 @@ def authenticate_client_id( """ del args, kwargs - LOG.debug("authenticate_client_id %s", client_id) + _LOG.debug("authenticate_client_id %s", client_id) from c2cgeoportal_commons.models import DBSession, static # pylint: disable=import-outside-toplevel @@ -140,7 +140,7 @@ def authenticate_client_id( .one_or_none() ) - LOG.debug("authenticate_client_id => %s", request.client is not None) + _LOG.debug("authenticate_client_id => %s", request.client is not None) return request.client is not None def client_authentication_required( @@ -180,7 +180,7 @@ def client_authentication_required( """ del request, args, kwargs - LOG.debug("client_authentication_required => False") + _LOG.debug("client_authentication_required => False") return False @@ -220,7 +220,7 @@ def confirm_redirect_uri( """ del args, kwargs - LOG.debug("confirm_redirect_uri %s %s", client_id, redirect_uri) + _LOG.debug("confirm_redirect_uri %s %s", client_id, redirect_uri) from c2cgeoportal_commons.models import DBSession, static # pylint: disable=import-outside-toplevel @@ -235,7 +235,7 @@ def confirm_redirect_uri( .filter(static.OAuth2AuthorizationCode.expire_at > datetime.now()) .one_or_none() ) - LOG.debug("confirm_redirect_uri => %s", authorization_code is not None) + _LOG.debug("confirm_redirect_uri => %s", authorization_code is not None) return authorization_code is not None def get_default_redirect_uri( @@ -261,7 +261,7 @@ def get_default_redirect_uri( """ del request, args, kwargs - LOG.debug("get_default_redirect_uri %s", client_id) + _LOG.debug("get_default_redirect_uri %s", client_id) raise NotImplementedError("Not implemented.") @@ -290,7 +290,7 @@ def get_default_scopes( """ del request, args, kwargs - LOG.debug("get_default_scopes %s", client_id) + _LOG.debug("get_default_scopes %s", client_id) return ["geomapfish"] @@ -316,7 +316,7 @@ def get_original_scopes( """ del refresh_token, request, args, kwargs - LOG.debug("get_original_scopes") + _LOG.debug("get_original_scopes") return [] @@ -366,7 +366,7 @@ def introspect_token( """ del token, request, args, kwargs - LOG.debug("introspect_token %s", token_type_hint) + _LOG.debug("introspect_token %s", token_type_hint) raise NotImplementedError("Not implemented.") @@ -392,7 +392,7 @@ def invalidate_authorization_code( """ del args, kwargs - LOG.debug("invalidate_authorization_code %s", client_id) + _LOG.debug("invalidate_authorization_code %s", client_id) from c2cgeoportal_commons.models import DBSession, static # pylint: disable=import-outside-toplevel @@ -437,7 +437,7 @@ def is_within_original_scope( """ del request, args, kwargs - LOG.debug("is_within_original_scope %s %s", request_scopes, refresh_token) + _LOG.debug("is_within_original_scope %s %s", request_scopes, refresh_token) return False @@ -463,7 +463,7 @@ def revoke_token( """ del token, request, args, kwargs - LOG.debug("revoke_token %s", token_type_hint) + _LOG.debug("revoke_token %s", token_type_hint) raise NotImplementedError("Not implemented.") @@ -484,7 +484,7 @@ def rotate_refresh_token(self, request: oauthlib.common.Request) -> bool: """ del request - LOG.debug("rotate_refresh_token") + _LOG.debug("rotate_refresh_token") return True @@ -531,7 +531,7 @@ def save_authorization_code( """ del args, kwargs - LOG.debug("save_authorization_code %s", client_id) + _LOG.debug("save_authorization_code %s", client_id) from c2cgeoportal_commons.models import DBSession, static # pylint: disable=import-outside-toplevel @@ -620,7 +620,7 @@ def save_bearer_token( """ del args, kwargs - LOG.debug("save_bearer_token") + _LOG.debug("save_bearer_token") from c2cgeoportal_commons.models import DBSession, static # pylint: disable=import-outside-toplevel @@ -703,7 +703,7 @@ def validate_bearer_token( - Client Credentials Grant """ - LOG.debug("validate_bearer_token %s", scopes) + _LOG.debug("validate_bearer_token %s", scopes) from c2cgeoportal_commons.models import DBSession, static # pylint: disable=import-outside-toplevel @@ -719,7 +719,7 @@ def validate_bearer_token( if bearer_token is not None: request.user = bearer_token.user - LOG.debug("validate_bearer_token => %s", bearer_token is not None) + _LOG.debug("validate_bearer_token => %s", bearer_token is not None) return bearer_token is not None def validate_client_id( @@ -747,7 +747,7 @@ def validate_client_id( """ del args, kwargs - LOG.debug("validate_client_id") + _LOG.debug("validate_client_id") from c2cgeoportal_commons.models import DBSession, static # pylint: disable=import-outside-toplevel @@ -796,7 +796,7 @@ def validate_code( """ del args, kwargs - LOG.debug("validate_code %s", client_id) + _LOG.debug("validate_code %s", client_id) from c2cgeoportal_commons.models import DBSession, static # pylint: disable=import-outside-toplevel @@ -816,7 +816,7 @@ def validate_code( authorization_code = authorization_code_query.one_or_none() if authorization_code is None: - LOG.debug("validate_code => KO, no authorization_code found") + _LOG.debug("validate_code => KO, no authorization_code found") return False if authorization_code.client.pkce_required: @@ -824,7 +824,7 @@ def validate_code( request.code_challenge_method = authorization_code.challenge_method request.user = authorization_code.user - LOG.debug("validate_code => OK") + _LOG.debug("validate_code => OK") return True def validate_grant_type( @@ -854,7 +854,7 @@ def validate_grant_type( """ del client, request, args, kwargs - LOG.debug( + _LOG.debug( "validate_grant_type %s %s => %s", client_id, grant_type, @@ -889,7 +889,7 @@ def validate_redirect_uri( """ del request, args, kwargs - LOG.debug("validate_redirect_uri %s %s", client_id, redirect_uri) + _LOG.debug("validate_redirect_uri %s %s", client_id, redirect_uri) from c2cgeoportal_commons.models import DBSession, static # pylint: disable=import-outside-toplevel @@ -901,7 +901,7 @@ def validate_redirect_uri( .filter(static.OAuth2Client.redirect_uri == redirect_uri) .one_or_none() ) - LOG.debug("validate_redirect_uri %s", client is not None) + _LOG.debug("validate_redirect_uri %s", client is not None) return client is not None def validate_refresh_token( @@ -931,7 +931,7 @@ def validate_refresh_token( """ del args, kwargs - LOG.debug("validate_refresh_token %s", client.client_id if client else None) + _LOG.debug("validate_refresh_token %s", client.client_id if client else None) from c2cgeoportal_commons.models import DBSession, static # pylint: disable=import-outside-toplevel @@ -974,7 +974,7 @@ def validate_response_type( """ del client, request, args, kwargs - LOG.debug("validate_response_type %s %s", client_id, response_type) + _LOG.debug("validate_response_type %s %s", client_id, response_type) return response_type == "code" @@ -1005,7 +1005,7 @@ def validate_scopes( """ del client, request, args, kwargs - LOG.debug("validate_scopes %s %s", client_id, scopes) + _LOG.debug("validate_scopes %s %s", client_id, scopes) return True @@ -1038,7 +1038,7 @@ def validate_user( """ del password, client, request, args, kwargs - LOG.debug("validate_user %s", username) + _LOG.debug("validate_user %s", username) raise NotImplementedError("Not implemented.") @@ -1113,7 +1113,7 @@ def get_code_challenge(self, code: str, request: oauthlib.common.Request) -> Opt assert DBSession is not None - LOG.debug("get_code_challenge") + _LOG.debug("get_code_challenge") authorization_code = ( DBSession.query(static.OAuth2AuthorizationCode) @@ -1122,7 +1122,7 @@ def get_code_challenge(self, code: str, request: oauthlib.common.Request) -> Opt ) if authorization_code: return authorization_code.challenge - LOG.debug("get_code_challenge authorization_code not found") + _LOG.debug("get_code_challenge authorization_code not found") return None def get_code_challenge_method(self, code: str, request: oauthlib.common.Request) -> Optional[str]: @@ -1153,7 +1153,7 @@ def get_code_challenge_method(self, code: str, request: oauthlib.common.Request) assert DBSession is not None - LOG.debug("get_code_challenge_method") + _LOG.debug("get_code_challenge_method") authorization_code = ( DBSession.query(static.OAuth2AuthorizationCode) @@ -1162,7 +1162,7 @@ def get_code_challenge_method(self, code: str, request: oauthlib.common.Request) ) if authorization_code: return authorization_code.challenge_method - LOG.debug("get_code_challenge_method authorization_code not found") + _LOG.debug("get_code_challenge_method authorization_code not found") return None @@ -1175,7 +1175,7 @@ def get_oauth_client(settings: dict[str, Any]) -> oauthlib.oauth2.WebApplication ) -@OBJECT_CACHE_REGION.cache_on_arguments() +@_OBJECT_CACHE_REGION.cache_on_arguments() def _get_oauth_client_cache( authorization_expire_minutes: int, token_expire_minutes: int ) -> oauthlib.oauth2.WebApplicationServer: diff --git a/geoportal/c2cgeoportal_geoportal/scaffolds/advance_create/{{cookiecutter.project}}/geoportal/{{cookiecutter.package}}_geoportal/models.py b/geoportal/c2cgeoportal_geoportal/scaffolds/advance_create/{{cookiecutter.project}}/geoportal/{{cookiecutter.package}}_geoportal/models.py index f393268852..1882e6726a 100644 --- a/geoportal/c2cgeoportal_geoportal/scaffolds/advance_create/{{cookiecutter.project}}/geoportal/{{cookiecutter.package}}_geoportal/models.py +++ b/geoportal/c2cgeoportal_geoportal/scaffolds/advance_create/{{cookiecutter.project}}/geoportal/{{cookiecutter.package}}_geoportal/models.py @@ -5,4 +5,4 @@ from c2cgeoportal_commons.models.main import * # noqa: ignore=F401, pylint: disable=unused-wildcard-import _ = TranslationStringFactory("{{cookiecutter.package}}_geoportal-server") -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) diff --git a/geoportal/c2cgeoportal_geoportal/scripts/create_demo_theme.py b/geoportal/c2cgeoportal_geoportal/scripts/create_demo_theme.py index 69f2b174eb..2f402c5568 100644 --- a/geoportal/c2cgeoportal_geoportal/scripts/create_demo_theme.py +++ b/geoportal/c2cgeoportal_geoportal/scripts/create_demo_theme.py @@ -1,4 +1,4 @@ -# Copyright (c) 2011-2021, Camptocamp SA +# Copyright (c) 2011-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -33,7 +33,7 @@ from c2cgeoportal_geoportal.scripts import fill_arguments, get_appsettings, get_session -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) def main() -> None: diff --git a/geoportal/c2cgeoportal_geoportal/views/entry.py b/geoportal/c2cgeoportal_geoportal/views/entry.py index 94e13f5800..854325f25d 100644 --- a/geoportal/c2cgeoportal_geoportal/views/entry.py +++ b/geoportal/c2cgeoportal_geoportal/views/entry.py @@ -1,4 +1,4 @@ -# Copyright (c) 2011-2023, Camptocamp SA +# Copyright (c) 2011-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -38,8 +38,8 @@ from c2cgeoportal_geoportal.lib.common_headers import Cache, set_common_headers _ = TranslationStringFactory("c2cgeoportal") -LOG = logging.getLogger(__name__) -CACHE_REGION = get_region("std") +_LOG = logging.getLogger(__name__) +_CACHE_REGION = get_region("std") class Entry: @@ -60,7 +60,7 @@ def get_ngeo_index_vars(self) -> dict[str, Any]: return {} @staticmethod - @CACHE_REGION.cache_on_arguments() + @_CACHE_REGION.cache_on_arguments() def get_apijs(api_name: Optional[str]) -> str: with open("/etc/static-ngeo/api.js", encoding="utf-8") as api_file: api = api_file.read().split("\n") diff --git a/geoportal/c2cgeoportal_geoportal/views/i18n.py b/geoportal/c2cgeoportal_geoportal/views/i18n.py index 8392661856..a675404a5c 100644 --- a/geoportal/c2cgeoportal_geoportal/views/i18n.py +++ b/geoportal/c2cgeoportal_geoportal/views/i18n.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2022, Camptocamp SA +# Copyright (c) 2019-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -48,7 +48,7 @@ from c2cgeoportal_geoportal.lib.cacheversion import get_cache_version from c2cgeoportal_geoportal.lib.common_headers import Cache, set_common_headers -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) _INITIALIZED = False @@ -101,11 +101,11 @@ def localepot(request: pyramid.request.Request) -> pyramid.response.Response: for filename in no_duplicates(list_files(None, sources)): real_filename = find_file(filename) if real_filename is None: - LOG.error("Can not find file %s", filename) + _LOG.error("Can not find file %s", filename) raise HTTPInternalServerError(f"Can not find file {filename}") extractor = get_extractor(real_filename) if extractor is None: - LOG.error("No extractor available for file %s", filename) + _LOG.error("No extractor available for file %s", filename) raise HTTPInternalServerError(f"No extractor available for file {filename}") extractor_options = ExtractorOptions( diff --git a/geoportal/c2cgeoportal_geoportal/views/layers.py b/geoportal/c2cgeoportal_geoportal/views/layers.py index 00cfca3d59..70b2dce365 100644 --- a/geoportal/c2cgeoportal_geoportal/views/layers.py +++ b/geoportal/c2cgeoportal_geoportal/views/layers.py @@ -68,8 +68,8 @@ if TYPE_CHECKING: from c2cgeoportal_commons.models import main # pylint: disable=ungrouped-imports.useless-suppression -LOG = logging.getLogger(__name__) -CACHE_REGION = get_region("std") +_LOG = logging.getLogger(__name__) +_CACHE_REGION = get_region("std") class Layers: @@ -320,7 +320,7 @@ def check_geometry(_: Any, feature: Feature, obj: Any) -> None: self.request.response.status_int = 400 return {"error_type": "validation_error", "message": str(e)} except exc.IntegrityError as e: - LOG.error(str(e)) + _LOG.error(str(e)) assert e.orig is not None self.request.response.status_int = 400 return {"error_type": "integrity_error", "message": str(e.orig.diag.message_primary)} # type: ignore[attr-defined] @@ -385,7 +385,7 @@ def check_geometry(_: Any, feature: Feature, obj: Any) -> None: self.request.response.status_int = 400 return {"error_type": "validation_error", "message": str(e)} except exc.IntegrityError as e: - LOG.error(str(e)) + _LOG.error(str(e)) assert e.orig is not None self.request.response.status_int = 400 return {"error_type": "integrity_error", "message": str(e.orig.diag.message_primary)} # type: ignore[attr-defined] @@ -488,7 +488,7 @@ def enumerate_attribute_values(self) -> dict[str, Any]: return cast(dict[str, Any], self._enumerate_attribute_values(layername, fieldname)) - @CACHE_REGION.cache_on_arguments() + @_CACHE_REGION.cache_on_arguments() def _enumerate_attribute_values(self, layername: str, fieldname: str) -> dict[str, Any]: if layername not in self.layers_enum_config: raise HTTPBadRequest(f"Unknown layer: {layername!s}") @@ -574,7 +574,7 @@ def get_layer_class(layer: "main.Layer", with_last_update_columns: bool = False) for attribute_name in attributes_order or []: if attribute_name not in column_properties: table = mapper.mapped_table - LOG.warning( + _LOG.warning( 'Attribute "%s" does not exists in table "%s.%s".\n' 'Please correct metadata "editingAttributesOrder" in layer "%s" (id=%s).\n' "Available attributes are: %s.", diff --git a/geoportal/c2cgeoportal_geoportal/views/login.py b/geoportal/c2cgeoportal_geoportal/views/login.py index 3b20bad43b..746faa087c 100644 --- a/geoportal/c2cgeoportal_geoportal/views/login.py +++ b/geoportal/c2cgeoportal_geoportal/views/login.py @@ -58,8 +58,8 @@ from c2cgeoportal_geoportal.lib.common_headers import Cache, set_common_headers from c2cgeoportal_geoportal.lib.functionality import get_functionality -LOG = logging.getLogger(__name__) -CACHE_REGION = get_region("std") +_LOG = logging.getLogger(__name__) +_CACHE_REGION = get_region("std") class Login: @@ -89,7 +89,7 @@ def _referer_log(self) -> None: if not hasattr(self.request, "is_valid_referer"): self.request.is_valid_referer = is_valid_referrer(self.request) if not self.request.is_valid_referer: - LOG.info("Invalid referrer for %s: %s", self.request.path_qs, repr(self.request.referrer)) + _LOG.info("Invalid referrer for %s: %s", self.request.path_qs, repr(self.request.referrer)) @forbidden_view_config(renderer="login.html") # type: ignore def loginform403(self) -> Union[dict[str, Any], pyramid.response.Response]: @@ -166,7 +166,7 @@ def login(self) -> pyramid.response.Response: if otp is None: raise HTTPBadRequest("The second factor is missing.") if not self._validate_2fa_totp(user, otp): - LOG.info("The second factor is wrong for user '%s'.", user.username) + _LOG.info("The second factor is wrong for user '%s'.", user.username) raise HTTPUnauthorized("See server logs for details") user.update_last_login() user.tech_data["consecutive_failed"] = "0" @@ -190,7 +190,7 @@ def login(self) -> pyramid.response.Response: ), ) - LOG.info("User '%s' logged in.", username) + _LOG.info("User '%s' logged in.", username) if self.request.GET.get("type") == "oauth2": self._oauth2_login(user) @@ -207,7 +207,7 @@ def login(self) -> pyramid.response.Response: f"is not the current host '{self.request.host}' " f"or part of allowed hosts: {', '.join(allowed_hosts)}" ) - LOG.debug(message) + _LOG.debug(message) return HTTPBadRequest(message) return HTTPFound(location=came_from, headers=headers) @@ -235,7 +235,7 @@ def login(self) -> pyramid.response.Response: def _oauth2_login(self, user: static.User) -> pyramid.response.Response: self.request.user_ = user - LOG.debug( + _LOG.debug( "Call OAuth create_authorization_response with:\nurl: %s\nmethod: %s\nbody:\n%s", self.request.current_route_url(_query=self.request.GET), self.request.method, @@ -251,7 +251,7 @@ def _oauth2_login(self, user: static.User) -> pyramid.response.Response: ) if hasattr(self.request, "tm"): self.request.tm.commit() - LOG.debug("OAuth create_authorization_response return\nstatus: %s\nbody:\n%s", status, body) + _LOG.debug("OAuth create_authorization_response return\nstatus: %s\nbody:\n%s", status, body) location = headers.get("Location") location_hostname = urllib.parse.urlparse(location).hostname @@ -263,7 +263,7 @@ def _oauth2_login(self, user: static.User) -> pyramid.response.Response: f"is not the current host '{self.request.host}' " f"or part of allowed_hosts: {', '.join(allowed_hosts)}" ) - LOG.debug(message) + _LOG.debug(message) return HTTPBadRequest(message) if status == 302: @@ -284,10 +284,10 @@ def logout(self) -> pyramid.response.Response: headers = forget(self.request) if not self.request.user: - LOG.info("Logout on non login user.") + _LOG.info("Logout on non login user.") raise HTTPUnauthorized("See server logs for details") - LOG.info("User '%s' (%s) logging out.", self.request.user.username, self.request.user.id) + _LOG.info("User '%s' (%s) logging out.", self.request.user.username, self.request.user.id) headers.append(("Content-Type", "text/json")) return set_common_headers( @@ -313,7 +313,7 @@ def _user(self, user: Optional[static.User] = None) -> dict[str, Any]: @view_config(route_name="loginuser", renderer="json") # type: ignore def loginuser(self) -> dict[str, Any]: - LOG.info("Client IP address: %s", self.request.client_addr) + _LOG.info("Client IP address: %s", self.request.client_addr) set_common_headers(self.request, "login", Cache.PRIVATE_NO) return self._user() @@ -343,25 +343,25 @@ def change_password(self) -> pyramid.response.Response: if login is not None: user = models.DBSession.query(static.User).filter_by(username=login).one_or_none() if user is None: - LOG.info("The login '%s' does not exist.", login) + _LOG.info("The login '%s' does not exist.", login) raise HTTPUnauthorized("See server logs for details") if self.two_factor_auth: if not self._validate_2fa_totp(user, otp): - LOG.info("The second factor is wrong for user '%s'.", login) + _LOG.info("The second factor is wrong for user '%s'.", login) raise HTTPUnauthorized("See server logs for details") else: user = self.request.user assert user is not None if self.request.registry.validate_user(self.request, user.username, old_password) is None: - LOG.info("The old password is wrong for user '%s'.", user.username) + _LOG.info("The old password is wrong for user '%s'.", user.username) raise HTTPUnauthorized("See server logs for details") user.password = new_password user.is_password_changed = True models.DBSession.flush() - LOG.info("Password changed for user '%s'", user.username) + _LOG.info("Password changed for user '%s'", user.username) headers = remember(self.request, user.username) headers.append(("Content-Type", "text/json")) @@ -410,15 +410,15 @@ def loginresetpassword(self) -> dict[str, Any]: user, username, password, error = self._loginresetpassword() if error is not None: - LOG.info(error) + _LOG.info(error) return {"success": True} if user is None: - LOG.info("The user is not found without any error.") + _LOG.info("The user is not found without any error.") return {"success": True} if user.deactivated: - LOG.info("The user '%s' is deactivated", username) + _LOG.info("The user '%s' is deactivated", username) return {"success": True} send_email_config( @@ -435,7 +435,7 @@ def loginresetpassword(self) -> dict[str, Any]: @view_config(route_name="oauth2introspect") # type: ignore def oauth2introspect(self) -> pyramid.response.Response: - LOG.debug( + _LOG.debug( "Call OAuth create_introspect_response with:\nurl: %s\nmethod: %s\nbody:\n%s", self.request.current_route_url(_query=self.request.GET), self.request.method, @@ -449,7 +449,7 @@ def oauth2introspect(self) -> pyramid.response.Response: self.request.body, self.request.headers, ) - LOG.debug("OAuth create_introspect_response return status: %s", status) + _LOG.debug("OAuth create_introspect_response return status: %s", status) # All requests to /token will return a json response, no redirection. if status != 200: @@ -465,7 +465,7 @@ def oauth2introspect(self) -> pyramid.response.Response: @view_config(route_name="oauth2token") # type: ignore def oauth2token(self) -> pyramid.response.Response: - LOG.debug( + _LOG.debug( "Call OAuth create_token_response with:\nurl: %s\nmethod: %s\nbody:\n%s", self.request.current_route_url(_query=self.request.GET), self.request.method, @@ -478,7 +478,7 @@ def oauth2token(self) -> pyramid.response.Response: self.request.headers, {}, ) - LOG.debug("OAuth create_token_response return status: %s", status) + _LOG.debug("OAuth create_token_response return status: %s", status) # All requests to /token will return a json response, no redirection. if status != 200: @@ -494,7 +494,7 @@ def oauth2token(self) -> pyramid.response.Response: @view_config(route_name="oauth2revoke_token") # type: ignore def oauth2revoke_token(self) -> pyramid.response.Response: - LOG.debug( + _LOG.debug( "Call OAuth create_revocation_response with:\nurl: %s\nmethod: %s\nbody:\n%s", self.request.create_revocation_response(_query=self.request.GET), self.request.method, diff --git a/geoportal/c2cgeoportal_geoportal/views/mapserverproxy.py b/geoportal/c2cgeoportal_geoportal/views/mapserverproxy.py index a197052484..2daff825b1 100644 --- a/geoportal/c2cgeoportal_geoportal/views/mapserverproxy.py +++ b/geoportal/c2cgeoportal_geoportal/views/mapserverproxy.py @@ -1,4 +1,4 @@ -# Copyright (c) 2011-2023, Camptocamp SA +# Copyright (c) 2011-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -43,8 +43,8 @@ from c2cgeoportal_geoportal.lib.functionality import get_mapserver_substitution_params from c2cgeoportal_geoportal.views.ogcproxy import OGCProxy -CACHE_REGION = get_region("std") -LOG = logging.getLogger(__name__) +_CACHE_REGION = get_region("std") +_LOG = logging.getLogger(__name__) class MapservProxy(OGCProxy): @@ -62,7 +62,7 @@ def __init__(self, request: Request) -> None: @view_config(route_name="mapserverproxy_post_path") # type: ignore def proxy(self) -> Response: if self.user is None and "authentication_required" in self.request.params: - LOG.debug("proxy() detected authentication_required") + _LOG.debug("proxy() detected authentication_required") if self.request.registry.settings.get("basicauth", "False").lower() == "true": raise HTTPUnauthorized( headers={"WWW-Authenticate": 'Basic realm="Access to restricted layers"'} @@ -83,7 +83,7 @@ def proxy(self) -> Response: # Do not allows direct variable substitution for k in list(self.params.keys()): if len(k) > 1 and k[:2].capitalize() == "S_": - LOG.warning("Direct substitution not allowed (%s=%s).", k, self.params[k]) + _LOG.warning("Direct substitution not allowed (%s=%s).", k, self.params[k]) del self.params[k] if ( @@ -125,7 +125,7 @@ def proxy(self) -> Response: _url = self._get_wfs_url(errors) if _url is None: - LOG.error("Error getting the URL:\n%s", "\n".join(errors)) + _LOG.error("Error getting the URL:\n%s", "\n".join(errors)) raise HTTPInternalServerError() cache_control = ( diff --git a/geoportal/c2cgeoportal_geoportal/views/memory.py b/geoportal/c2cgeoportal_geoportal/views/memory.py index 6a0938c430..d8d6c6ae72 100644 --- a/geoportal/c2cgeoportal_geoportal/views/memory.py +++ b/geoportal/c2cgeoportal_geoportal/views/memory.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2023, Camptocamp SA +# Copyright (c) 2018-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -39,7 +39,7 @@ from c2cgeoportal_geoportal.lib.caching import MEMORY_CACHE_DICT from c2cgeoportal_geoportal.views import raster -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) @view_config(route_name="memory", renderer="fast_json") # type: ignore diff --git a/geoportal/c2cgeoportal_geoportal/views/ogcproxy.py b/geoportal/c2cgeoportal_geoportal/views/ogcproxy.py index c31a23371d..c0f989e9ee 100644 --- a/geoportal/c2cgeoportal_geoportal/views/ogcproxy.py +++ b/geoportal/c2cgeoportal_geoportal/views/ogcproxy.py @@ -37,8 +37,8 @@ from c2cgeoportal_geoportal.lib.caching import get_region from c2cgeoportal_geoportal.views.proxy import Proxy -CACHE_REGION = get_region("std") -LOG = logging.getLogger(__name__) +_CACHE_REGION = get_region("std") +_LOG = logging.getLogger(__name__) class OGCProxy(Proxy): @@ -76,7 +76,7 @@ def __init__(self, request: pyramid.request.Request, has_default_ogc_server: boo elif not has_default_ogc_server: raise HTTPBadRequest("The querystring argument 'ogcserver' is required") - @CACHE_REGION.cache_on_arguments() + @_CACHE_REGION.cache_on_arguments() def _get_ogcserver_byname(self, name: str) -> main.OGCServer: assert DBSession is not None @@ -94,7 +94,7 @@ def _get_wms_url(self, errors: set[str]) -> Optional[Url]: ogc_server = self.ogc_server url = get_url2(f"The OGC server '{ogc_server.name}'", ogc_server.url, self.request, errors) if errors: - LOG.error("\n".join(errors)) + _LOG.error("\n".join(errors)) return url def _get_wfs_url(self, errors: set[str]) -> Optional[Url]: @@ -106,7 +106,7 @@ def _get_wfs_url(self, errors: set[str]) -> Optional[Url]: errors, ) if errors: - LOG.error("\n".join(errors)) + _LOG.error("\n".join(errors)) return url def get_headers(self) -> dict[str, str]: diff --git a/geoportal/c2cgeoportal_geoportal/views/pdfreport.py b/geoportal/c2cgeoportal_geoportal/views/pdfreport.py index 72f7e2a63e..c1c931516b 100644 --- a/geoportal/c2cgeoportal_geoportal/views/pdfreport.py +++ b/geoportal/c2cgeoportal_geoportal/views/pdfreport.py @@ -41,7 +41,7 @@ from c2cgeoportal_geoportal.lib.layers import get_private_layers, get_protected_layers from c2cgeoportal_geoportal.views.ogcproxy import OGCProxy -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class PdfReport(OGCProxy): diff --git a/geoportal/c2cgeoportal_geoportal/views/printproxy.py b/geoportal/c2cgeoportal_geoportal/views/printproxy.py index 338edddcf2..c067826748 100644 --- a/geoportal/c2cgeoportal_geoportal/views/printproxy.py +++ b/geoportal/c2cgeoportal_geoportal/views/printproxy.py @@ -1,4 +1,4 @@ -# Copyright (c) 2011-2023, Camptocamp SA +# Copyright (c) 2011-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -43,8 +43,8 @@ from c2cgeoportal_geoportal.lib.functionality import get_functionality from c2cgeoportal_geoportal.views.proxy import Proxy -LOG = logging.getLogger(__name__) -CACHE_REGION = get_region("std") +_LOG = logging.getLogger(__name__) +_CACHE_REGION = get_region("std") class PrintProxy(Proxy): @@ -71,7 +71,7 @@ def capabilities(self) -> pyramid.response.Response: response.vary += ("Referrer", "Referer") return response - @CACHE_REGION.cache_on_arguments() + @_CACHE_REGION.cache_on_arguments() def _capabilities( self, templates: list[str], query_string: dict[str, str], method: str, referrer: str ) -> tuple[requests.Response, str]: @@ -88,7 +88,7 @@ def _capabilities( try: capabilities = response.json() except json.decoder.JSONDecodeError: - LOG.exception("Unable to parse capabilities: %s", response.text) + _LOG.exception("Unable to parse capabilities: %s", response.text) raise HTTPBadGateway(response.text) # pylint: disable=raise-missing-from capabilities["layouts"] = list( diff --git a/geoportal/c2cgeoportal_geoportal/views/proxy.py b/geoportal/c2cgeoportal_geoportal/views/proxy.py index 6b5a926600..d53f732af7 100644 --- a/geoportal/c2cgeoportal_geoportal/views/proxy.py +++ b/geoportal/c2cgeoportal_geoportal/views/proxy.py @@ -1,4 +1,4 @@ -# Copyright (c) 2011-2023, Camptocamp SA +# Copyright (c) 2011-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -40,8 +40,8 @@ from c2cgeoportal_geoportal.lib.common_headers import Cache, set_common_headers from c2cgeoportal_geoportal.views import restrict_headers -LOG = logging.getLogger(__name__) -CACHE_REGION = get_region("std") +_LOG = logging.getLogger(__name__) +_CACHE_REGION = get_region("std") class Proxy: @@ -67,7 +67,7 @@ def _proxy( params = dict(self.request.params) if params is None else params url = url.clone().add_query(params, True) - LOG.debug("Send query to URL:\n%s.", url) + _LOG.debug("Send query to URL:\n%s.", url) if method is None: method = self.request.method @@ -132,7 +132,7 @@ def _proxy( if method in ("POST", "PUT") and body is not None: errors += ["--- Query with body ---", "%s"] args1.append(body.decode("utf-8")) - LOG.exception("\n".join(errors), *args1) + _LOG.exception("\n".join(errors), *args1) raise HTTPBadGateway( # pylint: disable=raise-missing-from "Error on backend, See logs for detail" @@ -164,15 +164,15 @@ def _proxy( args2.append(body.decode("utf-8")) errors += ["--- Return content ---", "%s"] args2.append(response.text) - LOG.error("\n".join(errors), *args2) + _LOG.error("\n".join(errors), *args2) raise exception_response(response.status_code) if not response.headers.get("Content-Type", "").startswith("image/"): - LOG.debug("Get result for URL: %s:\n%s.", url, body) + _LOG.debug("Get result for URL: %s:\n%s.", url, body) return response - @CACHE_REGION.cache_on_arguments() + @_CACHE_REGION.cache_on_arguments() def _proxy_cache(self, method: str, *args: Any, **kwargs: Any) -> pyramid.response.Response: # Method is only for the cache del method diff --git a/geoportal/c2cgeoportal_geoportal/views/raster.py b/geoportal/c2cgeoportal_geoportal/views/raster.py index fe0842c436..560b839e9e 100644 --- a/geoportal/c2cgeoportal_geoportal/views/raster.py +++ b/geoportal/c2cgeoportal_geoportal/views/raster.py @@ -1,4 +1,4 @@ -# Copyright (c) 2012-2023, Camptocamp SA +# Copyright (c) 2012-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -46,7 +46,7 @@ if TYPE_CHECKING: import fiona.collection -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class Raster: @@ -166,7 +166,7 @@ def get_index(index_: int) -> tuple[int, int]: result = dataset.read(1, window=(get_index(index[0]), get_index(index[1])))[0][0] result = None if result == layer.get("nodata", dataset.nodata) else result else: - LOG.debug( + _LOG.debug( "Out of index for layer: %s (%s), lon/lat: %dx%d, index: %dx%d, shape: %dx%d.", name, layer["file"], @@ -188,6 +188,6 @@ def _round(value: numpy.float32, round_to: float) -> Optional[decimal.Decimal]: try: return decimal_value.quantize(decimal.Decimal(str(round_to))) except decimal.InvalidOperation: - LOG.info("Error on rounding %s: %s", decimal_value, traceback.format_exc()) + _LOG.info("Error on rounding %s: %s", decimal_value, traceback.format_exc()) return decimal_value return None diff --git a/geoportal/c2cgeoportal_geoportal/views/resourceproxy.py b/geoportal/c2cgeoportal_geoportal/views/resourceproxy.py index fa15996c7a..3ccef3c067 100644 --- a/geoportal/c2cgeoportal_geoportal/views/resourceproxy.py +++ b/geoportal/c2cgeoportal_geoportal/views/resourceproxy.py @@ -1,4 +1,4 @@ -# Copyright (c) 2011-2023, Camptocamp SA +# Copyright (c) 2011-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -37,7 +37,7 @@ from c2cgeoportal_geoportal.lib.common_headers import Cache from c2cgeoportal_geoportal.views.proxy import Proxy -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class ResourceProxy(Proxy): @@ -69,5 +69,5 @@ def proxy(self) -> pyramid.response.Response: if header not in self.settings["allowed_headers"]: response.headers.pop(header) return response - LOG.warning("Target URL not found: %s", target) + _LOG.warning("Target URL not found: %s", target) return HTTPBadRequest("URL not allowed") diff --git a/geoportal/c2cgeoportal_geoportal/views/theme.py b/geoportal/c2cgeoportal_geoportal/views/theme.py index 96d53535c9..4daf18e008 100644 --- a/geoportal/c2cgeoportal_geoportal/views/theme.py +++ b/geoportal/c2cgeoportal_geoportal/views/theme.py @@ -67,10 +67,10 @@ from c2cgeoportal_geoportal.lib.wmstparsing import TimeInformation, parse_extent from c2cgeoportal_geoportal.views.layers import get_layer_metadata -LOG = logging.getLogger(__name__) -CACHE_REGION = get_region("std") -CACHE_OGC_SERVER_REGION = get_region("ogc-server") -TIMEOUT = int(os.environ.get("C2CGEOPORTAL_THEME_TIMEOUT", "300")) +_LOG = logging.getLogger(__name__) +_CACHE_REGION = get_region("std") +_CACHE_OGC_SERVER_REGION = get_region("ogc-server") +_TIMEOUT = int(os.environ.get("C2CGEOPORTAL_THEME_TIMEOUT", "300")) Metadata = Union[str, int, float, bool, list[Any], dict[str, Any]] @@ -80,7 +80,7 @@ async def get_http_cached( ) -> tuple[bytes, str]: """Get the content of the URL with a cache (dogpile).""" - @CACHE_OGC_SERVER_REGION.cache_on_arguments() + @_CACHE_OGC_SERVER_REGION.cache_on_arguments() def do_get_http_cached(url: str) -> tuple[bytes, str]: # This function is just used to create a cache entry raise NotImplementedError() @@ -92,10 +92,10 @@ def do_get_http_cached(url: str) -> tuple[bytes, str]: return result response = await asyncio.to_thread( - requests.get, url.strip(), headers=headers, timeout=TIMEOUT, **http_options + requests.get, url.strip(), headers=headers, timeout=_TIMEOUT, **http_options ) response.raise_for_status() - LOG.info("Get url '%s' in %.1fs.", url, response.elapsed.total_seconds()) + _LOG.info("Get url '%s' in %.1fs.", url, response.elapsed.total_seconds()) result = (response.content, response.headers.get("Content-Type", "")) # Set the result in the cache do_get_http_cached.set(result, url) # type: ignore[attr-defined] @@ -195,9 +195,9 @@ def _get_metadata_list(self, item: main.TreeItem, errors: set[str]) -> dict[str, async def _wms_getcap( self, ogc_server: main.OGCServer, preload: bool = False, cache: bool = True ) -> tuple[Optional[dict[str, dict[str, Any]]], set[str]]: - LOG.debug("Get the WMS Capabilities of %s, preload: %s, cache: %s", ogc_server.name, preload, cache) + _LOG.debug("Get the WMS Capabilities of %s, preload: %s, cache: %s", ogc_server.name, preload, cache) - @CACHE_OGC_SERVER_REGION.cache_on_arguments() + @_CACHE_OGC_SERVER_REGION.cache_on_arguments() def build_web_map_service(ogc_server_id: int) -> tuple[Optional[dict[str, dict[str, Any]]], set[str]]: del ogc_server_id # Just for cache @@ -214,7 +214,7 @@ def build_web_map_service(ogc_server_id: int) -> tuple[Optional[dict[str, dict[s "recover the themes." f"\nURL: {url}\n{content.decode() if content else None}" ) - LOG.error(error, exc_info=True) + _LOG.error(error, exc_info=True) return None, {error} wms_layers_name = list(wms.contents) for layer_name in wms_layers_name: @@ -257,7 +257,7 @@ def build_web_map_service(ogc_server_id: int) -> tuple[Optional[dict[str, dict[s f"Unable to get the WMS Capabilities for OGC server '{ogc_server.name}', " f"return the error: {exception.response.status_code} {exception.response.reason}" ) - LOG.exception(error) + _LOG.exception(error) return None, {error} if errors or preload: return None, errors @@ -289,7 +289,7 @@ async def _wms_getcap_cached( }, ) - LOG.debug("Get WMS GetCapabilities for URL: %s", url) + _LOG.debug("Get WMS GetCapabilities for URL: %s", url) headers = {} @@ -303,7 +303,7 @@ async def _wms_getcap_cached( except Exception: error = f"Unable to GetCapabilities from URL {url}" errors.add(error) - LOG.error(error, exc_info=True) + _LOG.error(error, exc_info=True) return url, None, errors # With wms 1.3 it returns text/xml also in case of error :-( @@ -316,7 +316,7 @@ async def _wms_getcap_cached( f"{content.decode()}" ) errors.add(error) - LOG.error(error) + _LOG.error(error) return url, None, errors return url, content, errors @@ -502,7 +502,7 @@ def _fill_editable(self, layer_theme: dict[str, Any], layer: main.Layer) -> set[ layer_theme["edit_columns"] = get_layer_metadata(layer) layer_theme["editable"] = True except Exception as exception: - LOG.exception(str(exception)) + _LOG.exception(str(exception)) errors.add(str(exception)) return errors @@ -603,7 +603,7 @@ def _get_ogc_servers(self, group: main.LayerGroup, depth: int) -> set[Union[str, # escape loop if depth > 30: - LOG.error("Error: too many recursions with group '%s'", group.name) + _LOG.error("Error: too many recursions with group '%s'", group.name) return ogc_servers # recurse on children @@ -875,7 +875,7 @@ async def _get_children( children.append(layer_theme) return children, errors - @CACHE_REGION.cache_on_arguments() + @_CACHE_REGION.cache_on_arguments() def _get_layers_enum(self) -> dict[str, dict[str, str]]: layers_enum = {} if "enum" in self.settings.get("layers", {}): @@ -909,7 +909,7 @@ async def _wfs_get_features_type( } ) - LOG.debug("WFS DescribeFeatureType for the URL: %s", wfs_url.url()) + _LOG.debug("WFS DescribeFeatureType for the URL: %s", wfs_url.url()) headers = {} @@ -931,7 +931,7 @@ async def _wfs_get_features_type( ) ) errors.add(error) - LOG.exception(error) + _LOG.exception(error) return None, errors except Exception: error = ( @@ -939,7 +939,7 @@ async def _wfs_get_features_type( f"OGC server {ogc_server.name}" ) errors.add(error) - LOG.exception(error) + _LOG.exception(error) return None, errors if preload: @@ -997,9 +997,9 @@ async def _preload(self, errors: set[str]) -> None: .filter(main.LayerWMS.ogc_server_id == ogc_server.id) .one() ) - LOG.debug("%i layers for OGC server '%s'", nb_layers[0], ogc_server.name) + _LOG.debug("%i layers for OGC server '%s'", nb_layers[0], ogc_server.name) if nb_layers[0] > 0: - LOG.debug("Preload OGC server '%s'", ogc_server.name) + _LOG.debug("Preload OGC server '%s'", ogc_server.name) url_internal_wfs, _, _ = self.get_url_internal_wfs(ogc_server, errors) if url_internal_wfs is not None: tasks.add(self.preload_ogc_server(ogc_server, url_internal_wfs)) @@ -1016,7 +1016,7 @@ async def preload_ogc_server( async def _get_features_attributes( self, url_internal_wfs: Url, ogc_server: main.OGCServer, cache: bool = True ) -> tuple[Optional[dict[str, dict[Any, dict[str, Any]]]], Optional[str], set[str]]: - @CACHE_OGC_SERVER_REGION.cache_on_arguments() + @_CACHE_OGC_SERVER_REGION.cache_on_arguments() def _get_features_attributes_cache( url_internal_wfs: Url, ogc_server_name: str ) -> tuple[Optional[dict[str, dict[Any, dict[str, Any]]]], Optional[str], set[str]]: @@ -1034,7 +1034,7 @@ def _get_features_attributes_cache( name = child.attrib["name"] type_namespace, type_ = child.attrib["type"].split(":") if type_namespace not in child.nsmap: - LOG.info( + _LOG.info( "The namespace '%s' of the type '%s' is not found in the " "available namespaces: %s (OGC server: %s)", type_namespace, @@ -1043,7 +1043,7 @@ def _get_features_attributes_cache( ogc_server_name, ) elif child.nsmap[type_namespace] != namespace: - LOG.info( + _LOG.info( "The namespace '%s' of the type '%s' should be '%s' (OGC server: %s).", child.nsmap[type_namespace], name, @@ -1066,7 +1066,7 @@ def _get_features_attributes_cache( type_namespace = children.nsmap[type_namespace] attrib[name]["namespace"] = type_namespace else: - LOG.info( + _LOG.info( "The namespace '%s' of the type '%s' is not found in the " "available namespaces: %s (OGC server: %s)", type_namespace, @@ -1083,7 +1083,7 @@ def _get_features_attributes_cache( if type_ in types: attributes[name] = types[type_] elif (type_ == "Character") and (name + "Type") in types: - LOG.debug( + _LOG.debug( 'Due to MapServer weird behavior when using METADATA "gml_types" "auto"' "the type 'ms:Character' is returned as type '%sType' for feature '%s'.", name, @@ -1091,7 +1091,7 @@ def _get_features_attributes_cache( ) attributes[name] = types[name + "Type"] else: - LOG.warning( + _LOG.warning( "The provided type '%s' does not exist, available types are %s.", type_, ", ".join(types.keys()), @@ -1133,14 +1133,14 @@ async def get_theme() -> dict[str, Union[dict[str, Any], list[str]]]: result: dict[str, Union[dict[str, Any], list[Any]]] = {} all_errors: set[str] = set() - LOG.debug("Start preload") + _LOG.debug("Start preload") start_time = time.time() await self._preload(all_errors) - LOG.debug("End preload") + _LOG.debug("End preload") # Don't log if it looks to be already preloaded. if (time.time() - start_time) > 1: - LOG.info("Do preload in %.3fs.", time.time() - start_time) - LOG.debug("Run garbage collection: %s", ", ".join([str(gc.collect(n)) for n in range(3)])) + _LOG.info("Do preload in %.3fs.", time.time() - start_time) + _LOG.debug("Run garbage collection: %s", ", ".join([str(gc.collect(n)) for n in range(3)])) result["ogcServers"] = {} for ogc_server in models.DBSession.query(main.OGCServer).all(): nb_layers = ( @@ -1154,7 +1154,7 @@ async def get_theme() -> dict[str, Union[dict[str, Any], list[str]]]: # QGIS Server landing page requires an OGC server that can't be used here. continue - LOG.debug("Process OGC server '%s'", ogc_server.name) + _LOG.debug("Process OGC server '%s'", ogc_server.name) url_internal_wfs, url, url_wfs = self.get_url_internal_wfs(ogc_server, all_errors) @@ -1219,10 +1219,10 @@ async def get_theme() -> dict[str, Union[dict[str, Any], list[str]]]: result["errors"] = list(all_errors) if all_errors: - LOG.info("Theme errors:\n%s", "\n".join(all_errors)) + _LOG.info("Theme errors:\n%s", "\n".join(all_errors)) return result - @CACHE_REGION.cache_on_arguments() + @_CACHE_REGION.cache_on_arguments() def get_theme_anonymous( intranet: bool, interface: str, @@ -1286,7 +1286,7 @@ def ogc_server_clear_cache_view(self) -> dict[str, Any]: f"is not the current host '{self.request.host}' " f"or part of allowed hosts: {', '.join(allowed_hosts)}" ) - LOG.debug(message) + _LOG.debug(message) raise pyramid.httpexceptions.HTTPBadRequest(message) if came_from: raise pyramid.httpexceptions.HTTPFound(location=came_from) @@ -1296,7 +1296,7 @@ def _ogc_server_clear_cache(self, ogc_server: main.OGCServer) -> None: errors: set[str] = set() url_internal_wfs, _, _ = self.get_url_internal_wfs(ogc_server, errors) if errors: - LOG.error( + _LOG.error( "Error while getting the URL of the OGC Server %s:\n%s", ogc_server.id, "\n".join(errors) ) return diff --git a/geoportal/c2cgeoportal_geoportal/views/tinyowsproxy.py b/geoportal/c2cgeoportal_geoportal/views/tinyowsproxy.py index b6df04f8d0..9edd8efb1f 100644 --- a/geoportal/c2cgeoportal_geoportal/views/tinyowsproxy.py +++ b/geoportal/c2cgeoportal_geoportal/views/tinyowsproxy.py @@ -46,7 +46,7 @@ from c2cgeoportal_geoportal.lib.layers import get_writable_layers from c2cgeoportal_geoportal.views.ogcproxy import OGCProxy -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class TinyOWSProxy(OGCProxy): @@ -87,7 +87,7 @@ def proxy(self) -> pyramid.response.Response: try: (operation, typenames_post) = self._parse_body(self.request.body) except Exception: - LOG.exception("Error while parsing POST request body") + _LOG.exception("Error while parsing POST request body") raise HTTPBadRequest( # pylint: disable=raise-missing-from "Error parsing the request (see logs for more details)" ) @@ -147,7 +147,7 @@ def _proxy_callback( errors: set[str] = set() url = super()._get_wfs_url(errors) if url is None: - LOG.error("Error getting the URL:\n%s", "\n".join(errors)) + _LOG.error("Error getting the URL:\n%s", "\n".join(errors)) raise HTTPInternalServerError() if operation == "getcapabilities": diff --git a/geoportal/c2cgeoportal_geoportal/views/vector_tiles.py b/geoportal/c2cgeoportal_geoportal/views/vector_tiles.py index a58cbf0f05..9fd929ba9e 100644 --- a/geoportal/c2cgeoportal_geoportal/views/vector_tiles.py +++ b/geoportal/c2cgeoportal_geoportal/views/vector_tiles.py @@ -38,7 +38,7 @@ from c2cgeoportal_commons.models import DBSession, main from c2cgeoportal_geoportal.lib.common_headers import Cache, set_common_headers -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class VectorTilesViews: diff --git a/geoportal/tests/functional/__init__.py b/geoportal/tests/functional/__init__.py index 12b7bc69a0..1c20f4a195 100644 --- a/geoportal/tests/functional/__init__.py +++ b/geoportal/tests/functional/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -48,7 +48,7 @@ from c2cgeoportal_commons import models from c2cgeoportal_geoportal.lib import caching -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) mapserv_url = "http://mapserver:8080/" config = None diff --git a/geoportal/tests/functional/test_authentication.py b/geoportal/tests/functional/test_authentication.py index c9eccd2e55..8e7bf01675 100644 --- a/geoportal/tests/functional/test_authentication.py +++ b/geoportal/tests/functional/test_authentication.py @@ -1,4 +1,4 @@ -# Copyright (c) 2017-2023, Camptocamp SA +# Copyright (c) 2017-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -108,7 +108,7 @@ def test_wrong_key(self): userid = policy.unauthenticated_userid(request) self.assertIsNone(userid) - @patch("c2cgeoportal_geoportal.lib.authentication.LOG.error", side_effect=Exception()) + @patch("c2cgeoportal_geoportal.lib.authentication._LOG.error", side_effect=Exception()) def test_wrong_method(self, log_mock): # pylint: disable=unused-argument """ POST requests with input named "auth" must not raise exceptions due to urllogin. diff --git a/geoportal/tests/functional/test_entry.py b/geoportal/tests/functional/test_entry.py index e0f0271a5a..8f33eb729f 100644 --- a/geoportal/tests/functional/test_entry.py +++ b/geoportal/tests/functional/test_entry.py @@ -40,7 +40,7 @@ from tests.functional import setup_db from tests.functional import teardown_common as teardown_module # noqa, pylint: disable=unused-import -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class TestEntryView(TestCase): diff --git a/geoportal/tests/functional/test_login.py b/geoportal/tests/functional/test_login.py index 62f0867b42..6d3369ef10 100644 --- a/geoportal/tests/functional/test_login.py +++ b/geoportal/tests/functional/test_login.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -41,7 +41,7 @@ from tests.functional import setup_db from tests.functional import teardown_common as teardown_module # noqa, pylint: disable=unused-import -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class TestLoginView(TestCase): diff --git a/geoportal/tests/functional/test_login_2fa.py b/geoportal/tests/functional/test_login_2fa.py index 6b6e3b346c..9b3662852e 100644 --- a/geoportal/tests/functional/test_login_2fa.py +++ b/geoportal/tests/functional/test_login_2fa.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013-2023, Camptocamp SA +# Copyright (c) 2013-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -42,7 +42,7 @@ from tests.functional import setup_db from tests.functional import teardown_common as teardown_module # noqa, pylint: disable=unused-import -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class Test2faView(TestCase): diff --git a/geoportal/tests/functional/test_oauth2.py b/geoportal/tests/functional/test_oauth2.py index 78574270fc..2e2016dd95 100644 --- a/geoportal/tests/functional/test_oauth2.py +++ b/geoportal/tests/functional/test_oauth2.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, Camptocamp SA +# Copyright (c) 2021-2024, Camptocamp SA # All rights reserved. # Redistribution and use in source and binary forms, with or without @@ -41,7 +41,7 @@ from tests.functional import setup_common as setup_module # pylint: disable=unused-import from tests.functional import teardown_common as teardown_module # pylint: disable=unused-import -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class TestLoginView(TestCase): diff --git a/geoportal/tests/functional/test_themes_entry.py b/geoportal/tests/functional/test_themes_entry.py index fc63813b1b..207edd01d3 100644 --- a/geoportal/tests/functional/test_themes_entry.py +++ b/geoportal/tests/functional/test_themes_entry.py @@ -41,7 +41,7 @@ from c2cgeoportal_geoportal.lib.caching import invalidate_region -LOG = logging.getLogger(__name__) +_LOG = logging.getLogger(__name__) class TestThemeEntryView(TestCase):