-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alert - Support for authentication when calling receiver endpoint. #560
base: develop
Are you sure you want to change the base?
Changes from 9 commits
c39273f
1fa0693
bd88ef3
1c3d61e
3de5ef9
51ea8a4
9a10f8f
d771c5e
7613ba3
3d383a5
7957567
f96b907
be8c718
cec1a0f
8050102
c04c902
d98c6bd
a34506d
14f58cd
dffbd88
b3a54c0
0f26f6d
0a8b73b
b207360
826c78e
f4d02bb
6aa67ab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,3 @@ | ||
from abc import abstractmethod | ||
import json | ||
import logging | ||
import os | ||
|
@@ -58,48 +57,49 @@ class AlertReceiverAuthentication: | |
authentication_scheme: str = None | ||
|
||
class AlertReceiverAuthenticationInterface: | ||
def __init__(self, alert_receiver_config: dict, authentication_key: str): | ||
self.authentication_config = alert_receiver_config.get(authentication_key) | ||
|
||
if self.authentication_config is None: | ||
raise ConfigurationError( | ||
f"No authentication configuration found ({authentication_key})." | ||
def __init__(self, alert_receiver_config: dict, authentication_config_key: str): | ||
if authentication_config_key is not None: | ||
self.authentication_config = alert_receiver_config.get( | ||
authentication_config_key | ||
) | ||
|
||
self.authentication_scheme = self.authentication_config.get( | ||
"authentication_scheme", self.authentication_scheme | ||
) | ||
self._validate_authentication_scheme() | ||
if self.authentication_config is None: | ||
raise ConfigurationError( | ||
"No authentication configuration found for dictionary key:" | ||
f"{authentication_config_key}." | ||
) | ||
|
||
self.authentication_scheme = self.authentication_config.get( | ||
"authentication_scheme", self.authentication_scheme | ||
) | ||
Comment on lines
+72
to
+74
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the second argument here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually it should take the default value set in the classes that inherited the base. This is a very bad coded concept, but it should work: class Base:
def __init__(self):
self.surname = self.name if self.name is not None else 'Base'
class A(Base):
name = "A"
def __init__(self):
super().__init__()
class B(Base):
name = "B"
def __init__(self):
super().__init__()
a = A()
print(a.surname)
b = B()
print(b.surname) Which prints: A
B A proper nit-picking review is a great opportunity to improve my coding skills ;) |
||
self._validate_authentication_scheme() | ||
|
||
def _validate_authentication_scheme(self) -> None: | ||
if not self.authentication_scheme: | ||
raise ConfigurationError( | ||
"The authentication scheme cannot be null or empty." | ||
) | ||
|
||
if " " in self.authentication_scheme: | ||
# check if self.authentication_scheme contains only letters | ||
if not self.authentication_scheme.isalpha(): | ||
raise ConfigurationError( | ||
"The authentication scheme cannot contain any space." | ||
"The authentication scheme must contain only letters." | ||
) | ||
|
||
@abstractmethod | ||
def get_header(self) -> dict: | ||
pass | ||
return {} | ||
|
||
class AlertReceiverNoneAuthentication(AlertReceiverAuthenticationInterface): | ||
""" | ||
Placeholder class for AlertReceiver without authentication. | ||
""" | ||
|
||
def __init__(self, alert_receiver_config: dict): | ||
pass | ||
|
||
def get_header(self) -> dict: | ||
return {} | ||
super().__init__(alert_receiver_config, None) | ||
|
||
class AlertReceiverBasicAuthentication(AlertReceiverAuthenticationInterface): | ||
""" | ||
Class to store authentication information for basic authentication type with username and password. | ||
Class to store authentication information for basic authentication type | ||
with username and password. | ||
""" | ||
|
||
username: str | ||
|
@@ -124,7 +124,8 @@ def __init__(self, alert_receiver_config: dict): | |
|
||
if self.username is None or self.password is None: | ||
raise ConfigurationError( | ||
f"No username or password found from environmental variables {username_env} and {password_env}." | ||
"No username or password found from environmental variables " | ||
f"{username_env} and {password_env}." | ||
) | ||
|
||
def get_header(self) -> dict: | ||
|
@@ -169,14 +170,16 @@ def __init__(self, alert_receiver_config: dict): | |
) | ||
else: | ||
try: | ||
with open(token_file, "r") as token_file: | ||
with open(token_file, "r", encoding="utf-8") as token_file: | ||
self.token = token_file.read() | ||
except FileNotFoundError: | ||
raise ConfigurationError(f"No token file found at {token_file}.") | ||
except FileNotFoundError as err: | ||
raise ConfigurationError( | ||
f"No token file found at {token_file}." | ||
) from err | ||
except Exception as err: | ||
raise ConfigurationError( | ||
f"An error occurred while loading the token file {token_file}: {str(err)}" | ||
) | ||
) from err | ||
|
||
def get_header(self) -> dict: | ||
return {"Authorization": f"{self.authentication_scheme} {self.token}"} | ||
|
@@ -196,16 +199,15 @@ def __init__(self, alert_receiver_config: dict): | |
self.__init_authentication_instance(alert_receiver_config) | ||
|
||
def __init_authentication_instance(self, alert_receiver_config: dict): | ||
authentication_class = self.__get_authentication_class() | ||
self._authentication_instance = authentication_class(alert_receiver_config) | ||
|
||
def __get_authentication_class(self): | ||
if self.authentication_type not in AlertReceiverAuthentication.init_map.keys(): | ||
try: | ||
self._authentication_instance = self.init_map[self.authentication_type]( | ||
alert_receiver_config | ||
) | ||
except KeyError as err: | ||
raise ConfigurationError( | ||
f"No authentication type found. Valid values are {list(AlertReceiverAuthentication.init_map.keys())}" | ||
) # hopefully this never happens | ||
|
||
return self.init_map.get(self.authentication_type) | ||
"No authentication type found. Valid values are " | ||
f"{list(AlertReceiverAuthentication.init_map.keys())}" | ||
) from err # hopefully this never happens | ||
|
||
def get_auth_header(self) -> dict: | ||
return self._authentication_instance.get_header() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, you can also
try: self.authentication_config = alert_receiver_config[authentication_config_key]
, and then raise the error in theexcept:
block.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed also other places where EAFP can be applied.