Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Jan 3, 2024
1 parent b56a736 commit 305d247
Showing 1 changed file with 44 additions and 38 deletions.
82 changes: 44 additions & 38 deletions jazzband/hookserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
from flask import request
from werkzeug.exceptions import BadRequest, Forbidden, ServiceUnavailable

__author__ = 'Nick Frost'
__version__ = '1.1.0'
__license__ = 'MIT'
__author__ = "Nick Frost"
__version__ = "1.1.0"
__license__ = "MIT"


class Hooks(object):
Expand All @@ -31,52 +31,52 @@ class Hooks(object):
:param url: the url that events will be posted to
"""

def __init__(self, app=None, url='/hooks'):
def __init__(self, app=None, url="/hooks"):
"""Initialize the extension."""
self._hooks = {}
if app is not None:
self.init_app(app, url=url)

def init_app(self, app, url='/hooks'):
def init_app(self, app, url="/hooks"):
"""Register the URL route to the application.
:param app: the optional :class:`~flask.Flask` instance to
register the extension
:param url: the url that events will be posted to
"""
app.config.setdefault('VALIDATE_IP', True)
app.config.setdefault('VALIDATE_SIGNATURE', True)
app.config.setdefault("VALIDATE_IP", True)
app.config.setdefault("VALIDATE_SIGNATURE", True)

@app.route(url, methods=['POST'])
@app.route(url, methods=["POST"])
def hook():
if app.config['VALIDATE_IP']:
if app.config["VALIDATE_IP"]:
if not is_github_ip(request.remote_addr):
raise Forbidden('Requests must originate from GitHub')
raise Forbidden("Requests must originate from GitHub")

if app.config['VALIDATE_SIGNATURE']:
key = app.config.get('GITHUB_WEBHOOKS_KEY', app.secret_key)
signature = request.headers.get('X-Hub-Signature')
if app.config["VALIDATE_SIGNATURE"]:
key = app.config.get("GITHUB_WEBHOOKS_KEY", app.secret_key)
signature = request.headers.get("X-Hub-Signature")

if hasattr(request, 'get_data'):
if hasattr(request, "get_data"):
# Werkzeug >= 0.9
payload = request.get_data()
else:
payload = request.data

if not signature:
raise BadRequest('Missing signature')
raise BadRequest("Missing signature")

if not check_signature(signature, key, payload):
raise BadRequest('Wrong signature')
raise BadRequest("Wrong signature")

event = request.headers.get('X-GitHub-Event')
guid = request.headers.get('X-GitHub-Delivery')
event = request.headers.get("X-GitHub-Event")
guid = request.headers.get("X-GitHub-Delivery")
if not event:
raise BadRequest('Missing header: X-GitHub-Event')
raise BadRequest("Missing header: X-GitHub-Event")
elif not guid:
raise BadRequest('Missing header: X-GitHub-Delivery')
raise BadRequest("Missing header: X-GitHub-Delivery")

if hasattr(request, 'get_json'):
if hasattr(request, "get_json"):
# Flask >= 0.10
data = request.get_json()
else:
Expand All @@ -85,23 +85,25 @@ def hook():
if event in self._hooks:
return self._hooks[event](data, guid)
else:
return 'Hook not used\n'
return "Hook not used\n"

def register_hook(self, hook_name, fn):
"""Register a function to be called on a GitHub event."""
if hook_name not in self._hooks:
self._hooks[hook_name] = fn
else:
raise Exception('%s hook already registered' % hook_name)
raise Exception("%s hook already registered" % hook_name)

def hook(self, hook_name):
"""A decorator that's used to register a new hook handler.
:param hook_name: the event to handle
"""

def wrapper(fn):
self.register_hook(hook_name, fn)
return fn

return wrapper


Expand All @@ -121,16 +123,18 @@ def __init__(self, timeout):

def __call__(self, fn):
"""Create the wrapped function."""

@wraps(fn)
def inner(*args, **kwargs):
if self.last is None or time.time() - self.last > self.timeout:
self.cache = fn(*args, **kwargs)
self.last = time.time()
return self.cache

return inner


def _load_github_hooks(github_url='https://api.github.com'):
def _load_github_hooks(github_url="https://api.github.com"):
"""Request GitHub's IP block from their API.
Return the IP network.
Expand All @@ -141,20 +145,22 @@ def _load_github_hooks(github_url='https://api.github.com'):
If something else goes wrong, raise a generic 503.
"""
try:
resp = requests.get(github_url + '/meta')
resp = requests.get(github_url + "/meta")
if resp.status_code == 200:
return resp.json()['hooks']
return resp.json()["hooks"]
else:
if resp.headers.get('X-RateLimit-Remaining') == '0':
reset_ts = int(resp.headers['X-RateLimit-Reset'])
reset_string = time.strftime('%a, %d %b %Y %H:%M:%S GMT',
time.gmtime(reset_ts))
raise ServiceUnavailable('Rate limited from GitHub until ' +
reset_string)
if resp.headers.get("X-RateLimit-Remaining") == "0":
reset_ts = int(resp.headers["X-RateLimit-Reset"])
reset_string = time.strftime(
"%a, %d %b %Y %H:%M:%S GMT", time.gmtime(reset_ts)
)
raise ServiceUnavailable(
"Rate limited from GitHub until " + reset_string
)
else:
raise ServiceUnavailable('Error reaching GitHub')
raise ServiceUnavailable("Error reaching GitHub")
except (KeyError, ValueError, requests.exceptions.ConnectionError):
raise ServiceUnavailable('Error reaching GitHub')
raise ServiceUnavailable("Error reaching GitHub")


# So we don't get rate limited
Expand All @@ -178,15 +184,15 @@ def is_github_ip(ip_str):

def check_signature(signature, key, data):
"""Compute the HMAC signature and test against a given hash."""
if isinstance(key, type(u'')):
if isinstance(key, type("")):
key = key.encode()

digest = 'sha1=' + hmac.new(key, data, hashlib.sha1).hexdigest()
digest = "sha1=" + hmac.new(key, data, hashlib.sha1).hexdigest()

# Covert everything to byte sequences
if isinstance(digest, type(u'')):
if isinstance(digest, type("")):
digest = digest.encode()
if isinstance(signature, type(u'')):
if isinstance(signature, type("")):
signature = signature.encode()

return werkzeug.security.safe_str_cmp(digest, signature)

0 comments on commit 305d247

Please sign in to comment.