import re
import time
from collections import defaultdict
-from itertools import chain
import traceback
import xlsxwriter
from six import string_types
+# noinspection PyUnresolvedReferences
import version
+import logging
+
+logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
__path__ = [os.path.dirname(os.path.abspath(__file__))]
RESOLUTION_STEPS_FILE = "resolution_steps.json"
HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
+TEST_SCRIPT_SITE = (
+ "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
+)
+VNFRQTS_ID_URL = (
+ "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
+)
REPORT_COLUMNS = [
("Input File", "file"),
"""
return self.item.function.__module__.split(".")[-1]
+ @property
+ def test_id(self):
+ """
+ :return: ID of the test (test_module + test_case)
+ """
+ return "{}::{}".format(self.test_module, self.test_case)
+
@property
def raw_output(self):
"""
text = (
"\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
for r_id in self.requirement_ids
+ if r_id in curr_reqs
)
return "".join(text)
elif "yaml_files" in self.item.fixturenames:
return self.item.funcargs["yaml_files"]
else:
- return [self.result.nodeid.split("[")[1][:-1]]
+ parts = self.result.nodeid.split("[")
+ return [""] if len(parts) == 1 else [parts[1][:-1]]
def _get_error_message(self):
"""
if outcome.get_result().when != "call":
return # only capture results of test cases themselves
result = TestResult(item, outcome)
- ALL_RESULTS.append(result)
if (
not item.config.option.continue_on_failure
and result.is_base_test
msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
result.error_message
)
+ result.error_message = msg
+ ALL_RESULTS.append(result)
pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
+ ALL_RESULTS.append(result)
+
def make_timestamp():
"""
):
item.add_marker(
pytest.mark.skip(
- reason="Test categories do not match all the passed categories"
+ reason=("Test categories do not match "
+ "all the passed categories")
)
)
else:
item.add_marker(
pytest.mark.skip(
- reason="Test belongs to a category but no categories were passed"
+ reason=("Test belongs to a category but "
+ "no categories were passed")
)
)
items.sort(
- key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
+ key=lambda x: 0 if "base" in set(m.name for m in x.iter_markers()) else 1
)
failures = [r for r in ALL_RESULTS if r.is_failed]
generate_failure_file(outpath)
output_format = output_format.lower().strip() if output_format else "html"
+ generate_json(outpath, template_path, categories)
if output_format == "html":
generate_html_report(outpath, categories, template_path, failures)
elif output_format == "excel":
generate_excel_report(outpath, categories, template_path, failures)
elif output_format == "json":
- generate_json(outpath, template_path, categories)
+ return
elif output_format == "csv":
generate_csv_report(outpath, categories, template_path, failures)
else:
rows.append(
[
"\n".join(failure.files),
- failure.test_module,
+ failure.test_id,
failure.requirement_text(reqs),
failure.resolution_steps(resolutions),
failure.error_message,
# table content
for row, failure in enumerate(failures, start=start_error_table_row + 2):
worksheet.write(row, 0, "\n".join(failure.files), normal)
- worksheet.write(row, 1, failure.test_module, normal)
+ worksheet.write(row, 1, failure.test_id, normal)
worksheet.write(row, 2, failure.requirement_text(reqs), normal)
worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
worksheet.write(row, 4, failure.error_message, normal)
return now.isoformat()
-def aggregate_requirement_adherence(r_id, collection_failures, test_results):
- """
- Examines all tests associated with a given requirement and determines
- the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
-
- * ERROR - At least one ERROR occurred
- * PASS - At least one PASS and no FAIL or ERRORs.
- * FAIL - At least one FAIL occurred (no ERRORs)
- * SKIP - All tests were SKIP
-
-
- :param r_id: Requirement ID to examing
- :param collection_failures: Errors that occurred during test setup.
- :param test_results: List of TestResult
- :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
- """
- errors = any(r_id in f["requirements"] for f in collection_failures)
- outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
- return aggregate_results(errors, outcomes, r_id)
-
-
-def aggregate_results(has_errors, outcomes, r_id=None):
+def aggregate_results(outcomes, r_id=None):
"""
Determines the aggregate result for the conditions provided. Assumes the
results have been filtered and collected for analysis.
- :param has_errors: True if collection failures occurred for the tests being
- analyzed.
:param outcomes: set of outcomes from the TestResults
:param r_id: Optional requirement ID if known
:return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
(see aggregate_requirement_adherence for more detail)
"""
- if has_errors:
- return "ERROR"
-
if not outcomes:
return "PASS"
+ elif "ERROR" in outcomes:
+ return "ERROR"
elif "FAIL" in outcomes:
return "FAIL"
elif "PASS" in outcomes:
return "PASS"
-def error(failure_or_result):
- """
- Extracts the error message from a collection failure or test result
- :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
- :return: Error message as string
- """
- if isinstance(failure_or_result, TestResult):
- return failure_or_result.error_message
- else:
- return failure_or_result["error"]
-
-
-def req_ids(failure_or_result):
- """
- Extracts the requirement IDs from a collection failure or test result
- :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
- :return: set of Requirement IDs. If no requirements mapped, then an empty set
- """
- if isinstance(failure_or_result, TestResult):
- return set(failure_or_result.requirement_ids)
- else:
- return set(failure_or_result["requirements"])
-
-
-def collect_errors(r_id, collection_failures, test_result):
- """
- Creates a list of error messages from the collection failures and
- test results. If r_id is provided, then it collects the error messages
- where the failure or test is associated with that requirement ID. If
- r_id is None, then it collects all errors that occur on failures and
- results that are not mapped to requirements
- """
-
- def selector(item):
- if r_id:
- return r_id in req_ids(item)
- else:
- return not req_ids(item)
-
- errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
- return [e for e in errors if e]
+def relative_paths(base_dir, paths):
+ return [os.path.relpath(p, base_dir) for p in paths]
+# noinspection PyTypeChecker
def generate_json(outpath, template_path, categories):
"""
Creates a JSON summary of the entire test run.
reqs = load_current_requirements()
data = {
"version": "dublin",
- "template_directory": template_path,
+ "template_directory": os.path.splitdrive(template_path)[1].replace(
+ os.path.sep, "/"
+ ),
"timestamp": make_iso_timestamp(),
"checksum": hash_directory(template_path),
"categories": categories,
for result in ALL_RESULTS:
results.append(
{
- "files": result.files,
+ "files": relative_paths(template_path, result.files),
"test_module": result.test_module,
"test_case": result.test_case,
"result": result.outcome,
}
)
+ # Build a mapping of requirement ID to the results
+ r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
+ for test_result in results:
+ test_reqs = test_result["requirements"]
+ r_ids = (
+ [r["id"] if isinstance(r, dict) else r for r in test_reqs]
+ if test_reqs
+ else ("",)
+ )
+ for r_id in r_ids:
+ item = r_id_results[r_id]
+ item["outcomes"].add(test_result["result"])
+ if test_result["error"]:
+ item["errors"].add(test_result["error"])
+
requirements = data["requirements"]
for r_id, r_data in reqs.items():
- result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
- if result:
- requirements.append(
- {
- "id": r_id,
- "text": r_data["description"],
- "keyword": r_data["keyword"],
- "result": result,
- "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
- }
- )
- # If there are tests that aren't mapped to a requirement, then we'll
- # map them to a special entry so the results are coherent.
- unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
- has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
- if unmapped_outcomes or has_errors:
+ requirements.append(
+ {
+ "id": r_id,
+ "text": r_data["description"],
+ "keyword": r_data["keyword"],
+ "result": aggregate_results(r_id_results[r_id]["outcomes"]),
+ "errors": list(r_id_results[r_id]["errors"]),
+ }
+ )
+
+ if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
requirements.append(
{
"id": "Unmapped",
"text": "Tests not mapped to requirements (see tests)",
- "result": aggregate_results(has_errors, unmapped_outcomes),
- "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
+ "result": aggregate_results(r_id_results[""]["outcomes"]),
+ "errors": list(r_id_results[""]["errors"]),
}
)
fail_data.append(
{
"file_links": make_href(failure.files),
- "test_id": failure.test_module,
+ "test_id": failure.test_id,
"error_message": failure.error_message,
"raw_output": failure.raw_output,
"requirements": docutils.core.publish_parts(
def select_heat_requirements(reqs):
"""Filters dict requirements to only those requirements pertaining to Heat"""
- return {k: v for k, v in reqs.items() if "Heat" in v["docname"]}
+ return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
+
+
+def is_testable(reqs):
+ """Filters dict requirements to only those which are testable"""
+ for key, values in reqs.items():
+ if ("MUST" in values.get("keyword", "").upper()) and (
+ "none" not in values.get("validation_mode", "").lower()
+ ):
+ reqs[key]["testable"] = True
+ else:
+ reqs[key]["testable"] = False
+ return reqs
+
+
+def build_rst_json(reqs):
+ """Takes requirements and returns list of only Heat requirements"""
+ for key, values in list(reqs.items()):
+ if values["testable"]:
+ # Creates links in RST format to requirements and test cases
+ if values["test_case"]:
+ mod = values["test_case"].split(".")[-1]
+ val = TEST_SCRIPT_SITE + mod + ".py"
+ rst_value = "`" + mod + " <" + val + ">`_"
+ title = (
+ "`"
+ + values["id"]
+ + " <"
+ + VNFRQTS_ID_URL
+ + values["docname"].replace(" ", "%20")
+ + ".html#"
+ + values["id"]
+ + ">`_"
+ )
+ reqs[key].update({"full_title": title, "test_case": rst_value})
+ else:
+ title = (
+ "`"
+ + values["id"]
+ + " <"
+ + VNFRQTS_ID_URL
+ + values["docname"].replace(" ", "%20")
+ + ".html#"
+ + values["id"]
+ + ">`_"
+ )
+ reqs[key].update(
+ {
+ "full_title": title,
+ "test_case": "No test for requirement",
+ "validated_by": "static",
+ }
+ )
+ else:
+ del reqs[key]
+ return reqs
+
+
+def generate_rst_table(output_dir, data):
+ """Generate a formatted csv to be used in RST"""
+ rst_path = os.path.join(output_dir, "rst.csv")
+ with open(rst_path, "w", newline="") as f:
+ out = csv.writer(f)
+ out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
+ for req_id, metadata in data.items():
+ out.writerow(
+ (
+ metadata["full_title"],
+ metadata["description"],
+ metadata["test_case"],
+ metadata["validated_by"],
+ )
+ )
# noinspection PyUnusedLocal
os.makedirs(output_dir)
reqs = load_current_requirements()
requirements = select_heat_requirements(reqs)
+ testable_requirements = is_testable(requirements)
unmapped, mapped = partition(
lambda i: hasattr(i.function, "requirement_ids"), items
)
for req_id in item.function.requirement_ids:
if req_id not in req_to_test:
req_to_test[req_id].add(item)
+ if req_id in requirements:
+ reqs[req_id].update(
+ {
+ "test_case": item.function.__module__,
+ "validated_by": item.function.__name__,
+ }
+ )
if req_id not in requirements:
mapping_errors.add(
(req_id, item.function.__module__, item.function.__name__)
with open(traceability_path, "w", newline="") as f:
out = csv.writer(f)
out.writerow(
- ("Requirement ID", "Requirement", "Section",
- "Keyword", "Validation Mode", "Is Testable",
- "Test Module", "Test Name"),
+ (
+ "Requirement ID",
+ "Requirement",
+ "Section",
+ "Keyword",
+ "Validation Mode",
+ "Is Testable",
+ "Test Module",
+ "Test Name",
+ )
)
- for req_id, metadata in requirements.items():
- keyword = metadata["keyword"].upper()
- mode = metadata["validation_mode"].lower()
- testable = keyword in {"MUST", "MUST NOT"} and mode != "none"
+ for req_id, metadata in testable_requirements.items():
if req_to_test[req_id]:
for item in req_to_test[req_id]:
out.writerow(
req_id,
metadata["description"],
metadata["section_name"],
- keyword,
- mode,
- "TRUE" if testable else "FALSE",
+ metadata["keyword"],
+ metadata["validation_mode"],
+ metadata["testable"],
item.function.__module__,
item.function.__name__,
- ),
+ )
)
else:
out.writerow(
- (req_id,
- metadata["description"],
- metadata["section_name"],
- keyword,
- mode,
- "TRUE" if testable else "FALSE",
- "", # test module
- ""), # test function
+ (
+ req_id,
+ metadata["description"],
+ metadata["section_name"],
+ metadata["keyword"],
+ metadata["validation_mode"],
+ metadata["testable"],
+ "", # test module
+ "",
+ ) # test function
)
# now write out any test methods that weren't mapped to requirements
- unmapped_tests = {(item.function.__module__, item.function.__name__)
- for item in unmapped}
+ unmapped_tests = {
+ (item.function.__module__, item.function.__name__) for item in unmapped
+ }
for test_module, test_name in unmapped_tests:
out.writerow(
- ("", # req ID
- "", # description
- "", # section name
- "", # keyword
- "static", # validation mode
- "TRUE", # testable
- test_module,
- test_name)
+ (
+ "", # req ID
+ "", # description
+ "", # section name
+ "", # keyword
+ "static", # validation mode
+ "TRUE", # testable
+ test_module,
+ test_name,
+ )
)
+
+ generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))