2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
47 from preload.model import create_preloads
48 from config import get_generator_plugin_names
49 from tests.helpers import get_output_dir
52 from html import escape
54 from cgi import escape
55 from collections import defaultdict
62 from more_itertools import partition
64 from six import string_types
66 # noinspection PyUnresolvedReferences
70 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
72 __path__ = [os.path.dirname(os.path.abspath(__file__))]
74 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
76 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
78 "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
81 "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
85 ("Error #", "err_num"),
86 ("Input File", "file"),
87 ("Requirements", "req_description"),
88 ("Error Message", "message"),
89 ("Test", "test_file"),
92 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
93 while preparing to validate the the input files. Some validations may not have been
94 executed. Please refer these issue to the VNF Validation Tool team.
97 COLLECTION_FAILURES = []
99 # Captures the results of every test run
103 def extract_error_msg(rep):
105 If a custom error message was provided, then extract it otherwise
106 just show the pytest assert message
108 if rep.outcome != "failed":
111 full_msg = str(rep.longrepr.reprcrash.message)
113 "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
115 if match: # custom message was provided
116 # Extract everything between AssertionError and the start
117 # of the assert statement expansion in the pytest report
119 elif "AssertionError:" in full_msg:
120 msg = full_msg.split("AssertionError:")[1]
123 except AttributeError:
131 Wraps the test case and result to extract necessary metadata for
135 RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
137 def __init__(self, item, outcome):
139 self.result = outcome.get_result()
140 self.files = self._get_files()
141 self.error_message = self._get_error_message()
144 def requirement_ids(self):
146 Returns list of requirement IDs mapped to the test case.
148 :return: Returns a list of string requirement IDs the test was
149 annotated with ``validates`` otherwise returns and empty list
151 is_mapped = hasattr(self.item.function, "requirement_ids")
152 return self.item.function.requirement_ids if is_mapped else []
157 :return: Returns a set of pytest marker names for the test or an empty set
159 return set(m.name for m in self.item.iter_markers())
162 def is_base_test(self):
164 :return: Returns True if the test is annotated with a pytest marker called base
166 return "base" in self.markers
171 :return: True if the test failed
173 return self.outcome == "FAIL"
178 :return: Returns 'PASS', 'FAIL', or 'SKIP'
180 return self.RESULT_MAPPING[self.result.outcome]
185 :return: Name of the test case method
187 return self.item.function.__name__
190 def test_module(self):
192 :return: Name of the file containing the test case
194 return self.item.function.__module__.split(".")[-1]
199 :return: ID of the test (test_module + test_case)
201 return "{}::{}".format(self.test_module, self.test_case)
204 def raw_output(self):
206 :return: Full output from pytest for the given test case
208 return str(self.result.longrepr)
210 def requirement_text(self, curr_reqs):
212 Creates a text summary for the requirement IDs mapped to the test case.
213 If no requirements are mapped, then it returns the empty string.
215 :param curr_reqs: mapping of requirement IDs to requirement metadata
216 loaded from the VNFRQTS projects needs.json output
217 :return: ID and text of the requirements mapped to the test case
220 "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
221 for r_id in self.requirement_ids
226 def requirements_metadata(self, curr_reqs):
228 Returns a list of dicts containing the following metadata for each
232 - text: Full text of the requirement
233 - keyword: MUST, MUST NOT, MAY, etc.
235 :param curr_reqs: mapping of requirement IDs to requirement metadata
236 loaded from the VNFRQTS projects needs.json output
237 :return: List of requirement metadata
240 for r_id in self.requirement_ids:
241 if r_id not in curr_reqs:
246 "text": curr_reqs[r_id]["description"],
247 "keyword": curr_reqs[r_id]["keyword"],
252 def _get_files(self):
254 Extracts the list of files passed into the test case.
255 :return: List of absolute paths to files
257 if "environment_pair" in self.item.fixturenames:
259 "{} environment pair".format(
260 self.item.funcargs["environment_pair"]["name"]
263 elif "heat_volume_pair" in self.item.fixturenames:
265 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
267 elif "heat_templates" in self.item.fixturenames:
268 return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
269 elif "yaml_files" in self.item.fixturenames:
270 return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
272 parts = self.result.nodeid.split("[")
273 return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
275 def _get_error_message(self):
277 :return: Error message or empty string if the test did not fail or error
280 return extract_error_msg(self.result)
285 # noinspection PyUnusedLocal
286 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
287 def pytest_runtest_makereport(item, call):
289 Captures the test results for later reporting. This will also halt testing
290 if a base failure is encountered (can be overridden with continue-on-failure)
293 if outcome.get_result().when != "call":
294 return # only capture results of test cases themselves
295 result = TestResult(item, outcome)
297 not item.config.option.continue_on_failure
298 and result.is_base_test
301 msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
304 result.error_message = msg
305 ALL_RESULTS.append(result)
306 pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
308 ALL_RESULTS.append(result)
311 def make_timestamp():
313 :return: String make_iso_timestamp in format:
314 2019-01-19 10:18:49.865000 Central Standard Time
316 timezone = time.tzname[time.localtime().tm_isdst]
317 return "{} {}".format(str(datetime.datetime.now()), timezone)
320 # noinspection PyUnusedLocal
321 def pytest_sessionstart(session):
323 COLLECTION_FAILURES.clear()
326 # noinspection PyUnusedLocal
327 def pytest_sessionfinish(session, exitstatus):
329 If not a self-test run, generate the output reports
331 if not session.config.option.template_dir:
334 if session.config.option.template_source:
335 template_source = session.config.option.template_source[0]
337 template_source = os.path.abspath(session.config.option.template_dir[0])
339 categories_selected = session.config.option.test_categories or ""
341 get_output_dir(session.config),
344 session.config.option.report_format,
348 def pytest_terminal_summary(terminalreporter, exitstatus):
349 # Ensures all preload information and warnings appear after
351 create_preloads(terminalreporter.config, exitstatus)
354 # noinspection PyUnusedLocal
355 def pytest_collection_modifyitems(session, config, items):
357 Selects tests based on the categories requested. Tests without
358 categories will always be executed.
360 config.traceability_items = list(items) # save all items for traceability
361 if not config.option.self_test:
363 # checking if test belongs to a category
364 if hasattr(item.function, "categories"):
365 if config.option.test_categories:
366 test_categories = getattr(item.function, "categories")
367 passed_categories = config.option.test_categories
369 category in passed_categories for category in test_categories
374 "Test categories do not match "
375 "all the passed categories"
383 "Test belongs to a category but "
384 "no categories were passed"
390 key=lambda x: (0, x.name)
391 if "base" in set(m.name for m in x.iter_markers())
396 def make_href(paths, base_dir=None):
398 Create an anchor tag to link to the file paths provided.
399 :param paths: string or list of file paths
400 :param base_dir: If specified this is pre-pended to each path
401 :return: String of hrefs - one for each path, each seperated by a line
404 paths = [paths] if isinstance(paths, string_types) else paths
406 paths = [os.path.join(base_dir, p) for p in paths]
409 abs_path = os.path.abspath(p)
410 name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
412 "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
413 abs_path=abs_path, name=name
416 return "<br/>".join(links)
419 def generate_report(outpath, template_path, categories, output_format="html"):
421 Generates the various output reports.
423 :param outpath: destination directory for all reports
424 :param template_path: directory containing the Heat templates validated
425 :param categories: Optional categories selected
426 :param output_format: One of "html", "excel", or "csv". Default is "html"
427 :raises: ValueError if requested output format is unknown
429 failures = [r for r in ALL_RESULTS if r.is_failed]
430 generate_failure_file(outpath)
431 output_format = output_format.lower().strip() if output_format else "html"
432 generate_json(outpath, template_path, categories)
433 if output_format == "html":
434 generate_html_report(outpath, categories, template_path, failures)
435 elif output_format == "excel":
436 generate_excel_report(outpath, categories, template_path, failures)
437 elif output_format == "json":
439 elif output_format == "csv":
440 generate_csv_report(outpath, categories, template_path, failures)
442 raise ValueError("Unsupported output format: " + output_format)
445 def write_json(data, path):
447 Pretty print data as JSON to the output path requested
449 :param data: Data structure to be converted to JSON
450 :param path: Where to write output
452 with open(path, "w") as f:
453 json.dump(data, f, indent=2)
456 def generate_failure_file(outpath):
458 Writes a summary of test failures to a file named failures.
459 This is for backwards compatibility only. The report.json offers a
460 more comprehensive output.
462 failure_path = os.path.join(outpath, "failures")
463 failures = [r for r in ALL_RESULTS if r.is_failed]
465 for i, fail in enumerate(failures):
467 "file": fail.files[0] if len(fail.files) == 1 else fail.files,
468 "vnfrqts": fail.requirement_ids,
469 "test": fail.test_case,
470 "test_file": fail.test_module,
471 "raw_output": fail.raw_output,
472 "message": fail.error_message,
474 write_json(data, failure_path)
477 def generate_csv_report(output_dir, categories, template_path, failures):
478 rows = [["Validation Failures"]]
480 ("Categories Selected:", categories),
481 ("Tool Version:", version.VERSION),
482 ("Report Generated At:", make_timestamp()),
483 ("Directory Validated:", template_path),
484 ("Checksum:", hash_directory(template_path)),
485 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
488 for header in headers:
492 if COLLECTION_FAILURES:
493 rows.append([COLLECTION_FAILURE_WARNING])
494 rows.append(["Validation File", "Test", "Fixtures", "Error"])
495 for failure in COLLECTION_FAILURES:
500 ";".join(failure["fixtures"]),
507 rows.append([col for col, _ in REPORT_COLUMNS])
509 reqs = load_current_requirements()
512 for i, failure in enumerate(failures, start=1):
516 "\n".join(failure.files),
517 failure.requirement_text(reqs),
518 failure.error_message,
523 output_path = os.path.join(output_dir, "report.csv")
524 with open(output_path, "w", newline="") as f:
525 writer = csv.writer(f)
530 def generate_excel_report(output_dir, categories, template_path, failures):
531 output_path = os.path.join(output_dir, "report.xlsx")
532 workbook = xlsxwriter.Workbook(output_path)
533 bold = workbook.add_format({"bold": True, "align": "top"})
534 code = workbook.add_format(
535 {"font_name": "Courier", "text_wrap": True, "align": "top"}
537 normal = workbook.add_format({"text_wrap": True, "align": "top"})
538 heading = workbook.add_format({"bold": True, "font_size": 18})
539 worksheet = workbook.add_worksheet("failures")
540 worksheet.write(0, 0, "Validation Failures", heading)
543 ("Categories Selected:", ",".join(categories)),
544 ("Tool Version:", version.VERSION),
545 ("Report Generated At:", make_timestamp()),
546 ("Directory Validated:", template_path),
547 ("Checksum:", hash_directory(template_path)),
548 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
550 for row, (header, value) in enumerate(headers, start=2):
551 worksheet.write(row, 0, header, bold)
552 worksheet.write(row, 1, value)
554 worksheet.set_column(0, len(headers) - 1, 40)
555 worksheet.set_column(len(headers), len(headers), 80)
557 if COLLECTION_FAILURES:
558 collection_failures_start = 2 + len(headers) + 2
559 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
560 collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
561 for col_num, col_name in enumerate(collection_failure_headers):
562 worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
563 for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
564 worksheet.write(row, 0, data["module"])
565 worksheet.write(row, 1, data["test"])
566 worksheet.write(row, 2, ",".join(data["fixtures"]))
567 worksheet.write(row, 3, data["error"], code)
570 start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
571 worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
572 for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
573 worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
575 reqs = load_current_requirements()
578 for col, width in enumerate((20, 30, 60, 60, 40)):
579 worksheet.set_column(col, col, width)
581 for row, failure in enumerate(failures, start=start_error_table_row + 2):
582 worksheet.write(row, 0, str(err_num), normal)
583 worksheet.write(row, 1, "\n".join(failure.files), normal)
584 worksheet.write(row, 2, failure.requirement_text(reqs), normal)
585 worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
586 worksheet.write(row, 4, failure.test_id, normal)
588 worksheet.autofilter(
589 start_error_table_row + 1,
591 start_error_table_row + 1 + err_num,
592 len(REPORT_COLUMNS) - 1,
597 def make_iso_timestamp():
599 Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
601 now = datetime.datetime.utcnow()
602 now.replace(tzinfo=datetime.timezone.utc)
603 return now.isoformat()
606 def aggregate_results(outcomes, r_id=None):
608 Determines the aggregate result for the conditions provided. Assumes the
609 results have been filtered and collected for analysis.
611 :param outcomes: set of outcomes from the TestResults
612 :param r_id: Optional requirement ID if known
613 :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
614 (see aggregate_requirement_adherence for more detail)
618 elif "ERROR" in outcomes:
620 elif "FAIL" in outcomes:
622 elif "PASS" in outcomes:
624 elif {"SKIP"} == outcomes:
628 "Unexpected error aggregating outcomes ({}) for requirement {}".format(
635 def aggregate_run_results(collection_failures, test_results):
637 Determines overall status of run based on all failures and results.
639 * 'ERROR' - At least one collection failure occurred during the run.
640 * 'FAIL' - Template failed at least one test
641 * 'PASS' - All tests executed properly and no failures were detected
643 :param collection_failures: failures occuring during test setup
644 :param test_results: list of all test executuion results
645 :return: one of 'ERROR', 'FAIL', or 'PASS'
647 if collection_failures:
649 elif any(r.is_failed for r in test_results):
655 def relative_paths(base_dir, paths):
656 return [os.path.relpath(p, base_dir) for p in paths if p != ""]
659 # noinspection PyTypeChecker
660 def generate_json(outpath, template_path, categories):
662 Creates a JSON summary of the entire test run.
664 reqs = load_current_requirements()
667 "template_directory": os.path.splitdrive(template_path)[1].replace(
670 "timestamp": make_iso_timestamp(),
671 "checksum": hash_directory(template_path),
672 "categories": categories,
673 "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
678 results = data["tests"]
679 for result in COLLECTION_FAILURES:
683 "test_module": result["module"],
684 "test_case": result["test"],
686 "error": result["error"],
687 "requirements": result["requirements"],
690 for result in ALL_RESULTS:
693 "files": relative_paths(template_path, result.files),
694 "test_module": result.test_module,
695 "test_case": result.test_case,
696 "result": result.outcome,
697 "error": result.error_message if result.is_failed else "",
698 "requirements": result.requirements_metadata(reqs),
702 # Build a mapping of requirement ID to the results
703 r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
704 for test_result in results:
705 test_reqs = test_result["requirements"]
707 [r["id"] if isinstance(r, dict) else r for r in test_reqs]
712 item = r_id_results[r_id]
713 item["outcomes"].add(test_result["result"])
714 if test_result["error"]:
715 item["errors"].add(test_result["error"])
717 requirements = data["requirements"]
718 for r_id, r_data in reqs.items():
722 "text": r_data["description"],
723 "keyword": r_data["keyword"],
724 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
725 "errors": list(r_id_results[r_id]["errors"]),
729 if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
733 "text": "Tests not mapped to requirements (see tests)",
734 "result": aggregate_results(r_id_results[""]["outcomes"]),
735 "errors": list(r_id_results[""]["errors"]),
739 report_path = os.path.join(outpath, "report.json")
740 write_json(data, report_path)
743 def generate_html_report(outpath, categories, template_path, failures):
744 reqs = load_current_requirements()
746 for failure in failures:
749 "file_links": make_href(failure.files, template_path),
750 "test_id": failure.test_id,
751 "error_message": escape(failure.error_message).replace(
754 "raw_output": escape(failure.raw_output),
755 "requirements": docutils.core.publish_parts(
756 writer_name="html", source=failure.requirement_text(reqs)
760 pkg_dir = os.path.split(__file__)[0]
761 j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
762 with open(j2_template_path, "r") as f:
763 report_template = jinja2.Template(f.read())
764 contents = report_template.render(
765 version=version.VERSION,
766 num_failures=len(failures) + len(COLLECTION_FAILURES),
767 categories=categories,
768 template_dir=make_href(template_path),
769 checksum=hash_directory(template_path),
770 timestamp=make_timestamp(),
772 collection_failures=COLLECTION_FAILURES,
774 with open(os.path.join(outpath, "report.html"), "w") as f:
778 def pytest_addoption(parser):
780 Add needed CLI arguments
783 "--template-directory",
786 help="Directory which holds the templates for validation",
791 dest="template_source",
793 help="Source Directory which holds the templates for validation",
800 help="Test the unit tests against their fixtured data",
805 dest="report_format",
807 help="Format of output report (html, csv, excel, json)",
811 "--continue-on-failure",
812 dest="continue_on_failure",
814 help="Continue validation even when structural errors exist in input files",
818 "--output-directory",
827 dest="test_categories",
829 help="optional category of test to execute",
836 help="optional directory of .env files for preload generation",
841 dest="preload_formats",
844 "Preload format to create (multiple allowed). If not provided "
845 "then all available formats will be created: {}"
846 ).format(", ".join(get_generator_plugin_names())),
850 def pytest_configure(config):
852 Ensure that we are receive either `--self-test` or
853 `--template-dir=<directory` as CLI arguments
855 if config.getoption("template_dir") and config.getoption("self_test"):
856 raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
858 config.getoption("template_dir")
859 or config.getoption("self_test")
860 or config.getoption("help")
862 raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
865 def pytest_generate_tests(metafunc):
867 If a unit test requires an argument named 'filename'
868 we generate a test for the filenames selected. Either
869 the files contained in `template_dir` or if `template_dir`
870 is not specified on the CLI, the fixtures associated with this
874 # noinspection PyBroadException
876 if "filename" in metafunc.fixturenames:
877 from .parametrizers import parametrize_filename
879 parametrize_filename(metafunc)
881 if "filenames" in metafunc.fixturenames:
882 from .parametrizers import parametrize_filenames
884 parametrize_filenames(metafunc)
886 if "template_dir" in metafunc.fixturenames:
887 from .parametrizers import parametrize_template_dir
889 parametrize_template_dir(metafunc)
891 if "environment_pair" in metafunc.fixturenames:
892 from .parametrizers import parametrize_environment_pair
894 parametrize_environment_pair(metafunc)
896 if "heat_volume_pair" in metafunc.fixturenames:
897 from .parametrizers import parametrize_heat_volume_pair
899 parametrize_heat_volume_pair(metafunc)
901 if "yaml_files" in metafunc.fixturenames:
902 from .parametrizers import parametrize_yaml_files
904 parametrize_yaml_files(metafunc)
906 if "env_files" in metafunc.fixturenames:
907 from .parametrizers import parametrize_environment_files
909 parametrize_environment_files(metafunc)
911 if "yaml_file" in metafunc.fixturenames:
912 from .parametrizers import parametrize_yaml_file
914 parametrize_yaml_file(metafunc)
916 if "env_file" in metafunc.fixturenames:
917 from .parametrizers import parametrize_environment_file
919 parametrize_environment_file(metafunc)
921 if "parsed_yaml_file" in metafunc.fixturenames:
922 from .parametrizers import parametrize_parsed_yaml_file
924 parametrize_parsed_yaml_file(metafunc)
926 if "parsed_environment_file" in metafunc.fixturenames:
927 from .parametrizers import parametrize_parsed_environment_file
929 parametrize_parsed_environment_file(metafunc)
931 if "heat_template" in metafunc.fixturenames:
932 from .parametrizers import parametrize_heat_template
934 parametrize_heat_template(metafunc)
936 if "heat_templates" in metafunc.fixturenames:
937 from .parametrizers import parametrize_heat_templates
939 parametrize_heat_templates(metafunc)
941 if "volume_template" in metafunc.fixturenames:
942 from .parametrizers import parametrize_volume_template
944 parametrize_volume_template(metafunc)
946 if "volume_templates" in metafunc.fixturenames:
947 from .parametrizers import parametrize_volume_templates
949 parametrize_volume_templates(metafunc)
951 if "template" in metafunc.fixturenames:
952 from .parametrizers import parametrize_template
954 parametrize_template(metafunc)
956 if "templates" in metafunc.fixturenames:
957 from .parametrizers import parametrize_templates
959 parametrize_templates(metafunc)
960 except Exception as e:
961 # If an error occurs in the collection phase, then it won't be logged as a
962 # normal test failure. This means that failures could occur, but not
963 # be seen on the report resulting in a false positive success message. These
964 # errors will be stored and reported separately on the report
965 COLLECTION_FAILURES.append(
967 "module": metafunc.module.__name__,
968 "test": metafunc.function.__name__,
969 "fixtures": metafunc.fixturenames,
970 "error": traceback.format_exc(),
971 "requirements": getattr(metafunc.function, "requirement_ids", []),
977 def hash_directory(path):
979 Create md5 hash using the contents of all files under ``path``
980 :param path: string directory containing files
981 :return: string MD5 hash code (hex)
983 md5 = hashlib.md5() # nosec
984 for dir_path, sub_dirs, filenames in os.walk(path):
985 for filename in filenames:
986 file_path = os.path.join(dir_path, filename)
987 with open(file_path, "rb") as f:
989 return md5.hexdigest()
992 def load_current_requirements():
993 """Loads dict of current requirements or empty dict if file doesn't exist"""
994 with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
996 version = data["current_version"]
997 return data["versions"][version]["needs"]
1000 def select_heat_requirements(reqs):
1001 """Filters dict requirements to only those requirements pertaining to Heat"""
1002 return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1005 def is_testable(reqs):
1006 """Filters dict requirements to only those which are testable"""
1007 for key, values in reqs.items():
1008 if ("MUST" in values.get("keyword", "").upper()) and (
1009 "none" not in values.get("validation_mode", "").lower()
1011 reqs[key]["testable"] = True
1013 reqs[key]["testable"] = False
1017 def build_rst_json(reqs):
1018 """Takes requirements and returns list of only Heat requirements"""
1019 for key, values in list(reqs.items()):
1020 if values["testable"]:
1021 # Creates links in RST format to requirements and test cases
1022 if values["test_case"]:
1023 mod = values["test_case"].split(".")[-1]
1024 val = TEST_SCRIPT_SITE + mod + ".py"
1025 rst_value = "`" + mod + " <" + val + ">`_"
1031 + values["docname"].replace(" ", "%20")
1036 reqs[key].update({"full_title": title, "test_case": rst_value})
1043 + values["docname"].replace(" ", "%20")
1050 "full_title": title,
1051 "test_case": "No test for requirement",
1052 "validated_by": "static",
1060 def generate_rst_table(output_dir, data):
1061 """Generate a formatted csv to be used in RST"""
1062 rst_path = os.path.join(output_dir, "rst.csv")
1063 with open(rst_path, "w", newline="") as f:
1065 out.writerow(("Requirement ID", "Test Module", "Test Name"))
1066 for req_id, metadata in data.items():
1069 metadata["full_title"],
1070 metadata["test_case"],
1071 metadata["validated_by"],
1076 # noinspection PyUnusedLocal
1077 def pytest_report_collectionfinish(config, startdir, items):
1078 """Generates a simple traceability report to output/traceability.csv"""
1079 traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1080 output_dir = os.path.split(traceability_path)[0]
1081 if not os.path.exists(output_dir):
1082 os.makedirs(output_dir)
1083 reqs = load_current_requirements()
1084 requirements = select_heat_requirements(reqs)
1085 testable_requirements = is_testable(requirements)
1086 unmapped, mapped = partition(
1087 lambda i: hasattr(i.function, "requirement_ids"), items
1090 req_to_test = defaultdict(set)
1091 mapping_errors = set()
1093 for req_id in item.function.requirement_ids:
1094 if req_id not in req_to_test:
1095 req_to_test[req_id].add(item)
1096 if req_id in requirements:
1097 reqs[req_id].update(
1099 "test_case": item.function.__module__,
1100 "validated_by": item.function.__name__,
1103 if req_id not in requirements:
1105 (req_id, item.function.__module__, item.function.__name__)
1108 mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1109 with open(mapping_error_path, "w", newline="") as f:
1110 writer = csv.writer(f)
1111 for err in mapping_errors:
1112 writer.writerow(err)
1114 with open(traceability_path, "w", newline="") as f:
1128 for req_id, metadata in testable_requirements.items():
1129 if req_to_test[req_id]:
1130 for item in req_to_test[req_id]:
1134 metadata["description"],
1135 metadata["section_name"],
1136 metadata["keyword"],
1137 metadata["validation_mode"],
1138 metadata["testable"],
1139 item.function.__module__,
1140 item.function.__name__,
1147 metadata["description"],
1148 metadata["section_name"],
1149 metadata["keyword"],
1150 metadata["validation_mode"],
1151 metadata["testable"],
1156 # now write out any test methods that weren't mapped to requirements
1158 (item.function.__module__, item.function.__name__) for item in unmapped
1160 for test_module, test_name in unmapped_tests:
1167 "static", # validation mode
1174 generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))