import json
import os
import re
-import sys
import time
from collections import defaultdict
from itertools import chain
-import requests
import traceback
-import warnings
import docutils.core
import jinja2
DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
RESOLUTION_STEPS_FILE = "resolution_steps.json"
-HEAT_REQUIREMENTS_FILE = "heat_requirements.json"
-
-# noinspection PyPep8
-NEEDS_JSON_URL = "https://onap.readthedocs.io/en/latest/_downloads/789ac64d223325488fb3f120f959d985/needs.json"
+HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
+TEST_SCRIPT_SITE = "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
+VNFRQTS_ID_URL = "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
REPORT_COLUMNS = [
("Input File", "file"),
def get_output_dir(config):
+ """
+ Retrieve the output directory for the reports and create it if necessary
+ :param config: pytest configuration
+ :return: output directory as string
+ """
output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
"""
text = (
"\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
- for r_id in self.requirement_ids
+ for r_id in self.requirement_ids if r_id in curr_reqs
)
return "".join(text)
elif "yaml_files" in self.item.fixturenames:
return self.item.funcargs["yaml_files"]
else:
- return [self.result.nodeid.split("[")[1][:-1]]
+ parts = self.result.nodeid.split("[")
+ return [""] if len(parts) == 1 else [parts[1][:-1]]
def _get_error_message(self):
"""
failures = [r for r in ALL_RESULTS if r.is_failed]
generate_failure_file(outpath)
output_format = output_format.lower().strip() if output_format else "html"
+ generate_json(outpath, template_path, categories)
if output_format == "html":
generate_html_report(outpath, categories, template_path, failures)
elif output_format == "excel":
generate_excel_report(outpath, categories, template_path, failures)
elif output_format == "json":
- generate_json(outpath, template_path, categories)
+ return
elif output_format == "csv":
generate_csv_report(outpath, categories, template_path, failures)
else:
return [e for e in errors if e]
+def relative_paths(base_dir, paths):
+ return [os.path.relpath(p, base_dir) for p in paths]
+
+
def generate_json(outpath, template_path, categories):
"""
Creates a JSON summary of the entire test run.
reqs = load_current_requirements()
data = {
"version": "dublin",
- "template_directory": template_path,
+ "template_directory": os.path.splitdrive(template_path)[1].replace(
+ os.path.sep, "/"
+ ),
"timestamp": make_iso_timestamp(),
"checksum": hash_directory(template_path),
"categories": categories,
for result in ALL_RESULTS:
results.append(
{
- "files": result.files,
+ "files": relative_paths(template_path, result.files),
"test_module": result.test_module,
"test_case": result.test_case,
"result": result.outcome,
def hash_directory(path):
+ """
+ Create md5 hash using the contents of all files under ``path``
+ :param path: string directory containing files
+ :return: string MD5 hash code (hex)
+ """
md5 = hashlib.md5()
for dir_path, sub_dirs, filenames in os.walk(path):
for filename in filenames:
def load_current_requirements():
"""Loads dict of current requirements or empty dict if file doesn't exist"""
- try:
- r = requests.get(NEEDS_JSON_URL)
- if r.headers.get("content-type") == "application/json":
- with open(HEAT_REQUIREMENTS_FILE, "wb") as needs:
- needs.write(r.content)
- else:
- warnings.warn(
- (
- "Unexpected content-type ({}) encountered downloading "
- + "requirements.json, using last saved copy"
- ).format(r.headers.get("content-type"))
- )
- except requests.exceptions.RequestException as e:
- warnings.warn("Error downloading latest JSON, using last saved copy.")
- warnings.warn(UserWarning(e))
- if not os.path.exists(HEAT_REQUIREMENTS_FILE):
- return {}
with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
data = json.load(f)
version = data["current_version"]
return data["versions"][version]["needs"]
-def compat_open(path):
- """Invokes open correctly depending on the Python version"""
- if sys.version_info.major < 3:
- return open(path, "wb")
- else:
- return open(path, "w", newline="")
-
-
-def unicode_writerow(writer, row):
- if sys.version_info.major < 3:
- row = [s.encode("utf8") for s in row]
- writer.writerow(row)
+def select_heat_requirements(reqs):
+ """Filters dict requirements to only those requirements pertaining to Heat"""
+ return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
-def parse_heat_requirements(reqs):
+def build_rst_json(reqs):
"""Takes requirements and returns list of only Heat requirements"""
data = json.loads(reqs)
for key, values in list(data.items()):
if "Heat" in (values["docname"]):
- if "MUST" not in (values["keyword"]):
- del data[key]
- else:
+ if "MUST" in (values["keyword"]):
if "none" in (values["validation_mode"]):
del data[key]
+ else:
+ # Creates links in RST format to requirements and test cases
+ if values["test_case"]:
+ mod = values["test_case"].split(".")[-1]
+ val = TEST_SCRIPT_SITE + mod + ".py"
+ rst_value = ("`" + mod + " <" + val + ">`_")
+ title = "`" + values["id"] + " <" + VNFRQTS_ID_URL + values["docname"].replace(" ", "%20") + ".html#" + values["id"] + ">`_"
+ data[key].update({'full_title': title, 'test_case': rst_value})
+ else:
+ del data[key]
+ else:
+ del data[key]
else:
del data[key]
return data
+def generate_rst_table(output_dir, data):
+ """Generate a formatted csv to be used in RST"""
+ rst_path = os.path.join(output_dir, "rst.csv")
+ with open(rst_path, "w", newline="") as f:
+ out = csv.writer(f)
+ out.writerow(
+ ("Requirement ID", "Requirement", "Test Module", "Test Name"),
+ )
+ for req_id, metadata in data.items():
+ out.writerow(
+ (
+ metadata["full_title"],
+ metadata["description"],
+ metadata["test_case"],
+ metadata["validated_by"],
+ )
+ )
+
+
# noinspection PyUnusedLocal
def pytest_report_collectionfinish(config, startdir, items):
"""Generates a simple traceability report to output/traceability.csv"""
- traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
+ traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
output_dir = os.path.split(traceability_path)[0]
if not os.path.exists(output_dir):
os.makedirs(output_dir)
reqs = load_current_requirements()
- reqs = json.dumps(reqs)
- requirements = parse_heat_requirements(reqs)
+ requirements = select_heat_requirements(reqs)
unmapped, mapped = partition(
lambda i: hasattr(i.function, "requirement_ids"), items
)
for req_id in item.function.requirement_ids:
if req_id not in req_to_test:
req_to_test[req_id].add(item)
+ if req_id in requirements:
+ reqs[req_id].update({'test_case': item.function.__module__,
+ 'validated_by': item.function.__name__})
if req_id not in requirements:
mapping_errors.add(
(req_id, item.function.__module__, item.function.__name__)
)
- mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
- with compat_open(mapping_error_path) as f:
+ mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
+ with open(mapping_error_path, "w", newline="") as f:
writer = csv.writer(f)
for err in mapping_errors:
- unicode_writerow(writer, err)
+ writer.writerow(err)
- with compat_open(traceability_path) as f:
+ with open(traceability_path, "w", newline="") as f:
out = csv.writer(f)
- unicode_writerow(
- out,
- ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
+ out.writerow(
+ ("Requirement ID", "Requirement", "Section",
+ "Keyword", "Validation Mode", "Is Testable",
+ "Test Module", "Test Name"),
)
for req_id, metadata in requirements.items():
+ keyword = metadata["keyword"].upper()
+ mode = metadata["validation_mode"].lower()
+ testable = keyword in {"MUST", "MUST NOT"} and mode != "none"
if req_to_test[req_id]:
for item in req_to_test[req_id]:
- unicode_writerow(
- out,
+ out.writerow(
(
req_id,
metadata["description"],
metadata["section_name"],
+ keyword,
+ mode,
+ "TRUE" if testable else "FALSE",
item.function.__module__,
item.function.__name__,
),
)
else:
- unicode_writerow(
- out,
- (req_id, metadata["description"], metadata["section_name"], "", ""),
+ out.writerow(
+ (req_id,
+ metadata["description"],
+ metadata["section_name"],
+ keyword,
+ mode,
+ "TRUE" if testable else "FALSE",
+ "", # test module
+ ""), # test function
)
# now write out any test methods that weren't mapped to requirements
- for item in unmapped:
- unicode_writerow(
- out, ("", "", "", item.function.__module__, item.function.__name__)
+ unmapped_tests = {(item.function.__module__, item.function.__name__) for item in
+ unmapped}
+ for test_module, test_name in unmapped_tests:
+ out.writerow(
+ ("", # req ID
+ "", # description
+ "", # section name
+ "", # keyword
+ "static", # validation mode
+ "TRUE", # testable
+ test_module,
+ test_name)
)
+
+ generate_rst_table(get_output_dir(config), build_rst_json(json.dumps(reqs)))