Merge "[VVP] Adding preload generation functionality"
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
index 2c88ece..2507753 100644 (file)
@@ -43,8 +43,15 @@ import json
 import os
 import re
 import time
 import os
 import re
 import time
+
+from preload import create_preloads
+from tests.helpers import get_output_dir
+
+try:
+    from html import escape
+except ImportError:
+    from cgi import escape
 from collections import defaultdict
 from collections import defaultdict
-from itertools import chain
 
 import traceback
 
 
 import traceback
 
@@ -55,24 +62,30 @@ from more_itertools import partition
 import xlsxwriter
 from six import string_types
 
 import xlsxwriter
 from six import string_types
 
+# noinspection PyUnresolvedReferences
 import version
 import version
+import logging
+
+logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
 
 __path__ = [os.path.dirname(os.path.abspath(__file__))]
 
 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
 
 
 __path__ = [os.path.dirname(os.path.abspath(__file__))]
 
 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
 
-RESOLUTION_STEPS_FILE = "resolution_steps.json"
 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
-TEST_SCRIPT_SITE = "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
-VNFRQTS_ID_URL = "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
+TEST_SCRIPT_SITE = (
+    "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
+)
+VNFRQTS_ID_URL = (
+    "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
+)
 
 REPORT_COLUMNS = [
 
 REPORT_COLUMNS = [
+    ("Error #", "err_num"),
     ("Input File", "file"),
     ("Input File", "file"),
-    ("Test", "test_file"),
     ("Requirements", "req_description"),
     ("Requirements", "req_description"),
-    ("Resolution Steps", "resolution_steps"),
     ("Error Message", "message"),
     ("Error Message", "message"),
-    ("Raw Test Output", "raw_output"),
+    ("Test", "test_file"),
 ]
 
 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
 ]
 
 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
@@ -86,18 +99,6 @@ COLLECTION_FAILURES = []
 ALL_RESULTS = []
 
 
 ALL_RESULTS = []
 
 
-def get_output_dir(config):
-    """
-    Retrieve the output directory for the reports and create it if necessary
-    :param config: pytest configuration
-    :return: output directory as string
-    """
-    output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
-    if not os.path.exists(output_dir):
-        os.makedirs(output_dir, exist_ok=True)
-    return output_dir
-
-
 def extract_error_msg(rep):
     """
     If a custom error message was provided, then extract it otherwise
 def extract_error_msg(rep):
     """
     If a custom error message was provided, then extract it otherwise
@@ -114,10 +115,10 @@ def extract_error_msg(rep):
             # Extract everything between AssertionError and the start
             # of the assert statement expansion in the pytest report
             msg = match.group(1)
             # Extract everything between AssertionError and the start
             # of the assert statement expansion in the pytest report
             msg = match.group(1)
+        elif "AssertionError:" in full_msg:
+            msg = full_msg.split("AssertionError:")[1]
         else:
         else:
-            msg = str(rep.longrepr.reprcrash)
-            if "AssertionError:" in msg:
-                msg = msg.split("AssertionError:")[1]
+            msg = full_msg
     except AttributeError:
         msg = str(rep)
 
     except AttributeError:
         msg = str(rep)
 
@@ -135,7 +136,7 @@ class TestResult:
     def __init__(self, item, outcome):
         self.item = item
         self.result = outcome.get_result()
     def __init__(self, item, outcome):
         self.item = item
         self.result = outcome.get_result()
-        self.files = [os.path.normpath(p) for p in self._get_files()]
+        self.files = self._get_files()
         self.error_message = self._get_error_message()
 
     @property
         self.error_message = self._get_error_message()
 
     @property
@@ -191,6 +192,13 @@ class TestResult:
         """
         return self.item.function.__module__.split(".")[-1]
 
         """
         return self.item.function.__module__.split(".")[-1]
 
+    @property
+    def test_id(self):
+        """
+        :return: ID of the test (test_module + test_case)
+        """
+        return "{}::{}".format(self.test_module, self.test_case)
+
     @property
     def raw_output(self):
         """
     @property
     def raw_output(self):
         """
@@ -210,6 +218,7 @@ class TestResult:
         text = (
             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
             for r_id in self.requirement_ids
         text = (
             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
             for r_id in self.requirement_ids
+            if r_id in curr_reqs
         )
         return "".join(text)
 
         )
         return "".join(text)
 
@@ -239,30 +248,6 @@ class TestResult:
             )
         return data
 
             )
         return data
 
-    def resolution_steps(self, resolutions):
-        """
-        :param resolutions: Loaded from contents for resolution_steps.json
-        :return: Header and text for the resolution step associated with this
-                 test case.  Returns empty string if no resolutions are
-                 provided.
-        """
-        text = (
-            "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
-            for entry in resolutions
-            if self._match(entry)
-        )
-        return "".join(text)
-
-    def _match(self, resolution_entry):
-        """
-        Returns True if the test result maps to the given entry in
-        the resolutions file
-        """
-        return (
-            self.test_case == resolution_entry["function"]
-            and self.test_module == resolution_entry["module"]
-        )
-
     def _get_files(self):
         """
         Extracts the list of files passed into the test case.
     def _get_files(self):
         """
         Extracts the list of files passed into the test case.
@@ -279,12 +264,12 @@ class TestResult:
                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
             ]
         elif "heat_templates" in self.item.fixturenames:
                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
             ]
         elif "heat_templates" in self.item.fixturenames:
-            return self.item.funcargs["heat_templates"]
+            return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
         elif "yaml_files" in self.item.fixturenames:
         elif "yaml_files" in self.item.fixturenames:
-            return self.item.funcargs["yaml_files"]
+            return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
         else:
             parts = self.result.nodeid.split("[")
         else:
             parts = self.result.nodeid.split("[")
-            return [""] if len(parts) == 1 else [parts[1][:-1]]
+            return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
 
     def _get_error_message(self):
         """
 
     def _get_error_message(self):
         """
@@ -307,7 +292,6 @@ def pytest_runtest_makereport(item, call):
     if outcome.get_result().when != "call":
         return  # only capture results of test cases themselves
     result = TestResult(item, outcome)
     if outcome.get_result().when != "call":
         return  # only capture results of test cases themselves
     result = TestResult(item, outcome)
-    ALL_RESULTS.append(result)
     if (
         not item.config.option.continue_on_failure
         and result.is_base_test
     if (
         not item.config.option.continue_on_failure
         and result.is_base_test
@@ -316,8 +300,12 @@ def pytest_runtest_makereport(item, call):
         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
             result.error_message
         )
         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
             result.error_message
         )
+        result.error_message = msg
+        ALL_RESULTS.append(result)
         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
 
         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
 
+    ALL_RESULTS.append(result)
+
 
 def make_timestamp():
     """
 
 def make_timestamp():
     """
@@ -356,6 +344,12 @@ def pytest_sessionfinish(session, exitstatus):
     )
 
 
     )
 
 
+def pytest_terminal_summary(terminalreporter, exitstatus):
+    # Ensures all preload information and warnings appear after
+    # test results
+    create_preloads(terminalreporter.config, exitstatus)
+
+
 # noinspection PyUnusedLocal
 def pytest_collection_modifyitems(session, config, items):
     """
 # noinspection PyUnusedLocal
 def pytest_collection_modifyitems(session, config, items):
     """
@@ -375,28 +369,40 @@ def pytest_collection_modifyitems(session, config, items):
                     ):
                         item.add_marker(
                             pytest.mark.skip(
                     ):
                         item.add_marker(
                             pytest.mark.skip(
-                                reason="Test categories do not match all the passed categories"
+                                reason=(
+                                    "Test categories do not match "
+                                    "all the passed categories"
+                                )
                             )
                         )
                 else:
                     item.add_marker(
                         pytest.mark.skip(
                             )
                         )
                 else:
                     item.add_marker(
                         pytest.mark.skip(
-                            reason="Test belongs to a category but no categories were passed"
+                            reason=(
+                                "Test belongs to a category but "
+                                "no categories were passed"
+                            )
                         )
                     )
                         )
                     )
+
     items.sort(
     items.sort(
-        key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
+        key=lambda x: (0, x.name)
+        if "base" in set(m.name for m in x.iter_markers())
+        else (1, x.name)
     )
 
 
     )
 
 
-def make_href(paths):
+def make_href(paths, base_dir=None):
     """
     Create an anchor tag to link to the file paths provided.
     :param paths: string or list of file paths
     """
     Create an anchor tag to link to the file paths provided.
     :param paths: string or list of file paths
+    :param base_dir: If specified this is pre-pended to each path
     :return: String of hrefs - one for each path, each seperated by a line
              break (<br/).
     """
     paths = [paths] if isinstance(paths, string_types) else paths
     :return: String of hrefs - one for each path, each seperated by a line
              break (<br/).
     """
     paths = [paths] if isinstance(paths, string_types) else paths
+    if base_dir:
+        paths = [os.path.join(base_dir, p) for p in paths]
     links = []
     for p in paths:
         abs_path = os.path.abspath(p)
     links = []
     for p in paths:
         abs_path = os.path.abspath(p)
@@ -409,16 +415,6 @@ def make_href(paths):
     return "<br/>".join(links)
 
 
     return "<br/>".join(links)
 
 
-def load_resolutions_file():
-    """
-    :return: dict of data loaded from resolutions_steps.json
-    """
-    resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
-    if os.path.exists(resolution_steps):
-        with open(resolution_steps, "r") as f:
-            return json.loads(f.read())
-
-
 def generate_report(outpath, template_path, categories, output_format="html"):
     """
     Generates the various output reports.
 def generate_report(outpath, template_path, categories, output_format="html"):
     """
     Generates the various output reports.
@@ -432,12 +428,13 @@ def generate_report(outpath, template_path, categories, output_format="html"):
     failures = [r for r in ALL_RESULTS if r.is_failed]
     generate_failure_file(outpath)
     output_format = output_format.lower().strip() if output_format else "html"
     failures = [r for r in ALL_RESULTS if r.is_failed]
     generate_failure_file(outpath)
     output_format = output_format.lower().strip() if output_format else "html"
+    generate_json(outpath, template_path, categories)
     if output_format == "html":
         generate_html_report(outpath, categories, template_path, failures)
     elif output_format == "excel":
         generate_excel_report(outpath, categories, template_path, failures)
     elif output_format == "json":
     if output_format == "html":
         generate_html_report(outpath, categories, template_path, failures)
     elif output_format == "excel":
         generate_excel_report(outpath, categories, template_path, failures)
     elif output_format == "json":
-        generate_json(outpath, template_path, categories)
+        return
     elif output_format == "csv":
         generate_csv_report(outpath, categories, template_path, failures)
     else:
     elif output_format == "csv":
         generate_csv_report(outpath, categories, template_path, failures)
     else:
@@ -509,18 +506,16 @@ def generate_csv_report(output_dir, categories, template_path, failures):
     rows.append([col for col, _ in REPORT_COLUMNS])
 
     reqs = load_current_requirements()
     rows.append([col for col, _ in REPORT_COLUMNS])
 
     reqs = load_current_requirements()
-    resolutions = load_resolutions_file()
 
     # table content
 
     # table content
-    for failure in failures:
+    for i, failure in enumerate(failures, start=1):
         rows.append(
             [
         rows.append(
             [
+                i,
                 "\n".join(failure.files),
                 "\n".join(failure.files),
-                failure.test_module,
                 failure.requirement_text(reqs),
                 failure.requirement_text(reqs),
-                failure.resolution_steps(resolutions),
                 failure.error_message,
                 failure.error_message,
-                failure.raw_output,
+                failure.test_id,
             ]
         )
 
             ]
         )
 
@@ -534,9 +529,11 @@ def generate_csv_report(output_dir, categories, template_path, failures):
 def generate_excel_report(output_dir, categories, template_path, failures):
     output_path = os.path.join(output_dir, "report.xlsx")
     workbook = xlsxwriter.Workbook(output_path)
 def generate_excel_report(output_dir, categories, template_path, failures):
     output_path = os.path.join(output_dir, "report.xlsx")
     workbook = xlsxwriter.Workbook(output_path)
-    bold = workbook.add_format({"bold": True})
-    code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
-    normal = workbook.add_format({"text_wrap": True})
+    bold = workbook.add_format({"bold": True, "align": "top"})
+    code = workbook.add_format(
+        {"font_name": "Courier", "text_wrap": True, "align": "top"}
+    )
+    normal = workbook.add_format({"text_wrap": True, "align": "top"})
     heading = workbook.add_format({"bold": True, "font_size": 18})
     worksheet = workbook.add_worksheet("failures")
     worksheet.write(0, 0, "Validation Failures", heading)
     heading = workbook.add_format({"bold": True, "font_size": 18})
     worksheet = workbook.add_worksheet("failures")
     worksheet.write(0, 0, "Validation Failures", heading)
@@ -575,17 +572,24 @@ def generate_excel_report(output_dir, categories, template_path, failures):
         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
 
     reqs = load_current_requirements()
         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
 
     reqs = load_current_requirements()
-    resolutions = load_resolutions_file()
 
     # table content
 
     # table content
+    for col, width in enumerate((20, 30, 60, 60, 40)):
+        worksheet.set_column(col, col, width)
+    err_num = 1
     for row, failure in enumerate(failures, start=start_error_table_row + 2):
     for row, failure in enumerate(failures, start=start_error_table_row + 2):
-        worksheet.write(row, 0, "\n".join(failure.files), normal)
-        worksheet.write(row, 1, failure.test_module, normal)
+        worksheet.write(row, 0, str(err_num), normal)
+        worksheet.write(row, 1, "\n".join(failure.files), normal)
         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
-        worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
-        worksheet.write(row, 4, failure.error_message, normal)
-        worksheet.write(row, 5, failure.raw_output, code)
-
+        worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
+        worksheet.write(row, 4, failure.test_id, normal)
+        err_num += 1
+    worksheet.autofilter(
+        start_error_table_row + 1,
+        0,
+        start_error_table_row + 1 + err_num,
+        len(REPORT_COLUMNS) - 1,
+    )
     workbook.close()
 
 
     workbook.close()
 
 
@@ -598,44 +602,20 @@ def make_iso_timestamp():
     return now.isoformat()
 
 
     return now.isoformat()
 
 
-def aggregate_requirement_adherence(r_id, collection_failures, test_results):
-    """
-    Examines all tests associated with a given requirement and determines
-    the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
-
-    * ERROR - At least one ERROR occurred
-    * PASS -  At least one PASS and no FAIL or ERRORs.
-    * FAIL -  At least one FAIL occurred (no ERRORs)
-    * SKIP - All tests were SKIP
-
-
-    :param r_id: Requirement ID to examing
-    :param collection_failures: Errors that occurred during test setup.
-    :param test_results: List of TestResult
-    :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
-    """
-    errors = any(r_id in f["requirements"] for f in collection_failures)
-    outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
-    return aggregate_results(errors, outcomes, r_id)
-
-
-def aggregate_results(has_errors, outcomes, r_id=None):
+def aggregate_results(outcomes, r_id=None):
     """
     Determines the aggregate result for the conditions provided.  Assumes the
     results have been filtered and collected for analysis.
 
     """
     Determines the aggregate result for the conditions provided.  Assumes the
     results have been filtered and collected for analysis.
 
-    :param has_errors: True if collection failures occurred for the tests being
-                       analyzed.
     :param outcomes: set of outcomes from the TestResults
     :param r_id: Optional requirement ID if known
     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
              (see aggregate_requirement_adherence for more detail)
     """
     :param outcomes: set of outcomes from the TestResults
     :param r_id: Optional requirement ID if known
     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
              (see aggregate_requirement_adherence for more detail)
     """
-    if has_errors:
-        return "ERROR"
-
     if not outcomes:
         return "PASS"
     if not outcomes:
         return "PASS"
+    elif "ERROR" in outcomes:
+        return "ERROR"
     elif "FAIL" in outcomes:
         return "FAIL"
     elif "PASS" in outcomes:
     elif "FAIL" in outcomes:
         return "FAIL"
     elif "PASS" in outcomes:
@@ -671,53 +651,11 @@ def aggregate_run_results(collection_failures, test_results):
         return "PASS"
 
 
         return "PASS"
 
 
-def error(failure_or_result):
-    """
-    Extracts the error message from a collection failure or test result
-    :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
-    :return: Error message as string
-    """
-    if isinstance(failure_or_result, TestResult):
-        return failure_or_result.error_message
-    else:
-        return failure_or_result["error"]
-
-
-def req_ids(failure_or_result):
-    """
-    Extracts the requirement IDs from a collection failure or test result
-    :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
-    :return: set of Requirement IDs.  If no requirements mapped, then an empty set
-    """
-    if isinstance(failure_or_result, TestResult):
-        return set(failure_or_result.requirement_ids)
-    else:
-        return set(failure_or_result["requirements"])
-
-
-def collect_errors(r_id, collection_failures, test_result):
-    """
-    Creates a list of error messages from the collection failures and
-    test results.  If r_id is provided, then it collects the error messages
-    where the failure or test is associated with that requirement ID.  If
-    r_id is None, then it collects all errors that occur on failures and
-    results that are not mapped to requirements
-    """
-
-    def selector(item):
-        if r_id:
-            return r_id in req_ids(item)
-        else:
-            return not req_ids(item)
-
-    errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
-    return [e for e in errors if e]
-
-
 def relative_paths(base_dir, paths):
 def relative_paths(base_dir, paths):
-    return [os.path.relpath(p, base_dir) for p in paths]
+    return [os.path.relpath(p, base_dir) for p in paths if p != ""]
 
 
 
 
+# noinspection PyTypeChecker
 def generate_json(outpath, template_path, categories):
     """
     Creates a JSON summary of the entire test run.
 def generate_json(outpath, template_path, categories):
     """
     Creates a JSON summary of the entire test run.
@@ -760,30 +698,40 @@ def generate_json(outpath, template_path, categories):
             }
         )
 
             }
         )
 
+    # Build a mapping of requirement ID to the results
+    r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
+    for test_result in results:
+        test_reqs = test_result["requirements"]
+        r_ids = (
+            [r["id"] if isinstance(r, dict) else r for r in test_reqs]
+            if test_reqs
+            else ("",)
+        )
+        for r_id in r_ids:
+            item = r_id_results[r_id]
+            item["outcomes"].add(test_result["result"])
+            if test_result["error"]:
+                item["errors"].add(test_result["error"])
+
     requirements = data["requirements"]
     for r_id, r_data in reqs.items():
     requirements = data["requirements"]
     for r_id, r_data in reqs.items():
-        result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
-        if result:
-            requirements.append(
-                {
-                    "id": r_id,
-                    "text": r_data["description"],
-                    "keyword": r_data["keyword"],
-                    "result": result,
-                    "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
-                }
-            )
-    # If there are tests that aren't mapped to a requirement, then we'll
-    # map them to a special entry so the results are coherent.
-    unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
-    has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
-    if unmapped_outcomes or has_errors:
+        requirements.append(
+            {
+                "id": r_id,
+                "text": r_data["description"],
+                "keyword": r_data["keyword"],
+                "result": aggregate_results(r_id_results[r_id]["outcomes"]),
+                "errors": list(r_id_results[r_id]["errors"]),
+            }
+        )
+
+    if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
         requirements.append(
             {
                 "id": "Unmapped",
                 "text": "Tests not mapped to requirements (see tests)",
         requirements.append(
             {
                 "id": "Unmapped",
                 "text": "Tests not mapped to requirements (see tests)",
-                "result": aggregate_results(has_errors, unmapped_outcomes),
-                "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
+                "result": aggregate_results(r_id_results[""]["outcomes"]),
+                "errors": list(r_id_results[""]["errors"]),
             }
         )
 
             }
         )
 
@@ -793,19 +741,19 @@ def generate_json(outpath, template_path, categories):
 
 def generate_html_report(outpath, categories, template_path, failures):
     reqs = load_current_requirements()
 
 def generate_html_report(outpath, categories, template_path, failures):
     reqs = load_current_requirements()
-    resolutions = load_resolutions_file()
     fail_data = []
     for failure in failures:
         fail_data.append(
             {
     fail_data = []
     for failure in failures:
         fail_data.append(
             {
-                "file_links": make_href(failure.files),
-                "test_id": failure.test_module,
-                "error_message": failure.error_message,
-                "raw_output": failure.raw_output,
+                "file_links": make_href(failure.files, template_path),
+                "test_id": failure.test_id,
+                "error_message": escape(failure.error_message).replace(
+                    "\n", "<br/><br/>"
+                ),
+                "raw_output": escape(failure.raw_output),
                 "requirements": docutils.core.publish_parts(
                     writer_name="html", source=failure.requirement_text(reqs)
                 )["body"],
                 "requirements": docutils.core.publish_parts(
                     writer_name="html", source=failure.requirement_text(reqs)
                 )["body"],
-                "resolution_steps": failure.resolution_steps(resolutions),
             }
         )
     pkg_dir = os.path.split(__file__)[0]
             }
         )
     pkg_dir = os.path.split(__file__)[0]
@@ -1014,7 +962,7 @@ def hash_directory(path):
     :param path: string directory containing files
     :return: string MD5 hash code (hex)
     """
     :param path: string directory containing files
     :return: string MD5 hash code (hex)
     """
-    md5 = hashlib.md5()
+    md5 = hashlib.md5()  # nosec
     for dir_path, sub_dirs, filenames in os.walk(path):
         for filename in filenames:
             file_path = os.path.join(dir_path, filename)
     for dir_path, sub_dirs, filenames in os.walk(path):
         for filename in filenames:
             file_path = os.path.join(dir_path, filename)
@@ -1033,47 +981,74 @@ def load_current_requirements():
 
 def select_heat_requirements(reqs):
     """Filters dict requirements to only those requirements pertaining to Heat"""
 
 def select_heat_requirements(reqs):
     """Filters dict requirements to only those requirements pertaining to Heat"""
-    return {k: v for k, v in reqs.items() if "Heat" in v["docname"]}
+    return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
+
+
+def is_testable(reqs):
+    """Filters dict requirements to only those which are testable"""
+    for key, values in reqs.items():
+        if ("MUST" in values.get("keyword", "").upper()) and (
+            "none" not in values.get("validation_mode", "").lower()
+        ):
+            reqs[key]["testable"] = True
+        else:
+            reqs[key]["testable"] = False
+    return reqs
 
 
 def build_rst_json(reqs):
     """Takes requirements and returns list of only Heat requirements"""
 
 
 def build_rst_json(reqs):
     """Takes requirements and returns list of only Heat requirements"""
-    data = json.loads(reqs)
-    for key, values in list(data.items()):
-        if "Heat" in (values["docname"]):
-            if "MUST" in (values["keyword"]):
-                if "none" in (values["validation_mode"]):
-                    del data[key]
-                else:
-                    # Creates links in RST format to requirements and test cases
-                    if values["test_case"]:
-                        mod = values["test_case"].split(".")[-1]
-                        val = TEST_SCRIPT_SITE + mod + ".py"
-                        rst_value = ("`" + mod + " <" + val + ">`_")
-                        title = "`" + values["id"] + " <" + VNFRQTS_ID_URL + values["docname"].replace(" ", "%20") + ".html#" + values["id"] + ">`_"
-                        data[key].update({'full_title': title, 'test_case': rst_value})
-                    else:
-                        del data[key]
+    for key, values in list(reqs.items()):
+        if values["testable"]:
+            # Creates links in RST format to requirements and test cases
+            if values["test_case"]:
+                mod = values["test_case"].split(".")[-1]
+                val = TEST_SCRIPT_SITE + mod + ".py"
+                rst_value = "`" + mod + " <" + val + ">`_"
+                title = (
+                    "`"
+                    + values["id"]
+                    + " <"
+                    + VNFRQTS_ID_URL
+                    + values["docname"].replace(" ", "%20")
+                    + ".html#"
+                    + values["id"]
+                    + ">`_"
+                )
+                reqs[key].update({"full_title": title, "test_case": rst_value})
             else:
             else:
-                del data[key]
+                title = (
+                    "`"
+                    + values["id"]
+                    + " <"
+                    + VNFRQTS_ID_URL
+                    + values["docname"].replace(" ", "%20")
+                    + ".html#"
+                    + values["id"]
+                    + ">`_"
+                )
+                reqs[key].update(
+                    {
+                        "full_title": title,
+                        "test_case": "No test for requirement",
+                        "validated_by": "static",
+                    }
+                )
         else:
         else:
-            del data[key]
-    return data
+            del reqs[key]
+    return reqs
 
 
 
 
-def generate_rst_table(data):
+def generate_rst_table(output_dir, data):
     """Generate a formatted csv to be used in RST"""
     """Generate a formatted csv to be used in RST"""
-    rst_path = os.path.join(__path__[0], "../output/rst.csv")
+    rst_path = os.path.join(output_dir, "rst.csv")
     with open(rst_path, "w", newline="") as f:
         out = csv.writer(f)
     with open(rst_path, "w", newline="") as f:
         out = csv.writer(f)
-        out.writerow(
-            ("Requirement ID", "Requirement", "Test Module", "Test Name"),
-        )
+        out.writerow(("Requirement ID", "Test Module", "Test Name"))
         for req_id, metadata in data.items():
             out.writerow(
                 (
                     metadata["full_title"],
         for req_id, metadata in data.items():
             out.writerow(
                 (
                     metadata["full_title"],
-                    metadata["description"],
                     metadata["test_case"],
                     metadata["validated_by"],
                 )
                     metadata["test_case"],
                     metadata["validated_by"],
                 )
@@ -1089,6 +1064,7 @@ def pytest_report_collectionfinish(config, startdir, items):
         os.makedirs(output_dir)
     reqs = load_current_requirements()
     requirements = select_heat_requirements(reqs)
         os.makedirs(output_dir)
     reqs = load_current_requirements()
     requirements = select_heat_requirements(reqs)
+    testable_requirements = is_testable(requirements)
     unmapped, mapped = partition(
         lambda i: hasattr(i.function, "requirement_ids"), items
     )
     unmapped, mapped = partition(
         lambda i: hasattr(i.function, "requirement_ids"), items
     )
@@ -1100,8 +1076,12 @@ def pytest_report_collectionfinish(config, startdir, items):
             if req_id not in req_to_test:
                 req_to_test[req_id].add(item)
                 if req_id in requirements:
             if req_id not in req_to_test:
                 req_to_test[req_id].add(item)
                 if req_id in requirements:
-                    reqs[req_id].update({'test_case': item.function.__module__,
-                                         'validated_by': item.function.__name__})
+                    reqs[req_id].update(
+                        {
+                            "test_case": item.function.__module__,
+                            "validated_by": item.function.__name__,
+                        }
+                    )
             if req_id not in requirements:
                 mapping_errors.add(
                     (req_id, item.function.__module__, item.function.__name__)
             if req_id not in requirements:
                 mapping_errors.add(
                     (req_id, item.function.__module__, item.function.__name__)
@@ -1116,14 +1096,18 @@ def pytest_report_collectionfinish(config, startdir, items):
     with open(traceability_path, "w", newline="") as f:
         out = csv.writer(f)
         out.writerow(
     with open(traceability_path, "w", newline="") as f:
         out = csv.writer(f)
         out.writerow(
-            ("Requirement ID", "Requirement", "Section",
-             "Keyword", "Validation Mode", "Is Testable",
-             "Test Module", "Test Name"),
+            (
+                "Requirement ID",
+                "Requirement",
+                "Section",
+                "Keyword",
+                "Validation Mode",
+                "Is Testable",
+                "Test Module",
+                "Test Name",
+            )
         )
         )
-        for req_id, metadata in requirements.items():
-            keyword = metadata["keyword"].upper()
-            mode = metadata["validation_mode"].lower()
-            testable = keyword in {"MUST", "MUST NOT"} and mode != "none"
+        for req_id, metadata in testable_requirements.items():
             if req_to_test[req_id]:
                 for item in req_to_test[req_id]:
                     out.writerow(
             if req_to_test[req_id]:
                 for item in req_to_test[req_id]:
                     out.writerow(
@@ -1131,37 +1115,42 @@ def pytest_report_collectionfinish(config, startdir, items):
                             req_id,
                             metadata["description"],
                             metadata["section_name"],
                             req_id,
                             metadata["description"],
                             metadata["section_name"],
-                            keyword,
-                            mode,
-                            "TRUE" if testable else "FALSE",
+                            metadata["keyword"],
+                            metadata["validation_mode"],
+                            metadata["testable"],
                             item.function.__module__,
                             item.function.__name__,
                             item.function.__module__,
                             item.function.__name__,
-                        ),
+                        )
                     )
             else:
                 out.writerow(
                     )
             else:
                 out.writerow(
-                    (req_id,
-                     metadata["description"],
-                     metadata["section_name"],
-                     keyword,
-                     mode,
-                     "TRUE" if testable else "FALSE",
-                     "",   # test module
-                     ""),  # test function
+                    (
+                        req_id,
+                        metadata["description"],
+                        metadata["section_name"],
+                        metadata["keyword"],
+                        metadata["validation_mode"],
+                        metadata["testable"],
+                        "",  # test module
+                        "",
+                    )  # test function
                 )
         # now write out any test methods that weren't mapped to requirements
                 )
         # now write out any test methods that weren't mapped to requirements
-        unmapped_tests = {(item.function.__module__, item.function.__name__) for item in
-                          unmapped}
+        unmapped_tests = {
+            (item.function.__module__, item.function.__name__) for item in unmapped
+        }
         for test_module, test_name in unmapped_tests:
             out.writerow(
         for test_module, test_name in unmapped_tests:
             out.writerow(
-                ("",        # req ID
-                 "",        # description
-                 "",        # section name
-                 "",        # keyword
-                 "static",  # validation mode
-                 "TRUE",    # testable
-                 test_module,
-                 test_name)
+                (
+                    "",  # req ID
+                    "",  # description
+                    "",  # section name
+                    "",  # keyword
+                    "static",  # validation mode
+                    "TRUE",  # testable
+                    test_module,
+                    test_name,
+                )
             )
 
             )
 
-    generate_rst_table(build_rst_json(json.dumps(reqs)))
+    generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))