Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
talltechy committed Jul 18, 2023
1 parent 68d5358 commit 7cfc202
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 77 deletions.
18 changes: 4 additions & 14 deletions src/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
# BEGIN
"""
This package provides functions for interacting with the Rapid7 InsightVM API.
This package provides functions for interacting with different security tools and their APIs.
"""

from .rapid7 import *

__all__ = [
"load_r7_platform_api_credentials",
"get_platform_api_headers",
"load_r7_isvm_api_credentials",
"get_isvm_access_token",
"search_asset_isvm",
"get_asset_isvm",
"get_assets_isvm"
]
# END
import src.rapid7
import src.paloalto
import src.client
6 changes: 6 additions & 0 deletions src/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""
TODO: Update Summary Docstring
"""

from src.rapid7 import Rapid7
from src.paloalto import PaloAlto
1 change: 1 addition & 0 deletions src/paloalto/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
import src.paloalto.cortex_xdr
33 changes: 2 additions & 31 deletions src/paloalto/cortex_xdr/__init__.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,7 @@
"""
This module provides functions for authenticating and working with the Palo Alto Cortex XDR API.
Functions:
- load_xdr_api_credentials: loads the XDR API credentials from a configuration file
- generate_advanced_authentication: generates an advanced authentication token for the XDR API
- unisolate_endpoint: unisolates an endpoint in the XDR API
- get_endpoint_quarantine_status: gets the quarantine status of an endpoint in the XDR API
- quarantine_endpoint: quarantines an endpoint in the XDR API
- unquarantine_endpoint: unquarantines an endpoint in the XDR API
- get_endpoint_network_details: gets the network details of an endpoint in the XDR API
"""

from .api_pa_xdr_auth import (
load_xdr_api_credentials,
generate_advanced_authentication
)

from .api_pa_xdr import (
unisolate_endpoint,
get_endpoint_quarantine_status,
quarantine_endpoint,
unquarantine_endpoint,
get_endpoint_network_details,
)
from .api_pa_xdr_auth import *

__all__ = [
"load_xdr_api_credentials",
"generate_advanced_authentication",
"unisolate_endpoint",
"get_endpoint_quarantine_status",
"quarantine_endpoint",
"unquarantine_endpoint",
"get_endpoint_network_details",
"api_pa_xdr"
]
from .api_pa_xdr import *
127 changes: 127 additions & 0 deletions src/rapid7/api_r7_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""
TODO: Add docstring
"""

import collections
from typing import Any, Tuple
import logging
import urllib3
import requests
from src.rapid7.api_r7_auth_class import R7_ISVM_Auth

# Set up logging
logging.basicConfig(filename="api_r7_api.log", level=logging.ERROR)


class R7_ISVM_Api:
def __init__(
self, auth: R7_ISVM_Auth, fqdn: str, api_name: str, timeout: Tuple[int, int]
) -> None:
self.auth = auth
self.fqdn = fqdn
self.api_name = api_name
self.timeout = timeout

def _get_api_url(self, call_name: str) -> str:
"""
Returns the API URL.
Returns:
A string containing the API URL.
"""
return f"{self.auth.isvm_base_url}/api/3/{self.api_name}/{call_name}"

def _call(
self,
call_name: str,
method: str = "post",
params: dict = None,
json_value: object = None,
header_params=None,
) -> requests.Response:
"""
Calls the API with the specified parameters.
Args:
call_name: A string containing the name of the API call to make.
method: A string containing the HTTP method to use for the API call (default is "post").
params: A dictionary containing the query parameters to include in the API call (default is None).
json_value: An object containing the JSON data to include in the API call (default is None).
header_params: A dictionary containing additional headers to include in the API call (default is None).
Returns:
A requests.Response object containing the API response.
"""
if header_params is None:
header_params = {}
if params is None:
params = {}
if json_value is None:
json_value = {}
url = self._get_api_url(call_name)
headers = self.auth.get_isvm_encoded_auth_header()
self.extend_dict(headers, header_params)

return self._execute_call(
url=url,
method=method,
params=params,
json_value=json_value,
headers=headers,
)

def _execute_call(
self,
url: str,
method: str,
params: dict = None,
json_value: object = None,
headers: dict = None,
) -> requests.Response:
"""
Executes the API call.
Returns:
A requests.Response object containing the API response.
"""
response = None
if method == "get":
response = requests.get(
url, headers=headers, params=params, timeout=self.timeout
)
elif method == "post":
response = requests.post(
url, headers=headers, json=json_value, timeout=self.timeout
)
elif method == "put":
response = requests.put(
url, headers=headers, json=json_value, timeout=self.timeout
)
elif method == "delete":
response = requests.delete(url, headers=headers, timeout=self.timeout)
if response is not None:
response.raise_for_status()
else:
response = requests.Response()
return response

@staticmethod
def extend_dict(*args):
"""
Extends a dictionary with the key-value pairs from one or more dictionaries.
Args:
*args: One or more dictionaries to extend.
Returns:
A dictionary containing the extended key-value pairs.
"""
if args is not None:
if type(args[0]) is collections.OrderedDict:
result = collections.OrderedDict()
else:
result = {}
for arg in args:
result.update(arg)
return result
return {}
32 changes: 0 additions & 32 deletions src/rapid7/api_r7_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def get_platform_api_headers():

return platform_headers


def load_r7_isvm_api_credentials():
"""
Loads the Rapid7 InsightVM API credentials from environment variables.
Expand All @@ -89,37 +88,6 @@ def load_r7_isvm_api_credentials():

return isvm_api_username, isvm_api_password, isvm_base_url

def get_isvm_2fa_access_token():
"""
Generates an access token for the Rapid7 InsightVM API.
Returns:
The access token as a string.
"""
isvm_api_username, isvm_api_password, _ = load_r7_isvm_api_credentials()

# Construct the URL for the access token endpoint
token_url = f"{os.getenv('INSIGHTVM_BASE_URL')}/api/3/access-tokens"

# Make a POST request to the access token endpoint to generate a new access token
response = requests.post(
token_url,
auth=HTTPBasicAuth(isvm_api_username, isvm_api_password),
headers={"Accept": "application/json"},
timeout=10, # Add a timeout argument to avoid hanging indefinitely
verify=False # Ignore SSL errors
)

# Check if the request was successful
if response.status_code != 201:
logging.error("Failed to generate access token.")
raise ValueError("Failed to generate access token.")

# Extract the access token from the response
access_token = response.json()["token"]

return access_token

def get_isvm_basic_auth_header():
"""
Returns the Authorization header with the Base64 encoded hash of the username and password.
Expand Down
47 changes: 47 additions & 0 deletions src/rapid7/api_r7_auth_class.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""
This file contains the R7_ISVM_Auth class for handling authentication with the InsightVM API.
"""
import os
import logging
import base64
import dotenv

# Load environment variables from .env file
dotenv.load_dotenv()

# Set up logging
logging.basicConfig(filename='api_r7_auth.log', level=logging.INFO)

class R7_ISVM_Auth:
"""
A class for handling authentication with the InsightVM API.
Attributes:
isvm_api_username (str): The InsightVM API username.
isvm_api_password (str): The InsightVM API password.
isvm_base_url (str): The InsightVM API base URL.
"""

def __init__(self) -> None:
"""
Initializes the R7Auth class by loading the necessary environment variables and checking for missing credentials.
"""
self.isvm_api_username = os.environ.get('INSIGHTVM_API_USERNAME')
self.isvm_api_password = os.environ.get('INSIGHTVM_API_PASSWORD')
self.isvm_base_url = os.environ.get('INSIGHTVM_BASE_URL')

if not self.isvm_api_username or not self.isvm_api_password or not self.isvm_base_url:
logging.error("Missing ISVM API credentials or BASE URL. Please check .env file.")
raise ValueError("Missing ISVM API credentials or BASE URL. Please check .env file.")

def get_isvm_encoded_auth_header(self) -> dict[str, str]:
"""
Returns the Authorization header with the Base64 encoded hash of the username and password.
Returns:
A dictionary containing the Authorization header.
"""
auth_string = f"{self.isvm_api_username}:{self.isvm_api_password}"
encoded_auth_string = base64.b64encode(auth_string.encode()).decode()
auth_headers = {"Authorization": f"Basic {encoded_auth_string}"}
return auth_headers

0 comments on commit 7cfc202

Please sign in to comment.