Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Date filter, logging and types #26

Merged
merged 6 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
name: Run pytest on Pull Request

on: [push]
on:
push:
pull_request:

jobs:
test:
Expand Down
49 changes: 49 additions & 0 deletions fmatch/logrus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Logger as a common package
"""

import logging
import sys


class SingletonLogger:
Copy link
Contributor

@vishnuchalla vishnuchalla Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a singleton class here. It just as simple as updating a config.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion. The thought behind this was as follows: having a logger setup in Matcher class created duplicate logs of the same log-output, especially in daemon mode where multiple Matcher objects would be created as when the request would come in and multiple formatters were being attached to the logger which led to duplicate logs. To mitigate this a temporary solution was to check if the logger existed previously. I transferred this logic to a class so that it can be used in Orion as well. The SingletonLogger here also updates the config using methods and also keeps track of only having a single logger.

Copy link
Contributor

@vishnuchalla vishnuchalla Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging is singleton by default. All I am saying is, I think we need not write a custom logging class instead of just using the existing ones.

"""Singleton logger to set logging at one single place

Returns:
_type_: _description_
"""

instance = {}

def __new__(cls, debug: int, name: str):
if (not cls.instance) or name not in cls.instance:
cls.instance[name] = cls._initialize_logger(debug,name)
return cls.instance[name]

@staticmethod
def _initialize_logger(debug: int, name: str) -> logging.Logger:
level = debug # if debug else logging.INFO
logger = logging.getLogger(name)
logger.propagate=False
if not logger.hasHandlers():
logger.setLevel(level)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
formatter = logging.Formatter(
"%(asctime)s - %(name)-10s - %(levelname)s - file: %(filename)s - line: %(lineno)d - %(message)s" # pylint: disable = line-too-long
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger

@classmethod
def getLogger(cls, name:str) -> logging.Logger:
"""Return logger in instance

Args:
name (str): name of the logger

Returns:
logging.Logger: logger
"""
return cls.instance.get(name, None)
106 changes: 65 additions & 41 deletions fmatch/matcher.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
""" metadata matcher
"""

# pylint: disable = invalid-name, invalid-unary-operand-type
# pylint: disable = invalid-name, invalid-unary-operand-type, no-member
import os
import sys
import logging
from datetime import datetime
from typing import List, Dict, Any

# pylint: disable=import-error
from elasticsearch import Elasticsearch
Expand All @@ -13,35 +14,28 @@
# pylint: disable=import-error
import pandas as pd
from elasticsearch_dsl import Search, Q
from elasticsearch_dsl.response import Response
from fmatch.logrus import SingletonLogger


class Matcher:
"""Matcher"""

def __init__(
self, index="ospst-perf-scale-ci",
level=logging.INFO,
ES_URL=os.getenv("ES_SERVER"),
verify_certs=True
self,
index: str ="ospst-perf-scale-ci",
level: int =logging.INFO,
ES_URL: str =os.getenv("ES_SERVER"),
verify_certs: bool =True,
):
self.index = index
self.es_url = ES_URL
self.search_size = 10000
self.logger = logging.getLogger("Matcher")
self.logger.setLevel(level)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
formatter = logging.Formatter(
"%(asctime)s [%(name)s:%(filename)s:%(lineno)d] %(levelname)s: %(message)s"
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
# We can set the ES logging higher if we want additional debugging
logging.getLogger("elasticsearch").setLevel(logging.WARN)
self.logger = SingletonLogger(debug=level, name="Matcher")
self.es = Elasticsearch([self.es_url], timeout=30, verify_certs=verify_certs)
self.data = None

def get_metadata_by_uuid(self, uuid, index=None):
def get_metadata_by_uuid(self, uuid: str, index: str = None) -> dict:
"""Returns back metadata when uuid is given

Args:
Expand All @@ -62,7 +56,7 @@ def get_metadata_by_uuid(self, uuid, index=None):
result = dict(hits[0].to_dict()["_source"])
return result

def query_index(self, index, search):
def query_index(self, index: str, search: Search) -> Response:
"""generic query function

Args:
Expand All @@ -73,33 +67,50 @@ def query_index(self, index, search):
self.logger.debug("Executing query \r\n%s", search.to_dict())
return search.execute()

def get_uuid_by_metadata(self, meta, index=None):
def get_uuid_by_metadata(
self, meta: Dict[str, Any], index: str = None, lookback_date: datetime = None
) -> List[Dict[str, str]]:
"""get_uuid_by_metadata"""
if index is None:
index = self.index
version = meta["ocpVersion"][:4]

must_clause = [
(
Q("match", **{field: str(value)})
if isinstance(value, str)
else Q("match", **{field: value})
)
for field, value in meta.items()
if field not in "ocpVersion"
]

filter_clause = [
Q("wildcard", ocpVersion=f"{version}*"),
Q("match", jobStatus="success"),
]
if isinstance(lookback_date, datetime):
lookback_date = lookback_date.strftime("%Y-%m-%dT%H:%M:%SZ")
if lookback_date:
filter_clause.append(Q("range", timestamp={"gt": lookback_date}))
query = Q(
"bool",
must=[
Q(
"match", **{field: str(value)}
) if isinstance(value, str) else Q('match', **{field: value})
for field, value in meta.items()
if field not in "ocpVersion"
],
filter=[
Q("wildcard", ocpVersion=f"{version}*"),
Q("match", jobStatus="success"),
],
must=must_clause,
filter=filter_clause,
)
s = Search(using=self.es, index=index).query(query).extra(size=self.search_size)
result = self.query_index(index, s)
hits = result.hits.hits
uuids_docs = [{ "uuid":hit.to_dict()["_source"]["uuid"],
"buildUrl":hit.to_dict()["_source"]["buildUrl"]} for hit in hits]
uuids_docs = [
{
"uuid": hit.to_dict()["_source"]["uuid"],
"buildUrl": hit.to_dict()["_source"]["buildUrl"],
}
for hit in hits
]
return uuids_docs

def match_kube_burner(self, uuids, index):
def match_kube_burner(self, uuids: List[str], index: str) -> List[Dict[str, Any]]:
"""match kube burner runs
Args:
uuids (list): list of uuids
Expand All @@ -121,7 +132,7 @@ def match_kube_burner(self, uuids, index):
runs = [item.to_dict()["_source"] for item in result.hits.hits]
return runs

def filter_runs(self, pdata, data):
def filter_runs(self, pdata: Dict[Any, Any], data: Dict[Any, Any]) -> List[str]:
"""filter out runs with different jobIterations
Args:
pdata (_type_): _description_
Expand All @@ -138,7 +149,9 @@ def filter_runs(self, pdata, data):
ids_df = ndf.loc[df["jobConfig.jobIterations"] == iterations]
return ids_df["uuid"].to_list()

def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict):
def getResults(
self, uuid: str, uuids: List[str], index_str: str, metrics: Dict[str, Any]
) -> Dict[Any, Any]:
"""
Get results of elasticsearch data query based on uuid(s) and defined metrics

Expand All @@ -156,7 +169,7 @@ def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict):
metric_queries = []
not_queries = [
~Q("match", **{not_item_key: not_item_value})
for not_item_key, not_item_value in metrics.get("not",{}).items()
for not_item_key, not_item_value in metrics.get("not", {}).items()
]
metric_queries = [
Q("match", **{metric_key: metric_value})
Expand All @@ -180,7 +193,9 @@ def getResults(self, uuid: str, uuids: list, index_str: str, metrics: dict):
runs = [item.to_dict()["_source"] for item in result.hits.hits]
return runs

def get_agg_metric_query(self, uuids, index, metrics):
def get_agg_metric_query(
self, uuids: List[str], index: str, metrics: Dict[str, Any]
):
"""burner_metric_query will query for specific metrics data.

Args:
Expand Down Expand Up @@ -222,7 +237,9 @@ def get_agg_metric_query(self, uuids, index, metrics):
data = self.parse_agg_results(result, agg_value, agg_type)
return data

def parse_agg_results(self, data: dict, agg_value, agg_type):
def parse_agg_results(
self, data: Dict[Any, Any], agg_value: str, agg_type: str
) -> List[Dict[Any, Any]]:
"""parse out CPU data from kube-burner query
Args:
data (dict): Aggregated data from Elasticsearch DSL query
Expand All @@ -249,7 +266,9 @@ def parse_agg_results(self, data: dict, agg_value, agg_type):
res.append(dat)
return res

def convert_to_df(self, data, columns=None):
def convert_to_df(
self, data: Dict[Any, Any], columns: List[str] = None
) -> pd.DataFrame:
"""convert to a dataframe
Args:
data (_type_): _description_
Expand All @@ -263,7 +282,12 @@ def convert_to_df(self, data, columns=None):
odf = pd.DataFrame(odf, columns=columns)
return odf

def save_results(self, df, csv_file_path="output.csv", columns=None):
def save_results(
self,
df: pd.DataFrame,
csv_file_path: str = "output.csv",
columns: List[str] = None,
) -> None:
"""write results to CSV
Args:
df (_type_): _description_
Expand Down
24 changes: 24 additions & 0 deletions fmatch/tests/test_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# pylint: disable = import-error, duplicate-code
import os
from unittest.mock import patch
import datetime

from elasticsearch_dsl import Search
from elasticsearch_dsl.response import Response
Expand Down Expand Up @@ -76,6 +77,29 @@ def test_get_uuid_by_metadata(matcher_instance):
"buildUrl":"buildUrl1"}]
assert result == expected

def test_get_uuid_by_metadata_lookback(matcher_instance):
matcher_instance.es.search = lambda *args, **kwargs: {
"hits": {
"hits": [{"_source": {"uuid": "uuid1",
"buildUrl":"buildUrl1",
"timestamp":"2024-07-10T13:46:24Z"}},
{"_source": {"uuid": "uuid2",
"buildUrl":"buildUrl1",
"timestamp":"2024-07-08T13:46:24Z"}}]
}
}
meta = {
"field1": "value1",
"ocpVersion": "4.15",
}
date= datetime.datetime.strptime("2024-07-07T13:46:24Z","%Y-%m-%dT%H:%M:%SZ")
result = matcher_instance.get_uuid_by_metadata(meta=meta, lookback_date=date)
expected= [{"uuid": "uuid1",
"buildUrl":"buildUrl1"},
{"uuid": "uuid2",
"buildUrl":"buildUrl1"}]
assert result == expected


def test_match_kube_burner(matcher_instance):
result = matcher_instance.match_kube_burner(["uuid1"],index="ospst-*")
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from setuptools import setup, find_packages


VERSION = '0.0.7'
VERSION = '0.0.8'
DESCRIPTION = 'Common package for matching runs with provided metadata'
# pylint: disable= line-too-long
LONG_DESCRIPTION = "A package that allows to match metadata and get runs and create csv files with queried metrics"
Expand Down
Loading