2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
46 from collections import defaultdict
53 from more_itertools import partition
55 from six import string_types
57 # noinspection PyUnresolvedReferences
61 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
63 __path__ = [os.path.dirname(os.path.abspath(__file__))]
65 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
67 RESOLUTION_STEPS_FILE = "resolution_steps.json"
68 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
70 "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
73 "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
77 ("Input File", "file"),
78 ("Test", "test_file"),
79 ("Requirements", "req_description"),
80 ("Resolution Steps", "resolution_steps"),
81 ("Error Message", "message"),
82 ("Raw Test Output", "raw_output"),
85 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
86 while preparing to validate the the input files. Some validations may not have been
87 executed. Please refer these issue to the VNF Validation Tool team.
90 COLLECTION_FAILURES = []
92 # Captures the results of every test run
96 def get_output_dir(config):
98 Retrieve the output directory for the reports and create it if necessary
99 :param config: pytest configuration
100 :return: output directory as string
102 output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
103 if not os.path.exists(output_dir):
104 os.makedirs(output_dir, exist_ok=True)
108 def extract_error_msg(rep):
110 If a custom error message was provided, then extract it otherwise
111 just show the pytest assert message
113 if rep.outcome != "failed":
116 full_msg = str(rep.longrepr.reprcrash.message)
118 "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
120 if match: # custom message was provided
121 # Extract everything between AssertionError and the start
122 # of the assert statement expansion in the pytest report
125 msg = str(rep.longrepr.reprcrash)
126 if "AssertionError:" in msg:
127 msg = msg.split("AssertionError:")[1]
128 except AttributeError:
136 Wraps the test case and result to extract necessary metadata for
140 RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
142 def __init__(self, item, outcome):
144 self.result = outcome.get_result()
145 self.files = [os.path.normpath(p) for p in self._get_files()]
146 self.error_message = self._get_error_message()
149 def requirement_ids(self):
151 Returns list of requirement IDs mapped to the test case.
153 :return: Returns a list of string requirement IDs the test was
154 annotated with ``validates`` otherwise returns and empty list
156 is_mapped = hasattr(self.item.function, "requirement_ids")
157 return self.item.function.requirement_ids if is_mapped else []
162 :return: Returns a set of pytest marker names for the test or an empty set
164 return set(m.name for m in self.item.iter_markers())
167 def is_base_test(self):
169 :return: Returns True if the test is annotated with a pytest marker called base
171 return "base" in self.markers
176 :return: True if the test failed
178 return self.outcome == "FAIL"
183 :return: Returns 'PASS', 'FAIL', or 'SKIP'
185 return self.RESULT_MAPPING[self.result.outcome]
190 :return: Name of the test case method
192 return self.item.function.__name__
195 def test_module(self):
197 :return: Name of the file containing the test case
199 return self.item.function.__module__.split(".")[-1]
204 :return: ID of the test (test_module + test_case)
206 return "{}::{}".format(self.test_module, self.test_case)
209 def raw_output(self):
211 :return: Full output from pytest for the given test case
213 return str(self.result.longrepr)
215 def requirement_text(self, curr_reqs):
217 Creates a text summary for the requirement IDs mapped to the test case.
218 If no requirements are mapped, then it returns the empty string.
220 :param curr_reqs: mapping of requirement IDs to requirement metadata
221 loaded from the VNFRQTS projects needs.json output
222 :return: ID and text of the requirements mapped to the test case
225 "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
226 for r_id in self.requirement_ids
231 def requirements_metadata(self, curr_reqs):
233 Returns a list of dicts containing the following metadata for each
237 - text: Full text of the requirement
238 - keyword: MUST, MUST NOT, MAY, etc.
240 :param curr_reqs: mapping of requirement IDs to requirement metadata
241 loaded from the VNFRQTS projects needs.json output
242 :return: List of requirement metadata
245 for r_id in self.requirement_ids:
246 if r_id not in curr_reqs:
251 "text": curr_reqs[r_id]["description"],
252 "keyword": curr_reqs[r_id]["keyword"],
257 def resolution_steps(self, resolutions):
259 :param resolutions: Loaded from contents for resolution_steps.json
260 :return: Header and text for the resolution step associated with this
261 test case. Returns empty string if no resolutions are
265 "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
266 for entry in resolutions
267 if self._match(entry)
271 def _match(self, resolution_entry):
273 Returns True if the test result maps to the given entry in
277 self.test_case == resolution_entry["function"]
278 and self.test_module == resolution_entry["module"]
281 def _get_files(self):
283 Extracts the list of files passed into the test case.
284 :return: List of absolute paths to files
286 if "environment_pair" in self.item.fixturenames:
288 "{} environment pair".format(
289 self.item.funcargs["environment_pair"]["name"]
292 elif "heat_volume_pair" in self.item.fixturenames:
294 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
296 elif "heat_templates" in self.item.fixturenames:
297 return self.item.funcargs["heat_templates"]
298 elif "yaml_files" in self.item.fixturenames:
299 return self.item.funcargs["yaml_files"]
301 parts = self.result.nodeid.split("[")
302 return [""] if len(parts) == 1 else [parts[1][:-1]]
304 def _get_error_message(self):
306 :return: Error message or empty string if the test did not fail or error
309 return extract_error_msg(self.result)
314 # noinspection PyUnusedLocal
315 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
316 def pytest_runtest_makereport(item, call):
318 Captures the test results for later reporting. This will also halt testing
319 if a base failure is encountered (can be overridden with continue-on-failure)
322 if outcome.get_result().when != "call":
323 return # only capture results of test cases themselves
324 result = TestResult(item, outcome)
326 not item.config.option.continue_on_failure
327 and result.is_base_test
330 msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
333 result.error_message = msg
334 ALL_RESULTS.append(result)
335 pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
337 ALL_RESULTS.append(result)
340 def make_timestamp():
342 :return: String make_iso_timestamp in format:
343 2019-01-19 10:18:49.865000 Central Standard Time
345 timezone = time.tzname[time.localtime().tm_isdst]
346 return "{} {}".format(str(datetime.datetime.now()), timezone)
349 # noinspection PyUnusedLocal
350 def pytest_sessionstart(session):
352 COLLECTION_FAILURES.clear()
355 # noinspection PyUnusedLocal
356 def pytest_sessionfinish(session, exitstatus):
358 If not a self-test run, generate the output reports
360 if not session.config.option.template_dir:
363 if session.config.option.template_source:
364 template_source = session.config.option.template_source[0]
366 template_source = os.path.abspath(session.config.option.template_dir[0])
368 categories_selected = session.config.option.test_categories or ""
370 get_output_dir(session.config),
373 session.config.option.report_format,
377 # noinspection PyUnusedLocal
378 def pytest_collection_modifyitems(session, config, items):
380 Selects tests based on the categories requested. Tests without
381 categories will always be executed.
383 config.traceability_items = list(items) # save all items for traceability
384 if not config.option.self_test:
386 # checking if test belongs to a category
387 if hasattr(item.function, "categories"):
388 if config.option.test_categories:
389 test_categories = getattr(item.function, "categories")
390 passed_categories = config.option.test_categories
392 category in passed_categories for category in test_categories
397 "Test categories do not match "
398 "all the passed categories"
406 "Test belongs to a category but "
407 "no categories were passed"
413 key=lambda x: (0, x.name)
414 if "base" in set(m.name for m in x.iter_markers())
419 def make_href(paths):
421 Create an anchor tag to link to the file paths provided.
422 :param paths: string or list of file paths
423 :return: String of hrefs - one for each path, each seperated by a line
426 paths = [paths] if isinstance(paths, string_types) else paths
429 abs_path = os.path.abspath(p)
430 name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
432 "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
433 abs_path=abs_path, name=name
436 return "<br/>".join(links)
439 def load_resolutions_file():
441 :return: dict of data loaded from resolutions_steps.json
443 resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
444 if os.path.exists(resolution_steps):
445 with open(resolution_steps, "r") as f:
446 return json.loads(f.read())
449 def generate_report(outpath, template_path, categories, output_format="html"):
451 Generates the various output reports.
453 :param outpath: destination directory for all reports
454 :param template_path: directory containing the Heat templates validated
455 :param categories: Optional categories selected
456 :param output_format: One of "html", "excel", or "csv". Default is "html"
457 :raises: ValueError if requested output format is unknown
459 failures = [r for r in ALL_RESULTS if r.is_failed]
460 generate_failure_file(outpath)
461 output_format = output_format.lower().strip() if output_format else "html"
462 generate_json(outpath, template_path, categories)
463 if output_format == "html":
464 generate_html_report(outpath, categories, template_path, failures)
465 elif output_format == "excel":
466 generate_excel_report(outpath, categories, template_path, failures)
467 elif output_format == "json":
469 elif output_format == "csv":
470 generate_csv_report(outpath, categories, template_path, failures)
472 raise ValueError("Unsupported output format: " + output_format)
475 def write_json(data, path):
477 Pretty print data as JSON to the output path requested
479 :param data: Data structure to be converted to JSON
480 :param path: Where to write output
482 with open(path, "w") as f:
483 json.dump(data, f, indent=2)
486 def generate_failure_file(outpath):
488 Writes a summary of test failures to a file named failures.
489 This is for backwards compatibility only. The report.json offers a
490 more comprehensive output.
492 failure_path = os.path.join(outpath, "failures")
493 failures = [r for r in ALL_RESULTS if r.is_failed]
495 for i, fail in enumerate(failures):
497 "file": fail.files[0] if len(fail.files) == 1 else fail.files,
498 "vnfrqts": fail.requirement_ids,
499 "test": fail.test_case,
500 "test_file": fail.test_module,
501 "raw_output": fail.raw_output,
502 "message": fail.error_message,
504 write_json(data, failure_path)
507 def generate_csv_report(output_dir, categories, template_path, failures):
508 rows = [["Validation Failures"]]
510 ("Categories Selected:", categories),
511 ("Tool Version:", version.VERSION),
512 ("Report Generated At:", make_timestamp()),
513 ("Directory Validated:", template_path),
514 ("Checksum:", hash_directory(template_path)),
515 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
518 for header in headers:
522 if COLLECTION_FAILURES:
523 rows.append([COLLECTION_FAILURE_WARNING])
524 rows.append(["Validation File", "Test", "Fixtures", "Error"])
525 for failure in COLLECTION_FAILURES:
530 ";".join(failure["fixtures"]),
537 rows.append([col for col, _ in REPORT_COLUMNS])
539 reqs = load_current_requirements()
540 resolutions = load_resolutions_file()
543 for failure in failures:
546 "\n".join(failure.files),
548 failure.requirement_text(reqs),
549 failure.resolution_steps(resolutions),
550 failure.error_message,
555 output_path = os.path.join(output_dir, "report.csv")
556 with open(output_path, "w", newline="") as f:
557 writer = csv.writer(f)
562 def generate_excel_report(output_dir, categories, template_path, failures):
563 output_path = os.path.join(output_dir, "report.xlsx")
564 workbook = xlsxwriter.Workbook(output_path)
565 bold = workbook.add_format({"bold": True})
566 code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
567 normal = workbook.add_format({"text_wrap": True})
568 heading = workbook.add_format({"bold": True, "font_size": 18})
569 worksheet = workbook.add_worksheet("failures")
570 worksheet.write(0, 0, "Validation Failures", heading)
573 ("Categories Selected:", ",".join(categories)),
574 ("Tool Version:", version.VERSION),
575 ("Report Generated At:", make_timestamp()),
576 ("Directory Validated:", template_path),
577 ("Checksum:", hash_directory(template_path)),
578 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
580 for row, (header, value) in enumerate(headers, start=2):
581 worksheet.write(row, 0, header, bold)
582 worksheet.write(row, 1, value)
584 worksheet.set_column(0, len(headers) - 1, 40)
585 worksheet.set_column(len(headers), len(headers), 80)
587 if COLLECTION_FAILURES:
588 collection_failures_start = 2 + len(headers) + 2
589 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
590 collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
591 for col_num, col_name in enumerate(collection_failure_headers):
592 worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
593 for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
594 worksheet.write(row, 0, data["module"])
595 worksheet.write(row, 1, data["test"])
596 worksheet.write(row, 2, ",".join(data["fixtures"]))
597 worksheet.write(row, 3, data["error"], code)
600 start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
601 worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
602 for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
603 worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
605 reqs = load_current_requirements()
606 resolutions = load_resolutions_file()
609 for row, failure in enumerate(failures, start=start_error_table_row + 2):
610 worksheet.write(row, 0, "\n".join(failure.files), normal)
611 worksheet.write(row, 1, failure.test_id, normal)
612 worksheet.write(row, 2, failure.requirement_text(reqs), normal)
613 worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
614 worksheet.write(row, 4, failure.error_message, normal)
615 worksheet.write(row, 5, failure.raw_output, code)
620 def make_iso_timestamp():
622 Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
624 now = datetime.datetime.utcnow()
625 now.replace(tzinfo=datetime.timezone.utc)
626 return now.isoformat()
629 def aggregate_results(outcomes, r_id=None):
631 Determines the aggregate result for the conditions provided. Assumes the
632 results have been filtered and collected for analysis.
634 :param outcomes: set of outcomes from the TestResults
635 :param r_id: Optional requirement ID if known
636 :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
637 (see aggregate_requirement_adherence for more detail)
641 elif "ERROR" in outcomes:
643 elif "FAIL" in outcomes:
645 elif "PASS" in outcomes:
647 elif {"SKIP"} == outcomes:
651 "Unexpected error aggregating outcomes ({}) for requirement {}".format(
658 def aggregate_run_results(collection_failures, test_results):
660 Determines overall status of run based on all failures and results.
662 * 'ERROR' - At least one collection failure occurred during the run.
663 * 'FAIL' - Template failed at least one test
664 * 'PASS' - All tests executed properly and no failures were detected
666 :param collection_failures: failures occuring during test setup
667 :param test_results: list of all test executuion results
668 :return: one of 'ERROR', 'FAIL', or 'PASS'
670 if collection_failures:
672 elif any(r.is_failed for r in test_results):
678 def relative_paths(base_dir, paths):
679 return [os.path.relpath(p, base_dir) for p in paths]
682 # noinspection PyTypeChecker
683 def generate_json(outpath, template_path, categories):
685 Creates a JSON summary of the entire test run.
687 reqs = load_current_requirements()
690 "template_directory": os.path.splitdrive(template_path)[1].replace(
693 "timestamp": make_iso_timestamp(),
694 "checksum": hash_directory(template_path),
695 "categories": categories,
696 "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
701 results = data["tests"]
702 for result in COLLECTION_FAILURES:
706 "test_module": result["module"],
707 "test_case": result["test"],
709 "error": result["error"],
710 "requirements": result["requirements"],
713 for result in ALL_RESULTS:
716 "files": relative_paths(template_path, result.files),
717 "test_module": result.test_module,
718 "test_case": result.test_case,
719 "result": result.outcome,
720 "error": result.error_message if result.is_failed else "",
721 "requirements": result.requirements_metadata(reqs),
725 # Build a mapping of requirement ID to the results
726 r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
727 for test_result in results:
728 test_reqs = test_result["requirements"]
730 [r["id"] if isinstance(r, dict) else r for r in test_reqs]
735 item = r_id_results[r_id]
736 item["outcomes"].add(test_result["result"])
737 if test_result["error"]:
738 item["errors"].add(test_result["error"])
740 requirements = data["requirements"]
741 for r_id, r_data in reqs.items():
745 "text": r_data["description"],
746 "keyword": r_data["keyword"],
747 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
748 "errors": list(r_id_results[r_id]["errors"]),
752 if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
756 "text": "Tests not mapped to requirements (see tests)",
757 "result": aggregate_results(r_id_results[""]["outcomes"]),
758 "errors": list(r_id_results[""]["errors"]),
762 report_path = os.path.join(outpath, "report.json")
763 write_json(data, report_path)
766 def generate_html_report(outpath, categories, template_path, failures):
767 reqs = load_current_requirements()
768 resolutions = load_resolutions_file()
770 for failure in failures:
773 "file_links": make_href(failure.files),
774 "test_id": failure.test_id,
775 "error_message": failure.error_message,
776 "raw_output": failure.raw_output,
777 "requirements": docutils.core.publish_parts(
778 writer_name="html", source=failure.requirement_text(reqs)
780 "resolution_steps": failure.resolution_steps(resolutions),
783 pkg_dir = os.path.split(__file__)[0]
784 j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
785 with open(j2_template_path, "r") as f:
786 report_template = jinja2.Template(f.read())
787 contents = report_template.render(
788 version=version.VERSION,
789 num_failures=len(failures) + len(COLLECTION_FAILURES),
790 categories=categories,
791 template_dir=make_href(template_path),
792 checksum=hash_directory(template_path),
793 timestamp=make_timestamp(),
795 collection_failures=COLLECTION_FAILURES,
797 with open(os.path.join(outpath, "report.html"), "w") as f:
801 def pytest_addoption(parser):
803 Add needed CLI arguments
806 "--template-directory",
809 help="Directory which holds the templates for validation",
814 dest="template_source",
816 help="Source Directory which holds the templates for validation",
823 help="Test the unit tests against their fixtured data",
828 dest="report_format",
830 help="Format of output report (html, csv, excel, json)",
834 "--continue-on-failure",
835 dest="continue_on_failure",
837 help="Continue validation even when structural errors exist in input files",
841 "--output-directory",
850 dest="test_categories",
852 help="optional category of test to execute",
856 def pytest_configure(config):
858 Ensure that we are receive either `--self-test` or
859 `--template-dir=<directory` as CLI arguments
861 if config.getoption("template_dir") and config.getoption("self_test"):
862 raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
864 config.getoption("template_dir")
865 or config.getoption("self_test")
866 or config.getoption("help")
868 raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
871 def pytest_generate_tests(metafunc):
873 If a unit test requires an argument named 'filename'
874 we generate a test for the filenames selected. Either
875 the files contained in `template_dir` or if `template_dir`
876 is not specified on the CLI, the fixtures associated with this
880 # noinspection PyBroadException
882 if "filename" in metafunc.fixturenames:
883 from .parametrizers import parametrize_filename
885 parametrize_filename(metafunc)
887 if "filenames" in metafunc.fixturenames:
888 from .parametrizers import parametrize_filenames
890 parametrize_filenames(metafunc)
892 if "template_dir" in metafunc.fixturenames:
893 from .parametrizers import parametrize_template_dir
895 parametrize_template_dir(metafunc)
897 if "environment_pair" in metafunc.fixturenames:
898 from .parametrizers import parametrize_environment_pair
900 parametrize_environment_pair(metafunc)
902 if "heat_volume_pair" in metafunc.fixturenames:
903 from .parametrizers import parametrize_heat_volume_pair
905 parametrize_heat_volume_pair(metafunc)
907 if "yaml_files" in metafunc.fixturenames:
908 from .parametrizers import parametrize_yaml_files
910 parametrize_yaml_files(metafunc)
912 if "env_files" in metafunc.fixturenames:
913 from .parametrizers import parametrize_environment_files
915 parametrize_environment_files(metafunc)
917 if "yaml_file" in metafunc.fixturenames:
918 from .parametrizers import parametrize_yaml_file
920 parametrize_yaml_file(metafunc)
922 if "env_file" in metafunc.fixturenames:
923 from .parametrizers import parametrize_environment_file
925 parametrize_environment_file(metafunc)
927 if "parsed_yaml_file" in metafunc.fixturenames:
928 from .parametrizers import parametrize_parsed_yaml_file
930 parametrize_parsed_yaml_file(metafunc)
932 if "parsed_environment_file" in metafunc.fixturenames:
933 from .parametrizers import parametrize_parsed_environment_file
935 parametrize_parsed_environment_file(metafunc)
937 if "heat_template" in metafunc.fixturenames:
938 from .parametrizers import parametrize_heat_template
940 parametrize_heat_template(metafunc)
942 if "heat_templates" in metafunc.fixturenames:
943 from .parametrizers import parametrize_heat_templates
945 parametrize_heat_templates(metafunc)
947 if "volume_template" in metafunc.fixturenames:
948 from .parametrizers import parametrize_volume_template
950 parametrize_volume_template(metafunc)
952 if "volume_templates" in metafunc.fixturenames:
953 from .parametrizers import parametrize_volume_templates
955 parametrize_volume_templates(metafunc)
957 if "template" in metafunc.fixturenames:
958 from .parametrizers import parametrize_template
960 parametrize_template(metafunc)
962 if "templates" in metafunc.fixturenames:
963 from .parametrizers import parametrize_templates
965 parametrize_templates(metafunc)
966 except Exception as e:
967 # If an error occurs in the collection phase, then it won't be logged as a
968 # normal test failure. This means that failures could occur, but not
969 # be seen on the report resulting in a false positive success message. These
970 # errors will be stored and reported separately on the report
971 COLLECTION_FAILURES.append(
973 "module": metafunc.module.__name__,
974 "test": metafunc.function.__name__,
975 "fixtures": metafunc.fixturenames,
976 "error": traceback.format_exc(),
977 "requirements": getattr(metafunc.function, "requirement_ids", []),
983 def hash_directory(path):
985 Create md5 hash using the contents of all files under ``path``
986 :param path: string directory containing files
987 :return: string MD5 hash code (hex)
990 for dir_path, sub_dirs, filenames in os.walk(path):
991 for filename in filenames:
992 file_path = os.path.join(dir_path, filename)
993 with open(file_path, "rb") as f:
995 return md5.hexdigest()
998 def load_current_requirements():
999 """Loads dict of current requirements or empty dict if file doesn't exist"""
1000 with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1002 version = data["current_version"]
1003 return data["versions"][version]["needs"]
1006 def select_heat_requirements(reqs):
1007 """Filters dict requirements to only those requirements pertaining to Heat"""
1008 return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1011 def is_testable(reqs):
1012 """Filters dict requirements to only those which are testable"""
1013 for key, values in reqs.items():
1014 if ("MUST" in values.get("keyword", "").upper()) and (
1015 "none" not in values.get("validation_mode", "").lower()
1017 reqs[key]["testable"] = True
1019 reqs[key]["testable"] = False
1023 def build_rst_json(reqs):
1024 """Takes requirements and returns list of only Heat requirements"""
1025 for key, values in list(reqs.items()):
1026 if values["testable"]:
1027 # Creates links in RST format to requirements and test cases
1028 if values["test_case"]:
1029 mod = values["test_case"].split(".")[-1]
1030 val = TEST_SCRIPT_SITE + mod + ".py"
1031 rst_value = "`" + mod + " <" + val + ">`_"
1037 + values["docname"].replace(" ", "%20")
1042 reqs[key].update({"full_title": title, "test_case": rst_value})
1049 + values["docname"].replace(" ", "%20")
1056 "full_title": title,
1057 "test_case": "No test for requirement",
1058 "validated_by": "static",
1066 def generate_rst_table(output_dir, data):
1067 """Generate a formatted csv to be used in RST"""
1068 rst_path = os.path.join(output_dir, "rst.csv")
1069 with open(rst_path, "w", newline="") as f:
1071 out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1072 for req_id, metadata in data.items():
1075 metadata["full_title"],
1076 metadata["description"],
1077 metadata["test_case"],
1078 metadata["validated_by"],
1083 # noinspection PyUnusedLocal
1084 def pytest_report_collectionfinish(config, startdir, items):
1085 """Generates a simple traceability report to output/traceability.csv"""
1086 traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1087 output_dir = os.path.split(traceability_path)[0]
1088 if not os.path.exists(output_dir):
1089 os.makedirs(output_dir)
1090 reqs = load_current_requirements()
1091 requirements = select_heat_requirements(reqs)
1092 testable_requirements = is_testable(requirements)
1093 unmapped, mapped = partition(
1094 lambda i: hasattr(i.function, "requirement_ids"), items
1097 req_to_test = defaultdict(set)
1098 mapping_errors = set()
1100 for req_id in item.function.requirement_ids:
1101 if req_id not in req_to_test:
1102 req_to_test[req_id].add(item)
1103 if req_id in requirements:
1104 reqs[req_id].update(
1106 "test_case": item.function.__module__,
1107 "validated_by": item.function.__name__,
1110 if req_id not in requirements:
1112 (req_id, item.function.__module__, item.function.__name__)
1115 mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1116 with open(mapping_error_path, "w", newline="") as f:
1117 writer = csv.writer(f)
1118 for err in mapping_errors:
1119 writer.writerow(err)
1121 with open(traceability_path, "w", newline="") as f:
1135 for req_id, metadata in testable_requirements.items():
1136 if req_to_test[req_id]:
1137 for item in req_to_test[req_id]:
1141 metadata["description"],
1142 metadata["section_name"],
1143 metadata["keyword"],
1144 metadata["validation_mode"],
1145 metadata["testable"],
1146 item.function.__module__,
1147 item.function.__name__,
1154 metadata["description"],
1155 metadata["section_name"],
1156 metadata["keyword"],
1157 metadata["validation_mode"],
1158 metadata["testable"],
1163 # now write out any test methods that weren't mapped to requirements
1165 (item.function.__module__, item.function.__name__) for item in unmapped
1167 for test_module, test_name in unmapped_tests:
1174 "static", # validation mode
1181 generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))