# -*- coding: utf8 -*- # ============LICENSE_START======================================================= # org.onap.vvp/validation-scripts # =================================================================== # Copyright © 2018 AT&T Intellectual Property. All rights reserved. # =================================================================== # # Unless otherwise specified, all software contained herein is licensed # under the Apache License, Version 2.0 (the "License"); # you may not use this software except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # # # Unless otherwise specified, all documentation contained herein is licensed # under the Creative Commons License, Attribution 4.0 Intl. (the "License"); # you may not use this documentation except in compliance with the License. # You may obtain a copy of the License at # # https://creativecommons.org/licenses/by/4.0/ # # Unless required by applicable law or agreed to in writing, documentation # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # ============LICENSE_END============================================ import collections import csv import datetime import hashlib import io import json import os import sys import time import requests import traceback import warnings import docutils.core import jinja2 import pytest from more_itertools import partition from six import string_types import xlsxwriter __path__ = [os.path.dirname(os.path.abspath(__file__))] resolution_steps_file = "resolution_steps.json" heat_requirements_file = "heat_requirements.json" report_columns = [ ("Input File", "file"), ("Test", "test_file"), ("Requirements", "req_description"), ("Resolution Steps", "resolution_steps"), ("Error Message", "message"), ("Raw Test Output", "raw_output"), ] report = collections.OrderedDict(report_columns) COLLECTION_FAILURES = [] COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred while preparing to validate the the input files. Some validations may not have been executed. Please refer these issue to the VNF Validation Tool team. """ def extract_error_msg(rep): try: msg = str(rep.longrepr.reprcrash) except AttributeError: msg = str(rep) if "AssertionError:" in msg: return msg.split("AssertionError:")[1] else: return msg @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_runtest_makereport(item, call): outcome = yield rep = outcome.get_result() output_dir = "{}/../output".format(__path__[0]) if rep.outcome == "failed": if not os.path.exists(output_dir): os.mkdir(output_dir) if os.path.exists("{}/failures".format(output_dir)): with open("{}/failures".format(output_dir), "r") as o: jdata = json.loads(o.read()) else: jdata = {} if hasattr(item.function, "requirement_ids"): requirement_ids = item.function.requirement_ids else: requirement_ids = "" if "environment_pair" in item.fixturenames: resolved_pair = "{} environment pair".format( item.funcargs["environment_pair"]["name"] ) elif "heat_volume_pair" in item.fixturenames: resolved_pair = "{} volume pair".format( item.funcargs["heat_volume_pair"]["name"] ) elif "heat_templates" in item.fixturenames: resolved_pair = item.funcargs["heat_templates"] elif "yaml_files" in item.fixturenames: resolved_pair = item.funcargs["yaml_files"] else: resolved_pair = rep.nodeid.split("[")[1][:-1] markers = set(m.name for m in item.iter_markers()) base_test = "base" in markers msg = extract_error_msg(rep) if base_test: msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format( msg ) jdata[len(jdata)] = { "file": resolved_pair, "vnfrqts": requirement_ids, "test": item.function.__name__, "test_file": item.function.__module__.split(".")[-1], "raw_output": str(rep.longrepr), "message": msg, } with open("{}/failures".format(output_dir), "w") as f: json.dump(jdata, f, indent=4) if not item.config.option.continue_on_failure and base_test: pytest.exit( "{}\n{}\n{}".format(msg, resolved_pair, item.function.__name__) ) def make_timestamp(): timezone = time.tzname[time.localtime().tm_isdst] return "{} {}".format(str(datetime.datetime.now()), timezone) def pytest_sessionfinish(session, exitstatus): if not session.config.option.template_dir: return template_path = os.path.abspath(session.config.option.template_dir[0]) profile_name = session.config.option.validation_profile_name generate_report( "{}/../output".format(__path__[0]), template_path, profile_name, session.config.option.report_format, ) def pytest_collection_modifyitems(session, config, items): allowed_marks = ["xfail", "base"] profile = config.option.validation_profile for item in items: markers = set(m.name for m in item.iter_markers()) if not profile and markers and set(markers).isdisjoint(allowed_marks): item.add_marker( pytest.mark.skip( reason="No validation profile selected. Skipping tests with marks." ) ) if ( profile and markers and profile not in markers and set(markers).isdisjoint(allowed_marks) ): item.add_marker( pytest.mark.skip(reason="Doesn't match selection validation profile") ) items.sort( key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1 ) def make_href(path): paths = [path] if isinstance(path, string_types) else path links = [] for p in paths: abs_path = os.path.abspath(p) name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1] links.append( "{name}".format( abs_path=abs_path, name=name ) ) return "
".join(links) def generate_report(outpath, template_path, profile_name, output_format): failures = "{}/failures".format(outpath) faildata = None rdata = None hdata = None if os.path.exists(failures): with open(failures, "r") as f: faildata = json.loads(f.read()) else: faildata = {} resolution_steps = "{}/../{}".format(__path__[0], resolution_steps_file) if os.path.exists(resolution_steps): with open(resolution_steps, "r") as f: rdata = json.loads(f.read()) heat_requirements = "{}/../{}".format(__path__[0], heat_requirements_file) if os.path.exists(heat_requirements): with open(heat_requirements, "r") as f: hdata = json.loads(f.read()) # point requirements at the most recent version current_version = hdata["current_version"] hdata = hdata["versions"][current_version]["needs"] # mapping requirement IDs from failures to requirement descriptions for k, v in faildata.items(): req_text = "" if v["vnfrqts"] != "": for req in v["vnfrqts"]: if req in hdata: req_text += "\n\n{}: \n{}".format(req, hdata[req]["description"]) faildata[k]["req_description"] = req_text # mapping resolution steps to module and test name for k, v in faildata.items(): # resolution_step = "" faildata[k]["resolution_steps"] = "" for rs in rdata: if v["test_file"] == rs["module"] and v["test"] == rs["function"]: faildata[k]["resolution_steps"] = "\n{}: \n{}".format( rs["header"], rs["resolution_steps"] ) output_format = output_format.lower().strip() if output_format else "html" if output_format == "html": generate_html_report(outpath, profile_name, template_path, faildata) elif output_format == "excel": generate_excel_report(outpath, profile_name, template_path, faildata) elif output_format == "csv": generate_csv_report(outpath, profile_name, template_path, faildata) else: raise ValueError("Unsupported output format: " + output_format) def generate_csv_report(output_dir, profile_name, template_path, faildata): rows = [["Validation Failures"]] headers = [ ("Profile Selected:", profile_name), ("Report Generated At:", make_timestamp()), ("Directory Validated:", template_path), ("Checksum:", hash_directory(template_path)), ("Total Errors:", len(faildata) + len(COLLECTION_FAILURES)), ] rows.append([]) for header in headers: rows.append(header) rows.append([]) if COLLECTION_FAILURES: rows.append([COLLECTION_FAILURE_WARNING]) rows.append(["Validation File", "Test", "Fixtures", "Error"]) for failure in COLLECTION_FAILURES: rows.append( [ failure["module"], failure["test"], ";".join(failure["fixtures"]), failure["error"], ] ) rows.append([]) # table header rows.append([col for col, _ in report_columns]) # table content for data in faildata.values(): rows.append( [ data.get("file", ""), data.get("test_file", ""), data.get("req_description", ""), data.get("resolution_steps", ""), data.get("message", ""), data.get("raw_output", ""), ] ) output_path = os.path.join(output_dir, "report.csv") with open(output_path, "w", newline="") as f: writer = csv.writer(f) for row in rows: writer.writerow(row) def generate_excel_report(output_dir, profile_name, template_path, faildata): output_path = os.path.join(output_dir, "report.xlsx") workbook = xlsxwriter.Workbook(output_path) bold = workbook.add_format({"bold": True}) code = workbook.add_format(({"font_name": "Courier", "text_wrap": True})) normal = workbook.add_format({"text_wrap": True}) heading = workbook.add_format({"bold": True, "font_size": 18}) worksheet = workbook.add_worksheet("failures") worksheet.write(0, 0, "Validation Failures", heading) headers = [ ("Profile Selected:", profile_name), ("Report Generated At:", make_timestamp()), ("Directory Validated:", template_path), ("Checksum:", hash_directory(template_path)), ("Total Errors:", len(faildata) + len(COLLECTION_FAILURES)), ] for row, (header, value) in enumerate(headers, start=2): worksheet.write(row, 0, header, bold) worksheet.write(row, 1, value) worksheet.set_column(0, len(headers) - 1, 40) worksheet.set_column(len(headers), len(headers), 80) if COLLECTION_FAILURES: collection_failures_start = 2 + len(headers) + 2 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold) collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"] for col_num, col_name in enumerate(collection_failure_headers): worksheet.write(collection_failures_start + 1, col_num, col_name, bold) for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2): worksheet.write(row, 0, data["module"]) worksheet.write(row, 1, data["test"]) worksheet.write(row, 2, ",".join(data["fixtures"])) worksheet.write(row, 3, data["error"], code) # table header start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4 worksheet.write(start_error_table_row, 0, "Validation Failures", bold) for col_num, (col_name, _) in enumerate(report_columns): worksheet.write(start_error_table_row + 1, col_num, col_name, bold) # table content for row, data in enumerate(faildata.values(), start=start_error_table_row + 2): for col, key in enumerate(report.values()): if key == "file": paths = ( [data[key]] if isinstance(data[key], string_types) else data[key] ) contents = "\n".join(paths) worksheet.write(row, col, contents, normal) elif key == "raw_output": worksheet.write_string(row, col, data[key], code) else: worksheet.write(row, col, data[key], normal) workbook.close() def generate_html_report(outpath, profile_name, template_path, faildata): failures = [] for data in faildata.values(): failures.append( { "file_links": make_href(data["file"]), "test_id": data["test_file"], "error_message": data["message"], "raw_output": data["raw_output"], "requirements": docutils.core.publish_parts( writer_name="html", source=data["req_description"] )["body"], "resolution_steps": data["resolution_steps"], } ) pkg_dir = os.path.split(__file__)[0] j2_template_path = os.path.join(pkg_dir, "report.html.jinja2") with open(j2_template_path, "r") as f: report_template = jinja2.Template(f.read()) contents = report_template.render( num_failures=len(failures) + len(COLLECTION_FAILURES), profile_name=profile_name, template_dir=make_href(template_path), checksum=hash_directory(template_path), timestamp=make_timestamp(), failures=failures, collection_failures=COLLECTION_FAILURES, ) with open(os.path.join(outpath, "report.html"), "w") as f: f.write(contents) def pytest_addoption(parser): """ Add needed CLI arguments """ parser.addoption( "--template-directory", dest="template_dir", action="append", help="Directory which holds the templates for validation", ) parser.addoption( "--self-test", dest="self_test", action="store_true", help="Test the unit tests against their fixtured data", ) parser.addoption( "--validation-profile", dest="validation_profile", action="store", help="Runs all unmarked tests plus test with a matching marker", ) parser.addoption( "--validation-profile-name", dest="validation_profile_name", action="store", help="Friendly name of the validation profile used in reports", ) parser.addoption( "--report-format", dest="report_format", action="store", help="Format of output report (html, csv, excel)", ) parser.addoption( "--continue-on-failure", dest="continue_on_failure", action="store_true", help="Continue validation even when structural errors exist in input files", ) def pytest_configure(config): """ Ensure that we are receive either `--self-test` or `--template-dir=