Skip to content

Commit

Permalink
Merge pull request #232 from emilejq/docstring
Browse files Browse the repository at this point in the history
Update docstring; Update error messages
  • Loading branch information
mrjonstrong authored Apr 13, 2022
2 parents 99855d6 + e276280 commit 547d082
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 136 deletions.
23 changes: 9 additions & 14 deletions drheader/cli_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-

"""Utils for drheader console script."""
"""Utility functions for cli module."""

import json
import os
Expand All @@ -10,14 +9,12 @@


def echo_bulk_report(audit, json_output=False):
"""
Output bulk report.
"""Prints a report from a bulk scan.
:param audit: audit from core
:param json_output: json output flag
:return: None
Args:
audit (list): The report generated from the scan.
json_output (bool): (optional) A flag to format the output as JSON. Default is false.
"""

if json_output:
click.echo(json.dumps(audit))
else:
Expand All @@ -33,14 +30,12 @@ def echo_bulk_report(audit, json_output=False):


def file_junit_report(rules, report):
"""
Output file Junit xml report
"""Generates a JUnit XML report from a scan result.
:param rules: set of rules to verify
:param report: report generated by drheader
:return: None
Args:
rules (dict): The rules used to perform the scan.
report (list): The report generated from the scan.
"""

test_cases = []

for header in rules:
Expand Down
80 changes: 7 additions & 73 deletions drheader/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Entry point for analysis."""
"""Main module and entry point for analysis."""
import json
import os

Expand All @@ -23,6 +23,7 @@ class Drheader:
cookies (CaseInsensitiveDict): The cookies to analyse.
reporter (Reporter): Reporter instance that generates and holds the final report.
"""

def __init__(self, headers=None, url=None, method='get', params=None, request_headers=None, verify=True):
"""Initialises a Drheader instance.
Expand All @@ -38,7 +39,7 @@ def __init__(self, headers=None, url=None, method='get', params=None, request_he
verify (bool): (optional) A flag to verify the server's TLS certificate. Default is True.
Raises:
ValueError: If neither headers nor url is provided.
ValueError: If neither headers nor url is provided, or if url is not a valid URL.
"""
if not headers:
if not url:
Expand Down Expand Up @@ -69,12 +70,11 @@ def analyze(self, rules=None, cross_origin_isolated=False):
and its associated severity, and, if applicable, the observed value of the header, any expected, disallowed
or anomalous values, and the correct delimiter. For example:
{
'rule': 'Cache-Control',
'message': 'Value does not match security policy. All of the expected items were expected',
'rule': 'Referrer-Policy',
'message': 'Value does not match security policy. Exactly one of the expected items was expected',
'severity': 'high',
'value': 'no-cache'
'expected': ['no-store', 'max-age=0'],
'delimiter': ',',
'value': 'origin-when-cross-origin'
'expected': ['same-origin', 'strict-origin-when-cross-origin']
}
"""
if not rules:
Expand All @@ -98,50 +98,21 @@ def analyze(self, rules=None, cross_origin_isolated=False):
return self.reporter.report

def _analyze_header(self, config, validator, header):
"""Analyses a single header for validation.
Args:
config (dict): The rules against which to assess the header.
validator (HeaderValidator): The header validator to run the validations.
header (str): The header to validate.
"""
if header.lower() != 'set-cookie':
self._validate_rules(config, validator, header)
elif header in self.headers:
for cookie in self.cookies:
self._validate_rules(config, validator, header, cookie=cookie)

def _analyze_directives(self, config, validator, header):
"""Analyses the directives in a single header for validation.
Args:
config (dict): The configuration of the rules against which to assess the directives.
validator (DirectiveValidator): The directive validator to run the validations.
header (str): The header to validate.
"""
for directive, config in config['directives'].items():
self._validate_rules(config, validator, header, directive=directive)

def _analyze_cookies(self, config, validator):
"""Analyses the cookies for validation.
Args:
config (dict): The configuration of the rules against which to assess the cookies.
validator (CookieValidator): The cookie validator to run the validations.
"""
for cookie, config in config['cookies'].items():
self._validate_rules(config, validator, header='Set-Cookie', cookie=cookie)

def _validate_rules(self, config, validator, header, directive=None, cookie=None):
"""Starts the validation of a single header, directive or cookie.
Args:
config (dict): The rules against which to assess the header, directive or cookie
validator (ValidatorBase): The validator to run the validations.
header (str): The header to validate.
directive (str): (optional) The directive to validate.
cookie (str): (optional) The cookie to validate.
"""
if header in _DELIMITERS:
config['delimiters'] = _DELIMITERS[header]

Expand All @@ -157,15 +128,6 @@ def _validate_rules(self, config, validator, header, directive=None, cookie=None
self._validate_avoid_and_contain_values(config, validator, header, directive, cookie)

def _validate_exists(self, is_required, config, validator, header, directive, cookie):
"""Run an exists validation for a single header, directive or cookie.
If a validation failure occurs, it will generate a finding in the report.
Returns:
A boolean flag indicating whether the exists validation was successful. If is_required is 'optional', the
flag will only indicate whether the given header, directive or cookie is present in the headers, without
running any validation.
"""
if is_required == 'true':
report_item = validator.validate_exists(config, header, directive=directive, cookie=cookie)
self._add_to_report_if_exists(report_item)
Expand All @@ -178,11 +140,6 @@ def _validate_exists(self, is_required, config, validator, header, directive, co
return header in self.headers

def _validate_enforced_value(self, config, validator, header, directive):
"""Runs an enforced value validation for a single header, directive or cookie.
This method is applicable only when a 'value', 'value-one-of' or 'value-any-of' rule is defined. If a
validation failure occurs, it will generate a finding in the report.
"""
if 'value' in config:
report_item = validator.validate_value(config, header, directive=directive)
self._add_to_report_if_exists(report_item)
Expand All @@ -194,11 +151,6 @@ def _validate_enforced_value(self, config, validator, header, directive):
self._add_to_report_if_exists(report_item)

def _validate_avoid_and_contain_values(self, config, validator, header, directive, cookie):
"""Runs avoid and contain validations for a single header, directive or cookie.
This method is applicable only when a 'must-avoid', 'must-contain' or 'must-contain-one' rule is defined. If
any validation failures occur, they will generate a finding the report.
"""
if 'must-avoid' in config:
report_item = validator.validate_must_avoid(config, header, directive=directive, cookie=cookie)
self._add_to_report_if_exists(report_item)
Expand All @@ -210,7 +162,6 @@ def _validate_avoid_and_contain_values(self, config, validator, header, directiv
self._add_to_report_if_exists(report_item)

def _add_to_report_if_exists(self, report_item):
"""Adds a validation failure, or a list of validation failures, to the report."""
if report_item:
try:
self.reporter.add_item(report_item)
Expand All @@ -220,23 +171,6 @@ def _add_to_report_if_exists(self, report_item):


def _get_headers_from_url(url, method, params, headers, verify):
"""Retrieves headers from a URL.
Args:
url (str): The URL from which to retrieve the headers.
method (str): The HTTP verb to use when retrieving the headers.
params (dict): Any request parameters to send when retrieving the headers.
headers (dict): Any request headers to send when retrieving the headers.
verify (bool): A flag to verify the server's TLS certificate.
Returns:
A dict containing the response headers returned from the HTTP request. Header names are given in the dict
keys, and header values in the dict values. Cookies are aggregated into a list, with one entry in the list
per cookie. The list of cookies is returned in the value corresponding to the dict key 'set-cookie'.
Raises:
ValueError: If url is not a valid URL.
"""
if not validators.url(url):
raise ValueError(f"Cannot retrieve headers from '{url}'. The URL is malformed")

Expand Down
15 changes: 11 additions & 4 deletions drheader/report.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Primary module for report generation and storage."""
import enum
from typing import NamedTuple


class ErrorType(enum.Enum):
AVOID = 'Must-Avoid directive included'
CONTAIN = 'Must-Contain directive missed. All of the expected items were expected'
CONTAIN = 'Must-Contain directive missed'
CONTAIN_ONE = 'Must-Contain-One directive missed. At least one of the expected items was expected'
DISALLOWED = '{} should not be returned'
REQUIRED = '{} not included in response'
VALUE = 'Value does not match security policy. All of the expected items were expected'
VALUE = 'Value does not match security policy'
VALUE_ANY = 'Value does not match security policy. At least one of the expected items was expected'
VALUE_ONE = 'Value does not match security policy. Exactly one of the expected items was expected'

Expand All @@ -27,15 +28,21 @@ class ReportItem(NamedTuple):


class Reporter:
"""Class to generate and store reports from a scan.
Attributes:
report (list): The report detailing validation failures encountered during a scan.
"""

def __init__(self):
"""Initialises a Reporter instance with an empty report."""
self.report = []

def add_item(self, item):
"""Adds a validation failure to the final report.
"""Adds a validation failure to the report.
Args:
item (ReportItem): ReportItem instance describing the validation failure to add to the report.
item (ReportItem): The validation failure to be added.
"""
finding = {}
if item.directive:
Expand Down
53 changes: 20 additions & 33 deletions drheader/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# -*- coding: utf-8 -*-

"""Utils for drheader."""

"""Utility functions for core module."""
import io
import logging
import os
Expand All @@ -26,8 +24,11 @@ def parse_policy(policy, item_delimiter=None, key_delimiter=None, value_delimite
item_delimiter (str): (optional) The character that delimits individual directives.
key_delimiter (str): (optional) The character that delimits keys and values in key-value directives.
value_delimiter (str): (optional) The character that delimits individual values in key-value directives.
strip (str): (optional) A string of characters to strip from policy values.
strip (str): (optional) A string of characters to strip from directive values.
keys_only (bool): (optional) A flag to return only keys from key-value directives. Default is False.
Returns:
A list of directives.
"""
if not item_delimiter:
return [policy.strip(strip)]
Expand All @@ -49,18 +50,20 @@ def parse_policy(policy, item_delimiter=None, key_delimiter=None, value_delimite
return directives


def load_rules(rule_file=None, merge=None):
"""
Loads drheader ruleset. Will load local defaults unless overridden.
If merge flag is present, result file will be a merge between local defaults and custom file
:param rule_file: file object of rules.
:type rule_file: file
:param merge: flag indicating to merge file_rule with default rules
:type merge: boolean
:return: drheader rules
:rtype: dict
"""
def load_rules(rule_file=None, merge=False):
"""Returns a drHEADer ruleset from a file.
The loaded ruleset can be configured to be merged with the default drHEADer rules. If a rule exists in both the
custom rules and default rules, the custom one will take priority and override the default one. Otherwise, the new
custom rule will be appended to the default rules. If no file is provided, the default rules will be returned.
Args:
rule_file (file): (optional) The YAML file containing the ruleset.
merge (bool): (optional) A flag to merge the loaded rules with the drHEADer default rules. Default is False.
Returns:
A dict containing the loaded rules.
"""
if rule_file:
logging.debug('')
rules = yaml.safe_load(rule_file.read())
Expand All @@ -76,13 +79,7 @@ def load_rules(rule_file=None, merge=None):


def get_rules_from_uri(uri):
"""
Retrieves custom rule set from URL
:param uri: URL to your custom rules file
:type uri: uri
:return: rules file
:rtype: file
"""
"""Retrieves a rules file from a URL."""
download = requests.get(uri)
if not download.content:
raise Exception('No content retrieved from {}'.format(uri))
Expand All @@ -91,6 +88,7 @@ def get_rules_from_uri(uri):


def translate_to_case_insensitive_dict(dict_to_translate):
"""Recursively transforms a dict into a case-insensitive dict."""
for key, value in dict_to_translate.items():
if isinstance(value, dict):
dict_to_translate[key] = translate_to_case_insensitive_dict(value)
Expand All @@ -107,17 +105,6 @@ def _extract_key_value_directive(directive, value_delimiter, strip):


def _merge_rules(default_rules, custom_rules):
"""
Merge both rule set. Rules defined in 'custom_rules', also present in 'default_rules', will be overridden.
If a new rule is present in custom_rules, not present in default_rules, it will be added.
:param default_rules: base file object of rules.
:type default_rules: dict
:param custom_rules: override file object of rules.
:type custom_rules: dict
:return: final rule
:rtype: dict
"""

for rule in custom_rules['Headers']:
default_rules['Headers'][rule] = custom_rules['Headers'][rule]

Expand Down
8 changes: 8 additions & 0 deletions drheader/validators/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Base module for validators."""
import abc


class ValidatorBase:
"""Base class for validators."""

@abc.abstractmethod
def validate_exists(self, config, header, directive=None, cookie=None):
Expand Down Expand Up @@ -90,8 +92,14 @@ def validate_must_contain_one(self, config, header, directive=None, cookie=None)


class UnsupportedValidationError(Exception):
"""Exception to be raised when an unsupported validation is called.
Attributes:
message (string): A message describing the error.
"""

def __init__(self, message):
"""Initialises an UnsupportedValidationError instance with a message."""
self.message = message


Expand Down
Loading

0 comments on commit 547d082

Please sign in to comment.