+DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
+
+HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
+TEST_SCRIPT_SITE = (
+ "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
+)
+VNFRQTS_ID_URL = (
+ "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
+)
+
+REPORT_COLUMNS = [
+ ("Error #", "err_num"),
+ ("Input File", "file"),
+ ("Requirements", "req_description"),
+ ("Error Message", "message"),
+ ("Test", "test_file"),
+]
+
+COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
+while preparing to validate the the input files. Some validations may not have been
+executed. Please refer these issue to the VNF Validation Tool team.
+"""
+
+COLLECTION_FAILURES = []
+
+# Captures the results of every test run
+ALL_RESULTS = []
+
+
+def extract_error_msg(rep):
+ """
+ If a custom error message was provided, then extract it otherwise
+ just show the pytest assert message
+ """
+ if rep.outcome != "failed":
+ return ""
+ try:
+ full_msg = str(rep.longrepr.reprcrash.message)
+ match = re.match(
+ "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
+ )
+ if match: # custom message was provided
+ # Extract everything between AssertionError and the start
+ # of the assert statement expansion in the pytest report
+ msg = match.group(1)
+ elif "AssertionError:" in full_msg:
+ msg = full_msg.split("AssertionError:")[1]
+ else:
+ msg = full_msg
+ except AttributeError:
+ msg = str(rep)
+
+ return msg
+
+
+class TestResult:
+ """
+ Wraps the test case and result to extract necessary metadata for
+ reporting purposes.
+ """
+
+ RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
+
+ def __init__(self, item, outcome):
+ self.item = item
+ self.result = outcome.get_result()
+ self.files = self._get_files()
+ self.error_message = self._get_error_message()
+
+ @property
+ def requirement_ids(self):
+ """
+ Returns list of requirement IDs mapped to the test case.
+
+ :return: Returns a list of string requirement IDs the test was
+ annotated with ``validates`` otherwise returns and empty list
+ """
+ is_mapped = hasattr(self.item.function, "requirement_ids")
+ return self.item.function.requirement_ids if is_mapped else []
+
+ @property
+ def markers(self):
+ """
+ :return: Returns a set of pytest marker names for the test or an empty set
+ """
+ return set(m.name for m in self.item.iter_markers())
+
+ @property
+ def is_base_test(self):
+ """
+ :return: Returns True if the test is annotated with a pytest marker called base
+ """
+ return "base" in self.markers
+
+ @property
+ def is_failed(self):
+ """
+ :return: True if the test failed
+ """
+ return self.outcome == "FAIL"
+
+ @property
+ def outcome(self):
+ """
+ :return: Returns 'PASS', 'FAIL', or 'SKIP'
+ """
+ return self.RESULT_MAPPING[self.result.outcome]
+
+ @property
+ def test_case(self):
+ """
+ :return: Name of the test case method
+ """
+ return self.item.function.__name__
+
+ @property
+ def test_module(self):
+ """
+ :return: Name of the file containing the test case
+ """
+ return self.item.function.__module__.split(".")[-1]
+
+ @property
+ def test_id(self):
+ """
+ :return: ID of the test (test_module + test_case)
+ """
+ return "{}::{}".format(self.test_module, self.test_case)
+
+ @property
+ def raw_output(self):
+ """
+ :return: Full output from pytest for the given test case
+ """
+ return str(self.result.longrepr)
+
+ def requirement_text(self, curr_reqs):
+ """
+ Creates a text summary for the requirement IDs mapped to the test case.
+ If no requirements are mapped, then it returns the empty string.
+
+ :param curr_reqs: mapping of requirement IDs to requirement metadata
+ loaded from the VNFRQTS projects needs.json output
+ :return: ID and text of the requirements mapped to the test case
+ """
+ text = (
+ "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
+ for r_id in self.requirement_ids
+ if r_id in curr_reqs
+ )
+ return "".join(text)
+
+ def requirements_metadata(self, curr_reqs):
+ """
+ Returns a list of dicts containing the following metadata for each
+ requirement mapped:
+
+ - id: Requirement ID
+ - text: Full text of the requirement
+ - keyword: MUST, MUST NOT, MAY, etc.
+
+ :param curr_reqs: mapping of requirement IDs to requirement metadata
+ loaded from the VNFRQTS projects needs.json output
+ :return: List of requirement metadata
+ """
+ data = []
+ for r_id in self.requirement_ids:
+ if r_id not in curr_reqs:
+ continue
+ data.append(
+ {
+ "id": r_id,
+ "text": curr_reqs[r_id]["description"],
+ "keyword": curr_reqs[r_id]["keyword"],
+ }
+ )
+ return data
+
+ def _get_files(self):
+ """
+ Extracts the list of files passed into the test case.
+ :return: List of absolute paths to files
+ """
+ if "environment_pair" in self.item.fixturenames:
+ return [
+ "{} environment pair".format(
+ self.item.funcargs["environment_pair"]["name"]
+ )
+ ]
+ elif "heat_volume_pair" in self.item.fixturenames:
+ return [
+ "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
+ ]
+ elif "heat_templates" in self.item.fixturenames:
+ return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
+ elif "yaml_files" in self.item.fixturenames:
+ return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
+ else:
+ parts = self.result.nodeid.split("[")
+ return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
+
+ def _get_error_message(self):
+ """
+ :return: Error message or empty string if the test did not fail or error
+ """
+ if self.is_failed:
+ return extract_error_msg(self.result)
+ else:
+ return ""
+
+
+# noinspection PyUnusedLocal
+@pytest.hookimpl(tryfirst=True, hookwrapper=True)
+def pytest_runtest_makereport(item, call):
+ """
+ Captures the test results for later reporting. This will also halt testing
+ if a base failure is encountered (can be overridden with continue-on-failure)
+ """
+ outcome = yield
+ if outcome.get_result().when != "call":
+ return # only capture results of test cases themselves
+ result = TestResult(item, outcome)
+ if (
+ not item.config.option.continue_on_failure
+ and result.is_base_test
+ and result.is_failed
+ ):
+ msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
+ result.error_message
+ )
+ result.error_message = msg
+ ALL_RESULTS.append(result)
+ pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
+
+ ALL_RESULTS.append(result)
+
+
+def make_timestamp():
+ """
+ :return: String make_iso_timestamp in format:
+ 2019-01-19 10:18:49.865000 Central Standard Time
+ """
+ timezone = time.tzname[time.localtime().tm_isdst]
+ return "{} {}".format(str(datetime.datetime.now()), timezone)
+
+
+# noinspection PyUnusedLocal
+def pytest_sessionstart(session):
+ ALL_RESULTS.clear()
+ COLLECTION_FAILURES.clear()
+
+
+# noinspection PyUnusedLocal
+def pytest_sessionfinish(session, exitstatus):
+ """
+ If not a self-test run, generate the output reports
+ """
+ if not session.config.option.template_dir:
+ return
+
+ if session.config.option.template_source:
+ template_source = session.config.option.template_source[0]
+ else:
+ template_source = os.path.abspath(session.config.option.template_dir[0])
+
+ categories_selected = session.config.option.test_categories or ""
+ generate_report(
+ get_output_dir(session.config),
+ template_source,
+ categories_selected,
+ session.config.option.report_format,
+ )
+
+
+def pytest_terminal_summary(terminalreporter, exitstatus):
+ # Ensures all preload information and warnings appear after
+ # test results
+ create_preloads(terminalreporter.config, exitstatus)
+
+
+# noinspection PyUnusedLocal
+def pytest_collection_modifyitems(session, config, items):
+ """
+ Selects tests based on the categories requested. Tests without
+ categories will always be executed.
+ """
+ config.traceability_items = list(items) # save all items for traceability
+ if not config.option.self_test:
+ for item in items:
+ # checking if test belongs to a category
+ if hasattr(item.function, "categories"):
+ if config.option.test_categories:
+ test_categories = getattr(item.function, "categories")
+ passed_categories = config.option.test_categories
+ if not all(
+ category in passed_categories for category in test_categories
+ ):
+ item.add_marker(
+ pytest.mark.skip(
+ reason=(
+ "Test categories do not match "
+ "all the passed categories"
+ )
+ )
+ )
+ else:
+ item.add_marker(
+ pytest.mark.skip(
+ reason=(
+ "Test belongs to a category but "
+ "no categories were passed"
+ )
+ )
+ )
+
+ items.sort(
+ key=lambda x: (0, x.name)
+ if "base" in set(m.name for m in x.iter_markers())
+ else (1, x.name)
+ )
+
+
+def make_href(paths, base_dir=None):
+ """
+ Create an anchor tag to link to the file paths provided.
+ :param paths: string or list of file paths
+ :param base_dir: If specified this is pre-pended to each path
+ :return: String of hrefs - one for each path, each seperated by a line
+ break (<br/).
+ """
+ paths = [paths] if isinstance(paths, string_types) else paths
+ if base_dir:
+ paths = [os.path.join(base_dir, p) for p in paths]
+ links = []
+ for p in paths:
+ abs_path = os.path.abspath(p)
+ name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
+ links.append(
+ "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
+ abs_path=abs_path, name=name
+ )
+ )
+ return "<br/>".join(links)
+
+
+def generate_report(outpath, template_path, categories, output_format="html"):
+ """
+ Generates the various output reports.
+
+ :param outpath: destination directory for all reports
+ :param template_path: directory containing the Heat templates validated
+ :param categories: Optional categories selected
+ :param output_format: One of "html", "excel", or "csv". Default is "html"
+ :raises: ValueError if requested output format is unknown
+ """
+ failures = [r for r in ALL_RESULTS if r.is_failed]
+ generate_failure_file(outpath)
+ output_format = output_format.lower().strip() if output_format else "html"
+ generate_json(outpath, template_path, categories)
+ if output_format == "html":
+ generate_html_report(outpath, categories, template_path, failures)
+ elif output_format == "excel":
+ generate_excel_report(outpath, categories, template_path, failures)
+ elif output_format == "json":
+ return
+ elif output_format == "csv":
+ generate_csv_report(outpath, categories, template_path, failures)
+ else:
+ raise ValueError("Unsupported output format: " + output_format)
+
+
+def write_json(data, path):
+ """
+ Pretty print data as JSON to the output path requested
+
+ :param data: Data structure to be converted to JSON
+ :param path: Where to write output
+ """
+ with open(path, "w") as f:
+ json.dump(data, f, indent=2)
+
+
+def generate_failure_file(outpath):
+ """
+ Writes a summary of test failures to a file named failures.
+ This is for backwards compatibility only. The report.json offers a
+ more comprehensive output.
+ """
+ failure_path = os.path.join(outpath, "failures")
+ failures = [r for r in ALL_RESULTS if r.is_failed]
+ data = {}
+ for i, fail in enumerate(failures):
+ data[str(i)] = {
+ "file": fail.files[0] if len(fail.files) == 1 else fail.files,
+ "vnfrqts": fail.requirement_ids,
+ "test": fail.test_case,
+ "test_file": fail.test_module,
+ "raw_output": fail.raw_output,
+ "message": fail.error_message,
+ }
+ write_json(data, failure_path)
+
+
+def generate_csv_report(output_dir, categories, template_path, failures):
+ rows = [["Validation Failures"]]
+ headers = [
+ ("Categories Selected:", categories),
+ ("Tool Version:", version.VERSION),
+ ("Report Generated At:", make_timestamp()),
+ ("Directory Validated:", template_path),
+ ("Checksum:", hash_directory(template_path)),
+ ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
+ ]
+ rows.append([])
+ for header in headers:
+ rows.append(header)
+ rows.append([])
+
+ if COLLECTION_FAILURES:
+ rows.append([COLLECTION_FAILURE_WARNING])
+ rows.append(["Validation File", "Test", "Fixtures", "Error"])
+ for failure in COLLECTION_FAILURES:
+ rows.append(
+ [
+ failure["module"],
+ failure["test"],
+ ";".join(failure["fixtures"]),
+ failure["error"],
+ ]
+ )
+ rows.append([])
+
+ # table header
+ rows.append([col for col, _ in REPORT_COLUMNS])
+
+ reqs = load_current_requirements()
+
+ # table content
+ for i, failure in enumerate(failures, start=1):
+ rows.append(
+ [
+ i,
+ "\n".join(failure.files),
+ failure.requirement_text(reqs),
+ failure.error_message,
+ failure.test_id,
+ ]
+ )
+
+ output_path = os.path.join(output_dir, "report.csv")
+ with open(output_path, "w", newline="") as f:
+ writer = csv.writer(f)
+ for row in rows:
+ writer.writerow(row)
+
+
+def generate_excel_report(output_dir, categories, template_path, failures):
+ output_path = os.path.join(output_dir, "report.xlsx")
+ workbook = xlsxwriter.Workbook(output_path)
+ bold = workbook.add_format({"bold": True, "align": "top"})
+ code = workbook.add_format(
+ {"font_name": "Courier", "text_wrap": True, "align": "top"}
+ )
+ normal = workbook.add_format({"text_wrap": True, "align": "top"})
+ heading = workbook.add_format({"bold": True, "font_size": 18})
+ worksheet = workbook.add_worksheet("failures")
+ worksheet.write(0, 0, "Validation Failures", heading)
+
+ headers = [
+ ("Categories Selected:", ",".join(categories)),
+ ("Tool Version:", version.VERSION),
+ ("Report Generated At:", make_timestamp()),
+ ("Directory Validated:", template_path),
+ ("Checksum:", hash_directory(template_path)),
+ ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
+ ]
+ for row, (header, value) in enumerate(headers, start=2):
+ worksheet.write(row, 0, header, bold)
+ worksheet.write(row, 1, value)
+
+ worksheet.set_column(0, len(headers) - 1, 40)
+ worksheet.set_column(len(headers), len(headers), 80)
+
+ if COLLECTION_FAILURES:
+ collection_failures_start = 2 + len(headers) + 2
+ worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
+ collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
+ for col_num, col_name in enumerate(collection_failure_headers):
+ worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
+ for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
+ worksheet.write(row, 0, data["module"])
+ worksheet.write(row, 1, data["test"])
+ worksheet.write(row, 2, ",".join(data["fixtures"]))
+ worksheet.write(row, 3, data["error"], code)
+
+ # table header
+ start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
+ worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
+ for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
+ worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
+
+ reqs = load_current_requirements()
+
+ # table content
+ for col, width in enumerate((20, 30, 60, 60, 40)):
+ worksheet.set_column(col, col, width)
+ err_num = 1
+ for row, failure in enumerate(failures, start=start_error_table_row + 2):
+ worksheet.write(row, 0, str(err_num), normal)
+ worksheet.write(row, 1, "\n".join(failure.files), normal)
+ worksheet.write(row, 2, failure.requirement_text(reqs), normal)
+ worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
+ worksheet.write(row, 4, failure.test_id, normal)
+ err_num += 1
+ worksheet.autofilter(
+ start_error_table_row + 1,
+ 0,
+ start_error_table_row + 1 + err_num,
+ len(REPORT_COLUMNS) - 1,
+ )
+ workbook.close()
+
+
+def make_iso_timestamp():
+ """
+ Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
+ """
+ now = datetime.datetime.utcnow()
+ now.replace(tzinfo=datetime.timezone.utc)
+ return now.isoformat()
+
+
+def aggregate_results(outcomes, r_id=None):
+ """
+ Determines the aggregate result for the conditions provided. Assumes the
+ results have been filtered and collected for analysis.
+
+ :param outcomes: set of outcomes from the TestResults
+ :param r_id: Optional requirement ID if known
+ :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
+ (see aggregate_requirement_adherence for more detail)
+ """
+ if not outcomes:
+ return "PASS"
+ elif "ERROR" in outcomes:
+ return "ERROR"
+ elif "FAIL" in outcomes:
+ return "FAIL"
+ elif "PASS" in outcomes:
+ return "PASS"
+ elif {"SKIP"} == outcomes:
+ return "SKIP"
+ else:
+ pytest.warns(
+ "Unexpected error aggregating outcomes ({}) for requirement {}".format(
+ outcomes, r_id
+ )
+ )
+ return "ERROR"
+
+
+def aggregate_run_results(collection_failures, test_results):
+ """
+ Determines overall status of run based on all failures and results.
+
+ * 'ERROR' - At least one collection failure occurred during the run.
+ * 'FAIL' - Template failed at least one test
+ * 'PASS' - All tests executed properly and no failures were detected
+
+ :param collection_failures: failures occuring during test setup
+ :param test_results: list of all test executuion results
+ :return: one of 'ERROR', 'FAIL', or 'PASS'
+ """
+ if collection_failures:
+ return "ERROR"
+ elif any(r.is_failed for r in test_results):
+ return "FAIL"
+ else:
+ return "PASS"
+
+
+def relative_paths(base_dir, paths):
+ return [os.path.relpath(p, base_dir) for p in paths if p != ""]
+
+
+# noinspection PyTypeChecker
+def generate_json(outpath, template_path, categories):
+ """
+ Creates a JSON summary of the entire test run.
+ """
+ reqs = load_current_requirements()
+ data = {
+ "version": "dublin",
+ "template_directory": os.path.splitdrive(template_path)[1].replace(
+ os.path.sep, "/"
+ ),
+ "timestamp": make_iso_timestamp(),
+ "checksum": hash_directory(template_path),
+ "categories": categories,
+ "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
+ "tests": [],
+ "requirements": [],
+ }
+
+ results = data["tests"]
+ for result in COLLECTION_FAILURES:
+ results.append(
+ {
+ "files": [],
+ "test_module": result["module"],
+ "test_case": result["test"],
+ "result": "ERROR",
+ "error": result["error"],
+ "requirements": result["requirements"],
+ }
+ )
+ for result in ALL_RESULTS:
+ results.append(
+ {
+ "files": relative_paths(template_path, result.files),
+ "test_module": result.test_module,
+ "test_case": result.test_case,
+ "result": result.outcome,
+ "error": result.error_message if result.is_failed else "",
+ "requirements": result.requirements_metadata(reqs),
+ }
+ )
+
+ # Build a mapping of requirement ID to the results
+ r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
+ for test_result in results:
+ test_reqs = test_result["requirements"]
+ r_ids = (
+ [r["id"] if isinstance(r, dict) else r for r in test_reqs]
+ if test_reqs
+ else ("",)
+ )
+ for r_id in r_ids:
+ item = r_id_results[r_id]
+ item["outcomes"].add(test_result["result"])
+ if test_result["error"]:
+ item["errors"].add(test_result["error"])
+
+ requirements = data["requirements"]
+ for r_id, r_data in reqs.items():
+ requirements.append(
+ {
+ "id": r_id,
+ "text": r_data["description"],
+ "keyword": r_data["keyword"],
+ "result": aggregate_results(r_id_results[r_id]["outcomes"]),
+ "errors": list(r_id_results[r_id]["errors"]),
+ }
+ )
+
+ if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
+ requirements.append(
+ {
+ "id": "Unmapped",
+ "text": "Tests not mapped to requirements (see tests)",
+ "result": aggregate_results(r_id_results[""]["outcomes"]),
+ "errors": list(r_id_results[""]["errors"]),
+ }
+ )
+
+ report_path = os.path.join(outpath, "report.json")
+ write_json(data, report_path)
+
+
+def generate_html_report(outpath, categories, template_path, failures):
+ reqs = load_current_requirements()
+ fail_data = []
+ for failure in failures:
+ fail_data.append(
+ {
+ "file_links": make_href(failure.files, template_path),
+ "test_id": failure.test_id,
+ "error_message": escape(failure.error_message).replace(
+ "\n", "<br/><br/>"
+ ),
+ "raw_output": escape(failure.raw_output),
+ "requirements": docutils.core.publish_parts(
+ writer_name="html", source=failure.requirement_text(reqs)
+ )["body"],
+ }
+ )
+ pkg_dir = os.path.split(__file__)[0]
+ j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
+ with open(j2_template_path, "r") as f:
+ report_template = jinja2.Template(f.read())
+ contents = report_template.render(
+ version=version.VERSION,
+ num_failures=len(failures) + len(COLLECTION_FAILURES),
+ categories=categories,
+ template_dir=make_href(template_path),
+ checksum=hash_directory(template_path),
+ timestamp=make_timestamp(),
+ failures=fail_data,
+ collection_failures=COLLECTION_FAILURES,
+ )
+ with open(os.path.join(outpath, "report.html"), "w") as f:
+ f.write(contents)
+