2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
46 from collections import defaultdict
47 from itertools import chain
54 from more_itertools import partition
56 from six import string_types
60 __path__ = [os.path.dirname(os.path.abspath(__file__))]
62 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
64 RESOLUTION_STEPS_FILE = "resolution_steps.json"
65 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
68 ("Input File", "file"),
69 ("Test", "test_file"),
70 ("Requirements", "req_description"),
71 ("Resolution Steps", "resolution_steps"),
72 ("Error Message", "message"),
73 ("Raw Test Output", "raw_output"),
76 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
77 while preparing to validate the the input files. Some validations may not have been
78 executed. Please refer these issue to the VNF Validation Tool team.
81 COLLECTION_FAILURES = []
83 # Captures the results of every test run
87 def get_output_dir(config):
89 Retrieve the output directory for the reports and create it if necessary
90 :param config: pytest configuration
91 :return: output directory as string
93 output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
94 if not os.path.exists(output_dir):
95 os.makedirs(output_dir, exist_ok=True)
99 def extract_error_msg(rep):
101 If a custom error message was provided, then extract it otherwise
102 just show the pytest assert message
104 if rep.outcome != "failed":
107 full_msg = str(rep.longrepr.reprcrash.message)
109 "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
111 if match: # custom message was provided
112 # Extract everything between AssertionError and the start
113 # of the assert statement expansion in the pytest report
116 msg = str(rep.longrepr.reprcrash)
117 if "AssertionError:" in msg:
118 msg = msg.split("AssertionError:")[1]
119 except AttributeError:
127 Wraps the test case and result to extract necessary metadata for
131 RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
133 def __init__(self, item, outcome):
135 self.result = outcome.get_result()
136 self.files = [os.path.normpath(p) for p in self._get_files()]
137 self.error_message = self._get_error_message()
140 def requirement_ids(self):
142 Returns list of requirement IDs mapped to the test case.
144 :return: Returns a list of string requirement IDs the test was
145 annotated with ``validates`` otherwise returns and empty list
147 is_mapped = hasattr(self.item.function, "requirement_ids")
148 return self.item.function.requirement_ids if is_mapped else []
153 :return: Returns a set of pytest marker names for the test or an empty set
155 return set(m.name for m in self.item.iter_markers())
158 def is_base_test(self):
160 :return: Returns True if the test is annotated with a pytest marker called base
162 return "base" in self.markers
167 :return: True if the test failed
169 return self.outcome == "FAIL"
174 :return: Returns 'PASS', 'FAIL', or 'SKIP'
176 return self.RESULT_MAPPING[self.result.outcome]
181 :return: Name of the test case method
183 return self.item.function.__name__
186 def test_module(self):
188 :return: Name of the file containing the test case
190 return self.item.function.__module__.split(".")[-1]
193 def raw_output(self):
195 :return: Full output from pytest for the given test case
197 return str(self.result.longrepr)
199 def requirement_text(self, curr_reqs):
201 Creates a text summary for the requirement IDs mapped to the test case.
202 If no requirements are mapped, then it returns the empty string.
204 :param curr_reqs: mapping of requirement IDs to requirement metadata
205 loaded from the VNFRQTS projects needs.json output
206 :return: ID and text of the requirements mapped to the test case
209 "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
210 for r_id in self.requirement_ids
214 def requirements_metadata(self, curr_reqs):
216 Returns a list of dicts containing the following metadata for each
220 - text: Full text of the requirement
221 - keyword: MUST, MUST NOT, MAY, etc.
223 :param curr_reqs: mapping of requirement IDs to requirement metadata
224 loaded from the VNFRQTS projects needs.json output
225 :return: List of requirement metadata
228 for r_id in self.requirement_ids:
229 if r_id not in curr_reqs:
234 "text": curr_reqs[r_id]["description"],
235 "keyword": curr_reqs[r_id]["keyword"],
240 def resolution_steps(self, resolutions):
242 :param resolutions: Loaded from contents for resolution_steps.json
243 :return: Header and text for the resolution step associated with this
244 test case. Returns empty string if no resolutions are
248 "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
249 for entry in resolutions
250 if self._match(entry)
254 def _match(self, resolution_entry):
256 Returns True if the test result maps to the given entry in
260 self.test_case == resolution_entry["function"]
261 and self.test_module == resolution_entry["module"]
264 def _get_files(self):
266 Extracts the list of files passed into the test case.
267 :return: List of absolute paths to files
269 if "environment_pair" in self.item.fixturenames:
271 "{} environment pair".format(
272 self.item.funcargs["environment_pair"]["name"]
275 elif "heat_volume_pair" in self.item.fixturenames:
277 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
279 elif "heat_templates" in self.item.fixturenames:
280 return self.item.funcargs["heat_templates"]
281 elif "yaml_files" in self.item.fixturenames:
282 return self.item.funcargs["yaml_files"]
284 return [self.result.nodeid.split("[")[1][:-1]]
286 def _get_error_message(self):
288 :return: Error message or empty string if the test did not fail or error
291 return extract_error_msg(self.result)
296 # noinspection PyUnusedLocal
297 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
298 def pytest_runtest_makereport(item, call):
300 Captures the test results for later reporting. This will also halt testing
301 if a base failure is encountered (can be overridden with continue-on-failure)
304 if outcome.get_result().when != "call":
305 return # only capture results of test cases themselves
306 result = TestResult(item, outcome)
307 ALL_RESULTS.append(result)
309 not item.config.option.continue_on_failure
310 and result.is_base_test
313 msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
316 pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
319 def make_timestamp():
321 :return: String make_iso_timestamp in format:
322 2019-01-19 10:18:49.865000 Central Standard Time
324 timezone = time.tzname[time.localtime().tm_isdst]
325 return "{} {}".format(str(datetime.datetime.now()), timezone)
328 # noinspection PyUnusedLocal
329 def pytest_sessionstart(session):
331 COLLECTION_FAILURES.clear()
334 # noinspection PyUnusedLocal
335 def pytest_sessionfinish(session, exitstatus):
337 If not a self-test run, generate the output reports
339 if not session.config.option.template_dir:
342 if session.config.option.template_source:
343 template_source = session.config.option.template_source[0]
345 template_source = os.path.abspath(session.config.option.template_dir[0])
347 categories_selected = session.config.option.test_categories or ""
349 get_output_dir(session.config),
352 session.config.option.report_format,
356 # noinspection PyUnusedLocal
357 def pytest_collection_modifyitems(session, config, items):
359 Selects tests based on the categories requested. Tests without
360 categories will always be executed.
362 config.traceability_items = list(items) # save all items for traceability
363 if not config.option.self_test:
365 # checking if test belongs to a category
366 if hasattr(item.function, "categories"):
367 if config.option.test_categories:
368 test_categories = getattr(item.function, "categories")
369 passed_categories = config.option.test_categories
371 category in passed_categories for category in test_categories
375 reason="Test categories do not match all the passed categories"
381 reason="Test belongs to a category but no categories were passed"
385 key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
389 def make_href(paths):
391 Create an anchor tag to link to the file paths provided.
392 :param paths: string or list of file paths
393 :return: String of hrefs - one for each path, each seperated by a line
396 paths = [paths] if isinstance(paths, string_types) else paths
399 abs_path = os.path.abspath(p)
400 name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
402 "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
403 abs_path=abs_path, name=name
406 return "<br/>".join(links)
409 def load_resolutions_file():
411 :return: dict of data loaded from resolutions_steps.json
413 resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
414 if os.path.exists(resolution_steps):
415 with open(resolution_steps, "r") as f:
416 return json.loads(f.read())
419 def generate_report(outpath, template_path, categories, output_format="html"):
421 Generates the various output reports.
423 :param outpath: destination directory for all reports
424 :param template_path: directory containing the Heat templates validated
425 :param categories: Optional categories selected
426 :param output_format: One of "html", "excel", or "csv". Default is "html"
427 :raises: ValueError if requested output format is unknown
429 failures = [r for r in ALL_RESULTS if r.is_failed]
430 generate_failure_file(outpath)
431 output_format = output_format.lower().strip() if output_format else "html"
432 if output_format == "html":
433 generate_html_report(outpath, categories, template_path, failures)
434 elif output_format == "excel":
435 generate_excel_report(outpath, categories, template_path, failures)
436 elif output_format == "json":
437 generate_json(outpath, template_path, categories)
438 elif output_format == "csv":
439 generate_csv_report(outpath, categories, template_path, failures)
441 raise ValueError("Unsupported output format: " + output_format)
444 def write_json(data, path):
446 Pretty print data as JSON to the output path requested
448 :param data: Data structure to be converted to JSON
449 :param path: Where to write output
451 with open(path, "w") as f:
452 json.dump(data, f, indent=2)
455 def generate_failure_file(outpath):
457 Writes a summary of test failures to a file named failures.
458 This is for backwards compatibility only. The report.json offers a
459 more comprehensive output.
461 failure_path = os.path.join(outpath, "failures")
462 failures = [r for r in ALL_RESULTS if r.is_failed]
464 for i, fail in enumerate(failures):
466 "file": fail.files[0] if len(fail.files) == 1 else fail.files,
467 "vnfrqts": fail.requirement_ids,
468 "test": fail.test_case,
469 "test_file": fail.test_module,
470 "raw_output": fail.raw_output,
471 "message": fail.error_message,
473 write_json(data, failure_path)
476 def generate_csv_report(output_dir, categories, template_path, failures):
477 rows = [["Validation Failures"]]
479 ("Categories Selected:", categories),
480 ("Tool Version:", version.VERSION),
481 ("Report Generated At:", make_timestamp()),
482 ("Directory Validated:", template_path),
483 ("Checksum:", hash_directory(template_path)),
484 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
487 for header in headers:
491 if COLLECTION_FAILURES:
492 rows.append([COLLECTION_FAILURE_WARNING])
493 rows.append(["Validation File", "Test", "Fixtures", "Error"])
494 for failure in COLLECTION_FAILURES:
499 ";".join(failure["fixtures"]),
506 rows.append([col for col, _ in REPORT_COLUMNS])
508 reqs = load_current_requirements()
509 resolutions = load_resolutions_file()
512 for failure in failures:
515 "\n".join(failure.files),
517 failure.requirement_text(reqs),
518 failure.resolution_steps(resolutions),
519 failure.error_message,
524 output_path = os.path.join(output_dir, "report.csv")
525 with open(output_path, "w", newline="") as f:
526 writer = csv.writer(f)
531 def generate_excel_report(output_dir, categories, template_path, failures):
532 output_path = os.path.join(output_dir, "report.xlsx")
533 workbook = xlsxwriter.Workbook(output_path)
534 bold = workbook.add_format({"bold": True})
535 code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
536 normal = workbook.add_format({"text_wrap": True})
537 heading = workbook.add_format({"bold": True, "font_size": 18})
538 worksheet = workbook.add_worksheet("failures")
539 worksheet.write(0, 0, "Validation Failures", heading)
542 ("Categories Selected:", ",".join(categories)),
543 ("Tool Version:", version.VERSION),
544 ("Report Generated At:", make_timestamp()),
545 ("Directory Validated:", template_path),
546 ("Checksum:", hash_directory(template_path)),
547 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
549 for row, (header, value) in enumerate(headers, start=2):
550 worksheet.write(row, 0, header, bold)
551 worksheet.write(row, 1, value)
553 worksheet.set_column(0, len(headers) - 1, 40)
554 worksheet.set_column(len(headers), len(headers), 80)
556 if COLLECTION_FAILURES:
557 collection_failures_start = 2 + len(headers) + 2
558 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
559 collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
560 for col_num, col_name in enumerate(collection_failure_headers):
561 worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
562 for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
563 worksheet.write(row, 0, data["module"])
564 worksheet.write(row, 1, data["test"])
565 worksheet.write(row, 2, ",".join(data["fixtures"]))
566 worksheet.write(row, 3, data["error"], code)
569 start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
570 worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
571 for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
572 worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
574 reqs = load_current_requirements()
575 resolutions = load_resolutions_file()
578 for row, failure in enumerate(failures, start=start_error_table_row + 2):
579 worksheet.write(row, 0, "\n".join(failure.files), normal)
580 worksheet.write(row, 1, failure.test_module, normal)
581 worksheet.write(row, 2, failure.requirement_text(reqs), normal)
582 worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
583 worksheet.write(row, 4, failure.error_message, normal)
584 worksheet.write(row, 5, failure.raw_output, code)
589 def make_iso_timestamp():
591 Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
593 now = datetime.datetime.utcnow()
594 now.replace(tzinfo=datetime.timezone.utc)
595 return now.isoformat()
598 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
600 Examines all tests associated with a given requirement and determines
601 the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
603 * ERROR - At least one ERROR occurred
604 * PASS - At least one PASS and no FAIL or ERRORs.
605 * FAIL - At least one FAIL occurred (no ERRORs)
606 * SKIP - All tests were SKIP
609 :param r_id: Requirement ID to examing
610 :param collection_failures: Errors that occurred during test setup.
611 :param test_results: List of TestResult
612 :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
614 errors = any(r_id in f["requirements"] for f in collection_failures)
615 outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
616 return aggregate_results(errors, outcomes, r_id)
619 def aggregate_results(has_errors, outcomes, r_id=None):
621 Determines the aggregate result for the conditions provided. Assumes the
622 results have been filtered and collected for analysis.
624 :param has_errors: True if collection failures occurred for the tests being
626 :param outcomes: set of outcomes from the TestResults
627 :param r_id: Optional requirement ID if known
628 :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
629 (see aggregate_requirement_adherence for more detail)
636 elif "FAIL" in outcomes:
638 elif "PASS" in outcomes:
640 elif {"SKIP"} == outcomes:
644 "Unexpected error aggregating outcomes ({}) for requirement {}".format(
651 def aggregate_run_results(collection_failures, test_results):
653 Determines overall status of run based on all failures and results.
655 * 'ERROR' - At least one collection failure occurred during the run.
656 * 'FAIL' - Template failed at least one test
657 * 'PASS' - All tests executed properly and no failures were detected
659 :param collection_failures: failures occuring during test setup
660 :param test_results: list of all test executuion results
661 :return: one of 'ERROR', 'FAIL', or 'PASS'
663 if collection_failures:
665 elif any(r.is_failed for r in test_results):
671 def error(failure_or_result):
673 Extracts the error message from a collection failure or test result
674 :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
675 :return: Error message as string
677 if isinstance(failure_or_result, TestResult):
678 return failure_or_result.error_message
680 return failure_or_result["error"]
683 def req_ids(failure_or_result):
685 Extracts the requirement IDs from a collection failure or test result
686 :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
687 :return: set of Requirement IDs. If no requirements mapped, then an empty set
689 if isinstance(failure_or_result, TestResult):
690 return set(failure_or_result.requirement_ids)
692 return set(failure_or_result["requirements"])
695 def collect_errors(r_id, collection_failures, test_result):
697 Creates a list of error messages from the collection failures and
698 test results. If r_id is provided, then it collects the error messages
699 where the failure or test is associated with that requirement ID. If
700 r_id is None, then it collects all errors that occur on failures and
701 results that are not mapped to requirements
706 return r_id in req_ids(item)
708 return not req_ids(item)
710 errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
711 return [e for e in errors if e]
714 def generate_json(outpath, template_path, categories):
716 Creates a JSON summary of the entire test run.
718 reqs = load_current_requirements()
721 "template_directory": template_path,
722 "timestamp": make_iso_timestamp(),
723 "checksum": hash_directory(template_path),
724 "categories": categories,
725 "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
730 results = data["tests"]
731 for result in COLLECTION_FAILURES:
735 "test_module": result["module"],
736 "test_case": result["test"],
738 "error": result["error"],
739 "requirements": result["requirements"],
742 for result in ALL_RESULTS:
745 "files": result.files,
746 "test_module": result.test_module,
747 "test_case": result.test_case,
748 "result": result.outcome,
749 "error": result.error_message if result.is_failed else "",
750 "requirements": result.requirements_metadata(reqs),
754 requirements = data["requirements"]
755 for r_id, r_data in reqs.items():
756 result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
761 "text": r_data["description"],
762 "keyword": r_data["keyword"],
764 "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
767 # If there are tests that aren't mapped to a requirement, then we'll
768 # map them to a special entry so the results are coherent.
769 unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
770 has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
771 if unmapped_outcomes or has_errors:
775 "text": "Tests not mapped to requirements (see tests)",
776 "result": aggregate_results(has_errors, unmapped_outcomes),
777 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
781 report_path = os.path.join(outpath, "report.json")
782 write_json(data, report_path)
785 def generate_html_report(outpath, categories, template_path, failures):
786 reqs = load_current_requirements()
787 resolutions = load_resolutions_file()
789 for failure in failures:
792 "file_links": make_href(failure.files),
793 "test_id": failure.test_module,
794 "error_message": failure.error_message,
795 "raw_output": failure.raw_output,
796 "requirements": docutils.core.publish_parts(
797 writer_name="html", source=failure.requirement_text(reqs)
799 "resolution_steps": failure.resolution_steps(resolutions),
802 pkg_dir = os.path.split(__file__)[0]
803 j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
804 with open(j2_template_path, "r") as f:
805 report_template = jinja2.Template(f.read())
806 contents = report_template.render(
807 version=version.VERSION,
808 num_failures=len(failures) + len(COLLECTION_FAILURES),
809 categories=categories,
810 template_dir=make_href(template_path),
811 checksum=hash_directory(template_path),
812 timestamp=make_timestamp(),
814 collection_failures=COLLECTION_FAILURES,
816 with open(os.path.join(outpath, "report.html"), "w") as f:
820 def pytest_addoption(parser):
822 Add needed CLI arguments
825 "--template-directory",
828 help="Directory which holds the templates for validation",
833 dest="template_source",
835 help="Source Directory which holds the templates for validation",
842 help="Test the unit tests against their fixtured data",
847 dest="report_format",
849 help="Format of output report (html, csv, excel, json)",
853 "--continue-on-failure",
854 dest="continue_on_failure",
856 help="Continue validation even when structural errors exist in input files",
860 "--output-directory",
869 dest="test_categories",
871 help="optional category of test to execute",
875 def pytest_configure(config):
877 Ensure that we are receive either `--self-test` or
878 `--template-dir=<directory` as CLI arguments
880 if config.getoption("template_dir") and config.getoption("self_test"):
881 raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
883 config.getoption("template_dir")
884 or config.getoption("self_test")
885 or config.getoption("help")
887 raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
890 def pytest_generate_tests(metafunc):
892 If a unit test requires an argument named 'filename'
893 we generate a test for the filenames selected. Either
894 the files contained in `template_dir` or if `template_dir`
895 is not specified on the CLI, the fixtures associated with this
899 # noinspection PyBroadException
901 if "filename" in metafunc.fixturenames:
902 from .parametrizers import parametrize_filename
904 parametrize_filename(metafunc)
906 if "filenames" in metafunc.fixturenames:
907 from .parametrizers import parametrize_filenames
909 parametrize_filenames(metafunc)
911 if "template_dir" in metafunc.fixturenames:
912 from .parametrizers import parametrize_template_dir
914 parametrize_template_dir(metafunc)
916 if "environment_pair" in metafunc.fixturenames:
917 from .parametrizers import parametrize_environment_pair
919 parametrize_environment_pair(metafunc)
921 if "heat_volume_pair" in metafunc.fixturenames:
922 from .parametrizers import parametrize_heat_volume_pair
924 parametrize_heat_volume_pair(metafunc)
926 if "yaml_files" in metafunc.fixturenames:
927 from .parametrizers import parametrize_yaml_files
929 parametrize_yaml_files(metafunc)
931 if "env_files" in metafunc.fixturenames:
932 from .parametrizers import parametrize_environment_files
934 parametrize_environment_files(metafunc)
936 if "yaml_file" in metafunc.fixturenames:
937 from .parametrizers import parametrize_yaml_file
939 parametrize_yaml_file(metafunc)
941 if "env_file" in metafunc.fixturenames:
942 from .parametrizers import parametrize_environment_file
944 parametrize_environment_file(metafunc)
946 if "parsed_yaml_file" in metafunc.fixturenames:
947 from .parametrizers import parametrize_parsed_yaml_file
949 parametrize_parsed_yaml_file(metafunc)
951 if "parsed_environment_file" in metafunc.fixturenames:
952 from .parametrizers import parametrize_parsed_environment_file
954 parametrize_parsed_environment_file(metafunc)
956 if "heat_template" in metafunc.fixturenames:
957 from .parametrizers import parametrize_heat_template
959 parametrize_heat_template(metafunc)
961 if "heat_templates" in metafunc.fixturenames:
962 from .parametrizers import parametrize_heat_templates
964 parametrize_heat_templates(metafunc)
966 if "volume_template" in metafunc.fixturenames:
967 from .parametrizers import parametrize_volume_template
969 parametrize_volume_template(metafunc)
971 if "volume_templates" in metafunc.fixturenames:
972 from .parametrizers import parametrize_volume_templates
974 parametrize_volume_templates(metafunc)
976 if "template" in metafunc.fixturenames:
977 from .parametrizers import parametrize_template
979 parametrize_template(metafunc)
981 if "templates" in metafunc.fixturenames:
982 from .parametrizers import parametrize_templates
984 parametrize_templates(metafunc)
985 except Exception as e:
986 # If an error occurs in the collection phase, then it won't be logged as a
987 # normal test failure. This means that failures could occur, but not
988 # be seen on the report resulting in a false positive success message. These
989 # errors will be stored and reported separately on the report
990 COLLECTION_FAILURES.append(
992 "module": metafunc.module.__name__,
993 "test": metafunc.function.__name__,
994 "fixtures": metafunc.fixturenames,
995 "error": traceback.format_exc(),
996 "requirements": getattr(metafunc.function, "requirement_ids", []),
1002 def hash_directory(path):
1004 Create md5 hash using the contents of all files under ``path``
1005 :param path: string directory containing files
1006 :return: string MD5 hash code (hex)
1009 for dir_path, sub_dirs, filenames in os.walk(path):
1010 for filename in filenames:
1011 file_path = os.path.join(dir_path, filename)
1012 with open(file_path, "rb") as f:
1013 md5.update(f.read())
1014 return md5.hexdigest()
1017 def load_current_requirements():
1018 """Loads dict of current requirements or empty dict if file doesn't exist"""
1019 with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1021 version = data["current_version"]
1022 return data["versions"][version]["needs"]
1025 def select_heat_requirements(reqs):
1026 """Filters dict requirements to only those requirements pertaining to Heat"""
1027 return {k: v for k, v in reqs.items() if "Heat" in v["docname"]}
1030 # noinspection PyUnusedLocal
1031 def pytest_report_collectionfinish(config, startdir, items):
1032 """Generates a simple traceability report to output/traceability.csv"""
1033 traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1034 output_dir = os.path.split(traceability_path)[0]
1035 if not os.path.exists(output_dir):
1036 os.makedirs(output_dir)
1037 reqs = load_current_requirements()
1038 requirements = select_heat_requirements(reqs)
1039 unmapped, mapped = partition(
1040 lambda i: hasattr(i.function, "requirement_ids"), items
1043 req_to_test = defaultdict(set)
1044 mapping_errors = set()
1046 for req_id in item.function.requirement_ids:
1047 if req_id not in req_to_test:
1048 req_to_test[req_id].add(item)
1049 if req_id not in requirements:
1051 (req_id, item.function.__module__, item.function.__name__)
1054 mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1055 with open(mapping_error_path, "w", newline="") as f:
1056 writer = csv.writer(f)
1057 for err in mapping_errors:
1058 writer.writerow(err)
1060 with open(traceability_path, "w", newline="") as f:
1063 ("Requirement ID", "Requirement", "Section",
1064 "Keyword", "Validation Mode", "Is Testable",
1065 "Test Module", "Test Name"),
1067 for req_id, metadata in requirements.items():
1068 keyword = metadata["keyword"].upper()
1069 mode = metadata["validation_mode"].lower()
1070 testable = keyword in {"MUST", "MUST NOT"} and mode != "none"
1071 if req_to_test[req_id]:
1072 for item in req_to_test[req_id]:
1076 metadata["description"],
1077 metadata["section_name"],
1080 "TRUE" if testable else "FALSE",
1081 item.function.__module__,
1082 item.function.__name__,
1088 metadata["description"],
1089 metadata["section_name"],
1092 "TRUE" if testable else "FALSE",
1094 ""), # test function
1096 # now write out any test methods that weren't mapped to requirements
1097 unmapped_tests = {(item.function.__module__, item.function.__name__)
1098 for item in unmapped}
1099 for test_module, test_name in unmapped_tests:
1105 "static", # validation mode