From dd24d835f2b21089fb90ca598cd20e0a527bf256 Mon Sep 17 00:00:00 2001 From: Ian Chen Date: Fri, 2 Dec 2022 20:19:01 -0800 Subject: [PATCH] Add scripts for running the mcdc_checker and converting the output to sarif. Updated Earthfile to call the new run_mcdc_checker.py script Signed-off-by: Ian Chen --- mcdc/Earthfile | 2 + mcdc/mcdc-workspace.sh | 3 +- mcdc/mcdc_checker_output_parser.py | 307 ++++++++++++++++++++++++++ mcdc/run_mcdc_checker.py | 332 +++++++++++++++++++++++++++++ 4 files changed, 643 insertions(+), 1 deletion(-) create mode 100644 mcdc/mcdc_checker_output_parser.py create mode 100644 mcdc/run_mcdc_checker.py diff --git a/mcdc/Earthfile b/mcdc/Earthfile index bb9c6d9..2932dcc 100644 --- a/mcdc/Earthfile +++ b/mcdc/Earthfile @@ -20,6 +20,8 @@ mcdc-run: COPY ../spaceros+workspace/src src COPY +package-list/workspace-packages.txt workspace-packages.txt COPY mcdc-workspace.sh mcdc-workspace.sh + COPY run_mcdc_checker.py run_mcdc_checker.py + COPY mcdc_checker_output_parser.py mcdc_checker_output_parser.py RUN bash mcdc-workspace.sh $(cat workspace-packages.txt | head -1) SAVE ARTIFACT src/**/mcdc-results.txt AS LOCAL src diff --git a/mcdc/mcdc-workspace.sh b/mcdc/mcdc-workspace.sh index b7d5853..26510ae 100644 --- a/mcdc/mcdc-workspace.sh +++ b/mcdc/mcdc-workspace.sh @@ -1,8 +1,9 @@ #!/usr/bin/bash +mcdc_checker_script=`pwd`/run_mcdc_checker.py for packagedir in $@; do pushd $package - mcdc_checker -a 2>&1 | tee mcdc-results.txt + python3 $mcdc_checker_script `pwd` --sarif_file mcdc-results.txt --verbose popd done diff --git a/mcdc/mcdc_checker_output_parser.py b/mcdc/mcdc_checker_output_parser.py new file mode 100644 index 0000000..c6a4ec7 --- /dev/null +++ b/mcdc/mcdc_checker_output_parser.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 + +# Copyright 2022 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import json +import os +import re +import sys + +# String indicating the line when the summary report starts +report_start_str = 'The following errors were found' + +# a list of known MC/DC checker codes +checker_codes = [ + 'clang_parse_failed', + 'failed_to_create_bdd', + 'invalid_operator_nesting' + 'unexpected_node', + 'bdd_is_not_tree_like', + 'bdd_is_not_tree_like_and_has_too_many_nodes' + ] + +def format_result(rule_id, level, message, location_uri, line_no, column_no): + """ + Convert MC/DC checker summary output to SARIF format. + + :rule_id: Identifier of the rule that was evaluated to produce the result + :level: Severity level + :message: A string describing the result + :location_uri: Location in code where the tool detects a result + :line_no: Line number + :column_no: Column number + """ + output = {} + output['ruleId'] = rule_id + output['level'] = level + output['message'] = {'text': message} + physical_location = {'physicalLocation': { + 'artifactLocation': { + 'uri': location_uri, + }, + } + } + + if line_no and column_no and line_no.isdigit() and column_no.isdigit(): + physical_location['region'] = { + 'startLine': line_no, + 'startColumn': column_no + } + + locations = [] + locations.append(physical_location) + output['locations'] = locations + return output + +def convert_summary_to_sarif_output(data): + """ + Convert MC/DC checker summary output to SARIF format. + + :data: Lines to convert to SARIF format + """ + results = [] + + for code in checker_codes: + # if errors exist for a particular error code + if code in data.keys() and len(lines := data[code]) > 0: + # produce one result (in sarif terms) per line + for line in lines: + # All error outputs start with 'file ' + if line.startswith('file '): + rule_id = code + level = 'error' + message = f'{code}' + uri = '' + line_no = None + column_no = None + # run regex to get filename, line no and column no + # line no and column no are optional so they are placed in + # a non-capture group (:?) in the regex search str + pattern = 'file (.+?)(?: in line ([0-9]+) column ([0-9]+))?$' + m = re.search(pattern, line) + if m: + uri = m.group(1) + line_no = m.group(2) + column_no = m.group(3) + result = format_result(rule_id, + level, + message, + uri, + line_no, + column_no) + results.append(result) + # if it's a solution, it should be for the previous error + # so append it to the message field of previous line item + elif line.startswith ('Found solution'): + results[-1]['message']['text'] += f'. {line}' + + return results + + +def convert_pre_summary_to_sarif_output(lines): + """ + Convert MC/DC checker pre-summary output to SARIF format. + + Pre-summary output refers to all output lines produced by the checker + before the summary. + + :data: Lines to convert to SARIF format + """ + + results = [] + for line in lines: + line_no = None + column_no = None + l = line.lstrip() + if l.startswith('ERROR') and 'Clang' in l: + pattern = 'file (.+)' + m = re.search(pattern, line) + if m: + uri = m.group(1) + rule_id = 'clang_preprocessor_error' + level = 'error' + message = l + result = format_result(rule_id, + level, + message, + uri, + line_no, + column_no) + results.append(result) + + else: + pattern = 'file (.+?)(?: at line ([0-9]+), column ([0-9]+))' + m = re.search(pattern, line) + if m: + uri = m.group(1) + line_no = m.group(2) + column_no = m.group(3) + + rule_id = 'non-tree-like_decision' + level = 'error' + message = l + result = format_result(rule_id, + level, + message, + uri, + line_no, + column_no) + results.append(result) + + return results + +def parse_summary_for_error(lines, error): + """ + Parse and filter the summary output to contain only lines related to the + specified MC/DC checker error code + + :lines: Lines from the summary output + :error: Error code to look for + """ + + output = {error: []} + start = False + for line in lines: + if start: + l = line.lstrip() + # valid error pointing to a file + if l.startswith('file '): + output[error].append(l) + # if line is another error code, exit + elif any(c in l for c in checker_codes): + break + # other output produced for current error code + # e.g. Found solutions + else: + output[error].append(l) + # found start of section for this error code + elif error in line: + start = True + return output + +def main(): + """ + Main parse function that reads the MC/DC checker output text and converts it + to SARIF format + + :file_path: Path to raw MC/DC checker output file + :parse_all: True to parse all output. False to parse only the summary + :file_output: Path to raw MC/DC checker output file + """ + + parser = argparse.ArgumentParser(description='MC/DC checker output parser') + parser.add_argument( + 'file', + type=str, + nargs='?', + default=None, + help='Path to MC/DC checker output text file', + ) + parser.add_argument( + "-a", + "--all", + action="store_true", + required=False, + help="Parse all MC/DC checker output, including output before the summary", + ) + parser.add_argument( + "-o", + "--out", + type=str, + required=False, + help="Path to save the SARIF output to. Prints to console if not specified", + ) + parser.add_argument( + "-r", + "--results-only", + action="store_true", + required=False, + help="Print only the results section of the SARIF output.", + ) + + args = parser.parse_args() + if not args.file: + parser.print_usage() + sys.exit(1) + + file_path = args.file + parse_all = args.all + file_output = args.out + results_only = args.results_only + + # parse the raw text mcdc_checker output + with open(file_path) as f: + + # parse file into lines + lines = f.read().splitlines(); + line_idx = 0 + # find the line when report summary starts + while line_idx < len(lines): + line = lines[line_idx] + line_idx += 1 + if report_start_str in line: + break + + # if report summary start line is found + if line_idx < len(lines): + # get all output before summary eport + pre_summary_lines = lines[:line_idx-1] + + # get summary report output + summary_lines = lines[line_idx:] + + data = {} + results = [] + + # write parsed data to sarif output + if parse_all: + results = convert_pre_summary_to_sarif_output(pre_summary_lines) + + # parse the file for all error types + for code in checker_codes: + out = parse_summary_for_error(summary_lines, code) + data.update(out) + + # write parsed data to sarif output + summary_results = convert_summary_to_sarif_output(data) + results.extend(summary_results) + + output = {} + if results_only: + output = results + else: + # Output is generated based on this version spec + output['version'] = '2.1.0' + output['runs'] = [{ + 'tool': { + 'driver': { + 'name': 'mcdc_checker' + } + }, + 'results': results + }] + + out = json.dumps(output, indent = 2) + if not file_output: + print(out) + else: + out_f = open(file_output, 'w') + out_f.write(out) + out_f.close() + + +if __name__ == '__main__': + main() diff --git a/mcdc/run_mcdc_checker.py b/mcdc/run_mcdc_checker.py new file mode 100644 index 0000000..d702414 --- /dev/null +++ b/mcdc/run_mcdc_checker.py @@ -0,0 +1,332 @@ +#!/usr/bin/env python3 + +# Copyright 2022 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import glob +import json +import os +import re +import shutil +import subprocess +import sys +import time + + +def get_file_groups(paths, extensions, exclude_patterns): + excludes = [] + for exclude_pattern in exclude_patterns: + excludes.extend(glob.glob(exclude_pattern)) + excludes = {os.path.realpath(x) for x in excludes} + + # dict mapping root path to files + groups = {} + for path in paths: + if os.path.isdir(path): + for dirpath, dirnames, filenames in os.walk(path): + if 'AMENT_IGNORE' in dirnames + filenames: + dirnames[:] = [] + continue + # ignore folder starting with . or _ + dirnames[:] = [d for d in dirnames if d[0] not in ['.', '_']] + dirnames.sort() + + # select files by extension + for filename in sorted(filenames): + _, ext = os.path.splitext(filename) + if ext in ('.%s' % e for e in extensions): + filepath = os.path.join(dirpath, filename) + if os.path.realpath(filepath) not in excludes: + append_file_to_group(groups, filepath) + + if os.path.isfile(path): + if os.path.realpath(path) not in excludes: + append_file_to_group(groups, path) + + return groups + + +def append_file_to_group(groups, path): + path = os.path.abspath(path) + + root = '' + + # try to determine root from path + base_path = os.path.dirname(path) + # find longest subpath which ends with one of the following subfolder names + subfolder_names = ['include', 'src', 'test'] + matches = [ + re.search( + '^(.+%s%s)%s' % + (re.escape(os.sep), re.escape(subfolder_name), re.escape(os.sep)), path) + for subfolder_name in subfolder_names] + match_groups = [match.group(1) for match in matches if match] + if match_groups: + match_groups = [{'group_len': len(x), 'group': x} for x in match_groups] + sorted_groups = sorted(match_groups, key=lambda k: k['group_len']) + base_path = sorted_groups[-1]['group'] + root = base_path + + # try to find repository root + repo_root = None + p = path + while p and repo_root is None: + # abort if root is reached + if os.path.dirname(p) == p: + break + p = os.path.dirname(p) + for marker in ['.git', '.hg', '.svn']: + if os.path.exists(os.path.join(p, marker)): + repo_root = p + break + + # compute relative --root argument + if repo_root and repo_root > base_path: + root = os.path.relpath(base_path, repo_root) + + # add the path to the appropriate group + if root not in groups: + groups[root] = [] + groups[root].append(path) + + +def write_sarif_file(output_filename, input_file_path, verbose): + """Invoke MC/DC checker output parser and output result to sarif.""" + + # assume the mcdc_checker_output_parser.py is in the same dir as this script + dir_path = os.path.dirname(os.path.realpath(__file__)) + parser_script = os.path.join(dir_path, 'mcdc_checker_output_parser.py') + + # loop through all mcdc_checker output files, parse and merge them to + # one single sarif output + all_results = [] + for filename in os.listdir(input_file_path): + # this tells the parser to parse all output and generate sarif for the + # results section only. We will combine all results and put them into + # one 'run' in the sarif output + arguments = ['python3', parser_script, '-a', '-r'] + cmd_output = "" + try: + arguments.append(os.path.join(input_file_path, filename)) + if verbose: + print(' '.join(arguments)) + p = subprocess.Popen(arguments, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + cmd_output = p.communicate()[0] + except subprocess.CalledProcessError as e: + print("The invocation of 'mcdc_checker_output_parser' failed with error code %d: %s" % + (e.returncode, e), file=sys.stderr) + return False + + out = cmd_output.decode('utf-8') + results = json.loads(out) + all_results.extend(results) + + # generate sarif output with all results combined + output = {} + output['version'] = '2.1.0' + output['runs'] = [{ + 'tool': { + 'driver': { + 'name': 'mcdc_checker' + } + }, + 'results': all_results + }] + + out = json.dumps(output, indent = 2) + f = open(output_filename, 'w') + f.write(out) + f.close() + return True + + +def find_executable(file_name, additional_paths=None): + path = None + if additional_paths: + path = os.getenv('PATH', os.defpath) + path += os.path.pathsep + os.path.pathsep.join(additional_paths) + return shutil.which(file_name, path=path) + + +def invoke_mcdc_checker(arguments, output_path, file_path, verbose): + """Invoke MC/DC checker and write output to file.""" + try: + if verbose: + print(' '.join(arguments)) + p = subprocess.Popen(arguments, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + cmd_output = p.communicate()[0] + except subprocess.CalledProcessError as e: + print("The invocation of 'mcdc_checker' failed with error code %d: %s" % + (e.returncode, e), file=sys.stderr) + return False + + output = cmd_output.decode('utf-8') + + # save output to tmp file + # this will be read by mcdc_checker_output_parser later if user indicates + # to output to sarif + output_file = "" + filename = file_path.replace(os.sep, '_') + if filename[0] == '_': + filename = filename[1:] + output_file = os.path.join(output_path, filename + ".mcdc") + + f = open(output_file, 'w') + f.write(output) + f.close() + return True + + +def main(argv=sys.argv[1:]): + rc = 0 + extensions = ['c', 'cc', 'cpp', 'cxx'] + + # Define and parse the command-line options + parser = argparse.ArgumentParser( + description='Run the MC/DC checker tool.') + parser.add_argument( + 'paths', + nargs='*', + default=[os.curdir], + help='Files and/or directories to be checked. Directories are searched recursively for ' + 'files ending in one of %s.' % + ', '.join(["'.%s'" % e for e in extensions])) + parser.add_argument( + '--include_dirs', + nargs='*', + help='Include directories for C/C++ files being checked.' + "Each directory is passed to cobra as '-I'") + parser.add_argument( + '--exclude', default=[], + nargs='*', + help='Exclude C/C++ files from being checked.') + parser.add_argument( + '--compile_cmds', + help='The compile_commands.json file from which to gather preprocessor directives.') + parser.add_argument( + '--sarif_file', + help='Generate a SARIF file') + parser.add_argument( + '--verbose', + action='store_true', + help='Display verbose output') + + args = parser.parse_args(argv) + + target_binary = 'mcdc_checker' + mcdc_checker_bin = find_executable(target_binary) + if not mcdc_checker_bin: + print(f"Error: Could not find the '{target_binary}' executable", file=sys.stderr) + return 1 + + groups = get_file_groups(args.paths, extensions, args.exclude) + if not groups: + print('No files found', file=sys.stderr) + return 1 + + cmd = [mcdc_checker_bin] + + # Get the preprocessor options to use for each file from the + # input compile_commands.json file + options_map = {} + if args.compile_cmds: + f = open(args.compile_cmds) + compile_data = json.load(f) + + for item in compile_data: + compile_options = item['command'].split() + + preprocessor_options = [] + options = iter(compile_options) + for option in options: + if option in ['-D', '-I']: + preprocessor_options.extend([option, options.__next__()]) + elif option == '-isystem': + preprocessor_options.extend(['-I' + options.__next__()]) + elif option.startswith(('-D', '-I', '-U')): + preprocessor_options.extend([option]) + + options_map[item['file']] = { + 'directory': item['directory'], + 'options': preprocessor_options + } + + # create tmp dir for storing mcdc_chekcer output + mcdc_output_path = 'mcdc_raw_output' + if os.path.exists(mcdc_output_path): + shutil.rmtree(mcdc_output_path) + os.mkdir(mcdc_output_path) + + if args.verbose: + print('Invoking mcdc_checker. This may take a while') + + # For each group of files + # Run the mcdc_checker + success = True + for group_name in sorted(groups.keys()): + files_in_group = groups[group_name] + + # If a compile_commands.json is provided, process each source file + # separately, with its associated preprocessor directives + if args.compile_cmds: + for filename in files_in_group: + if filename in options_map and options_map[filename]['options']: + arguments = cmd + options_map[filename]['options'] + [filename] + else: + arguments = cmd + [filename] + + success = invoke_mcdc_checker(arguments, mcdc_output_path, filename, args.verbose) + if not success: + rc = 1 + print(f'There were errors running mcdc_checker on {filename}.') + # Otherwise, run mcdc_checker on this group of files + else: + includes = [] + for include_dir in (args.include_dirs or []): + includes.extend(['-I' + include_dir]) + + # mcdc_checker takes either path to a single directory or file but + # but not multiple files so run it for each file in the group + for filename in files_in_group: + arguments = cmd + includes + arguments.extend([filename]) + success = invoke_mcdc_checker(arguments, mcdc_output_path, filename, args.verbose) + if not success: + rc = 1 + print(f'There were errors running mcdc_checker on {filename}.') + + if args.verbose: + print('Done running mcdc_checker.') + + # run mcdc_checker_output_parser to convert to sarif format + if args.sarif_file: + if args.verbose: + print('Converting output to SARIF format.') + success = write_sarif_file(args.sarif_file, mcdc_output_path, args.verbose) + if not success: + rc = 1 + print('There were errors running mcdc_checker_output_parser.py.') + if args.verbose: + print(f'SARIF file saved to {args.sarif_file}.') + + return rc + + +if __name__ == '__main__': + sys.exit(main())