2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
46 from collections import defaultdict
47 from itertools import chain
54 from more_itertools import partition
56 from six import string_types
61 logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.ERROR)
63 __path__ = [os.path.dirname(os.path.abspath(__file__))]
65 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
67 RESOLUTION_STEPS_FILE = "resolution_steps.json"
68 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
70 "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
73 "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
77 ("Input File", "file"),
78 ("Test", "test_file"),
79 ("Requirements", "req_description"),
80 ("Resolution Steps", "resolution_steps"),
81 ("Error Message", "message"),
82 ("Raw Test Output", "raw_output"),
85 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
86 while preparing to validate the the input files. Some validations may not have been
87 executed. Please refer these issue to the VNF Validation Tool team.
90 COLLECTION_FAILURES = []
92 # Captures the results of every test run
96 def get_output_dir(config):
98 Retrieve the output directory for the reports and create it if necessary
99 :param config: pytest configuration
100 :return: output directory as string
102 output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
103 if not os.path.exists(output_dir):
104 os.makedirs(output_dir, exist_ok=True)
108 def extract_error_msg(rep):
110 If a custom error message was provided, then extract it otherwise
111 just show the pytest assert message
113 if rep.outcome != "failed":
116 full_msg = str(rep.longrepr.reprcrash.message)
118 "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
120 if match: # custom message was provided
121 # Extract everything between AssertionError and the start
122 # of the assert statement expansion in the pytest report
125 msg = str(rep.longrepr.reprcrash)
126 if "AssertionError:" in msg:
127 msg = msg.split("AssertionError:")[1]
128 except AttributeError:
136 Wraps the test case and result to extract necessary metadata for
140 RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
142 def __init__(self, item, outcome):
144 self.result = outcome.get_result()
145 self.files = [os.path.normpath(p) for p in self._get_files()]
146 self.error_message = self._get_error_message()
149 def requirement_ids(self):
151 Returns list of requirement IDs mapped to the test case.
153 :return: Returns a list of string requirement IDs the test was
154 annotated with ``validates`` otherwise returns and empty list
156 is_mapped = hasattr(self.item.function, "requirement_ids")
157 return self.item.function.requirement_ids if is_mapped else []
162 :return: Returns a set of pytest marker names for the test or an empty set
164 return set(m.name for m in self.item.iter_markers())
167 def is_base_test(self):
169 :return: Returns True if the test is annotated with a pytest marker called base
171 return "base" in self.markers
176 :return: True if the test failed
178 return self.outcome == "FAIL"
183 :return: Returns 'PASS', 'FAIL', or 'SKIP'
185 return self.RESULT_MAPPING[self.result.outcome]
190 :return: Name of the test case method
192 return self.item.function.__name__
195 def test_module(self):
197 :return: Name of the file containing the test case
199 return self.item.function.__module__.split(".")[-1]
204 :return: ID of the test (test_module + test_case)
206 return "{}::{}".format(self.test_module, self.test_case)
209 def raw_output(self):
211 :return: Full output from pytest for the given test case
213 return str(self.result.longrepr)
215 def requirement_text(self, curr_reqs):
217 Creates a text summary for the requirement IDs mapped to the test case.
218 If no requirements are mapped, then it returns the empty string.
220 :param curr_reqs: mapping of requirement IDs to requirement metadata
221 loaded from the VNFRQTS projects needs.json output
222 :return: ID and text of the requirements mapped to the test case
225 "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
226 for r_id in self.requirement_ids
231 def requirements_metadata(self, curr_reqs):
233 Returns a list of dicts containing the following metadata for each
237 - text: Full text of the requirement
238 - keyword: MUST, MUST NOT, MAY, etc.
240 :param curr_reqs: mapping of requirement IDs to requirement metadata
241 loaded from the VNFRQTS projects needs.json output
242 :return: List of requirement metadata
245 for r_id in self.requirement_ids:
246 if r_id not in curr_reqs:
251 "text": curr_reqs[r_id]["description"],
252 "keyword": curr_reqs[r_id]["keyword"],
257 def resolution_steps(self, resolutions):
259 :param resolutions: Loaded from contents for resolution_steps.json
260 :return: Header and text for the resolution step associated with this
261 test case. Returns empty string if no resolutions are
265 "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
266 for entry in resolutions
267 if self._match(entry)
271 def _match(self, resolution_entry):
273 Returns True if the test result maps to the given entry in
277 self.test_case == resolution_entry["function"]
278 and self.test_module == resolution_entry["module"]
281 def _get_files(self):
283 Extracts the list of files passed into the test case.
284 :return: List of absolute paths to files
286 if "environment_pair" in self.item.fixturenames:
288 "{} environment pair".format(
289 self.item.funcargs["environment_pair"]["name"]
292 elif "heat_volume_pair" in self.item.fixturenames:
294 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
296 elif "heat_templates" in self.item.fixturenames:
297 return self.item.funcargs["heat_templates"]
298 elif "yaml_files" in self.item.fixturenames:
299 return self.item.funcargs["yaml_files"]
301 parts = self.result.nodeid.split("[")
302 return [""] if len(parts) == 1 else [parts[1][:-1]]
304 def _get_error_message(self):
306 :return: Error message or empty string if the test did not fail or error
309 return extract_error_msg(self.result)
314 # noinspection PyUnusedLocal
315 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
316 def pytest_runtest_makereport(item, call):
318 Captures the test results for later reporting. This will also halt testing
319 if a base failure is encountered (can be overridden with continue-on-failure)
322 if outcome.get_result().when != "call":
323 return # only capture results of test cases themselves
324 result = TestResult(item, outcome)
326 not item.config.option.continue_on_failure
327 and result.is_base_test
330 msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
333 result.error_message = msg
334 ALL_RESULTS.append(result)
335 pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
337 ALL_RESULTS.append(result)
340 def make_timestamp():
342 :return: String make_iso_timestamp in format:
343 2019-01-19 10:18:49.865000 Central Standard Time
345 timezone = time.tzname[time.localtime().tm_isdst]
346 return "{} {}".format(str(datetime.datetime.now()), timezone)
349 # noinspection PyUnusedLocal
350 def pytest_sessionstart(session):
352 COLLECTION_FAILURES.clear()
355 # noinspection PyUnusedLocal
356 def pytest_sessionfinish(session, exitstatus):
358 If not a self-test run, generate the output reports
360 if not session.config.option.template_dir:
363 if session.config.option.template_source:
364 template_source = session.config.option.template_source[0]
366 template_source = os.path.abspath(session.config.option.template_dir[0])
368 categories_selected = session.config.option.test_categories or ""
370 get_output_dir(session.config),
373 session.config.option.report_format,
377 # noinspection PyUnusedLocal
378 def pytest_collection_modifyitems(session, config, items):
380 Selects tests based on the categories requested. Tests without
381 categories will always be executed.
383 config.traceability_items = list(items) # save all items for traceability
384 if not config.option.self_test:
386 # checking if test belongs to a category
387 if hasattr(item.function, "categories"):
388 if config.option.test_categories:
389 test_categories = getattr(item.function, "categories")
390 passed_categories = config.option.test_categories
392 category in passed_categories for category in test_categories
396 reason="Test categories do not match all the passed categories"
402 reason="Test belongs to a category but no categories were passed"
406 key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
410 def make_href(paths):
412 Create an anchor tag to link to the file paths provided.
413 :param paths: string or list of file paths
414 :return: String of hrefs - one for each path, each seperated by a line
417 paths = [paths] if isinstance(paths, string_types) else paths
420 abs_path = os.path.abspath(p)
421 name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
423 "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
424 abs_path=abs_path, name=name
427 return "<br/>".join(links)
430 def load_resolutions_file():
432 :return: dict of data loaded from resolutions_steps.json
434 resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
435 if os.path.exists(resolution_steps):
436 with open(resolution_steps, "r") as f:
437 return json.loads(f.read())
440 def generate_report(outpath, template_path, categories, output_format="html"):
442 Generates the various output reports.
444 :param outpath: destination directory for all reports
445 :param template_path: directory containing the Heat templates validated
446 :param categories: Optional categories selected
447 :param output_format: One of "html", "excel", or "csv". Default is "html"
448 :raises: ValueError if requested output format is unknown
450 failures = [r for r in ALL_RESULTS if r.is_failed]
451 generate_failure_file(outpath)
452 output_format = output_format.lower().strip() if output_format else "html"
453 generate_json(outpath, template_path, categories)
454 if output_format == "html":
455 generate_html_report(outpath, categories, template_path, failures)
456 elif output_format == "excel":
457 generate_excel_report(outpath, categories, template_path, failures)
458 elif output_format == "json":
460 elif output_format == "csv":
461 generate_csv_report(outpath, categories, template_path, failures)
463 raise ValueError("Unsupported output format: " + output_format)
466 def write_json(data, path):
468 Pretty print data as JSON to the output path requested
470 :param data: Data structure to be converted to JSON
471 :param path: Where to write output
473 with open(path, "w") as f:
474 json.dump(data, f, indent=2)
477 def generate_failure_file(outpath):
479 Writes a summary of test failures to a file named failures.
480 This is for backwards compatibility only. The report.json offers a
481 more comprehensive output.
483 failure_path = os.path.join(outpath, "failures")
484 failures = [r for r in ALL_RESULTS if r.is_failed]
486 for i, fail in enumerate(failures):
488 "file": fail.files[0] if len(fail.files) == 1 else fail.files,
489 "vnfrqts": fail.requirement_ids,
490 "test": fail.test_case,
491 "test_file": fail.test_module,
492 "raw_output": fail.raw_output,
493 "message": fail.error_message,
495 write_json(data, failure_path)
498 def generate_csv_report(output_dir, categories, template_path, failures):
499 rows = [["Validation Failures"]]
501 ("Categories Selected:", categories),
502 ("Tool Version:", version.VERSION),
503 ("Report Generated At:", make_timestamp()),
504 ("Directory Validated:", template_path),
505 ("Checksum:", hash_directory(template_path)),
506 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
509 for header in headers:
513 if COLLECTION_FAILURES:
514 rows.append([COLLECTION_FAILURE_WARNING])
515 rows.append(["Validation File", "Test", "Fixtures", "Error"])
516 for failure in COLLECTION_FAILURES:
521 ";".join(failure["fixtures"]),
528 rows.append([col for col, _ in REPORT_COLUMNS])
530 reqs = load_current_requirements()
531 resolutions = load_resolutions_file()
534 for failure in failures:
537 "\n".join(failure.files),
539 failure.requirement_text(reqs),
540 failure.resolution_steps(resolutions),
541 failure.error_message,
546 output_path = os.path.join(output_dir, "report.csv")
547 with open(output_path, "w", newline="") as f:
548 writer = csv.writer(f)
553 def generate_excel_report(output_dir, categories, template_path, failures):
554 output_path = os.path.join(output_dir, "report.xlsx")
555 workbook = xlsxwriter.Workbook(output_path)
556 bold = workbook.add_format({"bold": True})
557 code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
558 normal = workbook.add_format({"text_wrap": True})
559 heading = workbook.add_format({"bold": True, "font_size": 18})
560 worksheet = workbook.add_worksheet("failures")
561 worksheet.write(0, 0, "Validation Failures", heading)
564 ("Categories Selected:", ",".join(categories)),
565 ("Tool Version:", version.VERSION),
566 ("Report Generated At:", make_timestamp()),
567 ("Directory Validated:", template_path),
568 ("Checksum:", hash_directory(template_path)),
569 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
571 for row, (header, value) in enumerate(headers, start=2):
572 worksheet.write(row, 0, header, bold)
573 worksheet.write(row, 1, value)
575 worksheet.set_column(0, len(headers) - 1, 40)
576 worksheet.set_column(len(headers), len(headers), 80)
578 if COLLECTION_FAILURES:
579 collection_failures_start = 2 + len(headers) + 2
580 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
581 collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
582 for col_num, col_name in enumerate(collection_failure_headers):
583 worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
584 for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
585 worksheet.write(row, 0, data["module"])
586 worksheet.write(row, 1, data["test"])
587 worksheet.write(row, 2, ",".join(data["fixtures"]))
588 worksheet.write(row, 3, data["error"], code)
591 start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
592 worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
593 for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
594 worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
596 reqs = load_current_requirements()
597 resolutions = load_resolutions_file()
600 for row, failure in enumerate(failures, start=start_error_table_row + 2):
601 worksheet.write(row, 0, "\n".join(failure.files), normal)
602 worksheet.write(row, 1, failure.test_id, normal)
603 worksheet.write(row, 2, failure.requirement_text(reqs), normal)
604 worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
605 worksheet.write(row, 4, failure.error_message, normal)
606 worksheet.write(row, 5, failure.raw_output, code)
611 def make_iso_timestamp():
613 Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
615 now = datetime.datetime.utcnow()
616 now.replace(tzinfo=datetime.timezone.utc)
617 return now.isoformat()
620 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
622 Examines all tests associated with a given requirement and determines
623 the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
625 * ERROR - At least one ERROR occurred
626 * PASS - At least one PASS and no FAIL or ERRORs.
627 * FAIL - At least one FAIL occurred (no ERRORs)
628 * SKIP - All tests were SKIP
631 :param r_id: Requirement ID to examing
632 :param collection_failures: Errors that occurred during test setup.
633 :param test_results: List of TestResult
634 :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
636 errors = any(r_id in f["requirements"] for f in collection_failures)
637 outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
638 return aggregate_results(errors, outcomes, r_id)
641 def aggregate_results(has_errors, outcomes, r_id=None):
643 Determines the aggregate result for the conditions provided. Assumes the
644 results have been filtered and collected for analysis.
646 :param has_errors: True if collection failures occurred for the tests being
648 :param outcomes: set of outcomes from the TestResults
649 :param r_id: Optional requirement ID if known
650 :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
651 (see aggregate_requirement_adherence for more detail)
658 elif "FAIL" in outcomes:
660 elif "PASS" in outcomes:
662 elif {"SKIP"} == outcomes:
666 "Unexpected error aggregating outcomes ({}) for requirement {}".format(
673 def aggregate_run_results(collection_failures, test_results):
675 Determines overall status of run based on all failures and results.
677 * 'ERROR' - At least one collection failure occurred during the run.
678 * 'FAIL' - Template failed at least one test
679 * 'PASS' - All tests executed properly and no failures were detected
681 :param collection_failures: failures occuring during test setup
682 :param test_results: list of all test executuion results
683 :return: one of 'ERROR', 'FAIL', or 'PASS'
685 if collection_failures:
687 elif any(r.is_failed for r in test_results):
693 def error(failure_or_result):
695 Extracts the error message from a collection failure or test result
696 :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
697 :return: Error message as string
699 if isinstance(failure_or_result, TestResult):
700 return failure_or_result.error_message
702 return failure_or_result["error"]
705 def req_ids(failure_or_result):
707 Extracts the requirement IDs from a collection failure or test result
708 :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
709 :return: set of Requirement IDs. If no requirements mapped, then an empty set
711 if isinstance(failure_or_result, TestResult):
712 return set(failure_or_result.requirement_ids)
714 return set(failure_or_result["requirements"])
717 def collect_errors(r_id, collection_failures, test_result):
719 Creates a list of error messages from the collection failures and
720 test results. If r_id is provided, then it collects the error messages
721 where the failure or test is associated with that requirement ID. If
722 r_id is None, then it collects all errors that occur on failures and
723 results that are not mapped to requirements
728 return r_id in req_ids(item)
730 return not req_ids(item)
732 errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
733 return [e for e in errors if e]
736 def relative_paths(base_dir, paths):
737 return [os.path.relpath(p, base_dir) for p in paths]
740 def generate_json(outpath, template_path, categories):
742 Creates a JSON summary of the entire test run.
744 reqs = load_current_requirements()
747 "template_directory": os.path.splitdrive(template_path)[1].replace(
750 "timestamp": make_iso_timestamp(),
751 "checksum": hash_directory(template_path),
752 "categories": categories,
753 "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
758 results = data["tests"]
759 for result in COLLECTION_FAILURES:
763 "test_module": result["module"],
764 "test_case": result["test"],
766 "error": result["error"],
767 "requirements": result["requirements"],
770 for result in ALL_RESULTS:
773 "files": relative_paths(template_path, result.files),
774 "test_module": result.test_module,
775 "test_case": result.test_case,
776 "result": result.outcome,
777 "error": result.error_message if result.is_failed else "",
778 "requirements": result.requirements_metadata(reqs),
782 requirements = data["requirements"]
783 for r_id, r_data in reqs.items():
784 result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
789 "text": r_data["description"],
790 "keyword": r_data["keyword"],
792 "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
795 # If there are tests that aren't mapped to a requirement, then we'll
796 # map them to a special entry so the results are coherent.
797 unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
798 has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
799 if unmapped_outcomes or has_errors:
803 "text": "Tests not mapped to requirements (see tests)",
804 "result": aggregate_results(has_errors, unmapped_outcomes),
805 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
809 report_path = os.path.join(outpath, "report.json")
810 write_json(data, report_path)
813 def generate_html_report(outpath, categories, template_path, failures):
814 reqs = load_current_requirements()
815 resolutions = load_resolutions_file()
817 for failure in failures:
820 "file_links": make_href(failure.files),
821 "test_id": failure.test_id,
822 "error_message": failure.error_message,
823 "raw_output": failure.raw_output,
824 "requirements": docutils.core.publish_parts(
825 writer_name="html", source=failure.requirement_text(reqs)
827 "resolution_steps": failure.resolution_steps(resolutions),
830 pkg_dir = os.path.split(__file__)[0]
831 j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
832 with open(j2_template_path, "r") as f:
833 report_template = jinja2.Template(f.read())
834 contents = report_template.render(
835 version=version.VERSION,
836 num_failures=len(failures) + len(COLLECTION_FAILURES),
837 categories=categories,
838 template_dir=make_href(template_path),
839 checksum=hash_directory(template_path),
840 timestamp=make_timestamp(),
842 collection_failures=COLLECTION_FAILURES,
844 with open(os.path.join(outpath, "report.html"), "w") as f:
848 def pytest_addoption(parser):
850 Add needed CLI arguments
853 "--template-directory",
856 help="Directory which holds the templates for validation",
861 dest="template_source",
863 help="Source Directory which holds the templates for validation",
870 help="Test the unit tests against their fixtured data",
875 dest="report_format",
877 help="Format of output report (html, csv, excel, json)",
881 "--continue-on-failure",
882 dest="continue_on_failure",
884 help="Continue validation even when structural errors exist in input files",
888 "--output-directory",
897 dest="test_categories",
899 help="optional category of test to execute",
903 def pytest_configure(config):
905 Ensure that we are receive either `--self-test` or
906 `--template-dir=<directory` as CLI arguments
908 if config.getoption("template_dir") and config.getoption("self_test"):
909 raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
911 config.getoption("template_dir")
912 or config.getoption("self_test")
913 or config.getoption("help")
915 raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
918 def pytest_generate_tests(metafunc):
920 If a unit test requires an argument named 'filename'
921 we generate a test for the filenames selected. Either
922 the files contained in `template_dir` or if `template_dir`
923 is not specified on the CLI, the fixtures associated with this
927 # noinspection PyBroadException
929 if "filename" in metafunc.fixturenames:
930 from .parametrizers import parametrize_filename
932 parametrize_filename(metafunc)
934 if "filenames" in metafunc.fixturenames:
935 from .parametrizers import parametrize_filenames
937 parametrize_filenames(metafunc)
939 if "template_dir" in metafunc.fixturenames:
940 from .parametrizers import parametrize_template_dir
942 parametrize_template_dir(metafunc)
944 if "environment_pair" in metafunc.fixturenames:
945 from .parametrizers import parametrize_environment_pair
947 parametrize_environment_pair(metafunc)
949 if "heat_volume_pair" in metafunc.fixturenames:
950 from .parametrizers import parametrize_heat_volume_pair
952 parametrize_heat_volume_pair(metafunc)
954 if "yaml_files" in metafunc.fixturenames:
955 from .parametrizers import parametrize_yaml_files
957 parametrize_yaml_files(metafunc)
959 if "env_files" in metafunc.fixturenames:
960 from .parametrizers import parametrize_environment_files
962 parametrize_environment_files(metafunc)
964 if "yaml_file" in metafunc.fixturenames:
965 from .parametrizers import parametrize_yaml_file
967 parametrize_yaml_file(metafunc)
969 if "env_file" in metafunc.fixturenames:
970 from .parametrizers import parametrize_environment_file
972 parametrize_environment_file(metafunc)
974 if "parsed_yaml_file" in metafunc.fixturenames:
975 from .parametrizers import parametrize_parsed_yaml_file
977 parametrize_parsed_yaml_file(metafunc)
979 if "parsed_environment_file" in metafunc.fixturenames:
980 from .parametrizers import parametrize_parsed_environment_file
982 parametrize_parsed_environment_file(metafunc)
984 if "heat_template" in metafunc.fixturenames:
985 from .parametrizers import parametrize_heat_template
987 parametrize_heat_template(metafunc)
989 if "heat_templates" in metafunc.fixturenames:
990 from .parametrizers import parametrize_heat_templates
992 parametrize_heat_templates(metafunc)
994 if "volume_template" in metafunc.fixturenames:
995 from .parametrizers import parametrize_volume_template
997 parametrize_volume_template(metafunc)
999 if "volume_templates" in metafunc.fixturenames:
1000 from .parametrizers import parametrize_volume_templates
1002 parametrize_volume_templates(metafunc)
1004 if "template" in metafunc.fixturenames:
1005 from .parametrizers import parametrize_template
1007 parametrize_template(metafunc)
1009 if "templates" in metafunc.fixturenames:
1010 from .parametrizers import parametrize_templates
1012 parametrize_templates(metafunc)
1013 except Exception as e:
1014 # If an error occurs in the collection phase, then it won't be logged as a
1015 # normal test failure. This means that failures could occur, but not
1016 # be seen on the report resulting in a false positive success message. These
1017 # errors will be stored and reported separately on the report
1018 COLLECTION_FAILURES.append(
1020 "module": metafunc.module.__name__,
1021 "test": metafunc.function.__name__,
1022 "fixtures": metafunc.fixturenames,
1023 "error": traceback.format_exc(),
1024 "requirements": getattr(metafunc.function, "requirement_ids", []),
1030 def hash_directory(path):
1032 Create md5 hash using the contents of all files under ``path``
1033 :param path: string directory containing files
1034 :return: string MD5 hash code (hex)
1037 for dir_path, sub_dirs, filenames in os.walk(path):
1038 for filename in filenames:
1039 file_path = os.path.join(dir_path, filename)
1040 with open(file_path, "rb") as f:
1041 md5.update(f.read())
1042 return md5.hexdigest()
1045 def load_current_requirements():
1046 """Loads dict of current requirements or empty dict if file doesn't exist"""
1047 with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1049 version = data["current_version"]
1050 return data["versions"][version]["needs"]
1053 def select_heat_requirements(reqs):
1054 """Filters dict requirements to only those requirements pertaining to Heat"""
1055 return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1058 def is_testable(reqs):
1059 """Filters dict requirements to only those which are testable"""
1060 for key, values in reqs.items():
1061 if (("MUST" in values.get("keyword", "").upper()) and (
1062 "none" not in values.get("validation_mode", "").lower()
1064 reqs[key]["testable"] = True
1066 reqs[key]["testable"] = False
1070 def build_rst_json(reqs):
1071 """Takes requirements and returns list of only Heat requirements"""
1072 for key, values in list(reqs.items()):
1073 if values["testable"]:
1074 # Creates links in RST format to requirements and test cases
1075 if values["test_case"]:
1076 mod = values["test_case"].split(".")[-1]
1077 val = TEST_SCRIPT_SITE + mod + ".py"
1078 rst_value = "`" + mod + " <" + val + ">`_"
1084 + values["docname"].replace(" ", "%20")
1089 reqs[key].update({"full_title": title, "test_case": rst_value})
1096 + values["docname"].replace(" ", "%20")
1103 "full_title": title,
1104 "test_case": "No test for requirement",
1105 "validated_by": "static",
1113 def generate_rst_table(output_dir, data):
1114 """Generate a formatted csv to be used in RST"""
1115 rst_path = os.path.join(output_dir, "rst.csv")
1116 with open(rst_path, "w", newline="") as f:
1118 out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1119 for req_id, metadata in data.items():
1122 metadata["full_title"],
1123 metadata["description"],
1124 metadata["test_case"],
1125 metadata["validated_by"],
1130 # noinspection PyUnusedLocal
1131 def pytest_report_collectionfinish(config, startdir, items):
1132 """Generates a simple traceability report to output/traceability.csv"""
1133 traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1134 output_dir = os.path.split(traceability_path)[0]
1135 if not os.path.exists(output_dir):
1136 os.makedirs(output_dir)
1137 reqs = load_current_requirements()
1138 requirements = select_heat_requirements(reqs)
1139 testable_requirements = is_testable(requirements)
1140 unmapped, mapped = partition(
1141 lambda i: hasattr(i.function, "requirement_ids"), items
1144 req_to_test = defaultdict(set)
1145 mapping_errors = set()
1147 for req_id in item.function.requirement_ids:
1148 if req_id not in req_to_test:
1149 req_to_test[req_id].add(item)
1150 if req_id in requirements:
1151 reqs[req_id].update(
1153 "test_case": item.function.__module__,
1154 "validated_by": item.function.__name__,
1157 if req_id not in requirements:
1159 (req_id, item.function.__module__, item.function.__name__)
1162 mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1163 with open(mapping_error_path, "w", newline="") as f:
1164 writer = csv.writer(f)
1165 for err in mapping_errors:
1166 writer.writerow(err)
1168 with open(traceability_path, "w", newline="") as f:
1182 for req_id, metadata in testable_requirements.items():
1183 if req_to_test[req_id]:
1184 for item in req_to_test[req_id]:
1188 metadata["description"],
1189 metadata["section_name"],
1190 metadata["keyword"],
1191 metadata["validation_mode"],
1192 metadata["testable"],
1193 item.function.__module__,
1194 item.function.__name__,
1201 metadata["description"],
1202 metadata["section_name"],
1203 metadata["keyword"],
1204 metadata["validation_mode"],
1205 metadata["testable"],
1210 # now write out any test methods that weren't mapped to requirements
1212 (item.function.__module__, item.function.__name__) for item in unmapped
1214 for test_module, test_name in unmapped_tests:
1221 "static", # validation mode
1228 generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))