2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
46 from collections import defaultdict
47 from itertools import chain
54 from more_itertools import partition
56 from six import string_types
60 __path__ = [os.path.dirname(os.path.abspath(__file__))]
62 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
64 RESOLUTION_STEPS_FILE = "resolution_steps.json"
65 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
67 "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
70 "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
74 ("Input File", "file"),
75 ("Test", "test_file"),
76 ("Requirements", "req_description"),
77 ("Resolution Steps", "resolution_steps"),
78 ("Error Message", "message"),
79 ("Raw Test Output", "raw_output"),
82 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
83 while preparing to validate the the input files. Some validations may not have been
84 executed. Please refer these issue to the VNF Validation Tool team.
87 COLLECTION_FAILURES = []
89 # Captures the results of every test run
93 def get_output_dir(config):
95 Retrieve the output directory for the reports and create it if necessary
96 :param config: pytest configuration
97 :return: output directory as string
99 output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
100 if not os.path.exists(output_dir):
101 os.makedirs(output_dir, exist_ok=True)
105 def extract_error_msg(rep):
107 If a custom error message was provided, then extract it otherwise
108 just show the pytest assert message
110 if rep.outcome != "failed":
113 full_msg = str(rep.longrepr.reprcrash.message)
115 "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
117 if match: # custom message was provided
118 # Extract everything between AssertionError and the start
119 # of the assert statement expansion in the pytest report
122 msg = str(rep.longrepr.reprcrash)
123 if "AssertionError:" in msg:
124 msg = msg.split("AssertionError:")[1]
125 except AttributeError:
133 Wraps the test case and result to extract necessary metadata for
137 RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
139 def __init__(self, item, outcome):
141 self.result = outcome.get_result()
142 self.files = [os.path.normpath(p) for p in self._get_files()]
143 self.error_message = self._get_error_message()
146 def requirement_ids(self):
148 Returns list of requirement IDs mapped to the test case.
150 :return: Returns a list of string requirement IDs the test was
151 annotated with ``validates`` otherwise returns and empty list
153 is_mapped = hasattr(self.item.function, "requirement_ids")
154 return self.item.function.requirement_ids if is_mapped else []
159 :return: Returns a set of pytest marker names for the test or an empty set
161 return set(m.name for m in self.item.iter_markers())
164 def is_base_test(self):
166 :return: Returns True if the test is annotated with a pytest marker called base
168 return "base" in self.markers
173 :return: True if the test failed
175 return self.outcome == "FAIL"
180 :return: Returns 'PASS', 'FAIL', or 'SKIP'
182 return self.RESULT_MAPPING[self.result.outcome]
187 :return: Name of the test case method
189 return self.item.function.__name__
192 def test_module(self):
194 :return: Name of the file containing the test case
196 return self.item.function.__module__.split(".")[-1]
201 :return: ID of the test (test_module + test_case)
203 return "{}::{}".format(self.test_module, self.test_case)
206 def raw_output(self):
208 :return: Full output from pytest for the given test case
210 return str(self.result.longrepr)
212 def requirement_text(self, curr_reqs):
214 Creates a text summary for the requirement IDs mapped to the test case.
215 If no requirements are mapped, then it returns the empty string.
217 :param curr_reqs: mapping of requirement IDs to requirement metadata
218 loaded from the VNFRQTS projects needs.json output
219 :return: ID and text of the requirements mapped to the test case
222 "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
223 for r_id in self.requirement_ids
228 def requirements_metadata(self, curr_reqs):
230 Returns a list of dicts containing the following metadata for each
234 - text: Full text of the requirement
235 - keyword: MUST, MUST NOT, MAY, etc.
237 :param curr_reqs: mapping of requirement IDs to requirement metadata
238 loaded from the VNFRQTS projects needs.json output
239 :return: List of requirement metadata
242 for r_id in self.requirement_ids:
243 if r_id not in curr_reqs:
248 "text": curr_reqs[r_id]["description"],
249 "keyword": curr_reqs[r_id]["keyword"],
254 def resolution_steps(self, resolutions):
256 :param resolutions: Loaded from contents for resolution_steps.json
257 :return: Header and text for the resolution step associated with this
258 test case. Returns empty string if no resolutions are
262 "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
263 for entry in resolutions
264 if self._match(entry)
268 def _match(self, resolution_entry):
270 Returns True if the test result maps to the given entry in
274 self.test_case == resolution_entry["function"]
275 and self.test_module == resolution_entry["module"]
278 def _get_files(self):
280 Extracts the list of files passed into the test case.
281 :return: List of absolute paths to files
283 if "environment_pair" in self.item.fixturenames:
285 "{} environment pair".format(
286 self.item.funcargs["environment_pair"]["name"]
289 elif "heat_volume_pair" in self.item.fixturenames:
291 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
293 elif "heat_templates" in self.item.fixturenames:
294 return self.item.funcargs["heat_templates"]
295 elif "yaml_files" in self.item.fixturenames:
296 return self.item.funcargs["yaml_files"]
298 parts = self.result.nodeid.split("[")
299 return [""] if len(parts) == 1 else [parts[1][:-1]]
301 def _get_error_message(self):
303 :return: Error message or empty string if the test did not fail or error
306 return extract_error_msg(self.result)
311 # noinspection PyUnusedLocal
312 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
313 def pytest_runtest_makereport(item, call):
315 Captures the test results for later reporting. This will also halt testing
316 if a base failure is encountered (can be overridden with continue-on-failure)
319 if outcome.get_result().when != "call":
320 return # only capture results of test cases themselves
321 result = TestResult(item, outcome)
323 not item.config.option.continue_on_failure
324 and result.is_base_test
327 msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
330 result.error_message = msg
331 ALL_RESULTS.append(result)
332 pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
334 ALL_RESULTS.append(result)
337 def make_timestamp():
339 :return: String make_iso_timestamp in format:
340 2019-01-19 10:18:49.865000 Central Standard Time
342 timezone = time.tzname[time.localtime().tm_isdst]
343 return "{} {}".format(str(datetime.datetime.now()), timezone)
346 # noinspection PyUnusedLocal
347 def pytest_sessionstart(session):
349 COLLECTION_FAILURES.clear()
352 # noinspection PyUnusedLocal
353 def pytest_sessionfinish(session, exitstatus):
355 If not a self-test run, generate the output reports
357 if not session.config.option.template_dir:
360 if session.config.option.template_source:
361 template_source = session.config.option.template_source[0]
363 template_source = os.path.abspath(session.config.option.template_dir[0])
365 categories_selected = session.config.option.test_categories or ""
367 get_output_dir(session.config),
370 session.config.option.report_format,
374 # noinspection PyUnusedLocal
375 def pytest_collection_modifyitems(session, config, items):
377 Selects tests based on the categories requested. Tests without
378 categories will always be executed.
380 config.traceability_items = list(items) # save all items for traceability
381 if not config.option.self_test:
383 # checking if test belongs to a category
384 if hasattr(item.function, "categories"):
385 if config.option.test_categories:
386 test_categories = getattr(item.function, "categories")
387 passed_categories = config.option.test_categories
389 category in passed_categories for category in test_categories
393 reason="Test categories do not match all the passed categories"
399 reason="Test belongs to a category but no categories were passed"
403 key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
407 def make_href(paths):
409 Create an anchor tag to link to the file paths provided.
410 :param paths: string or list of file paths
411 :return: String of hrefs - one for each path, each seperated by a line
414 paths = [paths] if isinstance(paths, string_types) else paths
417 abs_path = os.path.abspath(p)
418 name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
420 "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
421 abs_path=abs_path, name=name
424 return "<br/>".join(links)
427 def load_resolutions_file():
429 :return: dict of data loaded from resolutions_steps.json
431 resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
432 if os.path.exists(resolution_steps):
433 with open(resolution_steps, "r") as f:
434 return json.loads(f.read())
437 def generate_report(outpath, template_path, categories, output_format="html"):
439 Generates the various output reports.
441 :param outpath: destination directory for all reports
442 :param template_path: directory containing the Heat templates validated
443 :param categories: Optional categories selected
444 :param output_format: One of "html", "excel", or "csv". Default is "html"
445 :raises: ValueError if requested output format is unknown
447 failures = [r for r in ALL_RESULTS if r.is_failed]
448 generate_failure_file(outpath)
449 output_format = output_format.lower().strip() if output_format else "html"
450 generate_json(outpath, template_path, categories)
451 if output_format == "html":
452 generate_html_report(outpath, categories, template_path, failures)
453 elif output_format == "excel":
454 generate_excel_report(outpath, categories, template_path, failures)
455 elif output_format == "json":
457 elif output_format == "csv":
458 generate_csv_report(outpath, categories, template_path, failures)
460 raise ValueError("Unsupported output format: " + output_format)
463 def write_json(data, path):
465 Pretty print data as JSON to the output path requested
467 :param data: Data structure to be converted to JSON
468 :param path: Where to write output
470 with open(path, "w") as f:
471 json.dump(data, f, indent=2)
474 def generate_failure_file(outpath):
476 Writes a summary of test failures to a file named failures.
477 This is for backwards compatibility only. The report.json offers a
478 more comprehensive output.
480 failure_path = os.path.join(outpath, "failures")
481 failures = [r for r in ALL_RESULTS if r.is_failed]
483 for i, fail in enumerate(failures):
485 "file": fail.files[0] if len(fail.files) == 1 else fail.files,
486 "vnfrqts": fail.requirement_ids,
487 "test": fail.test_case,
488 "test_file": fail.test_module,
489 "raw_output": fail.raw_output,
490 "message": fail.error_message,
492 write_json(data, failure_path)
495 def generate_csv_report(output_dir, categories, template_path, failures):
496 rows = [["Validation Failures"]]
498 ("Categories Selected:", categories),
499 ("Tool Version:", version.VERSION),
500 ("Report Generated At:", make_timestamp()),
501 ("Directory Validated:", template_path),
502 ("Checksum:", hash_directory(template_path)),
503 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
506 for header in headers:
510 if COLLECTION_FAILURES:
511 rows.append([COLLECTION_FAILURE_WARNING])
512 rows.append(["Validation File", "Test", "Fixtures", "Error"])
513 for failure in COLLECTION_FAILURES:
518 ";".join(failure["fixtures"]),
525 rows.append([col for col, _ in REPORT_COLUMNS])
527 reqs = load_current_requirements()
528 resolutions = load_resolutions_file()
531 for failure in failures:
534 "\n".join(failure.files),
536 failure.requirement_text(reqs),
537 failure.resolution_steps(resolutions),
538 failure.error_message,
543 output_path = os.path.join(output_dir, "report.csv")
544 with open(output_path, "w", newline="") as f:
545 writer = csv.writer(f)
550 def generate_excel_report(output_dir, categories, template_path, failures):
551 output_path = os.path.join(output_dir, "report.xlsx")
552 workbook = xlsxwriter.Workbook(output_path)
553 bold = workbook.add_format({"bold": True})
554 code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
555 normal = workbook.add_format({"text_wrap": True})
556 heading = workbook.add_format({"bold": True, "font_size": 18})
557 worksheet = workbook.add_worksheet("failures")
558 worksheet.write(0, 0, "Validation Failures", heading)
561 ("Categories Selected:", ",".join(categories)),
562 ("Tool Version:", version.VERSION),
563 ("Report Generated At:", make_timestamp()),
564 ("Directory Validated:", template_path),
565 ("Checksum:", hash_directory(template_path)),
566 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
568 for row, (header, value) in enumerate(headers, start=2):
569 worksheet.write(row, 0, header, bold)
570 worksheet.write(row, 1, value)
572 worksheet.set_column(0, len(headers) - 1, 40)
573 worksheet.set_column(len(headers), len(headers), 80)
575 if COLLECTION_FAILURES:
576 collection_failures_start = 2 + len(headers) + 2
577 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
578 collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
579 for col_num, col_name in enumerate(collection_failure_headers):
580 worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
581 for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
582 worksheet.write(row, 0, data["module"])
583 worksheet.write(row, 1, data["test"])
584 worksheet.write(row, 2, ",".join(data["fixtures"]))
585 worksheet.write(row, 3, data["error"], code)
588 start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
589 worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
590 for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
591 worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
593 reqs = load_current_requirements()
594 resolutions = load_resolutions_file()
597 for row, failure in enumerate(failures, start=start_error_table_row + 2):
598 worksheet.write(row, 0, "\n".join(failure.files), normal)
599 worksheet.write(row, 1, failure.test_id, normal)
600 worksheet.write(row, 2, failure.requirement_text(reqs), normal)
601 worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
602 worksheet.write(row, 4, failure.error_message, normal)
603 worksheet.write(row, 5, failure.raw_output, code)
608 def make_iso_timestamp():
610 Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
612 now = datetime.datetime.utcnow()
613 now.replace(tzinfo=datetime.timezone.utc)
614 return now.isoformat()
617 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
619 Examines all tests associated with a given requirement and determines
620 the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
622 * ERROR - At least one ERROR occurred
623 * PASS - At least one PASS and no FAIL or ERRORs.
624 * FAIL - At least one FAIL occurred (no ERRORs)
625 * SKIP - All tests were SKIP
628 :param r_id: Requirement ID to examing
629 :param collection_failures: Errors that occurred during test setup.
630 :param test_results: List of TestResult
631 :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
633 errors = any(r_id in f["requirements"] for f in collection_failures)
634 outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
635 return aggregate_results(errors, outcomes, r_id)
638 def aggregate_results(has_errors, outcomes, r_id=None):
640 Determines the aggregate result for the conditions provided. Assumes the
641 results have been filtered and collected for analysis.
643 :param has_errors: True if collection failures occurred for the tests being
645 :param outcomes: set of outcomes from the TestResults
646 :param r_id: Optional requirement ID if known
647 :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
648 (see aggregate_requirement_adherence for more detail)
655 elif "FAIL" in outcomes:
657 elif "PASS" in outcomes:
659 elif {"SKIP"} == outcomes:
663 "Unexpected error aggregating outcomes ({}) for requirement {}".format(
670 def aggregate_run_results(collection_failures, test_results):
672 Determines overall status of run based on all failures and results.
674 * 'ERROR' - At least one collection failure occurred during the run.
675 * 'FAIL' - Template failed at least one test
676 * 'PASS' - All tests executed properly and no failures were detected
678 :param collection_failures: failures occuring during test setup
679 :param test_results: list of all test executuion results
680 :return: one of 'ERROR', 'FAIL', or 'PASS'
682 if collection_failures:
684 elif any(r.is_failed for r in test_results):
690 def error(failure_or_result):
692 Extracts the error message from a collection failure or test result
693 :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
694 :return: Error message as string
696 if isinstance(failure_or_result, TestResult):
697 return failure_or_result.error_message
699 return failure_or_result["error"]
702 def req_ids(failure_or_result):
704 Extracts the requirement IDs from a collection failure or test result
705 :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
706 :return: set of Requirement IDs. If no requirements mapped, then an empty set
708 if isinstance(failure_or_result, TestResult):
709 return set(failure_or_result.requirement_ids)
711 return set(failure_or_result["requirements"])
714 def collect_errors(r_id, collection_failures, test_result):
716 Creates a list of error messages from the collection failures and
717 test results. If r_id is provided, then it collects the error messages
718 where the failure or test is associated with that requirement ID. If
719 r_id is None, then it collects all errors that occur on failures and
720 results that are not mapped to requirements
725 return r_id in req_ids(item)
727 return not req_ids(item)
729 errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
730 return [e for e in errors if e]
733 def relative_paths(base_dir, paths):
734 return [os.path.relpath(p, base_dir) for p in paths]
737 def generate_json(outpath, template_path, categories):
739 Creates a JSON summary of the entire test run.
741 reqs = load_current_requirements()
744 "template_directory": os.path.splitdrive(template_path)[1].replace(
747 "timestamp": make_iso_timestamp(),
748 "checksum": hash_directory(template_path),
749 "categories": categories,
750 "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
755 results = data["tests"]
756 for result in COLLECTION_FAILURES:
760 "test_module": result["module"],
761 "test_case": result["test"],
763 "error": result["error"],
764 "requirements": result["requirements"],
767 for result in ALL_RESULTS:
770 "files": relative_paths(template_path, result.files),
771 "test_module": result.test_module,
772 "test_case": result.test_case,
773 "result": result.outcome,
774 "error": result.error_message if result.is_failed else "",
775 "requirements": result.requirements_metadata(reqs),
779 requirements = data["requirements"]
780 for r_id, r_data in reqs.items():
781 result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
786 "text": r_data["description"],
787 "keyword": r_data["keyword"],
789 "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
792 # If there are tests that aren't mapped to a requirement, then we'll
793 # map them to a special entry so the results are coherent.
794 unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
795 has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
796 if unmapped_outcomes or has_errors:
800 "text": "Tests not mapped to requirements (see tests)",
801 "result": aggregate_results(has_errors, unmapped_outcomes),
802 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
806 report_path = os.path.join(outpath, "report.json")
807 write_json(data, report_path)
810 def generate_html_report(outpath, categories, template_path, failures):
811 reqs = load_current_requirements()
812 resolutions = load_resolutions_file()
814 for failure in failures:
817 "file_links": make_href(failure.files),
818 "test_id": failure.test_id,
819 "error_message": failure.error_message,
820 "raw_output": failure.raw_output,
821 "requirements": docutils.core.publish_parts(
822 writer_name="html", source=failure.requirement_text(reqs)
824 "resolution_steps": failure.resolution_steps(resolutions),
827 pkg_dir = os.path.split(__file__)[0]
828 j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
829 with open(j2_template_path, "r") as f:
830 report_template = jinja2.Template(f.read())
831 contents = report_template.render(
832 version=version.VERSION,
833 num_failures=len(failures) + len(COLLECTION_FAILURES),
834 categories=categories,
835 template_dir=make_href(template_path),
836 checksum=hash_directory(template_path),
837 timestamp=make_timestamp(),
839 collection_failures=COLLECTION_FAILURES,
841 with open(os.path.join(outpath, "report.html"), "w") as f:
845 def pytest_addoption(parser):
847 Add needed CLI arguments
850 "--template-directory",
853 help="Directory which holds the templates for validation",
858 dest="template_source",
860 help="Source Directory which holds the templates for validation",
867 help="Test the unit tests against their fixtured data",
872 dest="report_format",
874 help="Format of output report (html, csv, excel, json)",
878 "--continue-on-failure",
879 dest="continue_on_failure",
881 help="Continue validation even when structural errors exist in input files",
885 "--output-directory",
894 dest="test_categories",
896 help="optional category of test to execute",
900 def pytest_configure(config):
902 Ensure that we are receive either `--self-test` or
903 `--template-dir=<directory` as CLI arguments
905 if config.getoption("template_dir") and config.getoption("self_test"):
906 raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
908 config.getoption("template_dir")
909 or config.getoption("self_test")
910 or config.getoption("help")
912 raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
915 def pytest_generate_tests(metafunc):
917 If a unit test requires an argument named 'filename'
918 we generate a test for the filenames selected. Either
919 the files contained in `template_dir` or if `template_dir`
920 is not specified on the CLI, the fixtures associated with this
924 # noinspection PyBroadException
926 if "filename" in metafunc.fixturenames:
927 from .parametrizers import parametrize_filename
929 parametrize_filename(metafunc)
931 if "filenames" in metafunc.fixturenames:
932 from .parametrizers import parametrize_filenames
934 parametrize_filenames(metafunc)
936 if "template_dir" in metafunc.fixturenames:
937 from .parametrizers import parametrize_template_dir
939 parametrize_template_dir(metafunc)
941 if "environment_pair" in metafunc.fixturenames:
942 from .parametrizers import parametrize_environment_pair
944 parametrize_environment_pair(metafunc)
946 if "heat_volume_pair" in metafunc.fixturenames:
947 from .parametrizers import parametrize_heat_volume_pair
949 parametrize_heat_volume_pair(metafunc)
951 if "yaml_files" in metafunc.fixturenames:
952 from .parametrizers import parametrize_yaml_files
954 parametrize_yaml_files(metafunc)
956 if "env_files" in metafunc.fixturenames:
957 from .parametrizers import parametrize_environment_files
959 parametrize_environment_files(metafunc)
961 if "yaml_file" in metafunc.fixturenames:
962 from .parametrizers import parametrize_yaml_file
964 parametrize_yaml_file(metafunc)
966 if "env_file" in metafunc.fixturenames:
967 from .parametrizers import parametrize_environment_file
969 parametrize_environment_file(metafunc)
971 if "parsed_yaml_file" in metafunc.fixturenames:
972 from .parametrizers import parametrize_parsed_yaml_file
974 parametrize_parsed_yaml_file(metafunc)
976 if "parsed_environment_file" in metafunc.fixturenames:
977 from .parametrizers import parametrize_parsed_environment_file
979 parametrize_parsed_environment_file(metafunc)
981 if "heat_template" in metafunc.fixturenames:
982 from .parametrizers import parametrize_heat_template
984 parametrize_heat_template(metafunc)
986 if "heat_templates" in metafunc.fixturenames:
987 from .parametrizers import parametrize_heat_templates
989 parametrize_heat_templates(metafunc)
991 if "volume_template" in metafunc.fixturenames:
992 from .parametrizers import parametrize_volume_template
994 parametrize_volume_template(metafunc)
996 if "volume_templates" in metafunc.fixturenames:
997 from .parametrizers import parametrize_volume_templates
999 parametrize_volume_templates(metafunc)
1001 if "template" in metafunc.fixturenames:
1002 from .parametrizers import parametrize_template
1004 parametrize_template(metafunc)
1006 if "templates" in metafunc.fixturenames:
1007 from .parametrizers import parametrize_templates
1009 parametrize_templates(metafunc)
1010 except Exception as e:
1011 # If an error occurs in the collection phase, then it won't be logged as a
1012 # normal test failure. This means that failures could occur, but not
1013 # be seen on the report resulting in a false positive success message. These
1014 # errors will be stored and reported separately on the report
1015 COLLECTION_FAILURES.append(
1017 "module": metafunc.module.__name__,
1018 "test": metafunc.function.__name__,
1019 "fixtures": metafunc.fixturenames,
1020 "error": traceback.format_exc(),
1021 "requirements": getattr(metafunc.function, "requirement_ids", []),
1027 def hash_directory(path):
1029 Create md5 hash using the contents of all files under ``path``
1030 :param path: string directory containing files
1031 :return: string MD5 hash code (hex)
1034 for dir_path, sub_dirs, filenames in os.walk(path):
1035 for filename in filenames:
1036 file_path = os.path.join(dir_path, filename)
1037 with open(file_path, "rb") as f:
1038 md5.update(f.read())
1039 return md5.hexdigest()
1042 def load_current_requirements():
1043 """Loads dict of current requirements or empty dict if file doesn't exist"""
1044 with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1046 version = data["current_version"]
1047 return data["versions"][version]["needs"]
1050 def select_heat_requirements(reqs):
1051 """Filters dict requirements to only those requirements pertaining to Heat"""
1052 return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1055 def is_testable(reqs):
1056 """Filters dict requirements to only those which are testable"""
1057 for key, values in reqs.items():
1058 if (("MUST" in values.get("keyword", "").upper()) and (
1059 "none" not in values.get("validation_mode", "").lower()
1061 reqs[key]["testable"] = True
1063 reqs[key]["testable"] = False
1067 def build_rst_json(reqs):
1068 """Takes requirements and returns list of only Heat requirements"""
1069 for key, values in list(reqs.items()):
1070 if values["testable"]:
1071 # Creates links in RST format to requirements and test cases
1072 if values["test_case"]:
1073 mod = values["test_case"].split(".")[-1]
1074 val = TEST_SCRIPT_SITE + mod + ".py"
1075 rst_value = "`" + mod + " <" + val + ">`_"
1081 + values["docname"].replace(" ", "%20")
1086 reqs[key].update({"full_title": title, "test_case": rst_value})
1093 + values["docname"].replace(" ", "%20")
1100 "full_title": title,
1101 "test_case": "No test for requirement",
1102 "validated_by": "static",
1110 def generate_rst_table(output_dir, data):
1111 """Generate a formatted csv to be used in RST"""
1112 rst_path = os.path.join(output_dir, "rst.csv")
1113 with open(rst_path, "w", newline="") as f:
1115 out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1116 for req_id, metadata in data.items():
1119 metadata["full_title"],
1120 metadata["description"],
1121 metadata["test_case"],
1122 metadata["validated_by"],
1127 # noinspection PyUnusedLocal
1128 def pytest_report_collectionfinish(config, startdir, items):
1129 """Generates a simple traceability report to output/traceability.csv"""
1130 traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1131 output_dir = os.path.split(traceability_path)[0]
1132 if not os.path.exists(output_dir):
1133 os.makedirs(output_dir)
1134 reqs = load_current_requirements()
1135 requirements = select_heat_requirements(reqs)
1136 testable_requirements = is_testable(requirements)
1137 unmapped, mapped = partition(
1138 lambda i: hasattr(i.function, "requirement_ids"), items
1141 req_to_test = defaultdict(set)
1142 mapping_errors = set()
1144 for req_id in item.function.requirement_ids:
1145 if req_id not in req_to_test:
1146 req_to_test[req_id].add(item)
1147 if req_id in requirements:
1148 reqs[req_id].update(
1150 "test_case": item.function.__module__,
1151 "validated_by": item.function.__name__,
1154 if req_id not in requirements:
1156 (req_id, item.function.__module__, item.function.__name__)
1159 mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1160 with open(mapping_error_path, "w", newline="") as f:
1161 writer = csv.writer(f)
1162 for err in mapping_errors:
1163 writer.writerow(err)
1165 with open(traceability_path, "w", newline="") as f:
1179 for req_id, metadata in testable_requirements.items():
1180 if req_to_test[req_id]:
1181 for item in req_to_test[req_id]:
1185 metadata["description"],
1186 metadata["section_name"],
1187 metadata["keyword"],
1188 metadata["validation_mode"],
1189 metadata["testable"],
1190 item.function.__module__,
1191 item.function.__name__,
1198 metadata["description"],
1199 metadata["section_name"],
1200 metadata["keyword"],
1201 metadata["validation_mode"],
1202 metadata["testable"],
1207 # now write out any test methods that weren't mapped to requirements
1209 (item.function.__module__, item.function.__name__) for item in unmapped
1211 for test_module, test_name in unmapped_tests:
1218 "static", # validation mode
1225 generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))