Skip to content

Commit

Permalink
feat(gcp): updated + enhanced gcp estimation script (#45)
Browse files Browse the repository at this point in the history
* feat(gcp): updated + enhanced gcp estimation script

* chore(gcp): lint updates

* chore(gcp): update requirements file for new changes

* feat(gcp): add support for cloud run jobs

This commit adds the logic/support for cloud run jobs. It also ensures
that the failed service call returns a uniq set before writing to file
to prevent duplicates.

* feat(gcp): get count of active autopilot nodes

Clean up messaging, clean up logging, add node count, and clean up
output such as table headers.

* chore(gcp): update warnings message for disabled api calls

* chore(gcp): reword the exception file to be inline with other file naming
  • Loading branch information
carlosmmatos committed Jul 29, 2024
1 parent e5d9de4 commit 441a924
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 47 deletions.
213 changes: 166 additions & 47 deletions GCP/gcp_cspm_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,24 @@

import csv
import logging
import os
from functools import cached_property
from typing import List, Dict, Any
from tabulate import tabulate
import google.api_core.exceptions
from google.cloud.resourcemanager import ProjectsClient
from google.cloud.resourcemanager_v3.types import Project
from google.cloud import compute
from googleapiclient import discovery
from googleapiclient.errors import HttpError

# Suppress gRPC and absl logs
os.environ['GRPC_VERBOSITY'] = 'ERROR'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

LOG_LEVEL = logging.INFO
# Configuration for logging
LOG_LEVEL = logging.DEBUG
log = logging.getLogger('azure')
log = logging.getLogger('gcp')
log.setLevel(LOG_LEVEL)
ch = logging.StreamHandler()
ch.setLevel(LOG_LEVEL)
Expand All @@ -27,94 +34,206 @@


class GCP:
def projects(self):
def projects(self) -> List[Project]:
return ProjectsClient().search_projects()

def vms(self, project_id):
pass

def list_instances(self, project_id):
def list_instances(self, project_id: str):
request = compute.AggregatedListInstancesRequest(max_results=50, project=project_id)
return self.instances_client.aggregated_list(request=request)

def clusters(self, project_id):
def clusters(self, project_id: str) -> List[Dict[str, Any]]:
service = discovery.build('container', 'v1')
endpoint = service.projects().zones().clusters() # pylint: disable=no-member
request = endpoint.list(projectId=project_id, zone='-')
response = request.execute()
return response.get('clusters', [])

def list_cloud_run_services(self, project_id: str) -> List[Dict[str, Any]]:
service = discovery.build('run', 'v1')
parent = f"projects/{project_id}/locations/-"
request = service.projects().locations().services().list(parent=parent) # pylint: disable=no-member
response = request.execute()
return response.get('items', [])

def list_cloud_run_jobs(self, project_id: str) -> List[Dict[str, Any]]:
service = discovery.build('run', 'v1')
parent = f'namespaces/{project_id}'
request = service.namespaces().jobs().list(parent=parent) # pylint: disable=no-member
response = request.execute()
return response.get('items', [])

@cached_property
def instances_client(self):
def instances_client(self) -> compute.InstancesClient:
return compute.InstancesClient()

@classmethod
def is_vm_kubenode(cls, instance):
return any(k.key for k in instance.metadata.items if k.key == 'kubeconfig')
def is_vm_kubenode(cls, instance: compute.Instance) -> bool:
return any(k.key == 'kubeconfig' for k in instance.metadata.items)

@classmethod
def is_vm_running(cls, instance):
def is_vm_running(cls, instance: compute.Instance) -> bool:
return instance.status != 'TERMINATED'

@classmethod
def is_cluster_autopilot(cls, cluster):
def is_cluster_autopilot(cls, cluster: Dict[str, Any]) -> bool:
return cluster.get('autopilot', {}).get('enabled', False)

@classmethod
def get_autopilot_active_nodes(cls, cluster: Dict[str, Any]) -> int:
return cluster.get('currentNodeCount', 0)


def process_gcp_project(project): # pylint: disable=redefined-outer-name
if project.state == Project.State.DELETE_REQUESTED:
log.debug("Skipping GCP project %s (project pending deletion)", project.display_name)
def process_gcp_project(gcp_project: Project) -> Dict[str, Any]:
if gcp_project.state == Project.State.DELETE_REQUESTED:
log.info("Skipping GCP project %s (project pending deletion)", gcp_project.display_name)
return {}

result = {'project_id': project.project_id,
'kubenodes_running': 0, 'kubenodes_terminated': 0,
'vms_running': 0, 'vms_terminated': 0}
log.info("Exploring GCP project: %s", project.display_name)
result = {
'project_id': gcp_project.project_id,
'kubenodes_running': 0, 'kubenodes_terminated': 0,
'vms_running': 0, 'vms_terminated': 0,
'autopilot_clusters': 0, 'autopilot_nodes': 0,
'cloud_run_services': 0, 'cloud_run_jobs': 0
}

try:
# (1) Process GKE clusters
for cluster in gcp.clusters(project.project_id):
if GCP.is_cluster_autopilot(cluster):
log.error("Skipping GKE Autopilot cluster %s in project: %s", cluster['name'], project.display_name)

# (2) Process instances
for _zone, response in gcp.list_instances(project.project_id):
if response.instances:
for instance in response.instances:
typ = 'kubenode' if GCP.is_vm_kubenode(instance) else 'vm'
state = 'running' if GCP.is_vm_running(instance) else 'terminated'
key = f"{typ}s_{state}"
result[key] += 1
log.info("Processing GCP project: %s", gcp_project.display_name)

except google.api_core.exceptions.Forbidden as exc:
log.error("ERROR: cannot explore project: %s: %s", project.display_name, exc)
fail_safe(count_instances, gcp_project, result)
fail_safe(count_autopilot_clusters, gcp_project, result)
fail_safe(count_cloud_run_services, gcp_project, result)
fail_safe(count_cloud_run_jobs, gcp_project, result)

return result


def fail_safe(count_func, *args) -> None:
try:
count_func(*args)
except google.api_core.exceptions.Forbidden as exc:
if 'Compute Engine API has not been used' in str(exc):
log_warning('compute.googleapis.com', project.display_name)
add_message(project.project_id, exc.errors[0]['message'])
else:
log.error("Unexpected error for project: %s: %s", project.display_name, exc)
except HttpError as exc:
if exc.status_code == 403 and 'SERVICE_DISABLED' in str(exc):
log_warning(get_service_disabled_name(exc), project.display_name)
add_message(project.project_id, exc.reason)
else:
log.error("Unexpected error for project: %s: %s", project.display_name, exc)
except Exception as exc: # pylint: disable=broad-except
log.error("Unexpected error for project: %s: %s", project.display_name, exc)


def log_warning(api: str, project_name: str) -> None:
api_names = {
'compute.googleapis.com': 'Compute Engine',
'container.googleapis.com': 'Kubernetes Engine',
'run.googleapis.com': 'Cloud Run',
}
message = f"Unable to process {api_names[api]} API for project: {project_name}."
log.warning(message)


def add_message(project_id: str, message: str) -> None:
if project_id not in service_disabled_calls:
service_disabled_calls[project_id] = []
service_disabled_calls[project_id].append(message)


def get_service_disabled_name(exc: HttpError) -> str:
for detail in exc.error_details:
if detail.get('@type') == 'type.googleapis.com/google.rpc.ErrorInfo':
return detail['metadata']['service']
return None


def count_autopilot_clusters(gcp_project: Project, result: Dict[str, int]):
for cluster in gcp.clusters(gcp_project.project_id):
if GCP.is_cluster_autopilot(cluster):
result['autopilot_clusters'] += 1
result['autopilot_nodes'] += GCP.get_autopilot_active_nodes(cluster)


def count_instances(gcp_project: Project, result: Dict[str, int]):
for _zone, response in gcp.list_instances(gcp_project.project_id):
if response.instances:
for instance in response.instances:
typ = 'kubenode' if GCP.is_vm_kubenode(instance) else 'vm'
state = 'running' if GCP.is_vm_running(instance) else 'terminated'
key = f"{typ}s_{state}"
result[key] += 1


def count_cloud_run_services(gcp_project: Project, result: Dict[str, int]):
services = gcp.list_cloud_run_services(gcp_project.project_id)
result['cloud_run_services'] = len(services)


def count_cloud_run_jobs(gcp_project: Project, result: Dict[str, int]):
jobs = gcp.list_cloud_run_jobs(gcp_project.project_id)
result['cloud_run_jobs'] = len(jobs)


data = []
totals = {'project_id': 'totals',
'kubenodes_running': 0, 'kubenodes_terminated': 0,
'vms_running': 0, 'vms_terminated': 0}
service_disabled_calls = {}
headers = {
'project_id': 'Project ID',
'kubenodes_running': 'K8s Nodes (Running)',
'kubenodes_terminated': 'K8s Nodes (Terminated)',
'vms_running': 'VMs (Running)',
'vms_terminated': 'VMs (Terminated)',
'autopilot_clusters': 'Autopilot Clusters',
'autopilot_nodes': 'Autopilot Nodes (Running)',
'cloud_run_services': 'Cloud Run Services',
'cloud_run_jobs': 'Cloud Run Jobs'
}
totals = {
'project_id': 'totals',
'kubenodes_running': 0, 'kubenodes_terminated': 0,
'vms_running': 0, 'vms_terminated': 0,
'autopilot_clusters': 0, 'autopilot_nodes': 0,
'cloud_run_services': 0, 'cloud_run_jobs': 0
}

gcp = GCP()

projects = gcp.projects()
if not projects:
log.error("No GCP projects found")
exit(1) # pylint: disable=consider-using-sys-exit

for project in gcp.projects():
row = process_gcp_project(project)
if row:
data.append(row)
for k in totals:
if k == 'project_id':
continue

totals[k] += row[k]

if k != 'project_id':
totals[k] += row[k]

data.append(totals)

headers = ['project_id', 'kubenodes_running', 'kubenodes_terminated', 'vms_running', 'vms_terminated']
# Output our results
print(tabulate(data, headers=headers, tablefmt="grid", maxheadercolwidths=[10, 15, 15, 10, 15, 15, 15, 15, 12]))

with open('gcp-benchmark.csv', 'w', newline='', encoding='utf-8') as csv_file:
csv_writer = csv.DictWriter(csv_file, fieldnames=headers)
csv_writer = csv.DictWriter(csv_file, fieldnames=headers.keys())
csv_writer.writeheader()
csv_writer.writerows(data)

log.info("CSV summary has been exported to ./gcp-benchmark.csv file")
log.info("CSV file saved to: ./gcp-benchmark.csv")

if service_disabled_calls:
MSG = (
"Some API service calls were disabled in certain projects, preventing data processing. "
"These APIs might be intentionally disabled in your environment. "
"Details have been captured and saved to: ./gcp-exceptions.txt for your review."
)
log.warning(MSG)

with open('gcp-exceptions.txt', 'w', encoding='utf-8') as f:
for project, messages in service_disabled_calls.items():
f.write(f"Project ID: {project}\n")
for msg in set(messages):
f.write(f"- {msg}\n")
f.write('\n')
2 changes: 2 additions & 0 deletions GCP/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
google-cloud-compute
google-cloud-run
google-cloud-resource-manager
google-api-python-client
oauth2client
tabulate
7 changes: 7 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# General flake8, pylint settings
[flake8]
max-line-length = 120
max-complexity = 10

[pylint.MASTER]
disable=C0301,C0116,C0115,C0114

0 comments on commit 441a924

Please sign in to comment.