2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
47 from preload.model import create_preloads
48 from config import get_generator_plugin_names
49 from tests.helpers import get_output_dir
52 from html import escape
54 from cgi import escape
55 from collections import defaultdict
62 from more_itertools import partition
64 from six import string_types
66 # noinspection PyUnresolvedReferences
70 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
72 __path__ = [os.path.dirname(os.path.abspath(__file__))]
74 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
76 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
78 "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
81 "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
85 ("Error #", "err_num"),
86 ("Input File", "file"),
87 ("Requirements", "req_description"),
88 ("Error Message", "message"),
89 ("Test", "test_file"),
92 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
93 while preparing to validate the the input files. Some validations may not have been
94 executed. Please refer these issue to the VNF Validation Tool team.
97 COLLECTION_FAILURES = []
99 # Captures the results of every test run
103 def extract_error_msg(rep):
105 If a custom error message was provided, then extract it otherwise
106 just show the pytest assert message
108 if rep.outcome != "failed":
111 full_msg = str(rep.longrepr.reprcrash.message)
113 "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
115 if match: # custom message was provided
116 # Extract everything between AssertionError and the start
117 # of the assert statement expansion in the pytest report
119 elif "AssertionError:" in full_msg:
120 msg = full_msg.split("AssertionError:")[1]
123 except AttributeError:
131 Wraps the test case and result to extract necessary metadata for
135 RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
137 def __init__(self, item, outcome):
139 self.result = outcome.get_result()
140 self.files = self._get_files()
141 self.error_message = self._get_error_message()
144 def requirement_ids(self):
146 Returns list of requirement IDs mapped to the test case.
148 :return: Returns a list of string requirement IDs the test was
149 annotated with ``validates`` otherwise returns and empty list
151 is_mapped = hasattr(self.item.function, "requirement_ids")
152 return self.item.function.requirement_ids if is_mapped else []
157 :return: Returns a set of pytest marker names for the test or an empty set
159 return set(m.name for m in self.item.iter_markers())
162 def is_base_test(self):
164 :return: Returns True if the test is annotated with a pytest marker called base
166 return "base" in self.markers
171 :return: True if the test failed
173 return self.outcome == "FAIL"
178 :return: Returns 'PASS', 'FAIL', or 'SKIP'
180 return self.RESULT_MAPPING[self.result.outcome]
185 :return: Name of the test case method
187 return self.item.function.__name__
190 def test_module(self):
192 :return: Name of the file containing the test case
194 return self.item.function.__module__.split(".")[-1]
199 :return: ID of the test (test_module + test_case)
201 return "{}::{}".format(self.test_module, self.test_case)
204 def raw_output(self):
206 :return: Full output from pytest for the given test case
208 return str(self.result.longrepr)
210 def requirement_text(self, curr_reqs):
212 Creates a text summary for the requirement IDs mapped to the test case.
213 If no requirements are mapped, then it returns the empty string.
215 :param curr_reqs: mapping of requirement IDs to requirement metadata
216 loaded from the VNFRQTS projects needs.json output
217 :return: ID and text of the requirements mapped to the test case
220 "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
221 for r_id in self.requirement_ids
226 def requirements_metadata(self, curr_reqs):
228 Returns a list of dicts containing the following metadata for each
232 - text: Full text of the requirement
233 - keyword: MUST, MUST NOT, MAY, etc.
235 :param curr_reqs: mapping of requirement IDs to requirement metadata
236 loaded from the VNFRQTS projects needs.json output
237 :return: List of requirement metadata
240 for r_id in self.requirement_ids:
241 if r_id not in curr_reqs:
246 "text": curr_reqs[r_id]["description"],
247 "keyword": curr_reqs[r_id]["keyword"],
252 def _get_files(self):
254 Extracts the list of files passed into the test case.
255 :return: List of absolute paths to files
257 if "environment_pair" in self.item.fixturenames:
259 "{} environment pair".format(
260 self.item.funcargs["environment_pair"]["name"]
263 elif "heat_volume_pair" in self.item.fixturenames:
265 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
267 elif "heat_templates" in self.item.fixturenames:
268 return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
269 elif "yaml_files" in self.item.fixturenames:
270 return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
272 parts = self.result.nodeid.split("[")
273 return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
275 def _get_error_message(self):
277 :return: Error message or empty string if the test did not fail or error
280 return extract_error_msg(self.result)
285 # noinspection PyUnusedLocal
286 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
287 def pytest_runtest_makereport(item, call):
289 Captures the test results for later reporting. This will also halt testing
290 if a base failure is encountered (can be overridden with continue-on-failure)
293 if outcome.get_result().when != "call":
294 return # only capture results of test cases themselves
295 result = TestResult(item, outcome)
297 not item.config.option.continue_on_failure
298 and result.is_base_test
301 msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
304 result.error_message = msg
305 ALL_RESULTS.append(result)
306 pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
308 ALL_RESULTS.append(result)
311 def make_timestamp():
313 :return: String make_iso_timestamp in format:
314 2019-01-19 10:18:49.865000 Central Standard Time
316 timezone = time.tzname[time.localtime().tm_isdst]
317 return "{} {}".format(str(datetime.datetime.now()), timezone)
320 # noinspection PyUnusedLocal
321 def pytest_sessionstart(session):
323 COLLECTION_FAILURES.clear()
326 # noinspection PyUnusedLocal
327 def pytest_sessionfinish(session, exitstatus):
329 If not a self-test run, generate the output reports
331 if not session.config.option.template_dir:
334 if session.config.option.template_source:
335 template_source = session.config.option.template_source[0]
337 template_source = os.path.abspath(session.config.option.template_dir[0])
339 categories_selected = session.config.option.test_categories or ""
341 get_output_dir(session.config),
344 session.config.option.report_format,
348 def pytest_terminal_summary(terminalreporter, exitstatus):
349 # Ensures all preload information and warnings appear after
351 create_preloads(terminalreporter.config, exitstatus)
354 # noinspection PyUnusedLocal
355 def pytest_collection_modifyitems(session, config, items):
357 Selects tests based on the categories requested. Tests without
358 categories will always be executed.
360 config.traceability_items = list(items) # save all items for traceability
361 if not config.option.self_test:
363 passed_categories = set(config.option.test_categories or [])
364 all_of_categories = getattr(item.function, "all_categories", set())
365 any_of_categories = getattr(item.function, "any_categories", set())
366 if all_of_categories and not all_of_categories.issubset(passed_categories):
370 "Test categories do not match " "all the passed categories"
374 if any_of_categories and not passed_categories.intersection(
380 "Test categories do not match " "any the passed categories"
386 key=lambda x: (0, x.name)
387 if "base" in set(m.name for m in x.iter_markers())
392 def make_href(paths, base_dir=None):
394 Create an anchor tag to link to the file paths provided.
395 :param paths: string or list of file paths
396 :param base_dir: If specified this is pre-pended to each path
397 :return: String of hrefs - one for each path, each seperated by a line
400 paths = [paths] if isinstance(paths, string_types) else paths
402 paths = [os.path.join(base_dir, p) for p in paths]
405 abs_path = os.path.abspath(p)
406 name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
408 "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
409 abs_path=abs_path, name=name
412 return "<br/>".join(links)
415 def generate_report(outpath, template_path, categories, output_format="html"):
417 Generates the various output reports.
419 :param outpath: destination directory for all reports
420 :param template_path: directory containing the Heat templates validated
421 :param categories: Optional categories selected
422 :param output_format: One of "html", "excel", or "csv". Default is "html"
423 :raises: ValueError if requested output format is unknown
425 failures = [r for r in ALL_RESULTS if r.is_failed]
426 generate_failure_file(outpath)
427 output_format = output_format.lower().strip() if output_format else "html"
428 generate_json(outpath, template_path, categories)
429 if output_format == "html":
430 generate_html_report(outpath, categories, template_path, failures)
431 elif output_format == "excel":
432 generate_excel_report(outpath, categories, template_path, failures)
433 elif output_format == "json":
435 elif output_format == "csv":
436 generate_csv_report(outpath, categories, template_path, failures)
438 raise ValueError("Unsupported output format: " + output_format)
441 def write_json(data, path):
443 Pretty print data as JSON to the output path requested
445 :param data: Data structure to be converted to JSON
446 :param path: Where to write output
448 with open(path, "w") as f:
449 json.dump(data, f, indent=2)
452 def generate_failure_file(outpath):
454 Writes a summary of test failures to a file named failures.
455 This is for backwards compatibility only. The report.json offers a
456 more comprehensive output.
458 failure_path = os.path.join(outpath, "failures")
459 failures = [r for r in ALL_RESULTS if r.is_failed]
461 for i, fail in enumerate(failures):
463 "file": fail.files[0] if len(fail.files) == 1 else fail.files,
464 "vnfrqts": fail.requirement_ids,
465 "test": fail.test_case,
466 "test_file": fail.test_module,
467 "raw_output": fail.raw_output,
468 "message": fail.error_message,
470 write_json(data, failure_path)
473 def generate_csv_report(output_dir, categories, template_path, failures):
474 rows = [["Validation Failures"]]
476 ("Categories Selected:", categories),
477 ("Tool Version:", version.VERSION),
478 ("Report Generated At:", make_timestamp()),
479 ("Directory Validated:", template_path),
480 ("Checksum:", hash_directory(template_path)),
481 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
484 for header in headers:
488 if COLLECTION_FAILURES:
489 rows.append([COLLECTION_FAILURE_WARNING])
490 rows.append(["Validation File", "Test", "Fixtures", "Error"])
491 for failure in COLLECTION_FAILURES:
496 ";".join(failure["fixtures"]),
503 rows.append([col for col, _ in REPORT_COLUMNS])
505 reqs = load_current_requirements()
508 for i, failure in enumerate(failures, start=1):
512 "\n".join(failure.files),
513 failure.requirement_text(reqs),
514 failure.error_message,
519 output_path = os.path.join(output_dir, "report.csv")
520 with open(output_path, "w", newline="") as f:
521 writer = csv.writer(f)
526 def generate_excel_report(output_dir, categories, template_path, failures):
527 output_path = os.path.join(output_dir, "report.xlsx")
528 workbook = xlsxwriter.Workbook(output_path)
529 bold = workbook.add_format({"bold": True, "align": "top"})
530 code = workbook.add_format(
531 {"font_name": "Courier", "text_wrap": True, "align": "top"}
533 normal = workbook.add_format({"text_wrap": True, "align": "top"})
534 heading = workbook.add_format({"bold": True, "font_size": 18})
535 worksheet = workbook.add_worksheet("failures")
536 worksheet.write(0, 0, "Validation Failures", heading)
539 ("Categories Selected:", ",".join(categories)),
540 ("Tool Version:", version.VERSION),
541 ("Report Generated At:", make_timestamp()),
542 ("Directory Validated:", template_path),
543 ("Checksum:", hash_directory(template_path)),
544 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
546 for row, (header, value) in enumerate(headers, start=2):
547 worksheet.write(row, 0, header, bold)
548 worksheet.write(row, 1, value)
550 worksheet.set_column(0, len(headers) - 1, 40)
551 worksheet.set_column(len(headers), len(headers), 80)
553 if COLLECTION_FAILURES:
554 collection_failures_start = 2 + len(headers) + 2
555 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
556 collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
557 for col_num, col_name in enumerate(collection_failure_headers):
558 worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
559 for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
560 worksheet.write(row, 0, data["module"])
561 worksheet.write(row, 1, data["test"])
562 worksheet.write(row, 2, ",".join(data["fixtures"]))
563 worksheet.write(row, 3, data["error"], code)
566 start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
567 worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
568 for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
569 worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
571 reqs = load_current_requirements()
574 for col, width in enumerate((20, 30, 60, 60, 40)):
575 worksheet.set_column(col, col, width)
577 for row, failure in enumerate(failures, start=start_error_table_row + 2):
578 worksheet.write(row, 0, str(err_num), normal)
579 worksheet.write(row, 1, "\n".join(failure.files), normal)
580 worksheet.write(row, 2, failure.requirement_text(reqs), normal)
581 worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
582 worksheet.write(row, 4, failure.test_id, normal)
584 worksheet.autofilter(
585 start_error_table_row + 1,
587 start_error_table_row + 1 + err_num,
588 len(REPORT_COLUMNS) - 1,
593 def make_iso_timestamp():
595 Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
597 now = datetime.datetime.utcnow()
598 now.replace(tzinfo=datetime.timezone.utc)
599 return now.isoformat()
602 def aggregate_results(outcomes, r_id=None):
604 Determines the aggregate result for the conditions provided. Assumes the
605 results have been filtered and collected for analysis.
607 :param outcomes: set of outcomes from the TestResults
608 :param r_id: Optional requirement ID if known
609 :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
610 (see aggregate_requirement_adherence for more detail)
614 elif "ERROR" in outcomes:
616 elif "FAIL" in outcomes:
618 elif "PASS" in outcomes:
620 elif {"SKIP"} == outcomes:
624 "Unexpected error aggregating outcomes ({}) for requirement {}".format(
631 def aggregate_run_results(collection_failures, test_results):
633 Determines overall status of run based on all failures and results.
635 * 'ERROR' - At least one collection failure occurred during the run.
636 * 'FAIL' - Template failed at least one test
637 * 'PASS' - All tests executed properly and no failures were detected
639 :param collection_failures: failures occuring during test setup
640 :param test_results: list of all test executuion results
641 :return: one of 'ERROR', 'FAIL', or 'PASS'
643 if collection_failures:
645 elif any(r.is_failed for r in test_results):
651 def relative_paths(base_dir, paths):
652 return [os.path.relpath(p, base_dir) for p in paths if p != ""]
655 # noinspection PyTypeChecker
656 def generate_json(outpath, template_path, categories):
658 Creates a JSON summary of the entire test run.
660 reqs = load_current_requirements()
663 "template_directory": os.path.splitdrive(template_path)[1].replace(
666 "timestamp": make_iso_timestamp(),
667 "checksum": hash_directory(template_path),
668 "categories": categories,
669 "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
674 results = data["tests"]
675 for result in COLLECTION_FAILURES:
679 "test_module": result["module"],
680 "test_case": result["test"],
682 "error": result["error"],
683 "requirements": result["requirements"],
686 for result in ALL_RESULTS:
689 "files": relative_paths(template_path, result.files),
690 "test_module": result.test_module,
691 "test_case": result.test_case,
692 "result": result.outcome,
693 "error": result.error_message if result.is_failed else "",
694 "requirements": result.requirements_metadata(reqs),
698 # Build a mapping of requirement ID to the results
699 r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
700 for test_result in results:
701 test_reqs = test_result["requirements"]
703 [r["id"] if isinstance(r, dict) else r for r in test_reqs]
708 item = r_id_results[r_id]
709 item["outcomes"].add(test_result["result"])
710 if test_result["error"]:
711 item["errors"].add(test_result["error"])
713 requirements = data["requirements"]
714 for r_id, r_data in reqs.items():
718 "text": r_data["description"],
719 "keyword": r_data["keyword"],
720 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
721 "errors": list(r_id_results[r_id]["errors"]),
725 if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
729 "text": "Tests not mapped to requirements (see tests)",
730 "result": aggregate_results(r_id_results[""]["outcomes"]),
731 "errors": list(r_id_results[""]["errors"]),
735 report_path = os.path.join(outpath, "report.json")
736 write_json(data, report_path)
739 def generate_html_report(outpath, categories, template_path, failures):
740 reqs = load_current_requirements()
742 for failure in failures:
745 "file_links": make_href(failure.files, template_path),
746 "test_id": failure.test_id,
747 "error_message": escape(failure.error_message).replace(
750 "raw_output": escape(failure.raw_output),
751 "requirements": docutils.core.publish_parts(
752 writer_name="html", source=failure.requirement_text(reqs)
756 pkg_dir = os.path.split(__file__)[0]
757 j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
758 with open(j2_template_path, "r") as f:
759 report_template = jinja2.Template(f.read())
760 contents = report_template.render(
761 version=version.VERSION,
762 num_failures=len(failures) + len(COLLECTION_FAILURES),
763 categories=categories,
764 template_dir=make_href(template_path),
765 checksum=hash_directory(template_path),
766 timestamp=make_timestamp(),
768 collection_failures=COLLECTION_FAILURES,
770 with open(os.path.join(outpath, "report.html"), "w") as f:
774 def pytest_addoption(parser):
776 Add needed CLI arguments
779 "--template-directory",
782 help="Directory which holds the templates for validation",
787 dest="template_source",
789 help="Source Directory which holds the templates for validation",
796 help="Test the unit tests against their fixtured data",
801 dest="report_format",
803 help="Format of output report (html, csv, excel, json)",
807 "--continue-on-failure",
808 dest="continue_on_failure",
810 help="Continue validation even when structural errors exist in input files",
814 "--output-directory",
823 dest="test_categories",
825 help="optional category of test to execute",
832 help="optional directory of .env files for preload generation",
837 dest="preload_formats",
840 "Preload format to create (multiple allowed). If not provided "
841 "then all available formats will be created: {}"
842 ).format(", ".join(get_generator_plugin_names())),
846 def pytest_configure(config):
848 Ensure that we are receive either `--self-test` or
849 `--template-dir=<directory` as CLI arguments
851 if config.getoption("template_dir") and config.getoption("self_test"):
852 raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
854 config.getoption("template_dir")
855 or config.getoption("self_test")
856 or config.getoption("help")
858 raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
861 def pytest_generate_tests(metafunc):
863 If a unit test requires an argument named 'filename'
864 we generate a test for the filenames selected. Either
865 the files contained in `template_dir` or if `template_dir`
866 is not specified on the CLI, the fixtures associated with this
870 # noinspection PyBroadException
872 if "filename" in metafunc.fixturenames:
873 from .parametrizers import parametrize_filename
875 parametrize_filename(metafunc)
877 if "filenames" in metafunc.fixturenames:
878 from .parametrizers import parametrize_filenames
880 parametrize_filenames(metafunc)
882 if "template_dir" in metafunc.fixturenames:
883 from .parametrizers import parametrize_template_dir
885 parametrize_template_dir(metafunc)
887 if "environment_pair" in metafunc.fixturenames:
888 from .parametrizers import parametrize_environment_pair
890 parametrize_environment_pair(metafunc)
892 if "heat_volume_pair" in metafunc.fixturenames:
893 from .parametrizers import parametrize_heat_volume_pair
895 parametrize_heat_volume_pair(metafunc)
897 if "yaml_files" in metafunc.fixturenames:
898 from .parametrizers import parametrize_yaml_files
900 parametrize_yaml_files(metafunc)
902 if "env_files" in metafunc.fixturenames:
903 from .parametrizers import parametrize_environment_files
905 parametrize_environment_files(metafunc)
907 if "yaml_file" in metafunc.fixturenames:
908 from .parametrizers import parametrize_yaml_file
910 parametrize_yaml_file(metafunc)
912 if "env_file" in metafunc.fixturenames:
913 from .parametrizers import parametrize_environment_file
915 parametrize_environment_file(metafunc)
917 if "parsed_yaml_file" in metafunc.fixturenames:
918 from .parametrizers import parametrize_parsed_yaml_file
920 parametrize_parsed_yaml_file(metafunc)
922 if "parsed_environment_file" in metafunc.fixturenames:
923 from .parametrizers import parametrize_parsed_environment_file
925 parametrize_parsed_environment_file(metafunc)
927 if "heat_template" in metafunc.fixturenames:
928 from .parametrizers import parametrize_heat_template
930 parametrize_heat_template(metafunc)
932 if "heat_templates" in metafunc.fixturenames:
933 from .parametrizers import parametrize_heat_templates
935 parametrize_heat_templates(metafunc)
937 if "volume_template" in metafunc.fixturenames:
938 from .parametrizers import parametrize_volume_template
940 parametrize_volume_template(metafunc)
942 if "volume_templates" in metafunc.fixturenames:
943 from .parametrizers import parametrize_volume_templates
945 parametrize_volume_templates(metafunc)
947 if "template" in metafunc.fixturenames:
948 from .parametrizers import parametrize_template
950 parametrize_template(metafunc)
952 if "templates" in metafunc.fixturenames:
953 from .parametrizers import parametrize_templates
955 parametrize_templates(metafunc)
956 except Exception as e:
957 # If an error occurs in the collection phase, then it won't be logged as a
958 # normal test failure. This means that failures could occur, but not
959 # be seen on the report resulting in a false positive success message. These
960 # errors will be stored and reported separately on the report
961 COLLECTION_FAILURES.append(
963 "module": metafunc.module.__name__,
964 "test": metafunc.function.__name__,
965 "fixtures": metafunc.fixturenames,
966 "error": traceback.format_exc(),
967 "requirements": getattr(metafunc.function, "requirement_ids", []),
973 def hash_directory(path):
975 Create md5 hash using the contents of all files under ``path``
976 :param path: string directory containing files
977 :return: string MD5 hash code (hex)
979 md5 = hashlib.md5() # nosec
980 for dir_path, sub_dirs, filenames in os.walk(path):
981 for filename in filenames:
982 file_path = os.path.join(dir_path, filename)
983 with open(file_path, "rb") as f:
985 return md5.hexdigest()
988 def load_current_requirements():
989 """Loads dict of current requirements or empty dict if file doesn't exist"""
990 with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
992 version = data["current_version"]
993 return data["versions"][version]["needs"]
996 def select_heat_requirements(reqs):
997 """Filters dict requirements to only those requirements pertaining to Heat"""
998 return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1001 def is_testable(reqs):
1002 """Filters dict requirements to only those which are testable"""
1003 for key, values in reqs.items():
1004 if ("MUST" in values.get("keyword", "").upper()) and (
1005 "none" not in values.get("validation_mode", "").lower()
1007 reqs[key]["testable"] = True
1009 reqs[key]["testable"] = False
1013 def build_rst_json(reqs):
1014 """Takes requirements and returns list of only Heat requirements"""
1015 for key, values in list(reqs.items()):
1016 if values["testable"]:
1017 # Creates links in RST format to requirements and test cases
1018 if values["test_case"]:
1019 mod = values["test_case"].split(".")[-1]
1020 val = TEST_SCRIPT_SITE + mod + ".py"
1021 rst_value = "`" + mod + " <" + val + ">`_"
1027 + values["docname"].replace(" ", "%20")
1032 reqs[key].update({"full_title": title, "test_case": rst_value})
1039 + values["docname"].replace(" ", "%20")
1046 "full_title": title,
1047 "test_case": "No test for requirement",
1048 "validated_by": "static",
1056 def generate_rst_table(output_dir, data):
1057 """Generate a formatted csv to be used in RST"""
1058 rst_path = os.path.join(output_dir, "rst.csv")
1059 with open(rst_path, "w", newline="") as f:
1061 out.writerow(("Requirement ID", "Test Module", "Test Name"))
1062 for req_id, metadata in data.items():
1065 metadata["full_title"],
1066 metadata["test_case"],
1067 metadata["validated_by"],
1072 # noinspection PyUnusedLocal
1073 def pytest_report_collectionfinish(config, startdir, items):
1074 """Generates a simple traceability report to output/traceability.csv"""
1075 traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1076 output_dir = os.path.split(traceability_path)[0]
1077 if not os.path.exists(output_dir):
1078 os.makedirs(output_dir)
1079 reqs = load_current_requirements()
1080 requirements = select_heat_requirements(reqs)
1081 testable_requirements = is_testable(requirements)
1082 unmapped, mapped = partition(
1083 lambda i: hasattr(i.function, "requirement_ids"), items
1086 req_to_test = defaultdict(set)
1087 mapping_errors = set()
1089 for req_id in item.function.requirement_ids:
1090 if req_id not in req_to_test:
1091 req_to_test[req_id].add(item)
1092 if req_id in requirements:
1093 reqs[req_id].update(
1095 "test_case": item.function.__module__,
1096 "validated_by": item.function.__name__,
1099 if req_id not in requirements:
1101 (req_id, item.function.__module__, item.function.__name__)
1104 mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1105 with open(mapping_error_path, "w", newline="") as f:
1106 writer = csv.writer(f)
1107 for err in mapping_errors:
1108 writer.writerow(err)
1110 with open(traceability_path, "w", newline="") as f:
1124 for req_id, metadata in testable_requirements.items():
1125 if req_to_test[req_id]:
1126 for item in req_to_test[req_id]:
1130 metadata["description"],
1131 metadata["section_name"],
1132 metadata["keyword"],
1133 metadata["validation_mode"],
1134 metadata["testable"],
1135 item.function.__module__,
1136 item.function.__name__,
1143 metadata["description"],
1144 metadata["section_name"],
1145 metadata["keyword"],
1146 metadata["validation_mode"],
1147 metadata["testable"],
1152 # now write out any test methods that weren't mapped to requirements
1154 (item.function.__module__, item.function.__name__) for item in unmapped
1156 for test_module, test_name in unmapped_tests:
1163 "static", # validation mode
1170 generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))