2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
46 from collections import defaultdict
47 from itertools import chain
54 from more_itertools import partition
56 from six import string_types
60 __path__ = [os.path.dirname(os.path.abspath(__file__))]
62 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
64 RESOLUTION_STEPS_FILE = "resolution_steps.json"
65 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
67 "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
70 "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
74 ("Input File", "file"),
75 ("Test", "test_file"),
76 ("Requirements", "req_description"),
77 ("Resolution Steps", "resolution_steps"),
78 ("Error Message", "message"),
79 ("Raw Test Output", "raw_output"),
82 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
83 while preparing to validate the the input files. Some validations may not have been
84 executed. Please refer these issue to the VNF Validation Tool team.
87 COLLECTION_FAILURES = []
89 # Captures the results of every test run
93 def get_output_dir(config):
95 Retrieve the output directory for the reports and create it if necessary
96 :param config: pytest configuration
97 :return: output directory as string
99 output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
100 if not os.path.exists(output_dir):
101 os.makedirs(output_dir, exist_ok=True)
105 def extract_error_msg(rep):
107 If a custom error message was provided, then extract it otherwise
108 just show the pytest assert message
110 if rep.outcome != "failed":
113 full_msg = str(rep.longrepr.reprcrash.message)
115 "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
117 if match: # custom message was provided
118 # Extract everything between AssertionError and the start
119 # of the assert statement expansion in the pytest report
122 msg = str(rep.longrepr.reprcrash)
123 if "AssertionError:" in msg:
124 msg = msg.split("AssertionError:")[1]
125 except AttributeError:
133 Wraps the test case and result to extract necessary metadata for
137 RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
139 def __init__(self, item, outcome):
141 self.result = outcome.get_result()
142 self.files = [os.path.normpath(p) for p in self._get_files()]
143 self.error_message = self._get_error_message()
146 def requirement_ids(self):
148 Returns list of requirement IDs mapped to the test case.
150 :return: Returns a list of string requirement IDs the test was
151 annotated with ``validates`` otherwise returns and empty list
153 is_mapped = hasattr(self.item.function, "requirement_ids")
154 return self.item.function.requirement_ids if is_mapped else []
159 :return: Returns a set of pytest marker names for the test or an empty set
161 return set(m.name for m in self.item.iter_markers())
164 def is_base_test(self):
166 :return: Returns True if the test is annotated with a pytest marker called base
168 return "base" in self.markers
173 :return: True if the test failed
175 return self.outcome == "FAIL"
180 :return: Returns 'PASS', 'FAIL', or 'SKIP'
182 return self.RESULT_MAPPING[self.result.outcome]
187 :return: Name of the test case method
189 return self.item.function.__name__
192 def test_module(self):
194 :return: Name of the file containing the test case
196 return self.item.function.__module__.split(".")[-1]
199 def raw_output(self):
201 :return: Full output from pytest for the given test case
203 return str(self.result.longrepr)
205 def requirement_text(self, curr_reqs):
207 Creates a text summary for the requirement IDs mapped to the test case.
208 If no requirements are mapped, then it returns the empty string.
210 :param curr_reqs: mapping of requirement IDs to requirement metadata
211 loaded from the VNFRQTS projects needs.json output
212 :return: ID and text of the requirements mapped to the test case
215 "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
216 for r_id in self.requirement_ids
221 def requirements_metadata(self, curr_reqs):
223 Returns a list of dicts containing the following metadata for each
227 - text: Full text of the requirement
228 - keyword: MUST, MUST NOT, MAY, etc.
230 :param curr_reqs: mapping of requirement IDs to requirement metadata
231 loaded from the VNFRQTS projects needs.json output
232 :return: List of requirement metadata
235 for r_id in self.requirement_ids:
236 if r_id not in curr_reqs:
241 "text": curr_reqs[r_id]["description"],
242 "keyword": curr_reqs[r_id]["keyword"],
247 def resolution_steps(self, resolutions):
249 :param resolutions: Loaded from contents for resolution_steps.json
250 :return: Header and text for the resolution step associated with this
251 test case. Returns empty string if no resolutions are
255 "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
256 for entry in resolutions
257 if self._match(entry)
261 def _match(self, resolution_entry):
263 Returns True if the test result maps to the given entry in
267 self.test_case == resolution_entry["function"]
268 and self.test_module == resolution_entry["module"]
271 def _get_files(self):
273 Extracts the list of files passed into the test case.
274 :return: List of absolute paths to files
276 if "environment_pair" in self.item.fixturenames:
278 "{} environment pair".format(
279 self.item.funcargs["environment_pair"]["name"]
282 elif "heat_volume_pair" in self.item.fixturenames:
284 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
286 elif "heat_templates" in self.item.fixturenames:
287 return self.item.funcargs["heat_templates"]
288 elif "yaml_files" in self.item.fixturenames:
289 return self.item.funcargs["yaml_files"]
291 parts = self.result.nodeid.split("[")
292 return [""] if len(parts) == 1 else [parts[1][:-1]]
294 def _get_error_message(self):
296 :return: Error message or empty string if the test did not fail or error
299 return extract_error_msg(self.result)
304 # noinspection PyUnusedLocal
305 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
306 def pytest_runtest_makereport(item, call):
308 Captures the test results for later reporting. This will also halt testing
309 if a base failure is encountered (can be overridden with continue-on-failure)
312 if outcome.get_result().when != "call":
313 return # only capture results of test cases themselves
314 result = TestResult(item, outcome)
315 ALL_RESULTS.append(result)
317 not item.config.option.continue_on_failure
318 and result.is_base_test
321 msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
324 pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
327 def make_timestamp():
329 :return: String make_iso_timestamp in format:
330 2019-01-19 10:18:49.865000 Central Standard Time
332 timezone = time.tzname[time.localtime().tm_isdst]
333 return "{} {}".format(str(datetime.datetime.now()), timezone)
336 # noinspection PyUnusedLocal
337 def pytest_sessionstart(session):
339 COLLECTION_FAILURES.clear()
342 # noinspection PyUnusedLocal
343 def pytest_sessionfinish(session, exitstatus):
345 If not a self-test run, generate the output reports
347 if not session.config.option.template_dir:
350 if session.config.option.template_source:
351 template_source = session.config.option.template_source[0]
353 template_source = os.path.abspath(session.config.option.template_dir[0])
355 categories_selected = session.config.option.test_categories or ""
357 get_output_dir(session.config),
360 session.config.option.report_format,
364 # noinspection PyUnusedLocal
365 def pytest_collection_modifyitems(session, config, items):
367 Selects tests based on the categories requested. Tests without
368 categories will always be executed.
370 config.traceability_items = list(items) # save all items for traceability
371 if not config.option.self_test:
373 # checking if test belongs to a category
374 if hasattr(item.function, "categories"):
375 if config.option.test_categories:
376 test_categories = getattr(item.function, "categories")
377 passed_categories = config.option.test_categories
379 category in passed_categories for category in test_categories
383 reason="Test categories do not match all the passed categories"
389 reason="Test belongs to a category but no categories were passed"
393 key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
397 def make_href(paths):
399 Create an anchor tag to link to the file paths provided.
400 :param paths: string or list of file paths
401 :return: String of hrefs - one for each path, each seperated by a line
404 paths = [paths] if isinstance(paths, string_types) else paths
407 abs_path = os.path.abspath(p)
408 name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
410 "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
411 abs_path=abs_path, name=name
414 return "<br/>".join(links)
417 def load_resolutions_file():
419 :return: dict of data loaded from resolutions_steps.json
421 resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
422 if os.path.exists(resolution_steps):
423 with open(resolution_steps, "r") as f:
424 return json.loads(f.read())
427 def generate_report(outpath, template_path, categories, output_format="html"):
429 Generates the various output reports.
431 :param outpath: destination directory for all reports
432 :param template_path: directory containing the Heat templates validated
433 :param categories: Optional categories selected
434 :param output_format: One of "html", "excel", or "csv". Default is "html"
435 :raises: ValueError if requested output format is unknown
437 failures = [r for r in ALL_RESULTS if r.is_failed]
438 generate_failure_file(outpath)
439 output_format = output_format.lower().strip() if output_format else "html"
440 generate_json(outpath, template_path, categories)
441 if output_format == "html":
442 generate_html_report(outpath, categories, template_path, failures)
443 elif output_format == "excel":
444 generate_excel_report(outpath, categories, template_path, failures)
445 elif output_format == "json":
447 elif output_format == "csv":
448 generate_csv_report(outpath, categories, template_path, failures)
450 raise ValueError("Unsupported output format: " + output_format)
453 def write_json(data, path):
455 Pretty print data as JSON to the output path requested
457 :param data: Data structure to be converted to JSON
458 :param path: Where to write output
460 with open(path, "w") as f:
461 json.dump(data, f, indent=2)
464 def generate_failure_file(outpath):
466 Writes a summary of test failures to a file named failures.
467 This is for backwards compatibility only. The report.json offers a
468 more comprehensive output.
470 failure_path = os.path.join(outpath, "failures")
471 failures = [r for r in ALL_RESULTS if r.is_failed]
473 for i, fail in enumerate(failures):
475 "file": fail.files[0] if len(fail.files) == 1 else fail.files,
476 "vnfrqts": fail.requirement_ids,
477 "test": fail.test_case,
478 "test_file": fail.test_module,
479 "raw_output": fail.raw_output,
480 "message": fail.error_message,
482 write_json(data, failure_path)
485 def generate_csv_report(output_dir, categories, template_path, failures):
486 rows = [["Validation Failures"]]
488 ("Categories Selected:", categories),
489 ("Tool Version:", version.VERSION),
490 ("Report Generated At:", make_timestamp()),
491 ("Directory Validated:", template_path),
492 ("Checksum:", hash_directory(template_path)),
493 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
496 for header in headers:
500 if COLLECTION_FAILURES:
501 rows.append([COLLECTION_FAILURE_WARNING])
502 rows.append(["Validation File", "Test", "Fixtures", "Error"])
503 for failure in COLLECTION_FAILURES:
508 ";".join(failure["fixtures"]),
515 rows.append([col for col, _ in REPORT_COLUMNS])
517 reqs = load_current_requirements()
518 resolutions = load_resolutions_file()
521 for failure in failures:
524 "\n".join(failure.files),
526 failure.requirement_text(reqs),
527 failure.resolution_steps(resolutions),
528 failure.error_message,
533 output_path = os.path.join(output_dir, "report.csv")
534 with open(output_path, "w", newline="") as f:
535 writer = csv.writer(f)
540 def generate_excel_report(output_dir, categories, template_path, failures):
541 output_path = os.path.join(output_dir, "report.xlsx")
542 workbook = xlsxwriter.Workbook(output_path)
543 bold = workbook.add_format({"bold": True})
544 code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
545 normal = workbook.add_format({"text_wrap": True})
546 heading = workbook.add_format({"bold": True, "font_size": 18})
547 worksheet = workbook.add_worksheet("failures")
548 worksheet.write(0, 0, "Validation Failures", heading)
551 ("Categories Selected:", ",".join(categories)),
552 ("Tool Version:", version.VERSION),
553 ("Report Generated At:", make_timestamp()),
554 ("Directory Validated:", template_path),
555 ("Checksum:", hash_directory(template_path)),
556 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
558 for row, (header, value) in enumerate(headers, start=2):
559 worksheet.write(row, 0, header, bold)
560 worksheet.write(row, 1, value)
562 worksheet.set_column(0, len(headers) - 1, 40)
563 worksheet.set_column(len(headers), len(headers), 80)
565 if COLLECTION_FAILURES:
566 collection_failures_start = 2 + len(headers) + 2
567 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
568 collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
569 for col_num, col_name in enumerate(collection_failure_headers):
570 worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
571 for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
572 worksheet.write(row, 0, data["module"])
573 worksheet.write(row, 1, data["test"])
574 worksheet.write(row, 2, ",".join(data["fixtures"]))
575 worksheet.write(row, 3, data["error"], code)
578 start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
579 worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
580 for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
581 worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
583 reqs = load_current_requirements()
584 resolutions = load_resolutions_file()
587 for row, failure in enumerate(failures, start=start_error_table_row + 2):
588 worksheet.write(row, 0, "\n".join(failure.files), normal)
589 worksheet.write(row, 1, failure.test_module, normal)
590 worksheet.write(row, 2, failure.requirement_text(reqs), normal)
591 worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
592 worksheet.write(row, 4, failure.error_message, normal)
593 worksheet.write(row, 5, failure.raw_output, code)
598 def make_iso_timestamp():
600 Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
602 now = datetime.datetime.utcnow()
603 now.replace(tzinfo=datetime.timezone.utc)
604 return now.isoformat()
607 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
609 Examines all tests associated with a given requirement and determines
610 the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
612 * ERROR - At least one ERROR occurred
613 * PASS - At least one PASS and no FAIL or ERRORs.
614 * FAIL - At least one FAIL occurred (no ERRORs)
615 * SKIP - All tests were SKIP
618 :param r_id: Requirement ID to examing
619 :param collection_failures: Errors that occurred during test setup.
620 :param test_results: List of TestResult
621 :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
623 errors = any(r_id in f["requirements"] for f in collection_failures)
624 outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
625 return aggregate_results(errors, outcomes, r_id)
628 def aggregate_results(has_errors, outcomes, r_id=None):
630 Determines the aggregate result for the conditions provided. Assumes the
631 results have been filtered and collected for analysis.
633 :param has_errors: True if collection failures occurred for the tests being
635 :param outcomes: set of outcomes from the TestResults
636 :param r_id: Optional requirement ID if known
637 :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
638 (see aggregate_requirement_adherence for more detail)
645 elif "FAIL" in outcomes:
647 elif "PASS" in outcomes:
649 elif {"SKIP"} == outcomes:
653 "Unexpected error aggregating outcomes ({}) for requirement {}".format(
660 def aggregate_run_results(collection_failures, test_results):
662 Determines overall status of run based on all failures and results.
664 * 'ERROR' - At least one collection failure occurred during the run.
665 * 'FAIL' - Template failed at least one test
666 * 'PASS' - All tests executed properly and no failures were detected
668 :param collection_failures: failures occuring during test setup
669 :param test_results: list of all test executuion results
670 :return: one of 'ERROR', 'FAIL', or 'PASS'
672 if collection_failures:
674 elif any(r.is_failed for r in test_results):
680 def error(failure_or_result):
682 Extracts the error message from a collection failure or test result
683 :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
684 :return: Error message as string
686 if isinstance(failure_or_result, TestResult):
687 return failure_or_result.error_message
689 return failure_or_result["error"]
692 def req_ids(failure_or_result):
694 Extracts the requirement IDs from a collection failure or test result
695 :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
696 :return: set of Requirement IDs. If no requirements mapped, then an empty set
698 if isinstance(failure_or_result, TestResult):
699 return set(failure_or_result.requirement_ids)
701 return set(failure_or_result["requirements"])
704 def collect_errors(r_id, collection_failures, test_result):
706 Creates a list of error messages from the collection failures and
707 test results. If r_id is provided, then it collects the error messages
708 where the failure or test is associated with that requirement ID. If
709 r_id is None, then it collects all errors that occur on failures and
710 results that are not mapped to requirements
715 return r_id in req_ids(item)
717 return not req_ids(item)
719 errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
720 return [e for e in errors if e]
723 def relative_paths(base_dir, paths):
724 return [os.path.relpath(p, base_dir) for p in paths]
727 def generate_json(outpath, template_path, categories):
729 Creates a JSON summary of the entire test run.
731 reqs = load_current_requirements()
734 "template_directory": os.path.splitdrive(template_path)[1].replace(
737 "timestamp": make_iso_timestamp(),
738 "checksum": hash_directory(template_path),
739 "categories": categories,
740 "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
745 results = data["tests"]
746 for result in COLLECTION_FAILURES:
750 "test_module": result["module"],
751 "test_case": result["test"],
753 "error": result["error"],
754 "requirements": result["requirements"],
757 for result in ALL_RESULTS:
760 "files": relative_paths(template_path, result.files),
761 "test_module": result.test_module,
762 "test_case": result.test_case,
763 "result": result.outcome,
764 "error": result.error_message if result.is_failed else "",
765 "requirements": result.requirements_metadata(reqs),
769 requirements = data["requirements"]
770 for r_id, r_data in reqs.items():
771 result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
776 "text": r_data["description"],
777 "keyword": r_data["keyword"],
779 "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
782 # If there are tests that aren't mapped to a requirement, then we'll
783 # map them to a special entry so the results are coherent.
784 unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
785 has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
786 if unmapped_outcomes or has_errors:
790 "text": "Tests not mapped to requirements (see tests)",
791 "result": aggregate_results(has_errors, unmapped_outcomes),
792 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
796 report_path = os.path.join(outpath, "report.json")
797 write_json(data, report_path)
800 def generate_html_report(outpath, categories, template_path, failures):
801 reqs = load_current_requirements()
802 resolutions = load_resolutions_file()
804 for failure in failures:
807 "file_links": make_href(failure.files),
808 "test_id": failure.test_module,
809 "error_message": failure.error_message,
810 "raw_output": failure.raw_output,
811 "requirements": docutils.core.publish_parts(
812 writer_name="html", source=failure.requirement_text(reqs)
814 "resolution_steps": failure.resolution_steps(resolutions),
817 pkg_dir = os.path.split(__file__)[0]
818 j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
819 with open(j2_template_path, "r") as f:
820 report_template = jinja2.Template(f.read())
821 contents = report_template.render(
822 version=version.VERSION,
823 num_failures=len(failures) + len(COLLECTION_FAILURES),
824 categories=categories,
825 template_dir=make_href(template_path),
826 checksum=hash_directory(template_path),
827 timestamp=make_timestamp(),
829 collection_failures=COLLECTION_FAILURES,
831 with open(os.path.join(outpath, "report.html"), "w") as f:
835 def pytest_addoption(parser):
837 Add needed CLI arguments
840 "--template-directory",
843 help="Directory which holds the templates for validation",
848 dest="template_source",
850 help="Source Directory which holds the templates for validation",
857 help="Test the unit tests against their fixtured data",
862 dest="report_format",
864 help="Format of output report (html, csv, excel, json)",
868 "--continue-on-failure",
869 dest="continue_on_failure",
871 help="Continue validation even when structural errors exist in input files",
875 "--output-directory",
884 dest="test_categories",
886 help="optional category of test to execute",
890 def pytest_configure(config):
892 Ensure that we are receive either `--self-test` or
893 `--template-dir=<directory` as CLI arguments
895 if config.getoption("template_dir") and config.getoption("self_test"):
896 raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
898 config.getoption("template_dir")
899 or config.getoption("self_test")
900 or config.getoption("help")
902 raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
905 def pytest_generate_tests(metafunc):
907 If a unit test requires an argument named 'filename'
908 we generate a test for the filenames selected. Either
909 the files contained in `template_dir` or if `template_dir`
910 is not specified on the CLI, the fixtures associated with this
914 # noinspection PyBroadException
916 if "filename" in metafunc.fixturenames:
917 from .parametrizers import parametrize_filename
919 parametrize_filename(metafunc)
921 if "filenames" in metafunc.fixturenames:
922 from .parametrizers import parametrize_filenames
924 parametrize_filenames(metafunc)
926 if "template_dir" in metafunc.fixturenames:
927 from .parametrizers import parametrize_template_dir
929 parametrize_template_dir(metafunc)
931 if "environment_pair" in metafunc.fixturenames:
932 from .parametrizers import parametrize_environment_pair
934 parametrize_environment_pair(metafunc)
936 if "heat_volume_pair" in metafunc.fixturenames:
937 from .parametrizers import parametrize_heat_volume_pair
939 parametrize_heat_volume_pair(metafunc)
941 if "yaml_files" in metafunc.fixturenames:
942 from .parametrizers import parametrize_yaml_files
944 parametrize_yaml_files(metafunc)
946 if "env_files" in metafunc.fixturenames:
947 from .parametrizers import parametrize_environment_files
949 parametrize_environment_files(metafunc)
951 if "yaml_file" in metafunc.fixturenames:
952 from .parametrizers import parametrize_yaml_file
954 parametrize_yaml_file(metafunc)
956 if "env_file" in metafunc.fixturenames:
957 from .parametrizers import parametrize_environment_file
959 parametrize_environment_file(metafunc)
961 if "parsed_yaml_file" in metafunc.fixturenames:
962 from .parametrizers import parametrize_parsed_yaml_file
964 parametrize_parsed_yaml_file(metafunc)
966 if "parsed_environment_file" in metafunc.fixturenames:
967 from .parametrizers import parametrize_parsed_environment_file
969 parametrize_parsed_environment_file(metafunc)
971 if "heat_template" in metafunc.fixturenames:
972 from .parametrizers import parametrize_heat_template
974 parametrize_heat_template(metafunc)
976 if "heat_templates" in metafunc.fixturenames:
977 from .parametrizers import parametrize_heat_templates
979 parametrize_heat_templates(metafunc)
981 if "volume_template" in metafunc.fixturenames:
982 from .parametrizers import parametrize_volume_template
984 parametrize_volume_template(metafunc)
986 if "volume_templates" in metafunc.fixturenames:
987 from .parametrizers import parametrize_volume_templates
989 parametrize_volume_templates(metafunc)
991 if "template" in metafunc.fixturenames:
992 from .parametrizers import parametrize_template
994 parametrize_template(metafunc)
996 if "templates" in metafunc.fixturenames:
997 from .parametrizers import parametrize_templates
999 parametrize_templates(metafunc)
1000 except Exception as e:
1001 # If an error occurs in the collection phase, then it won't be logged as a
1002 # normal test failure. This means that failures could occur, but not
1003 # be seen on the report resulting in a false positive success message. These
1004 # errors will be stored and reported separately on the report
1005 COLLECTION_FAILURES.append(
1007 "module": metafunc.module.__name__,
1008 "test": metafunc.function.__name__,
1009 "fixtures": metafunc.fixturenames,
1010 "error": traceback.format_exc(),
1011 "requirements": getattr(metafunc.function, "requirement_ids", []),
1017 def hash_directory(path):
1019 Create md5 hash using the contents of all files under ``path``
1020 :param path: string directory containing files
1021 :return: string MD5 hash code (hex)
1024 for dir_path, sub_dirs, filenames in os.walk(path):
1025 for filename in filenames:
1026 file_path = os.path.join(dir_path, filename)
1027 with open(file_path, "rb") as f:
1028 md5.update(f.read())
1029 return md5.hexdigest()
1032 def load_current_requirements():
1033 """Loads dict of current requirements or empty dict if file doesn't exist"""
1034 with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1036 version = data["current_version"]
1037 return data["versions"][version]["needs"]
1040 def select_heat_requirements(reqs):
1041 """Filters dict requirements to only those requirements pertaining to Heat"""
1042 return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1045 def is_testable(reqs):
1046 """Filters dict requirements to only those which are testable"""
1047 for key, values in reqs.items():
1048 if (("MUST" in values.get("keyword", "").upper()) and (
1049 "none" not in values.get("validation_mode", "").lower()
1051 reqs[key]["testable"] = True
1053 reqs[key]["testable"] = False
1057 def build_rst_json(reqs):
1058 """Takes requirements and returns list of only Heat requirements"""
1059 for key, values in list(reqs.items()):
1060 if values["testable"]:
1061 # Creates links in RST format to requirements and test cases
1062 if values["test_case"]:
1063 mod = values["test_case"].split(".")[-1]
1064 val = TEST_SCRIPT_SITE + mod + ".py"
1065 rst_value = "`" + mod + " <" + val + ">`_"
1071 + values["docname"].replace(" ", "%20")
1076 reqs[key].update({"full_title": title, "test_case": rst_value})
1083 + values["docname"].replace(" ", "%20")
1090 "full_title": title,
1091 "test_case": "No test for requirement",
1092 "validated_by": "static",
1100 def generate_rst_table(output_dir, data):
1101 """Generate a formatted csv to be used in RST"""
1102 rst_path = os.path.join(output_dir, "rst.csv")
1103 with open(rst_path, "w", newline="") as f:
1105 out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1106 for req_id, metadata in data.items():
1109 metadata["full_title"],
1110 metadata["description"],
1111 metadata["test_case"],
1112 metadata["validated_by"],
1117 # noinspection PyUnusedLocal
1118 def pytest_report_collectionfinish(config, startdir, items):
1119 """Generates a simple traceability report to output/traceability.csv"""
1120 traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1121 output_dir = os.path.split(traceability_path)[0]
1122 if not os.path.exists(output_dir):
1123 os.makedirs(output_dir)
1124 reqs = load_current_requirements()
1125 requirements = select_heat_requirements(reqs)
1126 testable_requirements = is_testable(requirements)
1127 unmapped, mapped = partition(
1128 lambda i: hasattr(i.function, "requirement_ids"), items
1131 req_to_test = defaultdict(set)
1132 mapping_errors = set()
1134 for req_id in item.function.requirement_ids:
1135 if req_id not in req_to_test:
1136 req_to_test[req_id].add(item)
1137 if req_id in requirements:
1138 reqs[req_id].update(
1140 "test_case": item.function.__module__,
1141 "validated_by": item.function.__name__,
1144 if req_id not in requirements:
1146 (req_id, item.function.__module__, item.function.__name__)
1149 mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1150 with open(mapping_error_path, "w", newline="") as f:
1151 writer = csv.writer(f)
1152 for err in mapping_errors:
1153 writer.writerow(err)
1155 with open(traceability_path, "w", newline="") as f:
1169 for req_id, metadata in testable_requirements.items():
1170 if req_to_test[req_id]:
1171 for item in req_to_test[req_id]:
1175 metadata["description"],
1176 metadata["section_name"],
1177 metadata["keyword"],
1178 metadata["validation_mode"],
1179 metadata["testable"],
1180 item.function.__module__,
1181 item.function.__name__,
1188 metadata["description"],
1189 metadata["section_name"],
1190 metadata["keyword"],
1191 metadata["validation_mode"],
1192 metadata["testable"],
1197 # now write out any test methods that weren't mapped to requirements
1199 (item.function.__module__, item.function.__name__) for item in unmapped
1201 for test_module, test_name in unmapped_tests:
1208 "static", # validation mode
1215 generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))