X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=ice_validator%2Ftests%2Fconftest.py;h=439cdd7314271f6ff9864d22b1bb69b12a2af451;hb=2f5bed51e4b120e2efffe6a6298c24270e862ad4;hp=5184fb6484703f23a51f838b5fbdb74dc93f50e5;hpb=3fee7ccbfd465eb82508229306cd69d4f0340239;p=vvp%2Fvalidation-scripts.git diff --git a/ice_validator/tests/conftest.py b/ice_validator/tests/conftest.py index 5184fb6..439cdd7 100644 --- a/ice_validator/tests/conftest.py +++ b/ice_validator/tests/conftest.py @@ -42,14 +42,10 @@ import io import json import os import re -import sys import time from collections import defaultdict -from itertools import chain -import requests import traceback -import warnings import docutils.core import jinja2 @@ -58,23 +54,30 @@ from more_itertools import partition import xlsxwriter from six import string_types +# noinspection PyUnresolvedReferences +import version +import logging + +logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR) + __path__ = [os.path.dirname(os.path.abspath(__file__))] DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0]) -RESOLUTION_STEPS_FILE = "resolution_steps.json" -HEAT_REQUIREMENTS_FILE = "heat_requirements.json" - -# noinspection PyPep8 -NEEDS_JSON_URL = "https://onap.readthedocs.io/en/latest/_downloads/789ac64d223325488fb3f120f959d985/needs.json" +HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json") +TEST_SCRIPT_SITE = ( + "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/" +) +VNFRQTS_ID_URL = ( + "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/" +) REPORT_COLUMNS = [ + ("Error #", "err_num"), ("Input File", "file"), - ("Test", "test_file"), ("Requirements", "req_description"), - ("Resolution Steps", "resolution_steps"), ("Error Message", "message"), - ("Raw Test Output", "raw_output"), + ("Test", "test_file"), ] COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred @@ -89,6 +92,11 @@ ALL_RESULTS = [] def get_output_dir(config): + """ + Retrieve the output directory for the reports and create it if necessary + :param config: pytest configuration + :return: output directory as string + """ output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR if not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True) @@ -132,7 +140,7 @@ class TestResult: def __init__(self, item, outcome): self.item = item self.result = outcome.get_result() - self.files = [os.path.normpath(p) for p in self._get_files()] + self.files = self._get_files() self.error_message = self._get_error_message() @property @@ -188,6 +196,13 @@ class TestResult: """ return self.item.function.__module__.split(".")[-1] + @property + def test_id(self): + """ + :return: ID of the test (test_module + test_case) + """ + return "{}::{}".format(self.test_module, self.test_case) + @property def raw_output(self): """ @@ -207,6 +222,7 @@ class TestResult: text = ( "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"]) for r_id in self.requirement_ids + if r_id in curr_reqs ) return "".join(text) @@ -236,30 +252,6 @@ class TestResult: ) return data - def resolution_steps(self, resolutions): - """ - :param resolutions: Loaded from contents for resolution_steps.json - :return: Header and text for the resolution step associated with this - test case. Returns empty string if no resolutions are - provided. - """ - text = ( - "\n{}: \n{}".format(entry["header"], entry["resolution_steps"]) - for entry in resolutions - if self._match(entry) - ) - return "".join(text) - - def _match(self, resolution_entry): - """ - Returns True if the test result maps to the given entry in - the resolutions file - """ - return ( - self.test_case == resolution_entry["function"] - and self.test_module == resolution_entry["module"] - ) - def _get_files(self): """ Extracts the list of files passed into the test case. @@ -276,11 +268,12 @@ class TestResult: "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"]) ] elif "heat_templates" in self.item.fixturenames: - return self.item.funcargs["heat_templates"] + return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]] elif "yaml_files" in self.item.fixturenames: - return self.item.funcargs["yaml_files"] + return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]] else: - return [self.result.nodeid.split("[")[1][:-1]] + parts = self.result.nodeid.split("[") + return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])] def _get_error_message(self): """ @@ -303,7 +296,6 @@ def pytest_runtest_makereport(item, call): if outcome.get_result().when != "call": return # only capture results of test cases themselves result = TestResult(item, outcome) - ALL_RESULTS.append(result) if ( not item.config.option.continue_on_failure and result.is_base_test @@ -312,9 +304,11 @@ def pytest_runtest_makereport(item, call): msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format( result.error_message ) - pytest.exit( - "{}\n{}\n{}".format(msg, result.files, result.test_case) - ) + result.error_message = msg + ALL_RESULTS.append(result) + pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case)) + + ALL_RESULTS.append(result) def make_timestamp(): @@ -339,12 +333,17 @@ def pytest_sessionfinish(session, exitstatus): """ if not session.config.option.template_dir: return - template_path = os.path.abspath(session.config.option.template_dir[0]) - profile_name = session.config.option.validation_profile_name or "" + + if session.config.option.template_source: + template_source = session.config.option.template_source[0] + else: + template_source = os.path.abspath(session.config.option.template_dir[0]) + + categories_selected = session.config.option.test_categories or "" generate_report( get_output_dir(session.config), - template_path, - profile_name, + template_source, + categories_selected, session.config.option.report_format, ) @@ -352,44 +351,56 @@ def pytest_sessionfinish(session, exitstatus): # noinspection PyUnusedLocal def pytest_collection_modifyitems(session, config, items): """ - Selects tests based on the validation profile requested. Tests without - pytest markers will always be executed. + Selects tests based on the categories requested. Tests without + categories will always be executed. """ - allowed_marks = ["xfail", "base"] - profile = config.option.validation_profile - - for item in items: - markers = set(m.name for m in item.iter_markers()) - if not profile and markers and set(markers).isdisjoint(allowed_marks): - item.add_marker( - pytest.mark.skip( - reason="No validation profile selected. " - "Skipping tests with marks." - ) - ) - if ( - profile - and markers - and profile not in markers - and set(markers).isdisjoint(allowed_marks) - ): - item.add_marker( - pytest.mark.skip(reason="Doesn't match selection " "validation profile") - ) + config.traceability_items = list(items) # save all items for traceability + if not config.option.self_test: + for item in items: + # checking if test belongs to a category + if hasattr(item.function, "categories"): + if config.option.test_categories: + test_categories = getattr(item.function, "categories") + passed_categories = config.option.test_categories + if not all( + category in passed_categories for category in test_categories + ): + item.add_marker( + pytest.mark.skip( + reason=( + "Test categories do not match " + "all the passed categories" + ) + ) + ) + else: + item.add_marker( + pytest.mark.skip( + reason=( + "Test belongs to a category but " + "no categories were passed" + ) + ) + ) items.sort( - key=lambda i: 0 if "base" in set(m.name for m in i.iter_markers()) else 1 + key=lambda x: (0, x.name) + if "base" in set(m.name for m in x.iter_markers()) + else (1, x.name) ) -def make_href(paths): +def make_href(paths, base_dir=None): """ Create an anchor tag to link to the file paths provided. :param paths: string or list of file paths + :param base_dir: If specified this is pre-pended to each path :return: String of hrefs - one for each path, each seperated by a line break (
".join(links) -def load_resolutions_file(): - """ - :return: dict of data loaded from resolutions_steps.json - """ - resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE) - if os.path.exists(resolution_steps): - with open(resolution_steps, "r") as f: - return json.loads(f.read()) - - -def generate_report(outpath, template_path, profile_name, output_format="html"): +def generate_report(outpath, template_path, categories, output_format="html"): """ Generates the various output reports. :param outpath: destination directory for all reports :param template_path: directory containing the Heat templates validated - :param profile_name: Optional validation profile selected + :param categories: Optional categories selected :param output_format: One of "html", "excel", or "csv". Default is "html" :raises: ValueError if requested output format is unknown """ failures = [r for r in ALL_RESULTS if r.is_failed] generate_failure_file(outpath) output_format = output_format.lower().strip() if output_format else "html" + generate_json(outpath, template_path, categories) if output_format == "html": - generate_html_report(outpath, profile_name, template_path, failures) + generate_html_report(outpath, categories, template_path, failures) elif output_format == "excel": - generate_excel_report(outpath, profile_name, template_path, failures) + generate_excel_report(outpath, categories, template_path, failures) elif output_format == "json": - generate_json(outpath, template_path, profile_name) + return elif output_format == "csv": - generate_csv_report(outpath, profile_name, template_path, failures) + generate_csv_report(outpath, categories, template_path, failures) else: raise ValueError("Unsupported output format: " + output_format) @@ -469,10 +471,11 @@ def generate_failure_file(outpath): write_json(data, failure_path) -def generate_csv_report(output_dir, profile_name, template_path, failures): +def generate_csv_report(output_dir, categories, template_path, failures): rows = [["Validation Failures"]] headers = [ - ("Profile Selected:", profile_name), + ("Categories Selected:", categories), + ("Tool Version:", version.VERSION), ("Report Generated At:", make_timestamp()), ("Directory Validated:", template_path), ("Checksum:", hash_directory(template_path)), @@ -501,18 +504,16 @@ def generate_csv_report(output_dir, profile_name, template_path, failures): rows.append([col for col, _ in REPORT_COLUMNS]) reqs = load_current_requirements() - resolutions = load_resolutions_file() # table content - for failure in failures: + for i, failure in enumerate(failures, start=1): rows.append( [ + i, "\n".join(failure.files), - failure.test_module, failure.requirement_text(reqs), - failure.resolution_steps(resolutions), failure.error_message, - failure.raw_output, + failure.test_id, ] ) @@ -523,18 +524,21 @@ def generate_csv_report(output_dir, profile_name, template_path, failures): writer.writerow(row) -def generate_excel_report(output_dir, profile_name, template_path, failures): +def generate_excel_report(output_dir, categories, template_path, failures): output_path = os.path.join(output_dir, "report.xlsx") workbook = xlsxwriter.Workbook(output_path) - bold = workbook.add_format({"bold": True}) - code = workbook.add_format(({"font_name": "Courier", "text_wrap": True})) - normal = workbook.add_format({"text_wrap": True}) + bold = workbook.add_format({"bold": True, "align": "top"}) + code = workbook.add_format( + {"font_name": "Courier", "text_wrap": True, "align": "top"} + ) + normal = workbook.add_format({"text_wrap": True, "align": "top"}) heading = workbook.add_format({"bold": True, "font_size": 18}) worksheet = workbook.add_worksheet("failures") worksheet.write(0, 0, "Validation Failures", heading) headers = [ - ("Profile Selected:", profile_name), + ("Categories Selected:", ",".join(categories)), + ("Tool Version:", version.VERSION), ("Report Generated At:", make_timestamp()), ("Directory Validated:", template_path), ("Checksum:", hash_directory(template_path)), @@ -566,17 +570,24 @@ def generate_excel_report(output_dir, profile_name, template_path, failures): worksheet.write(start_error_table_row + 1, col_num, col_name, bold) reqs = load_current_requirements() - resolutions = load_resolutions_file() # table content + for col, width in enumerate((20, 30, 60, 60, 40)): + worksheet.set_column(col, col, width) + err_num = 1 for row, failure in enumerate(failures, start=start_error_table_row + 2): - worksheet.write(row, 0, "\n".join(failure.files), normal) - worksheet.write(row, 1, failure.test_module, normal) + worksheet.write(row, 0, str(err_num), normal) + worksheet.write(row, 1, "\n".join(failure.files), normal) worksheet.write(row, 2, failure.requirement_text(reqs), normal) - worksheet.write(row, 3, failure.resolution_steps(resolutions), normal) - worksheet.write(row, 4, failure.error_message, normal) - worksheet.write(row, 5, failure.raw_output, code) - + worksheet.write(row, 3, failure.error_message, normal) + worksheet.write(row, 4, failure.test_id, normal) + err_num += 1 + worksheet.autofilter( + start_error_table_row + 1, + 0, + start_error_table_row + 1 + err_num, + len(REPORT_COLUMNS) - 1, + ) workbook.close() @@ -589,44 +600,20 @@ def make_iso_timestamp(): return now.isoformat() -def aggregate_requirement_adherence(r_id, collection_failures, test_results): - """ - Examines all tests associated with a given requirement and determines - the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement. - - * ERROR - At least one ERROR occurred - * PASS - At least one PASS and no FAIL or ERRORs. - * FAIL - At least one FAIL occurred (no ERRORs) - * SKIP - All tests were SKIP - - - :param r_id: Requirement ID to examing - :param collection_failures: Errors that occurred during test setup. - :param test_results: List of TestResult - :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR' - """ - errors = any(r_id in f["requirements"] for f in collection_failures) - outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids) - return aggregate_results(errors, outcomes, r_id) - - -def aggregate_results(has_errors, outcomes, r_id=None): +def aggregate_results(outcomes, r_id=None): """ Determines the aggregate result for the conditions provided. Assumes the results have been filtered and collected for analysis. - :param has_errors: True if collection failures occurred for the tests being - analyzed. :param outcomes: set of outcomes from the TestResults :param r_id: Optional requirement ID if known :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP' (see aggregate_requirement_adherence for more detail) """ - if has_errors: - return "ERROR" - if not outcomes: return "PASS" + elif "ERROR" in outcomes: + return "ERROR" elif "FAIL" in outcomes: return "FAIL" elif "PASS" in outcomes: @@ -636,7 +623,8 @@ def aggregate_results(has_errors, outcomes, r_id=None): else: pytest.warns( "Unexpected error aggregating outcomes ({}) for requirement {}".format( - outcomes, r_id) + outcomes, r_id + ) ) return "ERROR" @@ -661,60 +649,24 @@ def aggregate_run_results(collection_failures, test_results): return "PASS" -def error(failure_or_result): - """ - Extracts the error message from a collection failure or test result - :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult - :return: Error message as string - """ - if isinstance(failure_or_result, TestResult): - return failure_or_result.error_message - else: - return failure_or_result["error"] - - -def req_ids(failure_or_result): - """ - Extracts the requirement IDs from a collection failure or test result - :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult - :return: set of Requirement IDs. If no requirements mapped, then an empty set - """ - if isinstance(failure_or_result, TestResult): - return set(failure_or_result.requirement_ids) - else: - return set(failure_or_result["requirements"]) +def relative_paths(base_dir, paths): + return [os.path.relpath(p, base_dir) for p in paths] -def collect_errors(r_id, collection_failures, test_result): - """ - Creates a list of error messages from the collection failures and - test results. If r_id is provided, then it collects the error messages - where the failure or test is associated with that requirement ID. If - r_id is None, then it collects all errors that occur on failures and - results that are not mapped to requirements - """ - def selector(item): - if r_id: - return r_id in req_ids(item) - else: - return not req_ids(item) - - errors = (error(x) for x in chain(collection_failures, test_result) - if selector(x)) - return [e for e in errors if e] - - -def generate_json(outpath, template_path, profile_name): +# noinspection PyTypeChecker +def generate_json(outpath, template_path, categories): """ Creates a JSON summary of the entire test run. """ reqs = load_current_requirements() data = { "version": "dublin", - "template_directory": template_path, + "template_directory": os.path.splitdrive(template_path)[1].replace( + os.path.sep, "/" + ), "timestamp": make_iso_timestamp(), "checksum": hash_directory(template_path), - "profile": profile_name, + "categories": categories, "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS), "tests": [], "requirements": [], @@ -735,7 +687,7 @@ def generate_json(outpath, template_path, profile_name): for result in ALL_RESULTS: results.append( { - "files": result.files, + "files": relative_paths(template_path, result.files), "test_module": result.test_module, "test_case": result.test_case, "result": result.outcome, @@ -744,30 +696,40 @@ def generate_json(outpath, template_path, profile_name): } ) + # Build a mapping of requirement ID to the results + r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()}) + for test_result in results: + test_reqs = test_result["requirements"] + r_ids = ( + [r["id"] if isinstance(r, dict) else r for r in test_reqs] + if test_reqs + else ("",) + ) + for r_id in r_ids: + item = r_id_results[r_id] + item["outcomes"].add(test_result["result"]) + if test_result["error"]: + item["errors"].add(test_result["error"]) + requirements = data["requirements"] for r_id, r_data in reqs.items(): - result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS) - if result: - requirements.append( - { - "id": r_id, - "text": r_data["description"], - "keyword": r_data["keyword"], - "result": result, - "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS) - } - ) - # If there are tests that aren't mapped to a requirement, then we'll - # map them to a special entry so the results are coherent. - unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids} - has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES) - if unmapped_outcomes or has_errors: + requirements.append( + { + "id": r_id, + "text": r_data["description"], + "keyword": r_data["keyword"], + "result": aggregate_results(r_id_results[r_id]["outcomes"]), + "errors": list(r_id_results[r_id]["errors"]), + } + ) + + if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]: requirements.append( { "id": "Unmapped", "text": "Tests not mapped to requirements (see tests)", - "result": aggregate_results(has_errors, unmapped_outcomes), - "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS) + "result": aggregate_results(r_id_results[""]["outcomes"]), + "errors": list(r_id_results[""]["errors"]), } ) @@ -775,21 +737,19 @@ def generate_json(outpath, template_path, profile_name): write_json(data, report_path) -def generate_html_report(outpath, profile_name, template_path, failures): +def generate_html_report(outpath, categories, template_path, failures): reqs = load_current_requirements() - resolutions = load_resolutions_file() fail_data = [] for failure in failures: fail_data.append( { - "file_links": make_href(failure.files), - "test_id": failure.test_module, + "file_links": make_href(failure.files, template_path), + "test_id": failure.test_id, "error_message": failure.error_message, "raw_output": failure.raw_output, "requirements": docutils.core.publish_parts( writer_name="html", source=failure.requirement_text(reqs) )["body"], - "resolution_steps": failure.resolution_steps(resolutions), } ) pkg_dir = os.path.split(__file__)[0] @@ -797,8 +757,9 @@ def generate_html_report(outpath, profile_name, template_path, failures): with open(j2_template_path, "r") as f: report_template = jinja2.Template(f.read()) contents = report_template.render( + version=version.VERSION, num_failures=len(failures) + len(COLLECTION_FAILURES), - profile_name=profile_name, + categories=categories, template_dir=make_href(template_path), checksum=hash_directory(template_path), timestamp=make_timestamp(), @@ -820,6 +781,13 @@ def pytest_addoption(parser): help="Directory which holds the templates for validation", ) + parser.addoption( + "--template-source", + dest="template_source", + action="append", + help="Source Directory which holds the templates for validation", + ) + parser.addoption( "--self-test", dest="self_test", @@ -827,20 +795,6 @@ def pytest_addoption(parser): help="Test the unit tests against their fixtured data", ) - parser.addoption( - "--validation-profile", - dest="validation_profile", - action="store", - help="Runs all unmarked tests plus test with a matching marker", - ) - - parser.addoption( - "--validation-profile-name", - dest="validation_profile_name", - action="store", - help="Friendly name of the validation profile used in reports", - ) - parser.addoption( "--report-format", dest="report_format", @@ -860,7 +814,14 @@ def pytest_addoption(parser): dest="output_dir", action="store", default=None, - help="Alternate " + help="Alternate ", + ) + + parser.addoption( + "--category", + dest="test_categories", + action="append", + help="optional category of test to execute", ) @@ -992,6 +953,11 @@ def pytest_generate_tests(metafunc): def hash_directory(path): + """ + Create md5 hash using the contents of all files under ``path`` + :param path: string directory containing files + :return: string MD5 hash code (hex) + """ md5 = hashlib.md5() for dir_path, sub_dirs, filenames in os.walk(path): for filename in filenames: @@ -1003,51 +969,99 @@ def hash_directory(path): def load_current_requirements(): """Loads dict of current requirements or empty dict if file doesn't exist""" - try: - r = requests.get(NEEDS_JSON_URL) - if r.headers.get("content-type") == "application/json": - with open(HEAT_REQUIREMENTS_FILE, "wb") as needs: - needs.write(r.content) - else: - warnings.warn( - ( - "Unexpected content-type ({}) encountered downloading " - + "requirements.json, using last saved copy" - ).format(r.headers.get("content-type")) - ) - except requests.exceptions.RequestException as e: - warnings.warn("Error downloading latest JSON, using last saved copy.") - warnings.warn(UserWarning(e)) - if not os.path.exists(HEAT_REQUIREMENTS_FILE): - return {} with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f: data = json.load(f) version = data["current_version"] return data["versions"][version]["needs"] -def compat_open(path): - """Invokes open correctly depending on the Python version""" - if sys.version_info.major < 3: - return open(path, "wb") - else: - return open(path, "w", newline="") +def select_heat_requirements(reqs): + """Filters dict requirements to only those requirements pertaining to Heat""" + return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()} + + +def is_testable(reqs): + """Filters dict requirements to only those which are testable""" + for key, values in reqs.items(): + if ("MUST" in values.get("keyword", "").upper()) and ( + "none" not in values.get("validation_mode", "").lower() + ): + reqs[key]["testable"] = True + else: + reqs[key]["testable"] = False + return reqs + + +def build_rst_json(reqs): + """Takes requirements and returns list of only Heat requirements""" + for key, values in list(reqs.items()): + if values["testable"]: + # Creates links in RST format to requirements and test cases + if values["test_case"]: + mod = values["test_case"].split(".")[-1] + val = TEST_SCRIPT_SITE + mod + ".py" + rst_value = "`" + mod + " <" + val + ">`_" + title = ( + "`" + + values["id"] + + " <" + + VNFRQTS_ID_URL + + values["docname"].replace(" ", "%20") + + ".html#" + + values["id"] + + ">`_" + ) + reqs[key].update({"full_title": title, "test_case": rst_value}) + else: + title = ( + "`" + + values["id"] + + " <" + + VNFRQTS_ID_URL + + values["docname"].replace(" ", "%20") + + ".html#" + + values["id"] + + ">`_" + ) + reqs[key].update( + { + "full_title": title, + "test_case": "No test for requirement", + "validated_by": "static", + } + ) + else: + del reqs[key] + return reqs -def unicode_writerow(writer, row): - if sys.version_info.major < 3: - row = [s.encode("utf8") for s in row] - writer.writerow(row) +def generate_rst_table(output_dir, data): + """Generate a formatted csv to be used in RST""" + rst_path = os.path.join(output_dir, "rst.csv") + with open(rst_path, "w", newline="") as f: + out = csv.writer(f) + out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name")) + for req_id, metadata in data.items(): + out.writerow( + ( + metadata["full_title"], + metadata["description"], + metadata["test_case"], + metadata["validated_by"], + ) + ) # noinspection PyUnusedLocal def pytest_report_collectionfinish(config, startdir, items): """Generates a simple traceability report to output/traceability.csv""" - traceability_path = os.path.join(__path__[0], "../output/traceability.csv") + traceability_path = os.path.join(get_output_dir(config), "traceability.csv") output_dir = os.path.split(traceability_path)[0] if not os.path.exists(output_dir): os.makedirs(output_dir) - requirements = load_current_requirements() + reqs = load_current_requirements() + requirements = select_heat_requirements(reqs) + testable_requirements = is_testable(requirements) unmapped, mapped = partition( lambda i: hasattr(i.function, "requirement_ids"), items ) @@ -1058,43 +1072,82 @@ def pytest_report_collectionfinish(config, startdir, items): for req_id in item.function.requirement_ids: if req_id not in req_to_test: req_to_test[req_id].add(item) + if req_id in requirements: + reqs[req_id].update( + { + "test_case": item.function.__module__, + "validated_by": item.function.__name__, + } + ) if req_id not in requirements: mapping_errors.add( (req_id, item.function.__module__, item.function.__name__) ) - mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv") - with compat_open(mapping_error_path) as f: + mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv") + with open(mapping_error_path, "w", newline="") as f: writer = csv.writer(f) for err in mapping_errors: - unicode_writerow(writer, err) + writer.writerow(err) - with compat_open(traceability_path) as f: + with open(traceability_path, "w", newline="") as f: out = csv.writer(f) - unicode_writerow( - out, - ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"), + out.writerow( + ( + "Requirement ID", + "Requirement", + "Section", + "Keyword", + "Validation Mode", + "Is Testable", + "Test Module", + "Test Name", + ) ) - for req_id, metadata in requirements.items(): + for req_id, metadata in testable_requirements.items(): if req_to_test[req_id]: for item in req_to_test[req_id]: - unicode_writerow( - out, + out.writerow( ( req_id, metadata["description"], metadata["section_name"], + metadata["keyword"], + metadata["validation_mode"], + metadata["testable"], item.function.__module__, item.function.__name__, - ), + ) ) else: - unicode_writerow( - out, - (req_id, metadata["description"], metadata["section_name"], "", ""), + out.writerow( + ( + req_id, + metadata["description"], + metadata["section_name"], + metadata["keyword"], + metadata["validation_mode"], + metadata["testable"], + "", # test module + "", + ) # test function ) # now write out any test methods that weren't mapped to requirements - for item in unmapped: - unicode_writerow( - out, ("", "", "", item.function.__module__, item.function.__name__) + unmapped_tests = { + (item.function.__module__, item.function.__name__) for item in unmapped + } + for test_module, test_name in unmapped_tests: + out.writerow( + ( + "", # req ID + "", # description + "", # section name + "", # keyword + "static", # validation mode + "TRUE", # testable + test_module, + test_name, + ) ) + + generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))