import re
import time
from collections import defaultdict
-from itertools import chain
import traceback
import xlsxwriter
from six import string_types
+# noinspection PyUnresolvedReferences
import version
+import logging
+
+logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
__path__ = [os.path.dirname(os.path.abspath(__file__))]
DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
-RESOLUTION_STEPS_FILE = "resolution_steps.json"
HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
TEST_SCRIPT_SITE = (
"https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
)
REPORT_COLUMNS = [
+ ("Error #", "err_num"),
("Input File", "file"),
- ("Test", "test_file"),
("Requirements", "req_description"),
- ("Resolution Steps", "resolution_steps"),
("Error Message", "message"),
- ("Raw Test Output", "raw_output"),
+ ("Test", "test_file"),
]
COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
# Extract everything between AssertionError and the start
# of the assert statement expansion in the pytest report
msg = match.group(1)
+ elif "AssertionError:" in full_msg:
+ msg = full_msg.split("AssertionError:")[1]
else:
- msg = str(rep.longrepr.reprcrash)
- if "AssertionError:" in msg:
- msg = msg.split("AssertionError:")[1]
+ msg = full_msg
except AttributeError:
msg = str(rep)
def __init__(self, item, outcome):
self.item = item
self.result = outcome.get_result()
- self.files = [os.path.normpath(p) for p in self._get_files()]
+ self.files = self._get_files()
self.error_message = self._get_error_message()
@property
)
return data
- def resolution_steps(self, resolutions):
- """
- :param resolutions: Loaded from contents for resolution_steps.json
- :return: Header and text for the resolution step associated with this
- test case. Returns empty string if no resolutions are
- provided.
- """
- text = (
- "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
- for entry in resolutions
- if self._match(entry)
- )
- return "".join(text)
-
- def _match(self, resolution_entry):
- """
- Returns True if the test result maps to the given entry in
- the resolutions file
- """
- return (
- self.test_case == resolution_entry["function"]
- and self.test_module == resolution_entry["module"]
- )
-
def _get_files(self):
"""
Extracts the list of files passed into the test case.
"{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
]
elif "heat_templates" in self.item.fixturenames:
- return self.item.funcargs["heat_templates"]
+ return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
elif "yaml_files" in self.item.fixturenames:
- return self.item.funcargs["yaml_files"]
+ return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
else:
parts = self.result.nodeid.split("[")
- return [""] if len(parts) == 1 else [parts[1][:-1]]
+ return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
def _get_error_message(self):
"""
):
item.add_marker(
pytest.mark.skip(
- reason="Test categories do not match all the passed categories"
+ reason=(
+ "Test categories do not match "
+ "all the passed categories"
+ )
)
)
else:
item.add_marker(
pytest.mark.skip(
- reason="Test belongs to a category but no categories were passed"
+ reason=(
+ "Test belongs to a category but "
+ "no categories were passed"
+ )
)
)
+
items.sort(
- key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
+ key=lambda x: (0, x.name)
+ if "base" in set(m.name for m in x.iter_markers())
+ else (1, x.name)
)
-def make_href(paths):
+def make_href(paths, base_dir=None):
"""
Create an anchor tag to link to the file paths provided.
:param paths: string or list of file paths
+ :param base_dir: If specified this is pre-pended to each path
:return: String of hrefs - one for each path, each seperated by a line
break (<br/).
"""
paths = [paths] if isinstance(paths, string_types) else paths
+ if base_dir:
+ paths = [os.path.join(base_dir, p) for p in paths]
links = []
for p in paths:
abs_path = os.path.abspath(p)
return "<br/>".join(links)
-def load_resolutions_file():
- """
- :return: dict of data loaded from resolutions_steps.json
- """
- resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
- if os.path.exists(resolution_steps):
- with open(resolution_steps, "r") as f:
- return json.loads(f.read())
-
-
def generate_report(outpath, template_path, categories, output_format="html"):
"""
Generates the various output reports.
rows.append([col for col, _ in REPORT_COLUMNS])
reqs = load_current_requirements()
- resolutions = load_resolutions_file()
# table content
- for failure in failures:
+ for i, failure in enumerate(failures, start=1):
rows.append(
[
+ i,
"\n".join(failure.files),
- failure.test_id,
failure.requirement_text(reqs),
- failure.resolution_steps(resolutions),
failure.error_message,
- failure.raw_output,
+ failure.test_id,
]
)
def generate_excel_report(output_dir, categories, template_path, failures):
output_path = os.path.join(output_dir, "report.xlsx")
workbook = xlsxwriter.Workbook(output_path)
- bold = workbook.add_format({"bold": True})
- code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
- normal = workbook.add_format({"text_wrap": True})
+ bold = workbook.add_format({"bold": True, "align": "top"})
+ code = workbook.add_format(
+ {"font_name": "Courier", "text_wrap": True, "align": "top"}
+ )
+ normal = workbook.add_format({"text_wrap": True, "align": "top"})
heading = workbook.add_format({"bold": True, "font_size": 18})
worksheet = workbook.add_worksheet("failures")
worksheet.write(0, 0, "Validation Failures", heading)
worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
reqs = load_current_requirements()
- resolutions = load_resolutions_file()
# table content
+ for col, width in enumerate((20, 30, 60, 60, 40)):
+ worksheet.set_column(col, col, width)
+ err_num = 1
for row, failure in enumerate(failures, start=start_error_table_row + 2):
- worksheet.write(row, 0, "\n".join(failure.files), normal)
- worksheet.write(row, 1, failure.test_id, normal)
+ worksheet.write(row, 0, str(err_num), normal)
+ worksheet.write(row, 1, "\n".join(failure.files), normal)
worksheet.write(row, 2, failure.requirement_text(reqs), normal)
- worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
- worksheet.write(row, 4, failure.error_message, normal)
- worksheet.write(row, 5, failure.raw_output, code)
-
+ worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
+ worksheet.write(row, 4, failure.test_id, normal)
+ err_num += 1
+ worksheet.autofilter(
+ start_error_table_row + 1,
+ 0,
+ start_error_table_row + 1 + err_num,
+ len(REPORT_COLUMNS) - 1,
+ )
workbook.close()
return now.isoformat()
-def aggregate_requirement_adherence(r_id, collection_failures, test_results):
- """
- Examines all tests associated with a given requirement and determines
- the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
-
- * ERROR - At least one ERROR occurred
- * PASS - At least one PASS and no FAIL or ERRORs.
- * FAIL - At least one FAIL occurred (no ERRORs)
- * SKIP - All tests were SKIP
-
-
- :param r_id: Requirement ID to examing
- :param collection_failures: Errors that occurred during test setup.
- :param test_results: List of TestResult
- :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
- """
- errors = any(r_id in f["requirements"] for f in collection_failures)
- outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
- return aggregate_results(errors, outcomes, r_id)
-
-
-def aggregate_results(has_errors, outcomes, r_id=None):
+def aggregate_results(outcomes, r_id=None):
"""
Determines the aggregate result for the conditions provided. Assumes the
results have been filtered and collected for analysis.
- :param has_errors: True if collection failures occurred for the tests being
- analyzed.
:param outcomes: set of outcomes from the TestResults
:param r_id: Optional requirement ID if known
:return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
(see aggregate_requirement_adherence for more detail)
"""
- if has_errors:
- return "ERROR"
-
if not outcomes:
return "PASS"
+ elif "ERROR" in outcomes:
+ return "ERROR"
elif "FAIL" in outcomes:
return "FAIL"
elif "PASS" in outcomes:
return "PASS"
-def error(failure_or_result):
- """
- Extracts the error message from a collection failure or test result
- :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
- :return: Error message as string
- """
- if isinstance(failure_or_result, TestResult):
- return failure_or_result.error_message
- else:
- return failure_or_result["error"]
-
-
-def req_ids(failure_or_result):
- """
- Extracts the requirement IDs from a collection failure or test result
- :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
- :return: set of Requirement IDs. If no requirements mapped, then an empty set
- """
- if isinstance(failure_or_result, TestResult):
- return set(failure_or_result.requirement_ids)
- else:
- return set(failure_or_result["requirements"])
-
-
-def collect_errors(r_id, collection_failures, test_result):
- """
- Creates a list of error messages from the collection failures and
- test results. If r_id is provided, then it collects the error messages
- where the failure or test is associated with that requirement ID. If
- r_id is None, then it collects all errors that occur on failures and
- results that are not mapped to requirements
- """
-
- def selector(item):
- if r_id:
- return r_id in req_ids(item)
- else:
- return not req_ids(item)
-
- errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
- return [e for e in errors if e]
-
-
def relative_paths(base_dir, paths):
return [os.path.relpath(p, base_dir) for p in paths]
+# noinspection PyTypeChecker
def generate_json(outpath, template_path, categories):
"""
Creates a JSON summary of the entire test run.
}
)
+ # Build a mapping of requirement ID to the results
+ r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
+ for test_result in results:
+ test_reqs = test_result["requirements"]
+ r_ids = (
+ [r["id"] if isinstance(r, dict) else r for r in test_reqs]
+ if test_reqs
+ else ("",)
+ )
+ for r_id in r_ids:
+ item = r_id_results[r_id]
+ item["outcomes"].add(test_result["result"])
+ if test_result["error"]:
+ item["errors"].add(test_result["error"])
+
requirements = data["requirements"]
for r_id, r_data in reqs.items():
- result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
- if result:
- requirements.append(
- {
- "id": r_id,
- "text": r_data["description"],
- "keyword": r_data["keyword"],
- "result": result,
- "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
- }
- )
- # If there are tests that aren't mapped to a requirement, then we'll
- # map them to a special entry so the results are coherent.
- unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
- has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
- if unmapped_outcomes or has_errors:
+ requirements.append(
+ {
+ "id": r_id,
+ "text": r_data["description"],
+ "keyword": r_data["keyword"],
+ "result": aggregate_results(r_id_results[r_id]["outcomes"]),
+ "errors": list(r_id_results[r_id]["errors"]),
+ }
+ )
+
+ if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
requirements.append(
{
"id": "Unmapped",
"text": "Tests not mapped to requirements (see tests)",
- "result": aggregate_results(has_errors, unmapped_outcomes),
- "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
+ "result": aggregate_results(r_id_results[""]["outcomes"]),
+ "errors": list(r_id_results[""]["errors"]),
}
)
def generate_html_report(outpath, categories, template_path, failures):
reqs = load_current_requirements()
- resolutions = load_resolutions_file()
fail_data = []
for failure in failures:
fail_data.append(
{
- "file_links": make_href(failure.files),
+ "file_links": make_href(failure.files, template_path),
"test_id": failure.test_id,
- "error_message": failure.error_message,
+ "error_message": failure.error_message.replace("\n", "<br/><br/>"),
"raw_output": failure.raw_output,
"requirements": docutils.core.publish_parts(
writer_name="html", source=failure.requirement_text(reqs)
)["body"],
- "resolution_steps": failure.resolution_steps(resolutions),
}
)
pkg_dir = os.path.split(__file__)[0]
def is_testable(reqs):
"""Filters dict requirements to only those which are testable"""
for key, values in reqs.items():
- if (("MUST" in values.get("keyword", "").upper()) and (
+ if ("MUST" in values.get("keyword", "").upper()) and (
"none" not in values.get("validation_mode", "").lower()
- )):
+ ):
reqs[key]["testable"] = True
else:
reqs[key]["testable"] = False