From 78f8e7cf7e4c515fb1696621bf6c6e95faa85d5b Mon Sep 17 00:00:00 2001 From: Abhineet Deshpande Date: Fri, 17 Feb 2023 10:10:13 -0800 Subject: [PATCH] Fixing arbitrary file reads in nested include policies. Restricting including policies from files in the base directory and its children only. PiperOrigin-RevId: 510460461 --- README.md | 6 ++-- capirca/lib/policy.py | 28 +++++++++++++++- tests/lib/policy_test.py | 72 +++++++++++++++++++++++++++++++++++++--- 3 files changed, 98 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index ab84a8ef..4820de8a 100644 --- a/README.md +++ b/README.md @@ -350,10 +350,12 @@ injected into the current policy file in the exact location of the `#include` directive. An example include: ``` -#include 'policies/includes/untrusted-networks-blocking.inc' +#include 'includes/untrusted-networks-blocking.inc' ``` +NOTE: Includes are only read from the subdirectories of your base_directory, +all other directories will error out. -The `.inc` file extension and the `includes` folder is not required but it is +The `.inc` file extension and the `includes/` folder is not required but it is recommended to be used as a best practice and for easier readability. ### Example diff --git a/capirca/lib/policy.py b/capirca/lib/policy.py index b306fc7e..ac9a035c 100644 --- a/capirca/lib/policy.py +++ b/capirca/lib/policy.py @@ -62,6 +62,8 @@ class FileReadError(Error): class RecursionTooDeepError(Error): """Included files exceed maximum recursion depth.""" +class InvalidIncludeDirectoryError(Error): + """Included files are from invalid directories.""" class ParseError(Error): """ParseError in the input.""" @@ -2632,6 +2634,23 @@ def _ReadFile(filename): raise FileNotFoundError('Unable to open policy file %s' % filename) +def _SubDirectory(child, parent): + """Returns if the child is a subdirectory of the parent. + + Resolves relative paths, but does not resolve symbolic links. + + Args: + child: A presumed child file path string. + parent: Base parent path string. + + Returns: + A boolean, true if the child is a subdirectory of the parent. + """ + child_path = os.path.abspath(child) + parent_path = os.path.abspath(parent) + return os.path.commonpath([parent_path, child_path]) == os.path.commonpath([ + parent_path]) + def _Preprocess(data, max_depth=5, base_dir=''): """Search input for include statements and import specified include file. @@ -2648,6 +2667,7 @@ def _Preprocess(data, max_depth=5, base_dir=''): Raises: RecursionTooDeepError: nested include files exceed maximum + InvalidIncludeDirectoryError: nested include files from invalid directories """ if not max_depth: raise RecursionTooDeepError('%s' % ( @@ -2658,7 +2678,13 @@ def _Preprocess(data, max_depth=5, base_dir=''): if len(words) > 1 and words[0] == '#include': # remove any quotes around included filename include_file = words[1].strip('\'"') - data = _ReadFile(os.path.join(base_dir, include_file)) + include_file_path = os.path.join(base_dir, include_file) + if not _SubDirectory(include_file_path, base_dir): + raise InvalidIncludeDirectoryError( + '%s' + % ('Included file is from invalid directory: %s.' % include_file) + ) + data = _ReadFile(include_file_path) # recursively handle includes in included data inc_data = _Preprocess(data, max_depth - 1, base_dir=base_dir) rval.extend(inc_data) diff --git a/tests/lib/policy_test.py b/tests/lib/policy_test.py index a77f4d05..699ca0b3 100644 --- a/tests/lib/policy_test.py +++ b/tests/lib/policy_test.py @@ -91,15 +91,40 @@ """ INCLUDE_STATEMENT = """ -#include "/tmp/y.inc" +#include "includes/y.inc" """ INCLUDED_Y_FILE = """ term included-term-1 { protocol:: tcp action:: accept } +#include "includes/z.inc" +""" + +BAD_INCLUDED_FILE = """ +term included-term-1 { + protocol:: tcp + action:: accept +} #include "/tmp/z.inc" """ + +BAD_INCLUDED_FILE_1 = """ +term included-term-1 { + protocol:: tcp + action:: accept +} +#include "includes/../../etc/passwd.inc" +""" + +GOOD_INCLUDED_FILE_1 = """ +term good-included-term-1 { + protocol:: tcp + action:: accept +} +#include "includes/../pol/z.inc" +""" + GOOD_TERM_0 = """ term good-term-0 { protocol:: icmp @@ -625,9 +650,46 @@ def testIncludes(self, mock_file): # ensure good-term-1 shows up as the second term self.assertEqual(terms[2].name, 'good-term-1') - mock_file.assert_has_calls([ - mock.call('/tmp/y.inc'), - mock.call('/tmp/z.inc')]) + mock_file.assert_has_calls( + [mock.call('includes/y.inc'), mock.call('includes/z.inc')] + ) + + @mock.patch.object(policy, '_ReadFile') + def testBadIncludes(self, mock_file): + """Ensure nested includes error handling works.""" + mock_file.side_effect = [BAD_INCLUDED_FILE, GOOD_TERM_5] + + # contents of our base policy (which has a bad included file) + pol = HEADER + INCLUDE_STATEMENT + GOOD_TERM_1 + self.assertRaises( + policy.InvalidIncludeDirectoryError, + policy.ParsePolicy, + pol, + self.naming, + ) + # Ensuring relative paths don't bypass invalid directory checks + mock_file.side_effect = [BAD_INCLUDED_FILE_1, GOOD_TERM_5] + pol = HEADER + BAD_INCLUDED_FILE_1 + GOOD_TERM_1 + self.assertRaises( + policy.InvalidIncludeDirectoryError, + policy.ParsePolicy, + pol, + self.naming, + ) + + @mock.patch.object(policy, '_ReadFile') + def testGoodIncludesWithRelativePaths(self, mock_file): + """Ensure nested includes error handling works for valid files.""" + mock_file.side_effect = [GOOD_TERM_5] + # base policy has a good included file, with relative paths + pol = HEADER + GOOD_INCLUDED_FILE_1 + GOOD_TERM_1 + p = policy.ParsePolicy(pol, self.naming) + _, terms = p.filters[0] + # ensure include worked and we now have 3 terms in this policy + self.assertEqual(len(terms), 3) + self.assertEqual(terms[0].name, 'good-included-term-1') + self.assertEqual(terms[1].name, 'good-term-5') + self.assertEqual(terms[2].name, 'good-term-1') def testGoodPol(self): pol = HEADER + GOOD_TERM_1 + GOOD_TERM_2 @@ -1211,7 +1273,7 @@ def testTermAddressByteLength(self): self.assertEqual(8, term.AddressesByteLength([6])) self.assertEqual(10, term.AddressesByteLength()) -# pylint: enable=maybe-no-member + # pylint: enable=maybe-no-member def testICMPCodes(self): pol = HEADER + GOOD_TERM_42