X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=ice_validator%2Ftests%2Fconftest.py;h=9a839b5add6858634a657f476ec2ac3888617216;hb=0c4e64d87728b89aa9cd4d41d738f5bfe64ceee3;hp=5184fb6484703f23a51f838b5fbdb74dc93f50e5;hpb=3fee7ccbfd465eb82508229306cd69d4f0340239;p=vvp%2Fvalidation-scripts.git
diff --git a/ice_validator/tests/conftest.py b/ice_validator/tests/conftest.py
index 5184fb6..9a839b5 100644
--- a/ice_validator/tests/conftest.py
+++ b/ice_validator/tests/conftest.py
@@ -42,14 +42,19 @@ import io
import json
import os
import re
-import sys
import time
+
+from preload.model import create_preloads
+from config import get_generator_plugin_names
+from tests.helpers import get_output_dir
+
+try:
+ from html import escape
+except ImportError:
+ from cgi import escape
from collections import defaultdict
-from itertools import chain
-import requests
import traceback
-import warnings
import docutils.core
import jinja2
@@ -58,23 +63,30 @@ from more_itertools import partition
import xlsxwriter
from six import string_types
+# noinspection PyUnresolvedReferences
+import version
+import logging
+
+logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
+
__path__ = [os.path.dirname(os.path.abspath(__file__))]
DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
-RESOLUTION_STEPS_FILE = "resolution_steps.json"
-HEAT_REQUIREMENTS_FILE = "heat_requirements.json"
-
-# noinspection PyPep8
-NEEDS_JSON_URL = "https://onap.readthedocs.io/en/latest/_downloads/789ac64d223325488fb3f120f959d985/needs.json"
+HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
+TEST_SCRIPT_SITE = (
+ "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
+)
+VNFRQTS_ID_URL = (
+ "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
+)
REPORT_COLUMNS = [
+ ("Error #", "err_num"),
("Input File", "file"),
- ("Test", "test_file"),
("Requirements", "req_description"),
- ("Resolution Steps", "resolution_steps"),
("Error Message", "message"),
- ("Raw Test Output", "raw_output"),
+ ("Test", "test_file"),
]
COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
@@ -88,13 +100,6 @@ COLLECTION_FAILURES = []
ALL_RESULTS = []
-def get_output_dir(config):
- output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
- if not os.path.exists(output_dir):
- os.makedirs(output_dir, exist_ok=True)
- return output_dir
-
-
def extract_error_msg(rep):
"""
If a custom error message was provided, then extract it otherwise
@@ -111,10 +116,10 @@ def extract_error_msg(rep):
# Extract everything between AssertionError and the start
# of the assert statement expansion in the pytest report
msg = match.group(1)
+ elif "AssertionError:" in full_msg:
+ msg = full_msg.split("AssertionError:")[1]
else:
- msg = str(rep.longrepr.reprcrash)
- if "AssertionError:" in msg:
- msg = msg.split("AssertionError:")[1]
+ msg = full_msg
except AttributeError:
msg = str(rep)
@@ -132,7 +137,7 @@ class TestResult:
def __init__(self, item, outcome):
self.item = item
self.result = outcome.get_result()
- self.files = [os.path.normpath(p) for p in self._get_files()]
+ self.files = self._get_files()
self.error_message = self._get_error_message()
@property
@@ -188,6 +193,13 @@ class TestResult:
"""
return self.item.function.__module__.split(".")[-1]
+ @property
+ def test_id(self):
+ """
+ :return: ID of the test (test_module + test_case)
+ """
+ return "{}::{}".format(self.test_module, self.test_case)
+
@property
def raw_output(self):
"""
@@ -207,6 +219,7 @@ class TestResult:
text = (
"\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
for r_id in self.requirement_ids
+ if r_id in curr_reqs
)
return "".join(text)
@@ -236,30 +249,6 @@ class TestResult:
)
return data
- def resolution_steps(self, resolutions):
- """
- :param resolutions: Loaded from contents for resolution_steps.json
- :return: Header and text for the resolution step associated with this
- test case. Returns empty string if no resolutions are
- provided.
- """
- text = (
- "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
- for entry in resolutions
- if self._match(entry)
- )
- return "".join(text)
-
- def _match(self, resolution_entry):
- """
- Returns True if the test result maps to the given entry in
- the resolutions file
- """
- return (
- self.test_case == resolution_entry["function"]
- and self.test_module == resolution_entry["module"]
- )
-
def _get_files(self):
"""
Extracts the list of files passed into the test case.
@@ -276,11 +265,12 @@ class TestResult:
"{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
]
elif "heat_templates" in self.item.fixturenames:
- return self.item.funcargs["heat_templates"]
+ return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
elif "yaml_files" in self.item.fixturenames:
- return self.item.funcargs["yaml_files"]
+ return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
else:
- return [self.result.nodeid.split("[")[1][:-1]]
+ parts = self.result.nodeid.split("[")
+ return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
def _get_error_message(self):
"""
@@ -303,7 +293,6 @@ def pytest_runtest_makereport(item, call):
if outcome.get_result().when != "call":
return # only capture results of test cases themselves
result = TestResult(item, outcome)
- ALL_RESULTS.append(result)
if (
not item.config.option.continue_on_failure
and result.is_base_test
@@ -312,9 +301,11 @@ def pytest_runtest_makereport(item, call):
msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
result.error_message
)
- pytest.exit(
- "{}\n{}\n{}".format(msg, result.files, result.test_case)
- )
+ result.error_message = msg
+ ALL_RESULTS.append(result)
+ pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
+
+ ALL_RESULTS.append(result)
def make_timestamp():
@@ -339,57 +330,80 @@ def pytest_sessionfinish(session, exitstatus):
"""
if not session.config.option.template_dir:
return
- template_path = os.path.abspath(session.config.option.template_dir[0])
- profile_name = session.config.option.validation_profile_name or ""
+
+ if session.config.option.template_source:
+ template_source = session.config.option.template_source[0]
+ else:
+ template_source = os.path.abspath(session.config.option.template_dir[0])
+
+ categories_selected = session.config.option.test_categories or ""
generate_report(
get_output_dir(session.config),
- template_path,
- profile_name,
+ template_source,
+ categories_selected,
session.config.option.report_format,
)
+def pytest_terminal_summary(terminalreporter, exitstatus):
+ # Ensures all preload information and warnings appear after
+ # test results
+ try:
+ create_preloads(terminalreporter.config, exitstatus)
+ except Exception:
+ print("Error creating preloads, skipping preload generation")
+ traceback.print_exc()
+
+
# noinspection PyUnusedLocal
def pytest_collection_modifyitems(session, config, items):
"""
- Selects tests based on the validation profile requested. Tests without
- pytest markers will always be executed.
+ Selects tests based on the categories requested. Tests without
+ categories will always be executed.
"""
- allowed_marks = ["xfail", "base"]
- profile = config.option.validation_profile
-
- for item in items:
- markers = set(m.name for m in item.iter_markers())
- if not profile and markers and set(markers).isdisjoint(allowed_marks):
- item.add_marker(
- pytest.mark.skip(
- reason="No validation profile selected. "
- "Skipping tests with marks."
+ config.traceability_items = list(items) # save all items for traceability
+ if not config.option.self_test:
+ for item in items:
+ passed_categories = set(config.option.test_categories or [])
+ all_of_categories = getattr(item.function, "all_categories", set())
+ any_of_categories = getattr(item.function, "any_categories", set())
+ if all_of_categories and not all_of_categories.issubset(passed_categories):
+ item.add_marker(
+ pytest.mark.skip(
+ reason=(
+ "Test categories do not match " "all the passed categories"
+ )
+ )
+ )
+ if any_of_categories and not passed_categories.intersection(
+ any_of_categories
+ ):
+ item.add_marker(
+ pytest.mark.skip(
+ reason=(
+ "Test categories do not match " "any the passed categories"
+ )
+ )
)
- )
- if (
- profile
- and markers
- and profile not in markers
- and set(markers).isdisjoint(allowed_marks)
- ):
- item.add_marker(
- pytest.mark.skip(reason="Doesn't match selection " "validation profile")
- )
items.sort(
- key=lambda i: 0 if "base" in set(m.name for m in i.iter_markers()) else 1
+ key=lambda x: (0, x.name)
+ if "base" in set(m.name for m in x.iter_markers())
+ else (1, x.name)
)
-def make_href(paths):
+def make_href(paths, base_dir=None):
"""
Create an anchor tag to link to the file paths provided.
:param paths: string or list of file paths
+ :param base_dir: If specified this is pre-pended to each path
:return: String of hrefs - one for each path, each seperated by a line
break (
".join(links)
-def load_resolutions_file():
- """
- :return: dict of data loaded from resolutions_steps.json
- """
- resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
- if os.path.exists(resolution_steps):
- with open(resolution_steps, "r") as f:
- return json.loads(f.read())
-
-
-def generate_report(outpath, template_path, profile_name, output_format="html"):
+def generate_report(outpath, template_path, categories, output_format="html"):
"""
Generates the various output reports.
:param outpath: destination directory for all reports
:param template_path: directory containing the Heat templates validated
- :param profile_name: Optional validation profile selected
+ :param categories: Optional categories selected
:param output_format: One of "html", "excel", or "csv". Default is "html"
:raises: ValueError if requested output format is unknown
"""
failures = [r for r in ALL_RESULTS if r.is_failed]
generate_failure_file(outpath)
output_format = output_format.lower().strip() if output_format else "html"
+ generate_json(outpath, template_path, categories)
if output_format == "html":
- generate_html_report(outpath, profile_name, template_path, failures)
+ generate_html_report(outpath, categories, template_path, failures)
elif output_format == "excel":
- generate_excel_report(outpath, profile_name, template_path, failures)
+ generate_excel_report(outpath, categories, template_path, failures)
elif output_format == "json":
- generate_json(outpath, template_path, profile_name)
+ return
elif output_format == "csv":
- generate_csv_report(outpath, profile_name, template_path, failures)
+ generate_csv_report(outpath, categories, template_path, failures)
else:
raise ValueError("Unsupported output format: " + output_format)
@@ -469,10 +474,11 @@ def generate_failure_file(outpath):
write_json(data, failure_path)
-def generate_csv_report(output_dir, profile_name, template_path, failures):
+def generate_csv_report(output_dir, categories, template_path, failures):
rows = [["Validation Failures"]]
headers = [
- ("Profile Selected:", profile_name),
+ ("Categories Selected:", categories),
+ ("Tool Version:", version.VERSION),
("Report Generated At:", make_timestamp()),
("Directory Validated:", template_path),
("Checksum:", hash_directory(template_path)),
@@ -501,18 +507,16 @@ def generate_csv_report(output_dir, profile_name, template_path, failures):
rows.append([col for col, _ in REPORT_COLUMNS])
reqs = load_current_requirements()
- resolutions = load_resolutions_file()
# table content
- for failure in failures:
+ for i, failure in enumerate(failures, start=1):
rows.append(
[
+ i,
"\n".join(failure.files),
- failure.test_module,
failure.requirement_text(reqs),
- failure.resolution_steps(resolutions),
failure.error_message,
- failure.raw_output,
+ failure.test_id,
]
)
@@ -523,18 +527,21 @@ def generate_csv_report(output_dir, profile_name, template_path, failures):
writer.writerow(row)
-def generate_excel_report(output_dir, profile_name, template_path, failures):
+def generate_excel_report(output_dir, categories, template_path, failures):
output_path = os.path.join(output_dir, "report.xlsx")
workbook = xlsxwriter.Workbook(output_path)
- bold = workbook.add_format({"bold": True})
- code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
- normal = workbook.add_format({"text_wrap": True})
+ bold = workbook.add_format({"bold": True, "align": "top"})
+ code = workbook.add_format(
+ {"font_name": "Courier", "text_wrap": True, "align": "top"}
+ )
+ normal = workbook.add_format({"text_wrap": True, "align": "top"})
heading = workbook.add_format({"bold": True, "font_size": 18})
worksheet = workbook.add_worksheet("failures")
worksheet.write(0, 0, "Validation Failures", heading)
headers = [
- ("Profile Selected:", profile_name),
+ ("Categories Selected:", ",".join(categories)),
+ ("Tool Version:", version.VERSION),
("Report Generated At:", make_timestamp()),
("Directory Validated:", template_path),
("Checksum:", hash_directory(template_path)),
@@ -566,17 +573,24 @@ def generate_excel_report(output_dir, profile_name, template_path, failures):
worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
reqs = load_current_requirements()
- resolutions = load_resolutions_file()
# table content
+ for col, width in enumerate((20, 30, 60, 60, 40)):
+ worksheet.set_column(col, col, width)
+ err_num = 1
for row, failure in enumerate(failures, start=start_error_table_row + 2):
- worksheet.write(row, 0, "\n".join(failure.files), normal)
- worksheet.write(row, 1, failure.test_module, normal)
+ worksheet.write(row, 0, str(err_num), normal)
+ worksheet.write(row, 1, "\n".join(failure.files), normal)
worksheet.write(row, 2, failure.requirement_text(reqs), normal)
- worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
- worksheet.write(row, 4, failure.error_message, normal)
- worksheet.write(row, 5, failure.raw_output, code)
-
+ worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
+ worksheet.write(row, 4, failure.test_id, normal)
+ err_num += 1
+ worksheet.autofilter(
+ start_error_table_row + 1,
+ 0,
+ start_error_table_row + 1 + err_num,
+ len(REPORT_COLUMNS) - 1,
+ )
workbook.close()
@@ -589,44 +603,20 @@ def make_iso_timestamp():
return now.isoformat()
-def aggregate_requirement_adherence(r_id, collection_failures, test_results):
- """
- Examines all tests associated with a given requirement and determines
- the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
-
- * ERROR - At least one ERROR occurred
- * PASS - At least one PASS and no FAIL or ERRORs.
- * FAIL - At least one FAIL occurred (no ERRORs)
- * SKIP - All tests were SKIP
-
-
- :param r_id: Requirement ID to examing
- :param collection_failures: Errors that occurred during test setup.
- :param test_results: List of TestResult
- :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
- """
- errors = any(r_id in f["requirements"] for f in collection_failures)
- outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
- return aggregate_results(errors, outcomes, r_id)
-
-
-def aggregate_results(has_errors, outcomes, r_id=None):
+def aggregate_results(outcomes, r_id=None):
"""
Determines the aggregate result for the conditions provided. Assumes the
results have been filtered and collected for analysis.
- :param has_errors: True if collection failures occurred for the tests being
- analyzed.
:param outcomes: set of outcomes from the TestResults
:param r_id: Optional requirement ID if known
:return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
(see aggregate_requirement_adherence for more detail)
"""
- if has_errors:
- return "ERROR"
-
if not outcomes:
return "PASS"
+ elif "ERROR" in outcomes:
+ return "ERROR"
elif "FAIL" in outcomes:
return "FAIL"
elif "PASS" in outcomes:
@@ -636,7 +626,8 @@ def aggregate_results(has_errors, outcomes, r_id=None):
else:
pytest.warns(
"Unexpected error aggregating outcomes ({}) for requirement {}".format(
- outcomes, r_id)
+ outcomes, r_id
+ )
)
return "ERROR"
@@ -661,60 +652,24 @@ def aggregate_run_results(collection_failures, test_results):
return "PASS"
-def error(failure_or_result):
- """
- Extracts the error message from a collection failure or test result
- :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
- :return: Error message as string
- """
- if isinstance(failure_or_result, TestResult):
- return failure_or_result.error_message
- else:
- return failure_or_result["error"]
-
-
-def req_ids(failure_or_result):
- """
- Extracts the requirement IDs from a collection failure or test result
- :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
- :return: set of Requirement IDs. If no requirements mapped, then an empty set
- """
- if isinstance(failure_or_result, TestResult):
- return set(failure_or_result.requirement_ids)
- else:
- return set(failure_or_result["requirements"])
-
-
-def collect_errors(r_id, collection_failures, test_result):
- """
- Creates a list of error messages from the collection failures and
- test results. If r_id is provided, then it collects the error messages
- where the failure or test is associated with that requirement ID. If
- r_id is None, then it collects all errors that occur on failures and
- results that are not mapped to requirements
- """
- def selector(item):
- if r_id:
- return r_id in req_ids(item)
- else:
- return not req_ids(item)
+def relative_paths(base_dir, paths):
+ return [os.path.relpath(p, base_dir) for p in paths if p != ""]
- errors = (error(x) for x in chain(collection_failures, test_result)
- if selector(x))
- return [e for e in errors if e]
-
-def generate_json(outpath, template_path, profile_name):
+# noinspection PyTypeChecker
+def generate_json(outpath, template_path, categories):
"""
Creates a JSON summary of the entire test run.
"""
reqs = load_current_requirements()
data = {
"version": "dublin",
- "template_directory": template_path,
+ "template_directory": os.path.splitdrive(template_path)[1].replace(
+ os.path.sep, "/"
+ ),
"timestamp": make_iso_timestamp(),
"checksum": hash_directory(template_path),
- "profile": profile_name,
+ "categories": categories,
"outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
"tests": [],
"requirements": [],
@@ -735,7 +690,7 @@ def generate_json(outpath, template_path, profile_name):
for result in ALL_RESULTS:
results.append(
{
- "files": result.files,
+ "files": relative_paths(template_path, result.files),
"test_module": result.test_module,
"test_case": result.test_case,
"result": result.outcome,
@@ -744,30 +699,40 @@ def generate_json(outpath, template_path, profile_name):
}
)
+ # Build a mapping of requirement ID to the results
+ r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
+ for test_result in results:
+ test_reqs = test_result["requirements"]
+ r_ids = (
+ [r["id"] if isinstance(r, dict) else r for r in test_reqs]
+ if test_reqs
+ else ("",)
+ )
+ for r_id in r_ids:
+ item = r_id_results[r_id]
+ item["outcomes"].add(test_result["result"])
+ if test_result["error"]:
+ item["errors"].add(test_result["error"])
+
requirements = data["requirements"]
for r_id, r_data in reqs.items():
- result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
- if result:
- requirements.append(
- {
- "id": r_id,
- "text": r_data["description"],
- "keyword": r_data["keyword"],
- "result": result,
- "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS)
- }
- )
- # If there are tests that aren't mapped to a requirement, then we'll
- # map them to a special entry so the results are coherent.
- unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
- has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
- if unmapped_outcomes or has_errors:
+ requirements.append(
+ {
+ "id": r_id,
+ "text": r_data["description"],
+ "keyword": r_data["keyword"],
+ "result": aggregate_results(r_id_results[r_id]["outcomes"]),
+ "errors": list(r_id_results[r_id]["errors"]),
+ }
+ )
+
+ if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
requirements.append(
{
"id": "Unmapped",
"text": "Tests not mapped to requirements (see tests)",
- "result": aggregate_results(has_errors, unmapped_outcomes),
- "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS)
+ "result": aggregate_results(r_id_results[""]["outcomes"]),
+ "errors": list(r_id_results[""]["errors"]),
}
)
@@ -775,21 +740,21 @@ def generate_json(outpath, template_path, profile_name):
write_json(data, report_path)
-def generate_html_report(outpath, profile_name, template_path, failures):
+def generate_html_report(outpath, categories, template_path, failures):
reqs = load_current_requirements()
- resolutions = load_resolutions_file()
fail_data = []
for failure in failures:
fail_data.append(
{
- "file_links": make_href(failure.files),
- "test_id": failure.test_module,
- "error_message": failure.error_message,
- "raw_output": failure.raw_output,
+ "file_links": make_href(failure.files, template_path),
+ "test_id": failure.test_id,
+ "error_message": escape(failure.error_message).replace(
+ "\n", "
"
+ ),
+ "raw_output": escape(failure.raw_output),
"requirements": docutils.core.publish_parts(
writer_name="html", source=failure.requirement_text(reqs)
)["body"],
- "resolution_steps": failure.resolution_steps(resolutions),
}
)
pkg_dir = os.path.split(__file__)[0]
@@ -797,8 +762,9 @@ def generate_html_report(outpath, profile_name, template_path, failures):
with open(j2_template_path, "r") as f:
report_template = jinja2.Template(f.read())
contents = report_template.render(
+ version=version.VERSION,
num_failures=len(failures) + len(COLLECTION_FAILURES),
- profile_name=profile_name,
+ categories=categories,
template_dir=make_href(template_path),
checksum=hash_directory(template_path),
timestamp=make_timestamp(),
@@ -820,6 +786,13 @@ def pytest_addoption(parser):
help="Directory which holds the templates for validation",
)
+ parser.addoption(
+ "--template-source",
+ dest="template_source",
+ action="append",
+ help="Source Directory which holds the templates for validation",
+ )
+
parser.addoption(
"--self-test",
dest="self_test",
@@ -827,20 +800,6 @@ def pytest_addoption(parser):
help="Test the unit tests against their fixtured data",
)
- parser.addoption(
- "--validation-profile",
- dest="validation_profile",
- action="store",
- help="Runs all unmarked tests plus test with a matching marker",
- )
-
- parser.addoption(
- "--validation-profile-name",
- dest="validation_profile_name",
- action="store",
- help="Friendly name of the validation profile used in reports",
- )
-
parser.addoption(
"--report-format",
dest="report_format",
@@ -860,7 +819,31 @@ def pytest_addoption(parser):
dest="output_dir",
action="store",
default=None,
- help="Alternate "
+ help="Alternate ",
+ )
+
+ parser.addoption(
+ "--category",
+ dest="test_categories",
+ action="append",
+ help="optional category of test to execute",
+ )
+
+ parser.addoption(
+ "--env-directory",
+ dest="env_dir",
+ action="store",
+ help="optional directory of .env files for preload generation",
+ )
+
+ parser.addoption(
+ "--preload-format",
+ dest="preload_formats",
+ action="append",
+ help=(
+ "Preload format to create (multiple allowed). If not provided "
+ "then all available formats will be created: {}"
+ ).format(", ".join(get_generator_plugin_names())),
)
@@ -992,7 +975,12 @@ def pytest_generate_tests(metafunc):
def hash_directory(path):
- md5 = hashlib.md5()
+ """
+ Create md5 hash using the contents of all files under ``path``
+ :param path: string directory containing files
+ :return: string MD5 hash code (hex)
+ """
+ md5 = hashlib.md5() # nosec
for dir_path, sub_dirs, filenames in os.walk(path):
for filename in filenames:
file_path = os.path.join(dir_path, filename)
@@ -1003,51 +991,98 @@ def hash_directory(path):
def load_current_requirements():
"""Loads dict of current requirements or empty dict if file doesn't exist"""
- try:
- r = requests.get(NEEDS_JSON_URL)
- if r.headers.get("content-type") == "application/json":
- with open(HEAT_REQUIREMENTS_FILE, "wb") as needs:
- needs.write(r.content)
- else:
- warnings.warn(
- (
- "Unexpected content-type ({}) encountered downloading "
- + "requirements.json, using last saved copy"
- ).format(r.headers.get("content-type"))
- )
- except requests.exceptions.RequestException as e:
- warnings.warn("Error downloading latest JSON, using last saved copy.")
- warnings.warn(UserWarning(e))
- if not os.path.exists(HEAT_REQUIREMENTS_FILE):
- return {}
with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
data = json.load(f)
version = data["current_version"]
return data["versions"][version]["needs"]
-def compat_open(path):
- """Invokes open correctly depending on the Python version"""
- if sys.version_info.major < 3:
- return open(path, "wb")
- else:
- return open(path, "w", newline="")
+def select_heat_requirements(reqs):
+ """Filters dict requirements to only those requirements pertaining to Heat"""
+ return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
+
+
+def is_testable(reqs):
+ """Filters dict requirements to only those which are testable"""
+ for key, values in reqs.items():
+ if ("MUST" in values.get("keyword", "").upper()) and (
+ "none" not in values.get("validation_mode", "").lower()
+ ):
+ reqs[key]["testable"] = True
+ else:
+ reqs[key]["testable"] = False
+ return reqs
+
+
+def build_rst_json(reqs):
+ """Takes requirements and returns list of only Heat requirements"""
+ for key, values in list(reqs.items()):
+ if values["testable"]:
+ # Creates links in RST format to requirements and test cases
+ if values["test_case"]:
+ mod = values["test_case"].split(".")[-1]
+ val = TEST_SCRIPT_SITE + mod + ".py"
+ rst_value = "`" + mod + " <" + val + ">`_"
+ title = (
+ "`"
+ + values["id"]
+ + " <"
+ + VNFRQTS_ID_URL
+ + values["docname"].replace(" ", "%20")
+ + ".html#"
+ + values["id"]
+ + ">`_"
+ )
+ reqs[key].update({"full_title": title, "test_case": rst_value})
+ else:
+ title = (
+ "`"
+ + values["id"]
+ + " <"
+ + VNFRQTS_ID_URL
+ + values["docname"].replace(" ", "%20")
+ + ".html#"
+ + values["id"]
+ + ">`_"
+ )
+ reqs[key].update(
+ {
+ "full_title": title,
+ "test_case": "No test for requirement",
+ "validated_by": "static",
+ }
+ )
+ else:
+ del reqs[key]
+ return reqs
-def unicode_writerow(writer, row):
- if sys.version_info.major < 3:
- row = [s.encode("utf8") for s in row]
- writer.writerow(row)
+def generate_rst_table(output_dir, data):
+ """Generate a formatted csv to be used in RST"""
+ rst_path = os.path.join(output_dir, "rst.csv")
+ with open(rst_path, "w", newline="") as f:
+ out = csv.writer(f)
+ out.writerow(("Requirement ID", "Test Module", "Test Name"))
+ for req_id, metadata in data.items():
+ out.writerow(
+ (
+ metadata["full_title"],
+ metadata["test_case"],
+ metadata["validated_by"],
+ )
+ )
# noinspection PyUnusedLocal
def pytest_report_collectionfinish(config, startdir, items):
"""Generates a simple traceability report to output/traceability.csv"""
- traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
+ traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
output_dir = os.path.split(traceability_path)[0]
if not os.path.exists(output_dir):
os.makedirs(output_dir)
- requirements = load_current_requirements()
+ reqs = load_current_requirements()
+ requirements = select_heat_requirements(reqs)
+ testable_requirements = is_testable(requirements)
unmapped, mapped = partition(
lambda i: hasattr(i.function, "requirement_ids"), items
)
@@ -1058,43 +1093,82 @@ def pytest_report_collectionfinish(config, startdir, items):
for req_id in item.function.requirement_ids:
if req_id not in req_to_test:
req_to_test[req_id].add(item)
+ if req_id in requirements:
+ reqs[req_id].update(
+ {
+ "test_case": item.function.__module__,
+ "validated_by": item.function.__name__,
+ }
+ )
if req_id not in requirements:
mapping_errors.add(
(req_id, item.function.__module__, item.function.__name__)
)
- mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
- with compat_open(mapping_error_path) as f:
+ mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
+ with open(mapping_error_path, "w", newline="") as f:
writer = csv.writer(f)
for err in mapping_errors:
- unicode_writerow(writer, err)
+ writer.writerow(err)
- with compat_open(traceability_path) as f:
+ with open(traceability_path, "w", newline="") as f:
out = csv.writer(f)
- unicode_writerow(
- out,
- ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
+ out.writerow(
+ (
+ "Requirement ID",
+ "Requirement",
+ "Section",
+ "Keyword",
+ "Validation Mode",
+ "Is Testable",
+ "Test Module",
+ "Test Name",
+ )
)
- for req_id, metadata in requirements.items():
+ for req_id, metadata in testable_requirements.items():
if req_to_test[req_id]:
for item in req_to_test[req_id]:
- unicode_writerow(
- out,
+ out.writerow(
(
req_id,
metadata["description"],
metadata["section_name"],
+ metadata["keyword"],
+ metadata["validation_mode"],
+ metadata["testable"],
item.function.__module__,
item.function.__name__,
- ),
+ )
)
else:
- unicode_writerow(
- out,
- (req_id, metadata["description"], metadata["section_name"], "", ""),
+ out.writerow(
+ (
+ req_id,
+ metadata["description"],
+ metadata["section_name"],
+ metadata["keyword"],
+ metadata["validation_mode"],
+ metadata["testable"],
+ "", # test module
+ "",
+ ) # test function
)
# now write out any test methods that weren't mapped to requirements
- for item in unmapped:
- unicode_writerow(
- out, ("", "", "", item.function.__module__, item.function.__name__)
+ unmapped_tests = {
+ (item.function.__module__, item.function.__name__) for item in unmapped
+ }
+ for test_module, test_name in unmapped_tests:
+ out.writerow(
+ (
+ "", # req ID
+ "", # description
+ "", # section name
+ "", # keyword
+ "static", # validation mode
+ "TRUE", # testable
+ test_module,
+ test_name,
+ )
)
+
+ generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))