2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
47 from preload.engine import PLUGIN_MGR, create_preloads
48 from tests.helpers import get_output_dir
51 from html import escape
53 from cgi import escape
54 from collections import defaultdict
61 from more_itertools import partition
63 from six import string_types
65 # noinspection PyUnresolvedReferences
69 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
71 __path__ = [os.path.dirname(os.path.abspath(__file__))]
73 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
75 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
77 "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
80 "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
84 ("Error #", "err_num"),
85 ("Input File", "file"),
86 ("Requirements", "req_description"),
87 ("Error Message", "message"),
88 ("Test", "test_file"),
91 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
92 while preparing to validate the the input files. Some validations may not have been
93 executed. Please refer these issue to the VNF Validation Tool team.
96 COLLECTION_FAILURES = []
98 # Captures the results of every test run
102 def extract_error_msg(rep):
104 If a custom error message was provided, then extract it otherwise
105 just show the pytest assert message
107 if rep.outcome != "failed":
110 full_msg = str(rep.longrepr.reprcrash.message)
112 "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
114 if match: # custom message was provided
115 # Extract everything between AssertionError and the start
116 # of the assert statement expansion in the pytest report
118 elif "AssertionError:" in full_msg:
119 msg = full_msg.split("AssertionError:")[1]
122 except AttributeError:
130 Wraps the test case and result to extract necessary metadata for
134 RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
136 def __init__(self, item, outcome):
138 self.result = outcome.get_result()
139 self.files = self._get_files()
140 self.error_message = self._get_error_message()
143 def requirement_ids(self):
145 Returns list of requirement IDs mapped to the test case.
147 :return: Returns a list of string requirement IDs the test was
148 annotated with ``validates`` otherwise returns and empty list
150 is_mapped = hasattr(self.item.function, "requirement_ids")
151 return self.item.function.requirement_ids if is_mapped else []
156 :return: Returns a set of pytest marker names for the test or an empty set
158 return set(m.name for m in self.item.iter_markers())
161 def is_base_test(self):
163 :return: Returns True if the test is annotated with a pytest marker called base
165 return "base" in self.markers
170 :return: True if the test failed
172 return self.outcome == "FAIL"
177 :return: Returns 'PASS', 'FAIL', or 'SKIP'
179 return self.RESULT_MAPPING[self.result.outcome]
184 :return: Name of the test case method
186 return self.item.function.__name__
189 def test_module(self):
191 :return: Name of the file containing the test case
193 return self.item.function.__module__.split(".")[-1]
198 :return: ID of the test (test_module + test_case)
200 return "{}::{}".format(self.test_module, self.test_case)
203 def raw_output(self):
205 :return: Full output from pytest for the given test case
207 return str(self.result.longrepr)
209 def requirement_text(self, curr_reqs):
211 Creates a text summary for the requirement IDs mapped to the test case.
212 If no requirements are mapped, then it returns the empty string.
214 :param curr_reqs: mapping of requirement IDs to requirement metadata
215 loaded from the VNFRQTS projects needs.json output
216 :return: ID and text of the requirements mapped to the test case
219 "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
220 for r_id in self.requirement_ids
225 def requirements_metadata(self, curr_reqs):
227 Returns a list of dicts containing the following metadata for each
231 - text: Full text of the requirement
232 - keyword: MUST, MUST NOT, MAY, etc.
234 :param curr_reqs: mapping of requirement IDs to requirement metadata
235 loaded from the VNFRQTS projects needs.json output
236 :return: List of requirement metadata
239 for r_id in self.requirement_ids:
240 if r_id not in curr_reqs:
245 "text": curr_reqs[r_id]["description"],
246 "keyword": curr_reqs[r_id]["keyword"],
251 def _get_files(self):
253 Extracts the list of files passed into the test case.
254 :return: List of absolute paths to files
256 if "environment_pair" in self.item.fixturenames:
258 "{} environment pair".format(
259 self.item.funcargs["environment_pair"]["name"]
262 elif "heat_volume_pair" in self.item.fixturenames:
264 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
266 elif "heat_templates" in self.item.fixturenames:
267 return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
268 elif "yaml_files" in self.item.fixturenames:
269 return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
271 parts = self.result.nodeid.split("[")
272 return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
274 def _get_error_message(self):
276 :return: Error message or empty string if the test did not fail or error
279 return extract_error_msg(self.result)
284 # noinspection PyUnusedLocal
285 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
286 def pytest_runtest_makereport(item, call):
288 Captures the test results for later reporting. This will also halt testing
289 if a base failure is encountered (can be overridden with continue-on-failure)
292 if outcome.get_result().when != "call":
293 return # only capture results of test cases themselves
294 result = TestResult(item, outcome)
296 not item.config.option.continue_on_failure
297 and result.is_base_test
300 msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
303 result.error_message = msg
304 ALL_RESULTS.append(result)
305 pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
307 ALL_RESULTS.append(result)
310 def make_timestamp():
312 :return: String make_iso_timestamp in format:
313 2019-01-19 10:18:49.865000 Central Standard Time
315 timezone = time.tzname[time.localtime().tm_isdst]
316 return "{} {}".format(str(datetime.datetime.now()), timezone)
319 # noinspection PyUnusedLocal
320 def pytest_sessionstart(session):
322 COLLECTION_FAILURES.clear()
325 # noinspection PyUnusedLocal
326 def pytest_sessionfinish(session, exitstatus):
328 If not a self-test run, generate the output reports
330 if not session.config.option.template_dir:
333 if session.config.option.template_source:
334 template_source = session.config.option.template_source[0]
336 template_source = os.path.abspath(session.config.option.template_dir[0])
338 categories_selected = session.config.option.test_categories or ""
340 get_output_dir(session.config),
343 session.config.option.report_format,
347 def pytest_terminal_summary(terminalreporter, exitstatus):
348 # Ensures all preload information and warnings appear after
351 create_preloads(terminalreporter.config, exitstatus)
353 print("Error creating preloads, skipping preload generation")
354 traceback.print_exc()
357 # noinspection PyUnusedLocal
358 def pytest_collection_modifyitems(session, config, items):
360 Selects tests based on the categories requested. Tests without
361 categories will always be executed.
363 config.traceability_items = list(items) # save all items for traceability
364 if not config.option.self_test:
366 passed_categories = set(config.option.test_categories or [])
367 all_of_categories = getattr(item.function, "all_categories", set())
368 any_of_categories = getattr(item.function, "any_categories", set())
369 if all_of_categories and not all_of_categories.issubset(passed_categories):
373 "Test categories do not match " "all the passed categories"
377 if any_of_categories and not passed_categories.intersection(
383 "Test categories do not match " "any the passed categories"
389 key=lambda x: (0, x.name)
390 if "base" in set(m.name for m in x.iter_markers())
395 def make_href(paths, base_dir=None):
397 Create an anchor tag to link to the file paths provided.
398 :param paths: string or list of file paths
399 :param base_dir: If specified this is pre-pended to each path
400 :return: String of hrefs - one for each path, each seperated by a line
403 paths = [paths] if isinstance(paths, string_types) else paths
405 paths = [os.path.join(base_dir, p) for p in paths]
408 abs_path = os.path.abspath(p)
409 name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
411 "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
412 abs_path=abs_path, name=name
415 return "<br/>".join(links)
418 def generate_report(outpath, template_path, categories, output_format="html"):
420 Generates the various output reports.
422 :param outpath: destination directory for all reports
423 :param template_path: directory containing the Heat templates validated
424 :param categories: Optional categories selected
425 :param output_format: One of "html", "excel", or "csv". Default is "html"
426 :raises: ValueError if requested output format is unknown
428 failures = [r for r in ALL_RESULTS if r.is_failed]
429 generate_failure_file(outpath)
430 output_format = output_format.lower().strip() if output_format else "html"
431 generate_json(outpath, template_path, categories)
432 if output_format == "html":
433 generate_html_report(outpath, categories, template_path, failures)
434 elif output_format == "excel":
435 generate_excel_report(outpath, categories, template_path, failures)
436 elif output_format == "json":
438 elif output_format == "csv":
439 generate_csv_report(outpath, categories, template_path, failures)
441 raise ValueError("Unsupported output format: " + output_format)
444 def write_json(data, path):
446 Pretty print data as JSON to the output path requested
448 :param data: Data structure to be converted to JSON
449 :param path: Where to write output
451 with open(path, "w") as f:
452 json.dump(data, f, indent=2)
455 def generate_failure_file(outpath):
457 Writes a summary of test failures to a file named failures.
458 This is for backwards compatibility only. The report.json offers a
459 more comprehensive output.
461 failure_path = os.path.join(outpath, "failures")
462 failures = [r for r in ALL_RESULTS if r.is_failed]
464 for i, fail in enumerate(failures):
466 "file": fail.files[0] if len(fail.files) == 1 else fail.files,
467 "vnfrqts": fail.requirement_ids,
468 "test": fail.test_case,
469 "test_file": fail.test_module,
470 "raw_output": fail.raw_output,
471 "message": fail.error_message,
473 write_json(data, failure_path)
476 def generate_csv_report(output_dir, categories, template_path, failures):
477 rows = [["Validation Failures"]]
479 ("Categories Selected:", categories),
480 ("Tool Version:", version.VERSION),
481 ("Report Generated At:", make_timestamp()),
482 ("Directory Validated:", template_path),
483 ("Checksum:", hash_directory(template_path)),
484 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
487 for header in headers:
491 if COLLECTION_FAILURES:
492 rows.append([COLLECTION_FAILURE_WARNING])
493 rows.append(["Validation File", "Test", "Fixtures", "Error"])
494 for failure in COLLECTION_FAILURES:
499 ";".join(failure["fixtures"]),
506 rows.append([col for col, _ in REPORT_COLUMNS])
508 reqs = load_current_requirements()
511 for i, failure in enumerate(failures, start=1):
515 "\n".join(failure.files),
516 failure.requirement_text(reqs),
517 failure.error_message,
522 output_path = os.path.join(output_dir, "report.csv")
523 with open(output_path, "w", newline="") as f:
524 writer = csv.writer(f)
529 def generate_excel_report(output_dir, categories, template_path, failures):
530 output_path = os.path.join(output_dir, "report.xlsx")
531 workbook = xlsxwriter.Workbook(output_path)
532 bold = workbook.add_format({"bold": True, "align": "top"})
533 code = workbook.add_format(
534 {"font_name": "Courier", "text_wrap": True, "align": "top"}
536 normal = workbook.add_format({"text_wrap": True, "align": "top"})
537 heading = workbook.add_format({"bold": True, "font_size": 18})
538 worksheet = workbook.add_worksheet("failures")
539 worksheet.write(0, 0, "Validation Failures", heading)
542 ("Categories Selected:", ",".join(categories)),
543 ("Tool Version:", version.VERSION),
544 ("Report Generated At:", make_timestamp()),
545 ("Directory Validated:", template_path),
546 ("Checksum:", hash_directory(template_path)),
547 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
549 for row, (header, value) in enumerate(headers, start=2):
550 worksheet.write(row, 0, header, bold)
551 worksheet.write(row, 1, value)
553 worksheet.set_column(0, len(headers) - 1, 40)
554 worksheet.set_column(len(headers), len(headers), 80)
556 if COLLECTION_FAILURES:
557 collection_failures_start = 2 + len(headers) + 2
558 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
559 collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
560 for col_num, col_name in enumerate(collection_failure_headers):
561 worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
562 for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
563 worksheet.write(row, 0, data["module"])
564 worksheet.write(row, 1, data["test"])
565 worksheet.write(row, 2, ",".join(data["fixtures"]))
566 worksheet.write(row, 3, data["error"], code)
569 start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
570 worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
571 for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
572 worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
574 reqs = load_current_requirements()
577 for col, width in enumerate((20, 30, 60, 60, 40)):
578 worksheet.set_column(col, col, width)
580 for row, failure in enumerate(failures, start=start_error_table_row + 2):
581 worksheet.write(row, 0, str(err_num), normal)
582 worksheet.write(row, 1, "\n".join(failure.files), normal)
583 worksheet.write(row, 2, failure.requirement_text(reqs), normal)
584 worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
585 worksheet.write(row, 4, failure.test_id, normal)
587 worksheet.autofilter(
588 start_error_table_row + 1,
590 start_error_table_row + 1 + err_num,
591 len(REPORT_COLUMNS) - 1,
596 def make_iso_timestamp():
598 Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
600 now = datetime.datetime.utcnow()
601 now.replace(tzinfo=datetime.timezone.utc)
602 return now.isoformat()
605 def aggregate_results(outcomes, r_id=None):
607 Determines the aggregate result for the conditions provided. Assumes the
608 results have been filtered and collected for analysis.
610 :param outcomes: set of outcomes from the TestResults
611 :param r_id: Optional requirement ID if known
612 :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
613 (see aggregate_requirement_adherence for more detail)
617 elif "ERROR" in outcomes:
619 elif "FAIL" in outcomes:
621 elif "PASS" in outcomes:
623 elif {"SKIP"} == outcomes:
627 "Unexpected error aggregating outcomes ({}) for requirement {}".format(
634 def aggregate_run_results(collection_failures, test_results):
636 Determines overall status of run based on all failures and results.
638 * 'ERROR' - At least one collection failure occurred during the run.
639 * 'FAIL' - Template failed at least one test
640 * 'PASS' - All tests executed properly and no failures were detected
642 :param collection_failures: failures occuring during test setup
643 :param test_results: list of all test executuion results
644 :return: one of 'ERROR', 'FAIL', or 'PASS'
646 if collection_failures:
648 elif any(r.is_failed for r in test_results):
654 def relative_paths(base_dir, paths):
655 return [os.path.relpath(p, base_dir) for p in paths if p != ""]
658 # noinspection PyTypeChecker
659 def generate_json(outpath, template_path, categories):
661 Creates a JSON summary of the entire test run.
663 reqs = load_current_requirements()
666 "template_directory": os.path.splitdrive(template_path)[1].replace(
669 "timestamp": make_iso_timestamp(),
670 "checksum": hash_directory(template_path),
671 "categories": categories,
672 "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
677 results = data["tests"]
678 for result in COLLECTION_FAILURES:
682 "test_module": result["module"],
683 "test_case": result["test"],
685 "error": result["error"],
686 "requirements": result["requirements"],
689 for result in ALL_RESULTS:
692 "files": relative_paths(template_path, result.files),
693 "test_module": result.test_module,
694 "test_case": result.test_case,
695 "result": result.outcome,
696 "error": result.error_message if result.is_failed else "",
697 "requirements": result.requirements_metadata(reqs),
701 # Build a mapping of requirement ID to the results
702 r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
703 for test_result in results:
704 test_reqs = test_result["requirements"]
706 [r["id"] if isinstance(r, dict) else r for r in test_reqs]
711 item = r_id_results[r_id]
712 item["outcomes"].add(test_result["result"])
713 if test_result["error"]:
714 item["errors"].add(test_result["error"])
716 requirements = data["requirements"]
717 for r_id, r_data in reqs.items():
721 "text": r_data["description"],
722 "keyword": r_data["keyword"],
723 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
724 "errors": list(r_id_results[r_id]["errors"]),
728 if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
732 "text": "Tests not mapped to requirements (see tests)",
733 "result": aggregate_results(r_id_results[""]["outcomes"]),
734 "errors": list(r_id_results[""]["errors"]),
738 report_path = os.path.join(outpath, "report.json")
739 write_json(data, report_path)
742 def generate_html_report(outpath, categories, template_path, failures):
743 reqs = load_current_requirements()
745 for failure in failures:
748 "file_links": make_href(failure.files, template_path),
749 "test_id": failure.test_id,
750 "error_message": escape(failure.error_message).replace(
753 "raw_output": escape(failure.raw_output),
754 "requirements": docutils.core.publish_parts(
755 writer_name="html", source=failure.requirement_text(reqs)
759 pkg_dir = os.path.split(__file__)[0]
760 j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
761 with open(j2_template_path, "r") as f:
762 report_template = jinja2.Template(f.read())
763 contents = report_template.render(
764 version=version.VERSION,
765 num_failures=len(failures) + len(COLLECTION_FAILURES),
766 categories=categories,
767 template_dir=make_href(template_path),
768 checksum=hash_directory(template_path),
769 timestamp=make_timestamp(),
771 collection_failures=COLLECTION_FAILURES,
773 with open(os.path.join(outpath, "report.html"), "w") as f:
777 def pytest_addoption(parser):
779 Add needed CLI arguments
782 "--template-directory",
785 help="Directory which holds the templates for validation",
790 dest="template_source",
792 help="Source Directory which holds the templates for validation",
799 help="Test the unit tests against their fixtured data",
804 dest="report_format",
806 help="Format of output report (html, csv, excel, json)",
810 "--continue-on-failure",
811 dest="continue_on_failure",
813 help="Continue validation even when structural errors exist in input files",
817 "--output-directory",
826 dest="test_categories",
828 help="optional category of test to execute",
833 dest="preload_formats",
836 "Preload format to create (multiple allowed). If not provided "
837 "then all available formats will be created: {}"
838 ).format(", ".join(g.format_name() for g in PLUGIN_MGR.preload_generators)),
842 "--preload-source-type",
843 dest="preload_source_type",
847 "Preload source type to create (multiple allowed): {}"
848 ).format(", ".join(s.get_identifier() for s in PLUGIN_MGR.preload_sources)),
853 dest="preload_source",
855 help="File or directory containing the source dat for the preloads",
859 def pytest_configure(config):
861 Ensure that we are receive either `--self-test` or
862 `--template-dir=<directory` as CLI arguments
864 if config.getoption("template_dir") and config.getoption("self_test"):
865 raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
867 config.getoption("template_dir")
868 or config.getoption("self_test")
869 or config.getoption("help")
871 raise Exception('One of "--template-directory" or'
872 ' "--self-test" must be specified')
875 def pytest_generate_tests(metafunc):
877 If a unit test requires an argument named 'filename'
878 we generate a test for the filenames selected. Either
879 the files contained in `template_dir` or if `template_dir`
880 is not specified on the CLI, the fixtures associated with this
884 # noinspection PyBroadException
886 if "filename" in metafunc.fixturenames:
887 from .parametrizers import parametrize_filename
889 parametrize_filename(metafunc)
891 if "filenames" in metafunc.fixturenames:
892 from .parametrizers import parametrize_filenames
894 parametrize_filenames(metafunc)
896 if "template_dir" in metafunc.fixturenames:
897 from .parametrizers import parametrize_template_dir
899 parametrize_template_dir(metafunc)
901 if "environment_pair" in metafunc.fixturenames:
902 from .parametrizers import parametrize_environment_pair
904 parametrize_environment_pair(metafunc)
906 if "heat_volume_pair" in metafunc.fixturenames:
907 from .parametrizers import parametrize_heat_volume_pair
909 parametrize_heat_volume_pair(metafunc)
911 if "yaml_files" in metafunc.fixturenames:
912 from .parametrizers import parametrize_yaml_files
914 parametrize_yaml_files(metafunc)
916 if "env_files" in metafunc.fixturenames:
917 from .parametrizers import parametrize_environment_files
919 parametrize_environment_files(metafunc)
921 if "yaml_file" in metafunc.fixturenames:
922 from .parametrizers import parametrize_yaml_file
924 parametrize_yaml_file(metafunc)
926 if "env_file" in metafunc.fixturenames:
927 from .parametrizers import parametrize_environment_file
929 parametrize_environment_file(metafunc)
931 if "parsed_yaml_file" in metafunc.fixturenames:
932 from .parametrizers import parametrize_parsed_yaml_file
934 parametrize_parsed_yaml_file(metafunc)
936 if "parsed_environment_file" in metafunc.fixturenames:
937 from .parametrizers import parametrize_parsed_environment_file
939 parametrize_parsed_environment_file(metafunc)
941 if "heat_template" in metafunc.fixturenames:
942 from .parametrizers import parametrize_heat_template
944 parametrize_heat_template(metafunc)
946 if "heat_templates" in metafunc.fixturenames:
947 from .parametrizers import parametrize_heat_templates
949 parametrize_heat_templates(metafunc)
951 if "volume_template" in metafunc.fixturenames:
952 from .parametrizers import parametrize_volume_template
954 parametrize_volume_template(metafunc)
956 if "volume_templates" in metafunc.fixturenames:
957 from .parametrizers import parametrize_volume_templates
959 parametrize_volume_templates(metafunc)
961 if "template" in metafunc.fixturenames:
962 from .parametrizers import parametrize_template
964 parametrize_template(metafunc)
966 if "templates" in metafunc.fixturenames:
967 from .parametrizers import parametrize_templates
969 parametrize_templates(metafunc)
970 except Exception as e:
971 # If an error occurs in the collection phase, then it won't be logged as a
972 # normal test failure. This means that failures could occur, but not
973 # be seen on the report resulting in a false positive success message. These
974 # errors will be stored and reported separately on the report
975 COLLECTION_FAILURES.append(
977 "module": metafunc.module.__name__,
978 "test": metafunc.function.__name__,
979 "fixtures": metafunc.fixturenames,
980 "error": traceback.format_exc(),
981 "requirements": getattr(metafunc.function, "requirement_ids", []),
987 def hash_directory(path):
989 Create md5 hash using the contents of all files under ``path``
990 :param path: string directory containing files
991 :return: string MD5 hash code (hex)
993 md5 = hashlib.md5() # nosec
994 for dir_path, sub_dirs, filenames in os.walk(path):
995 for filename in filenames:
996 file_path = os.path.join(dir_path, filename)
997 with open(file_path, "rb") as f:
999 return md5.hexdigest()
1002 def load_current_requirements():
1003 """Loads dict of current requirements or empty dict if file doesn't exist"""
1004 with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1006 version = data["current_version"]
1007 return data["versions"][version]["needs"]
1010 def select_heat_requirements(reqs):
1011 """Filters dict requirements to only those requirements pertaining to Heat"""
1012 return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1015 def is_testable(reqs):
1016 """Filters dict requirements to only those which are testable"""
1017 for key, values in reqs.items():
1018 if ("MUST" in values.get("keyword", "").upper()) and (
1019 "none" not in values.get("validation_mode", "").lower()
1021 reqs[key]["testable"] = True
1023 reqs[key]["testable"] = False
1027 def build_rst_json(reqs):
1028 """Takes requirements and returns list of only Heat requirements"""
1029 for key, values in list(reqs.items()):
1030 if values["testable"]:
1031 # Creates links in RST format to requirements and test cases
1032 if values["test_case"]:
1033 mod = values["test_case"].split(".")[-1]
1034 val = TEST_SCRIPT_SITE + mod + ".py"
1035 rst_value = "`" + mod + " <" + val + ">`_"
1041 + values["docname"].replace(" ", "%20")
1046 reqs[key].update({"full_title": title, "test_case": rst_value})
1053 + values["docname"].replace(" ", "%20")
1060 "full_title": title,
1061 "test_case": "No test for requirement",
1062 "validated_by": "static",
1070 def generate_rst_table(output_dir, data):
1071 """Generate a formatted csv to be used in RST"""
1072 rst_path = os.path.join(output_dir, "rst.csv")
1073 with open(rst_path, "w", newline="") as f:
1075 out.writerow(("Requirement ID", "Test Module", "Test Name"))
1076 for req_id, metadata in data.items():
1079 metadata["full_title"],
1080 metadata["test_case"],
1081 metadata["validated_by"],
1086 # noinspection PyUnusedLocal
1087 def pytest_report_collectionfinish(config, startdir, items):
1088 """Generates a simple traceability report to output/traceability.csv"""
1089 traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1090 output_dir = os.path.split(traceability_path)[0]
1091 if not os.path.exists(output_dir):
1092 os.makedirs(output_dir)
1093 reqs = load_current_requirements()
1094 requirements = select_heat_requirements(reqs)
1095 testable_requirements = is_testable(requirements)
1096 unmapped, mapped = partition(
1097 lambda i: hasattr(i.function, "requirement_ids"), items
1100 req_to_test = defaultdict(set)
1101 mapping_errors = set()
1103 for req_id in item.function.requirement_ids:
1104 if req_id not in req_to_test:
1105 req_to_test[req_id].add(item)
1106 if req_id in requirements:
1107 reqs[req_id].update(
1109 "test_case": item.function.__module__,
1110 "validated_by": item.function.__name__,
1113 if req_id not in requirements:
1115 (req_id, item.function.__module__, item.function.__name__)
1118 mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1119 with open(mapping_error_path, "w", newline="") as f:
1120 writer = csv.writer(f)
1121 for err in mapping_errors:
1122 writer.writerow(err)
1124 with open(traceability_path, "w", newline="") as f:
1138 for req_id, metadata in testable_requirements.items():
1139 if req_to_test[req_id]:
1140 for item in req_to_test[req_id]:
1144 metadata["description"],
1145 metadata["section_name"],
1146 metadata["keyword"],
1147 metadata["validation_mode"],
1148 metadata["testable"],
1149 item.function.__module__,
1150 item.function.__name__,
1157 metadata["description"],
1158 metadata["section_name"],
1159 metadata["keyword"],
1160 metadata["validation_mode"],
1161 metadata["testable"],
1166 # now write out any test methods that weren't mapped to requirements
1168 (item.function.__module__, item.function.__name__) for item in unmapped
1170 for test_module, test_name in unmapped_tests:
1177 "static", # validation mode
1184 generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))