[VVP] Performance Enhancements (report generation and test collection)
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
index 5184fb6..d07669c 100644 (file)
@@ -42,14 +42,10 @@ import io
 import json
 import os
 import re
-import sys
 import time
 from collections import defaultdict
-from itertools import chain
 
-import requests
 import traceback
-import warnings
 
 import docutils.core
 import jinja2
@@ -58,15 +54,24 @@ from more_itertools import partition
 import xlsxwriter
 from six import string_types
 
+# noinspection PyUnresolvedReferences
+import version
+import logging
+
+logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
+
 __path__ = [os.path.dirname(os.path.abspath(__file__))]
 
 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
 
 RESOLUTION_STEPS_FILE = "resolution_steps.json"
-HEAT_REQUIREMENTS_FILE = "heat_requirements.json"
-
-# noinspection PyPep8
-NEEDS_JSON_URL = "https://onap.readthedocs.io/en/latest/_downloads/789ac64d223325488fb3f120f959d985/needs.json"
+HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
+TEST_SCRIPT_SITE = (
+    "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
+)
+VNFRQTS_ID_URL = (
+    "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
+)
 
 REPORT_COLUMNS = [
     ("Input File", "file"),
@@ -89,6 +94,11 @@ ALL_RESULTS = []
 
 
 def get_output_dir(config):
+    """
+    Retrieve the output directory for the reports and create it if necessary
+    :param config: pytest configuration
+    :return: output directory as string
+    """
     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
     if not os.path.exists(output_dir):
         os.makedirs(output_dir, exist_ok=True)
@@ -188,6 +198,13 @@ class TestResult:
         """
         return self.item.function.__module__.split(".")[-1]
 
+    @property
+    def test_id(self):
+        """
+        :return: ID of the test (test_module + test_case)
+        """
+        return "{}::{}".format(self.test_module, self.test_case)
+
     @property
     def raw_output(self):
         """
@@ -207,6 +224,7 @@ class TestResult:
         text = (
             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
             for r_id in self.requirement_ids
+            if r_id in curr_reqs
         )
         return "".join(text)
 
@@ -280,7 +298,8 @@ class TestResult:
         elif "yaml_files" in self.item.fixturenames:
             return self.item.funcargs["yaml_files"]
         else:
-            return [self.result.nodeid.split("[")[1][:-1]]
+            parts = self.result.nodeid.split("[")
+            return [""] if len(parts) == 1 else [parts[1][:-1]]
 
     def _get_error_message(self):
         """
@@ -303,7 +322,6 @@ def pytest_runtest_makereport(item, call):
     if outcome.get_result().when != "call":
         return  # only capture results of test cases themselves
     result = TestResult(item, outcome)
-    ALL_RESULTS.append(result)
     if (
         not item.config.option.continue_on_failure
         and result.is_base_test
@@ -312,9 +330,11 @@ def pytest_runtest_makereport(item, call):
         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
             result.error_message
         )
-        pytest.exit(
-            "{}\n{}\n{}".format(msg, result.files, result.test_case)
-        )
+        result.error_message = msg
+        ALL_RESULTS.append(result)
+        pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
+
+    ALL_RESULTS.append(result)
 
 
 def make_timestamp():
@@ -339,12 +359,17 @@ def pytest_sessionfinish(session, exitstatus):
     """
     if not session.config.option.template_dir:
         return
-    template_path = os.path.abspath(session.config.option.template_dir[0])
-    profile_name = session.config.option.validation_profile_name or ""
+
+    if session.config.option.template_source:
+        template_source = session.config.option.template_source[0]
+    else:
+        template_source = os.path.abspath(session.config.option.template_dir[0])
+
+    categories_selected = session.config.option.test_categories or ""
     generate_report(
         get_output_dir(session.config),
-        template_path,
-        profile_name,
+        template_source,
+        categories_selected,
         session.config.option.report_format,
     )
 
@@ -352,33 +377,35 @@ def pytest_sessionfinish(session, exitstatus):
 # noinspection PyUnusedLocal
 def pytest_collection_modifyitems(session, config, items):
     """
-    Selects tests based on the validation profile requested.  Tests without
-    pytest markers will always be executed.
+    Selects tests based on the categories requested.  Tests without
+    categories will always be executed.
     """
-    allowed_marks = ["xfail", "base"]
-    profile = config.option.validation_profile
-
-    for item in items:
-        markers = set(m.name for m in item.iter_markers())
-        if not profile and markers and set(markers).isdisjoint(allowed_marks):
-            item.add_marker(
-                pytest.mark.skip(
-                    reason="No validation profile selected. "
-                    "Skipping tests with marks."
-                )
-            )
-        if (
-            profile
-            and markers
-            and profile not in markers
-            and set(markers).isdisjoint(allowed_marks)
-        ):
-            item.add_marker(
-                pytest.mark.skip(reason="Doesn't match selection " "validation profile")
-            )
-
+    config.traceability_items = list(items)  # save all items for traceability
+    if not config.option.self_test:
+        for item in items:
+            # checking if test belongs to a category
+            if hasattr(item.function, "categories"):
+                if config.option.test_categories:
+                    test_categories = getattr(item.function, "categories")
+                    passed_categories = config.option.test_categories
+                    if not all(
+                        category in passed_categories for category in test_categories
+                    ):
+                        item.add_marker(
+                            pytest.mark.skip(
+                                reason=("Test categories do not match "
+                                        "all the passed categories")
+                            )
+                        )
+                else:
+                    item.add_marker(
+                        pytest.mark.skip(
+                            reason=("Test belongs to a category but "
+                                    "no categories were passed")
+                        )
+                    )
     items.sort(
-        key=lambda i: 0 if "base" in set(m.name for m in i.iter_markers()) else 1
+        key=lambda x: 0 if "base" in set(m.name for m in x.iter_markers()) else 1
     )
 
 
@@ -412,27 +439,28 @@ def load_resolutions_file():
             return json.loads(f.read())
 
 
-def generate_report(outpath, template_path, profile_name, output_format="html"):
+def generate_report(outpath, template_path, categories, output_format="html"):
     """
     Generates the various output reports.
 
     :param outpath: destination directory for all reports
     :param template_path: directory containing the Heat templates validated
-    :param profile_name: Optional validation profile selected
+    :param categories: Optional categories selected
     :param output_format: One of "html", "excel", or "csv". Default is "html"
     :raises: ValueError if requested output format is unknown
     """
     failures = [r for r in ALL_RESULTS if r.is_failed]
     generate_failure_file(outpath)
     output_format = output_format.lower().strip() if output_format else "html"
+    generate_json(outpath, template_path, categories)
     if output_format == "html":
-        generate_html_report(outpath, profile_name, template_path, failures)
+        generate_html_report(outpath, categories, template_path, failures)
     elif output_format == "excel":
-        generate_excel_report(outpath, profile_name, template_path, failures)
+        generate_excel_report(outpath, categories, template_path, failures)
     elif output_format == "json":
-        generate_json(outpath, template_path, profile_name)
+        return
     elif output_format == "csv":
-        generate_csv_report(outpath, profile_name, template_path, failures)
+        generate_csv_report(outpath, categories, template_path, failures)
     else:
         raise ValueError("Unsupported output format: " + output_format)
 
@@ -469,10 +497,11 @@ def generate_failure_file(outpath):
     write_json(data, failure_path)
 
 
-def generate_csv_report(output_dir, profile_name, template_path, failures):
+def generate_csv_report(output_dir, categories, template_path, failures):
     rows = [["Validation Failures"]]
     headers = [
-        ("Profile Selected:", profile_name),
+        ("Categories Selected:", categories),
+        ("Tool Version:", version.VERSION),
         ("Report Generated At:", make_timestamp()),
         ("Directory Validated:", template_path),
         ("Checksum:", hash_directory(template_path)),
@@ -508,7 +537,7 @@ def generate_csv_report(output_dir, profile_name, template_path, failures):
         rows.append(
             [
                 "\n".join(failure.files),
-                failure.test_module,
+                failure.test_id,
                 failure.requirement_text(reqs),
                 failure.resolution_steps(resolutions),
                 failure.error_message,
@@ -523,7 +552,7 @@ def generate_csv_report(output_dir, profile_name, template_path, failures):
             writer.writerow(row)
 
 
-def generate_excel_report(output_dir, profile_name, template_path, failures):
+def generate_excel_report(output_dir, categories, template_path, failures):
     output_path = os.path.join(output_dir, "report.xlsx")
     workbook = xlsxwriter.Workbook(output_path)
     bold = workbook.add_format({"bold": True})
@@ -534,7 +563,8 @@ def generate_excel_report(output_dir, profile_name, template_path, failures):
     worksheet.write(0, 0, "Validation Failures", heading)
 
     headers = [
-        ("Profile Selected:", profile_name),
+        ("Categories Selected:", ",".join(categories)),
+        ("Tool Version:", version.VERSION),
         ("Report Generated At:", make_timestamp()),
         ("Directory Validated:", template_path),
         ("Checksum:", hash_directory(template_path)),
@@ -571,7 +601,7 @@ def generate_excel_report(output_dir, profile_name, template_path, failures):
     # table content
     for row, failure in enumerate(failures, start=start_error_table_row + 2):
         worksheet.write(row, 0, "\n".join(failure.files), normal)
-        worksheet.write(row, 1, failure.test_module, normal)
+        worksheet.write(row, 1, failure.test_id, normal)
         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
         worksheet.write(row, 4, failure.error_message, normal)
@@ -589,44 +619,20 @@ def make_iso_timestamp():
     return now.isoformat()
 
 
-def aggregate_requirement_adherence(r_id, collection_failures, test_results):
-    """
-    Examines all tests associated with a given requirement and determines
-    the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
-
-    * ERROR - At least one ERROR occurred
-    * PASS -  At least one PASS and no FAIL or ERRORs.
-    * FAIL -  At least one FAIL occurred (no ERRORs)
-    * SKIP - All tests were SKIP
-
-
-    :param r_id: Requirement ID to examing
-    :param collection_failures: Errors that occurred during test setup.
-    :param test_results: List of TestResult
-    :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
-    """
-    errors = any(r_id in f["requirements"] for f in collection_failures)
-    outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
-    return aggregate_results(errors, outcomes, r_id)
-
-
-def aggregate_results(has_errors, outcomes, r_id=None):
+def aggregate_results(outcomes, r_id=None):
     """
     Determines the aggregate result for the conditions provided.  Assumes the
     results have been filtered and collected for analysis.
 
-    :param has_errors: True if collection failures occurred for the tests being
-                       analyzed.
     :param outcomes: set of outcomes from the TestResults
     :param r_id: Optional requirement ID if known
     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
              (see aggregate_requirement_adherence for more detail)
     """
-    if has_errors:
-        return "ERROR"
-
     if not outcomes:
         return "PASS"
+    elif "ERROR" in outcomes:
+        return "ERROR"
     elif "FAIL" in outcomes:
         return "FAIL"
     elif "PASS" in outcomes:
@@ -636,7 +642,8 @@ def aggregate_results(has_errors, outcomes, r_id=None):
     else:
         pytest.warns(
             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
-                outcomes, r_id)
+                outcomes, r_id
+            )
         )
         return "ERROR"
 
@@ -661,60 +668,24 @@ def aggregate_run_results(collection_failures, test_results):
         return "PASS"
 
 
-def error(failure_or_result):
-    """
-    Extracts the error message from a collection failure or test result
-    :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
-    :return: Error message as string
-    """
-    if isinstance(failure_or_result, TestResult):
-        return failure_or_result.error_message
-    else:
-        return failure_or_result["error"]
+def relative_paths(base_dir, paths):
+    return [os.path.relpath(p, base_dir) for p in paths]
 
 
-def req_ids(failure_or_result):
-    """
-    Extracts the requirement IDs from a collection failure or test result
-    :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
-    :return: set of Requirement IDs.  If no requirements mapped, then an empty set
-    """
-    if isinstance(failure_or_result, TestResult):
-        return set(failure_or_result.requirement_ids)
-    else:
-        return set(failure_or_result["requirements"])
-
-
-def collect_errors(r_id, collection_failures, test_result):
-    """
-    Creates a list of error messages from the collection failures and
-    test results.  If r_id is provided, then it collects the error messages
-    where the failure or test is associated with that requirement ID.  If
-    r_id is None, then it collects all errors that occur on failures and
-    results that are not mapped to requirements
-    """
-    def selector(item):
-        if r_id:
-            return r_id in req_ids(item)
-        else:
-            return not req_ids(item)
-
-    errors = (error(x) for x in chain(collection_failures, test_result)
-              if selector(x))
-    return [e for e in errors if e]
-
-
-def generate_json(outpath, template_path, profile_name):
+# noinspection PyTypeChecker
+def generate_json(outpath, template_path, categories):
     """
     Creates a JSON summary of the entire test run.
     """
     reqs = load_current_requirements()
     data = {
         "version": "dublin",
-        "template_directory": template_path,
+        "template_directory": os.path.splitdrive(template_path)[1].replace(
+            os.path.sep, "/"
+        ),
         "timestamp": make_iso_timestamp(),
         "checksum": hash_directory(template_path),
-        "profile": profile_name,
+        "categories": categories,
         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
         "tests": [],
         "requirements": [],
@@ -735,7 +706,7 @@ def generate_json(outpath, template_path, profile_name):
     for result in ALL_RESULTS:
         results.append(
             {
-                "files": result.files,
+                "files": relative_paths(template_path, result.files),
                 "test_module": result.test_module,
                 "test_case": result.test_case,
                 "result": result.outcome,
@@ -744,30 +715,40 @@ def generate_json(outpath, template_path, profile_name):
             }
         )
 
+    # Build a mapping of requirement ID to the results
+    r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
+    for test_result in results:
+        test_reqs = test_result["requirements"]
+        r_ids = (
+            [r["id"] if isinstance(r, dict) else r for r in test_reqs]
+            if test_reqs
+            else ("",)
+        )
+        for r_id in r_ids:
+            item = r_id_results[r_id]
+            item["outcomes"].add(test_result["result"])
+            if test_result["error"]:
+                item["errors"].add(test_result["error"])
+
     requirements = data["requirements"]
     for r_id, r_data in reqs.items():
-        result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
-        if result:
-            requirements.append(
-                {
-                    "id": r_id,
-                    "text": r_data["description"],
-                    "keyword": r_data["keyword"],
-                    "result": result,
-                    "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS)
-                }
-            )
-    # If there are tests that aren't mapped to a requirement, then we'll
-    # map them to a special entry so the results are coherent.
-    unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
-    has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
-    if unmapped_outcomes or has_errors:
+        requirements.append(
+            {
+                "id": r_id,
+                "text": r_data["description"],
+                "keyword": r_data["keyword"],
+                "result": aggregate_results(r_id_results[r_id]["outcomes"]),
+                "errors": list(r_id_results[r_id]["errors"]),
+            }
+        )
+
+    if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
         requirements.append(
             {
                 "id": "Unmapped",
                 "text": "Tests not mapped to requirements (see tests)",
-                "result": aggregate_results(has_errors, unmapped_outcomes),
-                "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS)
+                "result": aggregate_results(r_id_results[""]["outcomes"]),
+                "errors": list(r_id_results[""]["errors"]),
             }
         )
 
@@ -775,7 +756,7 @@ def generate_json(outpath, template_path, profile_name):
     write_json(data, report_path)
 
 
-def generate_html_report(outpath, profile_name, template_path, failures):
+def generate_html_report(outpath, categories, template_path, failures):
     reqs = load_current_requirements()
     resolutions = load_resolutions_file()
     fail_data = []
@@ -783,7 +764,7 @@ def generate_html_report(outpath, profile_name, template_path, failures):
         fail_data.append(
             {
                 "file_links": make_href(failure.files),
-                "test_id": failure.test_module,
+                "test_id": failure.test_id,
                 "error_message": failure.error_message,
                 "raw_output": failure.raw_output,
                 "requirements": docutils.core.publish_parts(
@@ -797,8 +778,9 @@ def generate_html_report(outpath, profile_name, template_path, failures):
     with open(j2_template_path, "r") as f:
         report_template = jinja2.Template(f.read())
         contents = report_template.render(
+            version=version.VERSION,
             num_failures=len(failures) + len(COLLECTION_FAILURES),
-            profile_name=profile_name,
+            categories=categories,
             template_dir=make_href(template_path),
             checksum=hash_directory(template_path),
             timestamp=make_timestamp(),
@@ -820,6 +802,13 @@ def pytest_addoption(parser):
         help="Directory which holds the templates for validation",
     )
 
+    parser.addoption(
+        "--template-source",
+        dest="template_source",
+        action="append",
+        help="Source Directory which holds the templates for validation",
+    )
+
     parser.addoption(
         "--self-test",
         dest="self_test",
@@ -827,20 +816,6 @@ def pytest_addoption(parser):
         help="Test the unit tests against their fixtured data",
     )
 
-    parser.addoption(
-        "--validation-profile",
-        dest="validation_profile",
-        action="store",
-        help="Runs all unmarked tests plus test with a matching marker",
-    )
-
-    parser.addoption(
-        "--validation-profile-name",
-        dest="validation_profile_name",
-        action="store",
-        help="Friendly name of the validation profile used in reports",
-    )
-
     parser.addoption(
         "--report-format",
         dest="report_format",
@@ -860,7 +835,14 @@ def pytest_addoption(parser):
         dest="output_dir",
         action="store",
         default=None,
-        help="Alternate "
+        help="Alternate ",
+    )
+
+    parser.addoption(
+        "--category",
+        dest="test_categories",
+        action="append",
+        help="optional category of test to execute",
     )
 
 
@@ -992,6 +974,11 @@ def pytest_generate_tests(metafunc):
 
 
 def hash_directory(path):
+    """
+    Create md5 hash using the contents of all files under ``path``
+    :param path: string directory containing files
+    :return: string MD5 hash code (hex)
+    """
     md5 = hashlib.md5()
     for dir_path, sub_dirs, filenames in os.walk(path):
         for filename in filenames:
@@ -1003,51 +990,99 @@ def hash_directory(path):
 
 def load_current_requirements():
     """Loads dict of current requirements or empty dict if file doesn't exist"""
-    try:
-        r = requests.get(NEEDS_JSON_URL)
-        if r.headers.get("content-type") == "application/json":
-            with open(HEAT_REQUIREMENTS_FILE, "wb") as needs:
-                needs.write(r.content)
-        else:
-            warnings.warn(
-                (
-                    "Unexpected content-type ({}) encountered downloading "
-                    + "requirements.json, using last saved copy"
-                ).format(r.headers.get("content-type"))
-            )
-    except requests.exceptions.RequestException as e:
-        warnings.warn("Error downloading latest JSON, using last saved copy.")
-        warnings.warn(UserWarning(e))
-    if not os.path.exists(HEAT_REQUIREMENTS_FILE):
-        return {}
     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
         data = json.load(f)
         version = data["current_version"]
         return data["versions"][version]["needs"]
 
 
-def compat_open(path):
-    """Invokes open correctly depending on the Python version"""
-    if sys.version_info.major < 3:
-        return open(path, "wb")
-    else:
-        return open(path, "w", newline="")
+def select_heat_requirements(reqs):
+    """Filters dict requirements to only those requirements pertaining to Heat"""
+    return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
 
 
-def unicode_writerow(writer, row):
-    if sys.version_info.major < 3:
-        row = [s.encode("utf8") for s in row]
-    writer.writerow(row)
+def is_testable(reqs):
+    """Filters dict requirements to only those which are testable"""
+    for key, values in reqs.items():
+        if ("MUST" in values.get("keyword", "").upper()) and (
+            "none" not in values.get("validation_mode", "").lower()
+        ):
+            reqs[key]["testable"] = True
+        else:
+            reqs[key]["testable"] = False
+    return reqs
+
+
+def build_rst_json(reqs):
+    """Takes requirements and returns list of only Heat requirements"""
+    for key, values in list(reqs.items()):
+        if values["testable"]:
+            # Creates links in RST format to requirements and test cases
+            if values["test_case"]:
+                mod = values["test_case"].split(".")[-1]
+                val = TEST_SCRIPT_SITE + mod + ".py"
+                rst_value = "`" + mod + " <" + val + ">`_"
+                title = (
+                    "`"
+                    + values["id"]
+                    + " <"
+                    + VNFRQTS_ID_URL
+                    + values["docname"].replace(" ", "%20")
+                    + ".html#"
+                    + values["id"]
+                    + ">`_"
+                )
+                reqs[key].update({"full_title": title, "test_case": rst_value})
+            else:
+                title = (
+                    "`"
+                    + values["id"]
+                    + " <"
+                    + VNFRQTS_ID_URL
+                    + values["docname"].replace(" ", "%20")
+                    + ".html#"
+                    + values["id"]
+                    + ">`_"
+                )
+                reqs[key].update(
+                    {
+                        "full_title": title,
+                        "test_case": "No test for requirement",
+                        "validated_by": "static",
+                    }
+                )
+        else:
+            del reqs[key]
+    return reqs
+
+
+def generate_rst_table(output_dir, data):
+    """Generate a formatted csv to be used in RST"""
+    rst_path = os.path.join(output_dir, "rst.csv")
+    with open(rst_path, "w", newline="") as f:
+        out = csv.writer(f)
+        out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
+        for req_id, metadata in data.items():
+            out.writerow(
+                (
+                    metadata["full_title"],
+                    metadata["description"],
+                    metadata["test_case"],
+                    metadata["validated_by"],
+                )
+            )
 
 
 # noinspection PyUnusedLocal
 def pytest_report_collectionfinish(config, startdir, items):
     """Generates a simple traceability report to output/traceability.csv"""
-    traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
+    traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
     output_dir = os.path.split(traceability_path)[0]
     if not os.path.exists(output_dir):
         os.makedirs(output_dir)
-    requirements = load_current_requirements()
+    reqs = load_current_requirements()
+    requirements = select_heat_requirements(reqs)
+    testable_requirements = is_testable(requirements)
     unmapped, mapped = partition(
         lambda i: hasattr(i.function, "requirement_ids"), items
     )
@@ -1058,43 +1093,82 @@ def pytest_report_collectionfinish(config, startdir, items):
         for req_id in item.function.requirement_ids:
             if req_id not in req_to_test:
                 req_to_test[req_id].add(item)
+                if req_id in requirements:
+                    reqs[req_id].update(
+                        {
+                            "test_case": item.function.__module__,
+                            "validated_by": item.function.__name__,
+                        }
+                    )
             if req_id not in requirements:
                 mapping_errors.add(
                     (req_id, item.function.__module__, item.function.__name__)
                 )
 
-    mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
-    with compat_open(mapping_error_path) as f:
+    mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
+    with open(mapping_error_path, "w", newline="") as f:
         writer = csv.writer(f)
         for err in mapping_errors:
-            unicode_writerow(writer, err)
+            writer.writerow(err)
 
-    with compat_open(traceability_path) as f:
+    with open(traceability_path, "w", newline="") as f:
         out = csv.writer(f)
-        unicode_writerow(
-            out,
-            ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
+        out.writerow(
+            (
+                "Requirement ID",
+                "Requirement",
+                "Section",
+                "Keyword",
+                "Validation Mode",
+                "Is Testable",
+                "Test Module",
+                "Test Name",
+            )
         )
-        for req_id, metadata in requirements.items():
+        for req_id, metadata in testable_requirements.items():
             if req_to_test[req_id]:
                 for item in req_to_test[req_id]:
-                    unicode_writerow(
-                        out,
+                    out.writerow(
                         (
                             req_id,
                             metadata["description"],
                             metadata["section_name"],
+                            metadata["keyword"],
+                            metadata["validation_mode"],
+                            metadata["testable"],
                             item.function.__module__,
                             item.function.__name__,
-                        ),
+                        )
                     )
             else:
-                unicode_writerow(
-                    out,
-                    (req_id, metadata["description"], metadata["section_name"], "", ""),
+                out.writerow(
+                    (
+                        req_id,
+                        metadata["description"],
+                        metadata["section_name"],
+                        metadata["keyword"],
+                        metadata["validation_mode"],
+                        metadata["testable"],
+                        "",  # test module
+                        "",
+                    )  # test function
                 )
         # now write out any test methods that weren't mapped to requirements
-        for item in unmapped:
-            unicode_writerow(
-                out, ("", "", "", item.function.__module__, item.function.__name__)
+        unmapped_tests = {
+            (item.function.__module__, item.function.__name__) for item in unmapped
+        }
+        for test_module, test_name in unmapped_tests:
+            out.writerow(
+                (
+                    "",  # req ID
+                    "",  # description
+                    "",  # section name
+                    "",  # keyword
+                    "static",  # validation mode
+                    "TRUE",  # testable
+                    test_module,
+                    test_name,
+                )
             )
+
+    generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))