X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=ice_validator%2Ftests%2Fconftest.py;h=1a8b9c1129740273deca892fa188b980aba0d257;hb=f40ff1bf5c95696f78ec6ff1862e6954f360a04f;hp=598fc01c95afd567bcb1f05c093d73677161c353;hpb=5cb6317e84178beb054230ae88d3af1a882920cf;p=vvp%2Fvalidation-scripts.git
diff --git a/ice_validator/tests/conftest.py b/ice_validator/tests/conftest.py
index 598fc01..1a8b9c1 100644
--- a/ice_validator/tests/conftest.py
+++ b/ice_validator/tests/conftest.py
@@ -2,7 +2,7 @@
# ============LICENSE_START=======================================================
# org.onap.vvp/validation-scripts
# ===================================================================
-# Copyright © 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright © 2019 AT&T Intellectual Property. All rights reserved.
# ===================================================================
#
# Unless otherwise specified, all software contained herein is licensed
@@ -35,219 +35,485 @@
#
# ============LICENSE_END============================================
-import collections
import csv
import datetime
import hashlib
import io
import json
import os
-import sys
+import re
import time
+from collections import defaultdict
+
+import traceback
import docutils.core
+import jinja2
import pytest
from more_itertools import partition
-from six import string_types
import xlsxwriter
+from six import string_types
+
+# noinspection PyUnresolvedReferences
+import version
+import logging
+
+logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
__path__ = [os.path.dirname(os.path.abspath(__file__))]
-resolution_steps_file = "resolution_steps.json"
-requirements_file = "requirements.json"
+DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
-FAILURE_DATA = {}
+HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
+TEST_SCRIPT_SITE = (
+ "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
+)
+VNFRQTS_ID_URL = (
+ "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
+)
-report_columns = [
+REPORT_COLUMNS = [
+ ("Error #", "err_num"),
("Input File", "file"),
- ("Test", "test_file"),
("Requirements", "req_description"),
- ("Resolution Steps", "resolution_steps"),
("Error Message", "message"),
- ("Raw Test Output", "raw_output"),
+ ("Test", "test_file"),
]
-report = collections.OrderedDict(report_columns)
+
+COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
+while preparing to validate the the input files. Some validations may not have been
+executed. Please refer these issue to the VNF Validation Tool team.
+"""
+
+COLLECTION_FAILURES = []
+
+# Captures the results of every test run
+ALL_RESULTS = []
+
+
+def get_output_dir(config):
+ """
+ Retrieve the output directory for the reports and create it if necessary
+ :param config: pytest configuration
+ :return: output directory as string
+ """
+ output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
+ if not os.path.exists(output_dir):
+ os.makedirs(output_dir, exist_ok=True)
+ return output_dir
def extract_error_msg(rep):
+ """
+ If a custom error message was provided, then extract it otherwise
+ just show the pytest assert message
+ """
+ if rep.outcome != "failed":
+ return ""
try:
- msg = str(rep.longrepr.reprcrash)
+ full_msg = str(rep.longrepr.reprcrash.message)
+ match = re.match(
+ "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
+ )
+ if match: # custom message was provided
+ # Extract everything between AssertionError and the start
+ # of the assert statement expansion in the pytest report
+ msg = match.group(1)
+ elif "AssertionError:" in full_msg:
+ msg = full_msg.split("AssertionError:")[1]
+ else:
+ msg = full_msg
except AttributeError:
msg = str(rep)
- if "AssertionError:" in msg:
- return msg.split("AssertionError:")[1]
- else:
- return msg
+ return msg
-@pytest.hookimpl(tryfirst=True, hookwrapper=True)
-def pytest_runtest_makereport(item, call):
-
- outcome = yield
- rep = outcome.get_result()
-
- output_dir = "{}/../output".format(__path__[0])
- if rep.outcome == "failed":
- if not os.path.exists(output_dir):
- os.mkdir(output_dir)
-
- if hasattr(item.function, "requirement_ids"):
- requirement_ids = item.function.requirement_ids
- else:
- requirement_ids = ""
+class TestResult:
+ """
+ Wraps the test case and result to extract necessary metadata for
+ reporting purposes.
+ """
- if "environment_pair" in item.fixturenames:
- resolved_pair = "{} environment pair".format(
- item.funcargs["environment_pair"]["name"]
- )
- elif "heat_volume_pair" in item.fixturenames:
- resolved_pair = "{} volume pair".format(
- item.funcargs["heat_volume_pair"]["name"]
+ RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
+
+ def __init__(self, item, outcome):
+ self.item = item
+ self.result = outcome.get_result()
+ self.files = self._get_files()
+ self.error_message = self._get_error_message()
+
+ @property
+ def requirement_ids(self):
+ """
+ Returns list of requirement IDs mapped to the test case.
+
+ :return: Returns a list of string requirement IDs the test was
+ annotated with ``validates`` otherwise returns and empty list
+ """
+ is_mapped = hasattr(self.item.function, "requirement_ids")
+ return self.item.function.requirement_ids if is_mapped else []
+
+ @property
+ def markers(self):
+ """
+ :return: Returns a set of pytest marker names for the test or an empty set
+ """
+ return set(m.name for m in self.item.iter_markers())
+
+ @property
+ def is_base_test(self):
+ """
+ :return: Returns True if the test is annotated with a pytest marker called base
+ """
+ return "base" in self.markers
+
+ @property
+ def is_failed(self):
+ """
+ :return: True if the test failed
+ """
+ return self.outcome == "FAIL"
+
+ @property
+ def outcome(self):
+ """
+ :return: Returns 'PASS', 'FAIL', or 'SKIP'
+ """
+ return self.RESULT_MAPPING[self.result.outcome]
+
+ @property
+ def test_case(self):
+ """
+ :return: Name of the test case method
+ """
+ return self.item.function.__name__
+
+ @property
+ def test_module(self):
+ """
+ :return: Name of the file containing the test case
+ """
+ return self.item.function.__module__.split(".")[-1]
+
+ @property
+ def test_id(self):
+ """
+ :return: ID of the test (test_module + test_case)
+ """
+ return "{}::{}".format(self.test_module, self.test_case)
+
+ @property
+ def raw_output(self):
+ """
+ :return: Full output from pytest for the given test case
+ """
+ return str(self.result.longrepr)
+
+ def requirement_text(self, curr_reqs):
+ """
+ Creates a text summary for the requirement IDs mapped to the test case.
+ If no requirements are mapped, then it returns the empty string.
+
+ :param curr_reqs: mapping of requirement IDs to requirement metadata
+ loaded from the VNFRQTS projects needs.json output
+ :return: ID and text of the requirements mapped to the test case
+ """
+ text = (
+ "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
+ for r_id in self.requirement_ids
+ if r_id in curr_reqs
+ )
+ return "".join(text)
+
+ def requirements_metadata(self, curr_reqs):
+ """
+ Returns a list of dicts containing the following metadata for each
+ requirement mapped:
+
+ - id: Requirement ID
+ - text: Full text of the requirement
+ - keyword: MUST, MUST NOT, MAY, etc.
+
+ :param curr_reqs: mapping of requirement IDs to requirement metadata
+ loaded from the VNFRQTS projects needs.json output
+ :return: List of requirement metadata
+ """
+ data = []
+ for r_id in self.requirement_ids:
+ if r_id not in curr_reqs:
+ continue
+ data.append(
+ {
+ "id": r_id,
+ "text": curr_reqs[r_id]["description"],
+ "keyword": curr_reqs[r_id]["keyword"],
+ }
)
- elif "heat_templates" in item.fixturenames:
- resolved_pair = item.funcargs["heat_templates"]
- elif "yaml_files" in item.fixturenames:
- resolved_pair = item.funcargs["yaml_files"]
+ return data
+
+ def _get_files(self):
+ """
+ Extracts the list of files passed into the test case.
+ :return: List of absolute paths to files
+ """
+ if "environment_pair" in self.item.fixturenames:
+ return [
+ "{} environment pair".format(
+ self.item.funcargs["environment_pair"]["name"]
+ )
+ ]
+ elif "heat_volume_pair" in self.item.fixturenames:
+ return [
+ "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
+ ]
+ elif "heat_templates" in self.item.fixturenames:
+ return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
+ elif "yaml_files" in self.item.fixturenames:
+ return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
else:
- resolved_pair = rep.nodeid.split("[")[1][:-1]
-
- FAILURE_DATA[len(FAILURE_DATA)] = {
- "file": resolved_pair,
- "vnfrqts": requirement_ids,
- "test": item.function.__name__,
- "test_file": item.function.__module__.split(".")[-1],
- "raw_output": str(rep.longrepr),
- "message": extract_error_msg(rep),
- }
+ parts = self.result.nodeid.split("[")
+ return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
+
+ def _get_error_message(self):
+ """
+ :return: Error message or empty string if the test did not fail or error
+ """
+ if self.is_failed:
+ return extract_error_msg(self.result)
+ else:
+ return ""
- with open("{}/failures".format(output_dir), "w") as f:
- json.dump(FAILURE_DATA, f, indent=4)
+
+# noinspection PyUnusedLocal
+@pytest.hookimpl(tryfirst=True, hookwrapper=True)
+def pytest_runtest_makereport(item, call):
+ """
+ Captures the test results for later reporting. This will also halt testing
+ if a base failure is encountered (can be overridden with continue-on-failure)
+ """
+ outcome = yield
+ if outcome.get_result().when != "call":
+ return # only capture results of test cases themselves
+ result = TestResult(item, outcome)
+ if (
+ not item.config.option.continue_on_failure
+ and result.is_base_test
+ and result.is_failed
+ ):
+ msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
+ result.error_message
+ )
+ result.error_message = msg
+ ALL_RESULTS.append(result)
+ pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
+
+ ALL_RESULTS.append(result)
def make_timestamp():
+ """
+ :return: String make_iso_timestamp in format:
+ 2019-01-19 10:18:49.865000 Central Standard Time
+ """
timezone = time.tzname[time.localtime().tm_isdst]
return "{} {}".format(str(datetime.datetime.now()), timezone)
+# noinspection PyUnusedLocal
+def pytest_sessionstart(session):
+ ALL_RESULTS.clear()
+ COLLECTION_FAILURES.clear()
+
+
+# noinspection PyUnusedLocal
def pytest_sessionfinish(session, exitstatus):
+ """
+ If not a self-test run, generate the output reports
+ """
if not session.config.option.template_dir:
return
- template_path = os.path.abspath(session.config.option.template_dir[0])
- profile_name = session.config.option.validation_profile_name
+
+ if session.config.option.template_source:
+ template_source = session.config.option.template_source[0]
+ else:
+ template_source = os.path.abspath(session.config.option.template_dir[0])
+
+ categories_selected = session.config.option.test_categories or ""
generate_report(
- "{}/../output".format(__path__[0]),
- template_path,
- profile_name,
+ get_output_dir(session.config),
+ template_source,
+ categories_selected,
session.config.option.report_format,
)
-def pytest_runtest_setup(item):
- profile = item.session.config.option.validation_profile
- markers = set(m.name for m in item.iter_markers())
- if not profile and markers and "xfail" not in markers:
- pytest.skip("No validation profile selected. Skipping tests with marks.")
- if profile and markers and profile not in markers and "xfail" not in markers:
- pytest.skip("Doesn't match selection validation profile")
+# noinspection PyUnusedLocal
+def pytest_collection_modifyitems(session, config, items):
+ """
+ Selects tests based on the categories requested. Tests without
+ categories will always be executed.
+ """
+ config.traceability_items = list(items) # save all items for traceability
+ if not config.option.self_test:
+ for item in items:
+ # checking if test belongs to a category
+ if hasattr(item.function, "categories"):
+ if config.option.test_categories:
+ test_categories = getattr(item.function, "categories")
+ passed_categories = config.option.test_categories
+ if not all(
+ category in passed_categories for category in test_categories
+ ):
+ item.add_marker(
+ pytest.mark.skip(
+ reason=(
+ "Test categories do not match "
+ "all the passed categories"
+ )
+ )
+ )
+ else:
+ item.add_marker(
+ pytest.mark.skip(
+ reason=(
+ "Test belongs to a category but "
+ "no categories were passed"
+ )
+ )
+ )
+
+ items.sort(
+ key=lambda x: (0, x.name)
+ if "base" in set(m.name for m in x.iter_markers())
+ else (1, x.name)
+ )
-def make_href(path):
- paths = [path] if isinstance(path, string_types) else path
+def make_href(paths, base_dir=None):
+ """
+ Create an anchor tag to link to the file paths provided.
+ :param paths: string or list of file paths
+ :param base_dir: If specified this is pre-pended to each path
+ :return: String of hrefs - one for each path, each seperated by a line
+ break (
{filename}".format(
- abs_path=abs_path, filename=filename
+ "{name}".format(
+ abs_path=abs_path, name=name
)
)
return "
".join(links)
-def generate_report(outpath, template_path, profile_name, output_format):
- failures = "{}/failures".format(outpath)
- faildata = None
- rdata = None
- hdata = None
+def generate_report(outpath, template_path, categories, output_format="html"):
+ """
+ Generates the various output reports.
- if os.path.exists(failures):
- with open(failures, "r") as f:
- faildata = json.loads(f.read())
- else:
- faildata = {}
-
- resolution_steps = "{}/../{}".format(__path__[0], resolution_steps_file)
- if os.path.exists(resolution_steps):
- with open(resolution_steps, "r") as f:
- rdata = json.loads(f.read())
-
- heat_requirements = "{}/../{}".format(__path__[0], requirements_file)
- if os.path.exists(heat_requirements):
- with open(heat_requirements, "r") as f:
- hdata = json.loads(f.read())
-
- # point requirements at the most recent version
- current_version = hdata["current_version"]
- hdata = hdata["versions"][current_version]["needs"]
- # mapping requirement IDs from failures to requirement descriptions
- for k, v in faildata.items():
- req_text = ""
- if v["vnfrqts"] != "":
- for req in v["vnfrqts"]:
- if req in hdata:
- req_text += "\n\n{}: \n{}".format(req, hdata[req]["description"])
- faildata[k]["req_description"] = req_text
-
- # mapping resolution steps to module and test name
- for k, v in faildata.items():
- faildata[k]["resolution_steps"] = ""
- for rs in rdata:
- if v["test_file"] == rs["module"] and v["test"] == rs["function"]:
- faildata[k]["resolution_steps"] = "\n{}: \n{}".format(
- rs["header"], rs["resolution_steps"]
- )
+ :param outpath: destination directory for all reports
+ :param template_path: directory containing the Heat templates validated
+ :param categories: Optional categories selected
+ :param output_format: One of "html", "excel", or "csv". Default is "html"
+ :raises: ValueError if requested output format is unknown
+ """
+ failures = [r for r in ALL_RESULTS if r.is_failed]
+ generate_failure_file(outpath)
output_format = output_format.lower().strip() if output_format else "html"
+ generate_json(outpath, template_path, categories)
if output_format == "html":
- generate_html_report(outpath, profile_name, template_path, faildata)
+ generate_html_report(outpath, categories, template_path, failures)
elif output_format == "excel":
- generate_excel_report(outpath, profile_name, template_path, faildata)
+ generate_excel_report(outpath, categories, template_path, failures)
+ elif output_format == "json":
+ return
elif output_format == "csv":
- generate_csv_report(outpath, profile_name, template_path, faildata)
+ generate_csv_report(outpath, categories, template_path, failures)
else:
raise ValueError("Unsupported output format: " + output_format)
-def generate_csv_report(output_dir, profile_name, template_path, faildata):
- rows = []
- rows.append(["Validation Failures"])
+def write_json(data, path):
+ """
+ Pretty print data as JSON to the output path requested
+
+ :param data: Data structure to be converted to JSON
+ :param path: Where to write output
+ """
+ with open(path, "w") as f:
+ json.dump(data, f, indent=2)
+
+
+def generate_failure_file(outpath):
+ """
+ Writes a summary of test failures to a file named failures.
+ This is for backwards compatibility only. The report.json offers a
+ more comprehensive output.
+ """
+ failure_path = os.path.join(outpath, "failures")
+ failures = [r for r in ALL_RESULTS if r.is_failed]
+ data = {}
+ for i, fail in enumerate(failures):
+ data[str(i)] = {
+ "file": fail.files[0] if len(fail.files) == 1 else fail.files,
+ "vnfrqts": fail.requirement_ids,
+ "test": fail.test_case,
+ "test_file": fail.test_module,
+ "raw_output": fail.raw_output,
+ "message": fail.error_message,
+ }
+ write_json(data, failure_path)
+
+
+def generate_csv_report(output_dir, categories, template_path, failures):
+ rows = [["Validation Failures"]]
headers = [
- ("Profile Selected:", profile_name),
+ ("Categories Selected:", categories),
+ ("Tool Version:", version.VERSION),
("Report Generated At:", make_timestamp()),
("Directory Validated:", template_path),
("Checksum:", hash_directory(template_path)),
- ("Total Errors:", len(faildata)),
+ ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
]
-
rows.append([])
for header in headers:
rows.append(header)
rows.append([])
+ if COLLECTION_FAILURES:
+ rows.append([COLLECTION_FAILURE_WARNING])
+ rows.append(["Validation File", "Test", "Fixtures", "Error"])
+ for failure in COLLECTION_FAILURES:
+ rows.append(
+ [
+ failure["module"],
+ failure["test"],
+ ";".join(failure["fixtures"]),
+ failure["error"],
+ ]
+ )
+ rows.append([])
+
# table header
- rows.append([col for col, _ in report_columns])
+ rows.append([col for col, _ in REPORT_COLUMNS])
+
+ reqs = load_current_requirements()
# table content
- for data in faildata.values():
+ for i, failure in enumerate(failures, start=1):
rows.append(
[
- data.get("file", ""),
- data.get("test_file", ""),
- data.get("req_description", ""),
- data.get("resolution_steps", ""),
- data.get("message", ""),
- data.get("raw_output", ""),
+ i,
+ "\n".join(failure.files),
+ failure.requirement_text(reqs),
+ failure.error_message,
+ failure.test_id,
]
)
@@ -258,22 +524,25 @@ def generate_csv_report(output_dir, profile_name, template_path, faildata):
writer.writerow(row)
-def generate_excel_report(output_dir, profile_name, template_path, faildata):
+def generate_excel_report(output_dir, categories, template_path, failures):
output_path = os.path.join(output_dir, "report.xlsx")
workbook = xlsxwriter.Workbook(output_path)
- bold = workbook.add_format({"bold": True})
- code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
- normal = workbook.add_format({"text_wrap": True})
+ bold = workbook.add_format({"bold": True, "align": "top"})
+ code = workbook.add_format(
+ {"font_name": "Courier", "text_wrap": True, "align": "top"}
+ )
+ normal = workbook.add_format({"text_wrap": True, "align": "top"})
heading = workbook.add_format({"bold": True, "font_size": 18})
worksheet = workbook.add_worksheet("failures")
worksheet.write(0, 0, "Validation Failures", heading)
headers = [
- ("Profile Selected:", profile_name),
+ ("Categories Selected:", ",".join(categories)),
+ ("Tool Version:", version.VERSION),
("Report Generated At:", make_timestamp()),
("Directory Validated:", template_path),
("Checksum:", hash_directory(template_path)),
- ("Total Errors:", len(faildata)),
+ ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
]
for row, (header, value) in enumerate(headers, start=2):
worksheet.write(row, 0, header, bold)
@@ -282,93 +551,223 @@ def generate_excel_report(output_dir, profile_name, template_path, faildata):
worksheet.set_column(0, len(headers) - 1, 40)
worksheet.set_column(len(headers), len(headers), 80)
+ if COLLECTION_FAILURES:
+ collection_failures_start = 2 + len(headers) + 2
+ worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
+ collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
+ for col_num, col_name in enumerate(collection_failure_headers):
+ worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
+ for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
+ worksheet.write(row, 0, data["module"])
+ worksheet.write(row, 1, data["test"])
+ worksheet.write(row, 2, ",".join(data["fixtures"]))
+ worksheet.write(row, 3, data["error"], code)
+
# table header
- start_error_table_row = 2 + len(headers) + 2
- for col_num, (col_name, _) in enumerate(report_columns):
- worksheet.write(start_error_table_row, col_num, col_name, bold)
+ start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
+ worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
+ for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
+ worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
- # table content
- for row, data in enumerate(faildata.values(), start=start_error_table_row + 1):
- for col, key in enumerate(report.values()):
- if key == "file":
- paths = (
- [data[key]] if isinstance(data[key], string_types) else data[key]
- )
- contents = "\n".join(paths)
- worksheet.write(row, col, contents, normal)
- elif key == "raw_output":
- worksheet.write_string(row, col, data[key], code)
- else:
- worksheet.write(row, col, data[key], normal)
+ reqs = load_current_requirements()
+ # table content
+ for col, width in enumerate((20, 30, 60, 60, 40)):
+ worksheet.set_column(col, col, width)
+ err_num = 1
+ for row, failure in enumerate(failures, start=start_error_table_row + 2):
+ worksheet.write(row, 0, str(err_num), normal)
+ worksheet.write(row, 1, "\n".join(failure.files), normal)
+ worksheet.write(row, 2, failure.requirement_text(reqs), normal)
+ worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
+ worksheet.write(row, 4, failure.test_id, normal)
+ err_num += 1
+ worksheet.autofilter(
+ start_error_table_row + 1,
+ 0,
+ start_error_table_row + 1 + err_num,
+ len(REPORT_COLUMNS) - 1,
+ )
workbook.close()
-def generate_html_report(outpath, profile_name, template_path, faildata):
- with open("{}/report.html".format(outpath), "w") as of:
- body_begin = """
-
-
Success! No validation failures detected.
") - return - - table_begin = '{} | '.format(k)) - of.write("
---|
{} | ".format(value)) - of.write("