Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Digest-Authentication to wfs and wms probes by using Authentica… #475

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions GeoHealthCheck/plugins/probe/wfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from GeoHealthCheck.util import transform_bbox
from owslib.wfs import WebFeatureService

from owslib.util import Authentication
from requests.auth import HTTPDigestAuth


class WfsGetFeatureBbox(Probe):
"""
Expand Down Expand Up @@ -127,9 +130,22 @@ def get_metadata(self, resource, version='1.1.0'):
:param version:
:return: Metadata object
"""
return WebFeatureService(resource.url,
if resource.auth and resource.auth['type'] == 'Digest':
headers = self.get_request_headers()
if headers and 'auth' in headers:
del headers['auth']
username = resource.auth['data']['username']
password = resource.auth['data']['password']
auth = Authentication(auth_delegate=HTTPDigestAuth(username, password))
return WebFeatureService(resource.url,
version=version,
headers=self.get_request_headers())
headers=headers,
auth=auth)
else:
return WebFeatureService(resource.url,
version=version,
headers=self.get_request_headers())


# Overridden: expand param-ranges from WFS metadata
def expand_params(self, resource):
Expand Down
17 changes: 16 additions & 1 deletion GeoHealthCheck/plugins/probe/wms.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
from GeoHealthCheck.plugin import Plugin
from owslib.wms import WebMapService

from owslib.util import Authentication
from requests.auth import HTTPDigestAuth


class WmsGetMapV1(Probe):
"""
Expand Down Expand Up @@ -109,9 +112,21 @@ def get_metadata(self, resource, version='1.1.1'):
:param version:
:return: Metadata object
"""
return WebMapService(resource.url, version=version,
if resource.auth and resource.auth['type'] == 'Digest':
headers = self.get_request_headers()
if headers and 'auth' in headers:
del headers['auth']
username = resource.auth['data']['username']
password = resource.auth['data']['password']
auth = Authentication(auth_delegate=HTTPDigestAuth(username, password))
return WebMapService(resource.url, version=version,
headers=headers,
auth=auth)
else:
return WebMapService(resource.url, version=version,
headers=self.get_request_headers())


# Overridden: expand param-ranges from WMS metadata
def expand_params(self, resource):

Expand Down
60 changes: 60 additions & 0 deletions GeoHealthCheck/plugins/resourceauth/resourceauths.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,63 @@ def encode_auth_header_val(self):

# Bearer Type, see eg. https://tools.ietf.org/html/rfc6750
return "Bearer %s" % self.auth_dict['data']['token']


class DigestAuth(ResourceAuth):
"""
Digest authentication.
"""

NAME = 'Digest'
DESCRIPTION = 'Default class for digest auth'

PARAM_DEFS = {
'username': {
'type': 'string',
'description': 'Username',
'default': None,
'required': True,
'range': None
},
'password': {
'type': 'password',
'description': 'Password',
'default': None,
'required': True,
'range': None
}
}
"""Param defs"""

def __init__(self):
ResourceAuth.__init__(self)

def verify(self):
if self.auth_dict is None:
return False

if 'data' not in self.auth_dict:
return False

auth_data = self.auth_dict['data']

if auth_data.get('username', None) is None:
return False

if len(auth_data.get('username', '')) == 0:
return False

if auth_data.get('password', None) is None:
return False

if len(auth_data.get('password', '')) == 0:
return False

return True

def encode_auth_header_val(self):
"""
Get None!
HTTPDigestAuth is used directly within Probes
"""
return None
54 changes: 43 additions & 11 deletions GeoHealthCheck/probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
from util import create_requests_retry_session
from GeoHealthCheck import __version__

from requests.auth import HTTPDigestAuth


LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -303,20 +306,49 @@ def perform_request(self):

def perform_get_request(self, url):
""" Perform actual HTTP GET request to service"""
return self._session.get(
url,
timeout=App.get_config()['GHC_PROBE_HTTP_TIMEOUT_SECS'],
verify=App.get_config()['GHC_VERIFY_SSL'],
headers=self.get_request_headers())
if self._resource.auth and self._resource.auth['type'] == 'Digest':
headers = self.get_request_headers()
if headers and 'auth' in headers:
del headers['auth']
username = self._resource.auth['data']['username']
password = self._resource.auth['data']['password']
auth = HTTPDigestAuth(username, password)
response = self._session.get(
url,
timeout=App.get_config()['GHC_PROBE_HTTP_TIMEOUT_SECS'],
headers=headers,
auth=auth)
else:
response = self._session.get(
url,
timeout=App.get_config()['GHC_PROBE_HTTP_TIMEOUT_SECS'],
headers=self.get_request_headers(),
)
return response


def perform_post_request(self, url_base, request_string):
""" Perform actual HTTP POST request to service"""
return self._session.post(
url_base,
timeout=App.get_config()['GHC_PROBE_HTTP_TIMEOUT_SECS'],
verify=App.get_config()['GHC_VERIFY_SSL'],
data=request_string,
headers=self.get_request_headers())
if self._resource.auth and self._resource.auth['type'] == 'Digest':
headers = self.get_request_headers()
if headers and 'auth' in headers:
del headers['auth']
username = self._resource.auth['data']['username']
password = self._resource.auth['data']['password']
auth = HTTPDigestAuth(username, password)
response = self._session.post(
url_base,
timeout=App.get_config()['GHC_PROBE_HTTP_TIMEOUT_SECS'],
data=request_string,
auth=auth)
else:
response = self._session.post(
url_base,
timeout=App.get_config()['GHC_PROBE_HTTP_TIMEOUT_SECS'],
data=request_string,
headers=self.get_request_headers())
return response


def run_request(self):
""" Run actual request to service"""
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Flask-SQLAlchemy==2.4.0
itsdangerous==1.1.0
pyproj >=2.6.1
lxml >= 4.8.0, <= 4.9.2
OWSLib==0.20.0
OWSLib==0.24.0
jsonschema==3.0.2 # downgrade from 3.2.0 on sept 29, 2020, issue 331, consider better fix
openapi-spec-validator==0.2.8
Sphinx==5.3.0
Expand Down