2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
47 from collections import defaultdict
48 from itertools import chain
55 from more_itertools import partition
57 from six import string_types
61 __path__ = [os.path.dirname(os.path.abspath(__file__))]
63 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
65 RESOLUTION_STEPS_FILE = "resolution_steps.json"
66 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
69 ("Input File", "file"),
70 ("Test", "test_file"),
71 ("Requirements", "req_description"),
72 ("Resolution Steps", "resolution_steps"),
73 ("Error Message", "message"),
74 ("Raw Test Output", "raw_output"),
77 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
78 while preparing to validate the the input files. Some validations may not have been
79 executed. Please refer these issue to the VNF Validation Tool team.
82 COLLECTION_FAILURES = []
84 # Captures the results of every test run
88 def get_output_dir(config):
89 output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
90 if not os.path.exists(output_dir):
91 os.makedirs(output_dir, exist_ok=True)
95 def extract_error_msg(rep):
97 If a custom error message was provided, then extract it otherwise
98 just show the pytest assert message
100 if rep.outcome != "failed":
103 full_msg = str(rep.longrepr.reprcrash.message)
105 "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
107 if match: # custom message was provided
108 # Extract everything between AssertionError and the start
109 # of the assert statement expansion in the pytest report
112 msg = str(rep.longrepr.reprcrash)
113 if "AssertionError:" in msg:
114 msg = msg.split("AssertionError:")[1]
115 except AttributeError:
123 Wraps the test case and result to extract necessary metadata for
127 RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
129 def __init__(self, item, outcome):
131 self.result = outcome.get_result()
132 self.files = [os.path.normpath(p) for p in self._get_files()]
133 self.error_message = self._get_error_message()
136 def requirement_ids(self):
138 Returns list of requirement IDs mapped to the test case.
140 :return: Returns a list of string requirement IDs the test was
141 annotated with ``validates`` otherwise returns and empty list
143 is_mapped = hasattr(self.item.function, "requirement_ids")
144 return self.item.function.requirement_ids if is_mapped else []
149 :return: Returns a set of pytest marker names for the test or an empty set
151 return set(m.name for m in self.item.iter_markers())
154 def is_base_test(self):
156 :return: Returns True if the test is annotated with a pytest marker called base
158 return "base" in self.markers
163 :return: True if the test failed
165 return self.outcome == "FAIL"
170 :return: Returns 'PASS', 'FAIL', or 'SKIP'
172 return self.RESULT_MAPPING[self.result.outcome]
177 :return: Name of the test case method
179 return self.item.function.__name__
182 def test_module(self):
184 :return: Name of the file containing the test case
186 return self.item.function.__module__.split(".")[-1]
189 def raw_output(self):
191 :return: Full output from pytest for the given test case
193 return str(self.result.longrepr)
195 def requirement_text(self, curr_reqs):
197 Creates a text summary for the requirement IDs mapped to the test case.
198 If no requirements are mapped, then it returns the empty string.
200 :param curr_reqs: mapping of requirement IDs to requirement metadata
201 loaded from the VNFRQTS projects needs.json output
202 :return: ID and text of the requirements mapped to the test case
205 "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
206 for r_id in self.requirement_ids
210 def requirements_metadata(self, curr_reqs):
212 Returns a list of dicts containing the following metadata for each
216 - text: Full text of the requirement
217 - keyword: MUST, MUST NOT, MAY, etc.
219 :param curr_reqs: mapping of requirement IDs to requirement metadata
220 loaded from the VNFRQTS projects needs.json output
221 :return: List of requirement metadata
224 for r_id in self.requirement_ids:
225 if r_id not in curr_reqs:
230 "text": curr_reqs[r_id]["description"],
231 "keyword": curr_reqs[r_id]["keyword"],
236 def resolution_steps(self, resolutions):
238 :param resolutions: Loaded from contents for resolution_steps.json
239 :return: Header and text for the resolution step associated with this
240 test case. Returns empty string if no resolutions are
244 "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
245 for entry in resolutions
246 if self._match(entry)
250 def _match(self, resolution_entry):
252 Returns True if the test result maps to the given entry in
256 self.test_case == resolution_entry["function"]
257 and self.test_module == resolution_entry["module"]
260 def _get_files(self):
262 Extracts the list of files passed into the test case.
263 :return: List of absolute paths to files
265 if "environment_pair" in self.item.fixturenames:
267 "{} environment pair".format(
268 self.item.funcargs["environment_pair"]["name"]
271 elif "heat_volume_pair" in self.item.fixturenames:
273 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
275 elif "heat_templates" in self.item.fixturenames:
276 return self.item.funcargs["heat_templates"]
277 elif "yaml_files" in self.item.fixturenames:
278 return self.item.funcargs["yaml_files"]
280 return [self.result.nodeid.split("[")[1][:-1]]
282 def _get_error_message(self):
284 :return: Error message or empty string if the test did not fail or error
287 return extract_error_msg(self.result)
292 # noinspection PyUnusedLocal
293 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
294 def pytest_runtest_makereport(item, call):
296 Captures the test results for later reporting. This will also halt testing
297 if a base failure is encountered (can be overridden with continue-on-failure)
300 if outcome.get_result().when != "call":
301 return # only capture results of test cases themselves
302 result = TestResult(item, outcome)
303 ALL_RESULTS.append(result)
305 not item.config.option.continue_on_failure
306 and result.is_base_test
309 msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
312 pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
315 def make_timestamp():
317 :return: String make_iso_timestamp in format:
318 2019-01-19 10:18:49.865000 Central Standard Time
320 timezone = time.tzname[time.localtime().tm_isdst]
321 return "{} {}".format(str(datetime.datetime.now()), timezone)
324 # noinspection PyUnusedLocal
325 def pytest_sessionstart(session):
327 COLLECTION_FAILURES.clear()
330 # noinspection PyUnusedLocal
331 def pytest_sessionfinish(session, exitstatus):
333 If not a self-test run, generate the output reports
335 if not session.config.option.template_dir:
338 if session.config.option.template_source:
339 template_source = session.config.option.template_source[0]
341 template_source = os.path.abspath(session.config.option.template_dir[0])
343 categories_selected = session.config.option.test_categories or ""
345 get_output_dir(session.config),
348 session.config.option.report_format,
352 # noinspection PyUnusedLocal
353 def pytest_collection_modifyitems(session, config, items):
355 Selects tests based on the categories requested. Tests without
356 categories will always be executed.
358 config.traceability_items = list(items) # save all items for traceability
359 if not config.option.self_test:
361 # checking if test belongs to a category
362 if hasattr(item.function, "categories"):
363 if config.option.test_categories:
364 test_categories = getattr(item.function, "categories")
365 passed_categories = config.option.test_categories
367 category in passed_categories for category in test_categories
371 reason="Test categories do not match all the passed categories"
377 reason="Test belongs to a category but no categories were passed"
381 key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
385 def make_href(paths):
387 Create an anchor tag to link to the file paths provided.
388 :param paths: string or list of file paths
389 :return: String of hrefs - one for each path, each seperated by a line
392 paths = [paths] if isinstance(paths, string_types) else paths
395 abs_path = os.path.abspath(p)
396 name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
398 "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
399 abs_path=abs_path, name=name
402 return "<br/>".join(links)
405 def load_resolutions_file():
407 :return: dict of data loaded from resolutions_steps.json
409 resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
410 if os.path.exists(resolution_steps):
411 with open(resolution_steps, "r") as f:
412 return json.loads(f.read())
415 def generate_report(outpath, template_path, categories, output_format="html"):
417 Generates the various output reports.
419 :param outpath: destination directory for all reports
420 :param template_path: directory containing the Heat templates validated
421 :param categories: Optional categories selected
422 :param output_format: One of "html", "excel", or "csv". Default is "html"
423 :raises: ValueError if requested output format is unknown
425 failures = [r for r in ALL_RESULTS if r.is_failed]
426 generate_failure_file(outpath)
427 output_format = output_format.lower().strip() if output_format else "html"
428 if output_format == "html":
429 generate_html_report(outpath, categories, template_path, failures)
430 elif output_format == "excel":
431 generate_excel_report(outpath, categories, template_path, failures)
432 elif output_format == "json":
433 generate_json(outpath, template_path, categories)
434 elif output_format == "csv":
435 generate_csv_report(outpath, categories, template_path, failures)
437 raise ValueError("Unsupported output format: " + output_format)
440 def write_json(data, path):
442 Pretty print data as JSON to the output path requested
444 :param data: Data structure to be converted to JSON
445 :param path: Where to write output
447 with open(path, "w") as f:
448 json.dump(data, f, indent=2)
451 def generate_failure_file(outpath):
453 Writes a summary of test failures to a file named failures.
454 This is for backwards compatibility only. The report.json offers a
455 more comprehensive output.
457 failure_path = os.path.join(outpath, "failures")
458 failures = [r for r in ALL_RESULTS if r.is_failed]
460 for i, fail in enumerate(failures):
462 "file": fail.files[0] if len(fail.files) == 1 else fail.files,
463 "vnfrqts": fail.requirement_ids,
464 "test": fail.test_case,
465 "test_file": fail.test_module,
466 "raw_output": fail.raw_output,
467 "message": fail.error_message,
469 write_json(data, failure_path)
472 def generate_csv_report(output_dir, categories, template_path, failures):
473 rows = [["Validation Failures"]]
475 ("Categories Selected:", categories),
476 ("Tool Version:", version.VERSION),
477 ("Report Generated At:", make_timestamp()),
478 ("Directory Validated:", template_path),
479 ("Checksum:", hash_directory(template_path)),
480 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
483 for header in headers:
487 if COLLECTION_FAILURES:
488 rows.append([COLLECTION_FAILURE_WARNING])
489 rows.append(["Validation File", "Test", "Fixtures", "Error"])
490 for failure in COLLECTION_FAILURES:
495 ";".join(failure["fixtures"]),
502 rows.append([col for col, _ in REPORT_COLUMNS])
504 reqs = load_current_requirements()
505 resolutions = load_resolutions_file()
508 for failure in failures:
511 "\n".join(failure.files),
513 failure.requirement_text(reqs),
514 failure.resolution_steps(resolutions),
515 failure.error_message,
520 output_path = os.path.join(output_dir, "report.csv")
521 with open(output_path, "w", newline="") as f:
522 writer = csv.writer(f)
527 def generate_excel_report(output_dir, categories, template_path, failures):
528 output_path = os.path.join(output_dir, "report.xlsx")
529 workbook = xlsxwriter.Workbook(output_path)
530 bold = workbook.add_format({"bold": True})
531 code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
532 normal = workbook.add_format({"text_wrap": True})
533 heading = workbook.add_format({"bold": True, "font_size": 18})
534 worksheet = workbook.add_worksheet("failures")
535 worksheet.write(0, 0, "Validation Failures", heading)
538 ("Categories Selected:", ",".join(categories)),
539 ("Tool Version:", version.VERSION),
540 ("Report Generated At:", make_timestamp()),
541 ("Directory Validated:", template_path),
542 ("Checksum:", hash_directory(template_path)),
543 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
545 for row, (header, value) in enumerate(headers, start=2):
546 worksheet.write(row, 0, header, bold)
547 worksheet.write(row, 1, value)
549 worksheet.set_column(0, len(headers) - 1, 40)
550 worksheet.set_column(len(headers), len(headers), 80)
552 if COLLECTION_FAILURES:
553 collection_failures_start = 2 + len(headers) + 2
554 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
555 collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
556 for col_num, col_name in enumerate(collection_failure_headers):
557 worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
558 for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
559 worksheet.write(row, 0, data["module"])
560 worksheet.write(row, 1, data["test"])
561 worksheet.write(row, 2, ",".join(data["fixtures"]))
562 worksheet.write(row, 3, data["error"], code)
565 start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
566 worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
567 for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
568 worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
570 reqs = load_current_requirements()
571 resolutions = load_resolutions_file()
574 for row, failure in enumerate(failures, start=start_error_table_row + 2):
575 worksheet.write(row, 0, "\n".join(failure.files), normal)
576 worksheet.write(row, 1, failure.test_module, normal)
577 worksheet.write(row, 2, failure.requirement_text(reqs), normal)
578 worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
579 worksheet.write(row, 4, failure.error_message, normal)
580 worksheet.write(row, 5, failure.raw_output, code)
585 def make_iso_timestamp():
587 Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
589 now = datetime.datetime.utcnow()
590 now.replace(tzinfo=datetime.timezone.utc)
591 return now.isoformat()
594 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
596 Examines all tests associated with a given requirement and determines
597 the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
599 * ERROR - At least one ERROR occurred
600 * PASS - At least one PASS and no FAIL or ERRORs.
601 * FAIL - At least one FAIL occurred (no ERRORs)
602 * SKIP - All tests were SKIP
605 :param r_id: Requirement ID to examing
606 :param collection_failures: Errors that occurred during test setup.
607 :param test_results: List of TestResult
608 :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
610 errors = any(r_id in f["requirements"] for f in collection_failures)
611 outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
612 return aggregate_results(errors, outcomes, r_id)
615 def aggregate_results(has_errors, outcomes, r_id=None):
617 Determines the aggregate result for the conditions provided. Assumes the
618 results have been filtered and collected for analysis.
620 :param has_errors: True if collection failures occurred for the tests being
622 :param outcomes: set of outcomes from the TestResults
623 :param r_id: Optional requirement ID if known
624 :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
625 (see aggregate_requirement_adherence for more detail)
632 elif "FAIL" in outcomes:
634 elif "PASS" in outcomes:
636 elif {"SKIP"} == outcomes:
640 "Unexpected error aggregating outcomes ({}) for requirement {}".format(
647 def aggregate_run_results(collection_failures, test_results):
649 Determines overall status of run based on all failures and results.
651 * 'ERROR' - At least one collection failure occurred during the run.
652 * 'FAIL' - Template failed at least one test
653 * 'PASS' - All tests executed properly and no failures were detected
655 :param collection_failures: failures occuring during test setup
656 :param test_results: list of all test executuion results
657 :return: one of 'ERROR', 'FAIL', or 'PASS'
659 if collection_failures:
661 elif any(r.is_failed for r in test_results):
667 def error(failure_or_result):
669 Extracts the error message from a collection failure or test result
670 :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
671 :return: Error message as string
673 if isinstance(failure_or_result, TestResult):
674 return failure_or_result.error_message
676 return failure_or_result["error"]
679 def req_ids(failure_or_result):
681 Extracts the requirement IDs from a collection failure or test result
682 :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
683 :return: set of Requirement IDs. If no requirements mapped, then an empty set
685 if isinstance(failure_or_result, TestResult):
686 return set(failure_or_result.requirement_ids)
688 return set(failure_or_result["requirements"])
691 def collect_errors(r_id, collection_failures, test_result):
693 Creates a list of error messages from the collection failures and
694 test results. If r_id is provided, then it collects the error messages
695 where the failure or test is associated with that requirement ID. If
696 r_id is None, then it collects all errors that occur on failures and
697 results that are not mapped to requirements
702 return r_id in req_ids(item)
704 return not req_ids(item)
706 errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
707 return [e for e in errors if e]
710 def generate_json(outpath, template_path, categories):
712 Creates a JSON summary of the entire test run.
714 reqs = load_current_requirements()
717 "template_directory": template_path,
718 "timestamp": make_iso_timestamp(),
719 "checksum": hash_directory(template_path),
720 "categories": categories,
721 "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
726 results = data["tests"]
727 for result in COLLECTION_FAILURES:
731 "test_module": result["module"],
732 "test_case": result["test"],
734 "error": result["error"],
735 "requirements": result["requirements"],
738 for result in ALL_RESULTS:
741 "files": result.files,
742 "test_module": result.test_module,
743 "test_case": result.test_case,
744 "result": result.outcome,
745 "error": result.error_message if result.is_failed else "",
746 "requirements": result.requirements_metadata(reqs),
750 requirements = data["requirements"]
751 for r_id, r_data in reqs.items():
752 result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
757 "text": r_data["description"],
758 "keyword": r_data["keyword"],
760 "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
763 # If there are tests that aren't mapped to a requirement, then we'll
764 # map them to a special entry so the results are coherent.
765 unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
766 has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
767 if unmapped_outcomes or has_errors:
771 "text": "Tests not mapped to requirements (see tests)",
772 "result": aggregate_results(has_errors, unmapped_outcomes),
773 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
777 report_path = os.path.join(outpath, "report.json")
778 write_json(data, report_path)
781 def generate_html_report(outpath, categories, template_path, failures):
782 reqs = load_current_requirements()
783 resolutions = load_resolutions_file()
785 for failure in failures:
788 "file_links": make_href(failure.files),
789 "test_id": failure.test_module,
790 "error_message": failure.error_message,
791 "raw_output": failure.raw_output,
792 "requirements": docutils.core.publish_parts(
793 writer_name="html", source=failure.requirement_text(reqs)
795 "resolution_steps": failure.resolution_steps(resolutions),
798 pkg_dir = os.path.split(__file__)[0]
799 j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
800 with open(j2_template_path, "r") as f:
801 report_template = jinja2.Template(f.read())
802 contents = report_template.render(
803 version=version.VERSION,
804 num_failures=len(failures) + len(COLLECTION_FAILURES),
805 categories=categories,
806 template_dir=make_href(template_path),
807 checksum=hash_directory(template_path),
808 timestamp=make_timestamp(),
810 collection_failures=COLLECTION_FAILURES,
812 with open(os.path.join(outpath, "report.html"), "w") as f:
816 def pytest_addoption(parser):
818 Add needed CLI arguments
821 "--template-directory",
824 help="Directory which holds the templates for validation",
829 dest="template_source",
831 help="Source Directory which holds the templates for validation",
838 help="Test the unit tests against their fixtured data",
843 dest="report_format",
845 help="Format of output report (html, csv, excel, json)",
849 "--continue-on-failure",
850 dest="continue_on_failure",
852 help="Continue validation even when structural errors exist in input files",
856 "--output-directory",
865 dest="test_categories",
867 help="optional category of test to execute",
871 def pytest_configure(config):
873 Ensure that we are receive either `--self-test` or
874 `--template-dir=<directory` as CLI arguments
876 if config.getoption("template_dir") and config.getoption("self_test"):
877 raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
879 config.getoption("template_dir")
880 or config.getoption("self_test")
881 or config.getoption("help")
883 raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
886 def pytest_generate_tests(metafunc):
888 If a unit test requires an argument named 'filename'
889 we generate a test for the filenames selected. Either
890 the files contained in `template_dir` or if `template_dir`
891 is not specified on the CLI, the fixtures associated with this
895 # noinspection PyBroadException
897 if "filename" in metafunc.fixturenames:
898 from .parametrizers import parametrize_filename
900 parametrize_filename(metafunc)
902 if "filenames" in metafunc.fixturenames:
903 from .parametrizers import parametrize_filenames
905 parametrize_filenames(metafunc)
907 if "template_dir" in metafunc.fixturenames:
908 from .parametrizers import parametrize_template_dir
910 parametrize_template_dir(metafunc)
912 if "environment_pair" in metafunc.fixturenames:
913 from .parametrizers import parametrize_environment_pair
915 parametrize_environment_pair(metafunc)
917 if "heat_volume_pair" in metafunc.fixturenames:
918 from .parametrizers import parametrize_heat_volume_pair
920 parametrize_heat_volume_pair(metafunc)
922 if "yaml_files" in metafunc.fixturenames:
923 from .parametrizers import parametrize_yaml_files
925 parametrize_yaml_files(metafunc)
927 if "env_files" in metafunc.fixturenames:
928 from .parametrizers import parametrize_environment_files
930 parametrize_environment_files(metafunc)
932 if "yaml_file" in metafunc.fixturenames:
933 from .parametrizers import parametrize_yaml_file
935 parametrize_yaml_file(metafunc)
937 if "env_file" in metafunc.fixturenames:
938 from .parametrizers import parametrize_environment_file
940 parametrize_environment_file(metafunc)
942 if "parsed_yaml_file" in metafunc.fixturenames:
943 from .parametrizers import parametrize_parsed_yaml_file
945 parametrize_parsed_yaml_file(metafunc)
947 if "parsed_environment_file" in metafunc.fixturenames:
948 from .parametrizers import parametrize_parsed_environment_file
950 parametrize_parsed_environment_file(metafunc)
952 if "heat_template" in metafunc.fixturenames:
953 from .parametrizers import parametrize_heat_template
955 parametrize_heat_template(metafunc)
957 if "heat_templates" in metafunc.fixturenames:
958 from .parametrizers import parametrize_heat_templates
960 parametrize_heat_templates(metafunc)
962 if "volume_template" in metafunc.fixturenames:
963 from .parametrizers import parametrize_volume_template
965 parametrize_volume_template(metafunc)
967 if "volume_templates" in metafunc.fixturenames:
968 from .parametrizers import parametrize_volume_templates
970 parametrize_volume_templates(metafunc)
972 if "template" in metafunc.fixturenames:
973 from .parametrizers import parametrize_template
975 parametrize_template(metafunc)
977 if "templates" in metafunc.fixturenames:
978 from .parametrizers import parametrize_templates
980 parametrize_templates(metafunc)
981 except Exception as e:
982 # If an error occurs in the collection phase, then it won't be logged as a
983 # normal test failure. This means that failures could occur, but not
984 # be seen on the report resulting in a false positive success message. These
985 # errors will be stored and reported separately on the report
986 COLLECTION_FAILURES.append(
988 "module": metafunc.module.__name__,
989 "test": metafunc.function.__name__,
990 "fixtures": metafunc.fixturenames,
991 "error": traceback.format_exc(),
992 "requirements": getattr(metafunc.function, "requirement_ids", []),
998 def hash_directory(path):
1000 for dir_path, sub_dirs, filenames in os.walk(path):
1001 for filename in filenames:
1002 file_path = os.path.join(dir_path, filename)
1003 with open(file_path, "rb") as f:
1004 md5.update(f.read())
1005 return md5.hexdigest()
1008 def load_current_requirements():
1009 """Loads dict of current requirements or empty dict if file doesn't exist"""
1010 with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1012 version = data["current_version"]
1013 return data["versions"][version]["needs"]
1016 def compat_open(path):
1017 """Invokes open correctly depending on the Python version"""
1018 if sys.version_info.major < 3:
1019 return open(path, "wb")
1021 return open(path, "w", newline="")
1024 def unicode_writerow(writer, row):
1025 if sys.version_info.major < 3:
1026 row = [s.encode("utf8") for s in row]
1027 writer.writerow(row)
1030 def parse_heat_requirements(reqs):
1031 """Takes requirements and returns list of only Heat requirements"""
1032 data = json.loads(reqs)
1033 for key, values in list(data.items()):
1034 if "Heat" in (values["docname"]):
1035 if "MUST" not in (values["keyword"]):
1038 if "none" in (values["validation_mode"]):
1045 # noinspection PyUnusedLocal
1046 def pytest_report_collectionfinish(config, startdir, items):
1047 """Generates a simple traceability report to output/traceability.csv"""
1048 traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
1049 output_dir = os.path.split(traceability_path)[0]
1050 if not os.path.exists(output_dir):
1051 os.makedirs(output_dir)
1052 reqs = load_current_requirements()
1053 reqs = json.dumps(reqs)
1054 requirements = parse_heat_requirements(reqs)
1055 unmapped, mapped = partition(
1056 lambda i: hasattr(i.function, "requirement_ids"), items
1059 req_to_test = defaultdict(set)
1060 mapping_errors = set()
1062 for req_id in item.function.requirement_ids:
1063 if req_id not in req_to_test:
1064 req_to_test[req_id].add(item)
1065 if req_id not in requirements:
1067 (req_id, item.function.__module__, item.function.__name__)
1070 mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
1071 with compat_open(mapping_error_path) as f:
1072 writer = csv.writer(f)
1073 for err in mapping_errors:
1074 unicode_writerow(writer, err)
1076 with compat_open(traceability_path) as f:
1080 ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
1082 for req_id, metadata in requirements.items():
1083 if req_to_test[req_id]:
1084 for item in req_to_test[req_id]:
1089 metadata["description"],
1090 metadata["section_name"],
1091 item.function.__module__,
1092 item.function.__name__,
1098 (req_id, metadata["description"], metadata["section_name"], "", ""),
1100 # now write out any test methods that weren't mapped to requirements
1101 for item in unmapped:
1103 out, ("", "", "", item.function.__module__, item.function.__name__)