2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
47 from html import escape
49 from cgi import escape
50 from collections import defaultdict
57 from more_itertools import partition
59 from six import string_types
61 # noinspection PyUnresolvedReferences
65 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
67 __path__ = [os.path.dirname(os.path.abspath(__file__))]
69 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
71 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
73 "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
76 "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
80 ("Error #", "err_num"),
81 ("Input File", "file"),
82 ("Requirements", "req_description"),
83 ("Error Message", "message"),
84 ("Test", "test_file"),
87 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
88 while preparing to validate the the input files. Some validations may not have been
89 executed. Please refer these issue to the VNF Validation Tool team.
92 COLLECTION_FAILURES = []
94 # Captures the results of every test run
98 def get_output_dir(config):
100 Retrieve the output directory for the reports and create it if necessary
101 :param config: pytest configuration
102 :return: output directory as string
104 output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
105 if not os.path.exists(output_dir):
106 os.makedirs(output_dir, exist_ok=True)
110 def extract_error_msg(rep):
112 If a custom error message was provided, then extract it otherwise
113 just show the pytest assert message
115 if rep.outcome != "failed":
118 full_msg = str(rep.longrepr.reprcrash.message)
120 "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
122 if match: # custom message was provided
123 # Extract everything between AssertionError and the start
124 # of the assert statement expansion in the pytest report
126 elif "AssertionError:" in full_msg:
127 msg = full_msg.split("AssertionError:")[1]
130 except AttributeError:
138 Wraps the test case and result to extract necessary metadata for
142 RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
144 def __init__(self, item, outcome):
146 self.result = outcome.get_result()
147 self.files = self._get_files()
148 self.error_message = self._get_error_message()
151 def requirement_ids(self):
153 Returns list of requirement IDs mapped to the test case.
155 :return: Returns a list of string requirement IDs the test was
156 annotated with ``validates`` otherwise returns and empty list
158 is_mapped = hasattr(self.item.function, "requirement_ids")
159 return self.item.function.requirement_ids if is_mapped else []
164 :return: Returns a set of pytest marker names for the test or an empty set
166 return set(m.name for m in self.item.iter_markers())
169 def is_base_test(self):
171 :return: Returns True if the test is annotated with a pytest marker called base
173 return "base" in self.markers
178 :return: True if the test failed
180 return self.outcome == "FAIL"
185 :return: Returns 'PASS', 'FAIL', or 'SKIP'
187 return self.RESULT_MAPPING[self.result.outcome]
192 :return: Name of the test case method
194 return self.item.function.__name__
197 def test_module(self):
199 :return: Name of the file containing the test case
201 return self.item.function.__module__.split(".")[-1]
206 :return: ID of the test (test_module + test_case)
208 return "{}::{}".format(self.test_module, self.test_case)
211 def raw_output(self):
213 :return: Full output from pytest for the given test case
215 return str(self.result.longrepr)
217 def requirement_text(self, curr_reqs):
219 Creates a text summary for the requirement IDs mapped to the test case.
220 If no requirements are mapped, then it returns the empty string.
222 :param curr_reqs: mapping of requirement IDs to requirement metadata
223 loaded from the VNFRQTS projects needs.json output
224 :return: ID and text of the requirements mapped to the test case
227 "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
228 for r_id in self.requirement_ids
233 def requirements_metadata(self, curr_reqs):
235 Returns a list of dicts containing the following metadata for each
239 - text: Full text of the requirement
240 - keyword: MUST, MUST NOT, MAY, etc.
242 :param curr_reqs: mapping of requirement IDs to requirement metadata
243 loaded from the VNFRQTS projects needs.json output
244 :return: List of requirement metadata
247 for r_id in self.requirement_ids:
248 if r_id not in curr_reqs:
253 "text": curr_reqs[r_id]["description"],
254 "keyword": curr_reqs[r_id]["keyword"],
259 def _get_files(self):
261 Extracts the list of files passed into the test case.
262 :return: List of absolute paths to files
264 if "environment_pair" in self.item.fixturenames:
266 "{} environment pair".format(
267 self.item.funcargs["environment_pair"]["name"]
270 elif "heat_volume_pair" in self.item.fixturenames:
272 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
274 elif "heat_templates" in self.item.fixturenames:
275 return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
276 elif "yaml_files" in self.item.fixturenames:
277 return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
279 parts = self.result.nodeid.split("[")
280 return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
282 def _get_error_message(self):
284 :return: Error message or empty string if the test did not fail or error
287 return extract_error_msg(self.result)
292 # noinspection PyUnusedLocal
293 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
294 def pytest_runtest_makereport(item, call):
296 Captures the test results for later reporting. This will also halt testing
297 if a base failure is encountered (can be overridden with continue-on-failure)
300 if outcome.get_result().when != "call":
301 return # only capture results of test cases themselves
302 result = TestResult(item, outcome)
304 not item.config.option.continue_on_failure
305 and result.is_base_test
308 msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
311 result.error_message = msg
312 ALL_RESULTS.append(result)
313 pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
315 ALL_RESULTS.append(result)
318 def make_timestamp():
320 :return: String make_iso_timestamp in format:
321 2019-01-19 10:18:49.865000 Central Standard Time
323 timezone = time.tzname[time.localtime().tm_isdst]
324 return "{} {}".format(str(datetime.datetime.now()), timezone)
327 # noinspection PyUnusedLocal
328 def pytest_sessionstart(session):
330 COLLECTION_FAILURES.clear()
333 # noinspection PyUnusedLocal
334 def pytest_sessionfinish(session, exitstatus):
336 If not a self-test run, generate the output reports
338 if not session.config.option.template_dir:
341 if session.config.option.template_source:
342 template_source = session.config.option.template_source[0]
344 template_source = os.path.abspath(session.config.option.template_dir[0])
346 categories_selected = session.config.option.test_categories or ""
348 get_output_dir(session.config),
351 session.config.option.report_format,
355 # noinspection PyUnusedLocal
356 def pytest_collection_modifyitems(session, config, items):
358 Selects tests based on the categories requested. Tests without
359 categories will always be executed.
361 config.traceability_items = list(items) # save all items for traceability
362 if not config.option.self_test:
364 # checking if test belongs to a category
365 if hasattr(item.function, "categories"):
366 if config.option.test_categories:
367 test_categories = getattr(item.function, "categories")
368 passed_categories = config.option.test_categories
370 category in passed_categories for category in test_categories
375 "Test categories do not match "
376 "all the passed categories"
384 "Test belongs to a category but "
385 "no categories were passed"
391 key=lambda x: (0, x.name)
392 if "base" in set(m.name for m in x.iter_markers())
397 def make_href(paths, base_dir=None):
399 Create an anchor tag to link to the file paths provided.
400 :param paths: string or list of file paths
401 :param base_dir: If specified this is pre-pended to each path
402 :return: String of hrefs - one for each path, each seperated by a line
405 paths = [paths] if isinstance(paths, string_types) else paths
407 paths = [os.path.join(base_dir, p) for p in paths]
410 abs_path = os.path.abspath(p)
411 name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
413 "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
414 abs_path=abs_path, name=name
417 return "<br/>".join(links)
420 def generate_report(outpath, template_path, categories, output_format="html"):
422 Generates the various output reports.
424 :param outpath: destination directory for all reports
425 :param template_path: directory containing the Heat templates validated
426 :param categories: Optional categories selected
427 :param output_format: One of "html", "excel", or "csv". Default is "html"
428 :raises: ValueError if requested output format is unknown
430 failures = [r for r in ALL_RESULTS if r.is_failed]
431 generate_failure_file(outpath)
432 output_format = output_format.lower().strip() if output_format else "html"
433 generate_json(outpath, template_path, categories)
434 if output_format == "html":
435 generate_html_report(outpath, categories, template_path, failures)
436 elif output_format == "excel":
437 generate_excel_report(outpath, categories, template_path, failures)
438 elif output_format == "json":
440 elif output_format == "csv":
441 generate_csv_report(outpath, categories, template_path, failures)
443 raise ValueError("Unsupported output format: " + output_format)
446 def write_json(data, path):
448 Pretty print data as JSON to the output path requested
450 :param data: Data structure to be converted to JSON
451 :param path: Where to write output
453 with open(path, "w") as f:
454 json.dump(data, f, indent=2)
457 def generate_failure_file(outpath):
459 Writes a summary of test failures to a file named failures.
460 This is for backwards compatibility only. The report.json offers a
461 more comprehensive output.
463 failure_path = os.path.join(outpath, "failures")
464 failures = [r for r in ALL_RESULTS if r.is_failed]
466 for i, fail in enumerate(failures):
468 "file": fail.files[0] if len(fail.files) == 1 else fail.files,
469 "vnfrqts": fail.requirement_ids,
470 "test": fail.test_case,
471 "test_file": fail.test_module,
472 "raw_output": fail.raw_output,
473 "message": fail.error_message,
475 write_json(data, failure_path)
478 def generate_csv_report(output_dir, categories, template_path, failures):
479 rows = [["Validation Failures"]]
481 ("Categories Selected:", categories),
482 ("Tool Version:", version.VERSION),
483 ("Report Generated At:", make_timestamp()),
484 ("Directory Validated:", template_path),
485 ("Checksum:", hash_directory(template_path)),
486 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
489 for header in headers:
493 if COLLECTION_FAILURES:
494 rows.append([COLLECTION_FAILURE_WARNING])
495 rows.append(["Validation File", "Test", "Fixtures", "Error"])
496 for failure in COLLECTION_FAILURES:
501 ";".join(failure["fixtures"]),
508 rows.append([col for col, _ in REPORT_COLUMNS])
510 reqs = load_current_requirements()
513 for i, failure in enumerate(failures, start=1):
517 "\n".join(failure.files),
518 failure.requirement_text(reqs),
519 failure.error_message,
524 output_path = os.path.join(output_dir, "report.csv")
525 with open(output_path, "w", newline="") as f:
526 writer = csv.writer(f)
531 def generate_excel_report(output_dir, categories, template_path, failures):
532 output_path = os.path.join(output_dir, "report.xlsx")
533 workbook = xlsxwriter.Workbook(output_path)
534 bold = workbook.add_format({"bold": True, "align": "top"})
535 code = workbook.add_format(
536 {"font_name": "Courier", "text_wrap": True, "align": "top"}
538 normal = workbook.add_format({"text_wrap": True, "align": "top"})
539 heading = workbook.add_format({"bold": True, "font_size": 18})
540 worksheet = workbook.add_worksheet("failures")
541 worksheet.write(0, 0, "Validation Failures", heading)
544 ("Categories Selected:", ",".join(categories)),
545 ("Tool Version:", version.VERSION),
546 ("Report Generated At:", make_timestamp()),
547 ("Directory Validated:", template_path),
548 ("Checksum:", hash_directory(template_path)),
549 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
551 for row, (header, value) in enumerate(headers, start=2):
552 worksheet.write(row, 0, header, bold)
553 worksheet.write(row, 1, value)
555 worksheet.set_column(0, len(headers) - 1, 40)
556 worksheet.set_column(len(headers), len(headers), 80)
558 if COLLECTION_FAILURES:
559 collection_failures_start = 2 + len(headers) + 2
560 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
561 collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
562 for col_num, col_name in enumerate(collection_failure_headers):
563 worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
564 for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
565 worksheet.write(row, 0, data["module"])
566 worksheet.write(row, 1, data["test"])
567 worksheet.write(row, 2, ",".join(data["fixtures"]))
568 worksheet.write(row, 3, data["error"], code)
571 start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
572 worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
573 for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
574 worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
576 reqs = load_current_requirements()
579 for col, width in enumerate((20, 30, 60, 60, 40)):
580 worksheet.set_column(col, col, width)
582 for row, failure in enumerate(failures, start=start_error_table_row + 2):
583 worksheet.write(row, 0, str(err_num), normal)
584 worksheet.write(row, 1, "\n".join(failure.files), normal)
585 worksheet.write(row, 2, failure.requirement_text(reqs), normal)
586 worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
587 worksheet.write(row, 4, failure.test_id, normal)
589 worksheet.autofilter(
590 start_error_table_row + 1,
592 start_error_table_row + 1 + err_num,
593 len(REPORT_COLUMNS) - 1,
598 def make_iso_timestamp():
600 Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
602 now = datetime.datetime.utcnow()
603 now.replace(tzinfo=datetime.timezone.utc)
604 return now.isoformat()
607 def aggregate_results(outcomes, r_id=None):
609 Determines the aggregate result for the conditions provided. Assumes the
610 results have been filtered and collected for analysis.
612 :param outcomes: set of outcomes from the TestResults
613 :param r_id: Optional requirement ID if known
614 :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
615 (see aggregate_requirement_adherence for more detail)
619 elif "ERROR" in outcomes:
621 elif "FAIL" in outcomes:
623 elif "PASS" in outcomes:
625 elif {"SKIP"} == outcomes:
629 "Unexpected error aggregating outcomes ({}) for requirement {}".format(
636 def aggregate_run_results(collection_failures, test_results):
638 Determines overall status of run based on all failures and results.
640 * 'ERROR' - At least one collection failure occurred during the run.
641 * 'FAIL' - Template failed at least one test
642 * 'PASS' - All tests executed properly and no failures were detected
644 :param collection_failures: failures occuring during test setup
645 :param test_results: list of all test executuion results
646 :return: one of 'ERROR', 'FAIL', or 'PASS'
648 if collection_failures:
650 elif any(r.is_failed for r in test_results):
656 def relative_paths(base_dir, paths):
657 return [os.path.relpath(p, base_dir) for p in paths if p != ""]
660 # noinspection PyTypeChecker
661 def generate_json(outpath, template_path, categories):
663 Creates a JSON summary of the entire test run.
665 reqs = load_current_requirements()
668 "template_directory": os.path.splitdrive(template_path)[1].replace(
671 "timestamp": make_iso_timestamp(),
672 "checksum": hash_directory(template_path),
673 "categories": categories,
674 "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
679 results = data["tests"]
680 for result in COLLECTION_FAILURES:
684 "test_module": result["module"],
685 "test_case": result["test"],
687 "error": result["error"],
688 "requirements": result["requirements"],
691 for result in ALL_RESULTS:
694 "files": relative_paths(template_path, result.files),
695 "test_module": result.test_module,
696 "test_case": result.test_case,
697 "result": result.outcome,
698 "error": result.error_message if result.is_failed else "",
699 "requirements": result.requirements_metadata(reqs),
703 # Build a mapping of requirement ID to the results
704 r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
705 for test_result in results:
706 test_reqs = test_result["requirements"]
708 [r["id"] if isinstance(r, dict) else r for r in test_reqs]
713 item = r_id_results[r_id]
714 item["outcomes"].add(test_result["result"])
715 if test_result["error"]:
716 item["errors"].add(test_result["error"])
718 requirements = data["requirements"]
719 for r_id, r_data in reqs.items():
723 "text": r_data["description"],
724 "keyword": r_data["keyword"],
725 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
726 "errors": list(r_id_results[r_id]["errors"]),
730 if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
734 "text": "Tests not mapped to requirements (see tests)",
735 "result": aggregate_results(r_id_results[""]["outcomes"]),
736 "errors": list(r_id_results[""]["errors"]),
740 report_path = os.path.join(outpath, "report.json")
741 write_json(data, report_path)
744 def generate_html_report(outpath, categories, template_path, failures):
745 reqs = load_current_requirements()
747 for failure in failures:
750 "file_links": make_href(failure.files, template_path),
751 "test_id": failure.test_id,
752 "error_message": escape(failure.error_message).replace("\n",
754 "raw_output": escape(failure.raw_output),
755 "requirements": docutils.core.publish_parts(
756 writer_name="html", source=failure.requirement_text(reqs)
760 pkg_dir = os.path.split(__file__)[0]
761 j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
762 with open(j2_template_path, "r") as f:
763 report_template = jinja2.Template(f.read())
764 contents = report_template.render(
765 version=version.VERSION,
766 num_failures=len(failures) + len(COLLECTION_FAILURES),
767 categories=categories,
768 template_dir=make_href(template_path),
769 checksum=hash_directory(template_path),
770 timestamp=make_timestamp(),
772 collection_failures=COLLECTION_FAILURES,
774 with open(os.path.join(outpath, "report.html"), "w") as f:
778 def pytest_addoption(parser):
780 Add needed CLI arguments
783 "--template-directory",
786 help="Directory which holds the templates for validation",
791 dest="template_source",
793 help="Source Directory which holds the templates for validation",
800 help="Test the unit tests against their fixtured data",
805 dest="report_format",
807 help="Format of output report (html, csv, excel, json)",
811 "--continue-on-failure",
812 dest="continue_on_failure",
814 help="Continue validation even when structural errors exist in input files",
818 "--output-directory",
827 dest="test_categories",
829 help="optional category of test to execute",
833 def pytest_configure(config):
835 Ensure that we are receive either `--self-test` or
836 `--template-dir=<directory` as CLI arguments
838 if config.getoption("template_dir") and config.getoption("self_test"):
839 raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
841 config.getoption("template_dir")
842 or config.getoption("self_test")
843 or config.getoption("help")
845 raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
848 def pytest_generate_tests(metafunc):
850 If a unit test requires an argument named 'filename'
851 we generate a test for the filenames selected. Either
852 the files contained in `template_dir` or if `template_dir`
853 is not specified on the CLI, the fixtures associated with this
857 # noinspection PyBroadException
859 if "filename" in metafunc.fixturenames:
860 from .parametrizers import parametrize_filename
862 parametrize_filename(metafunc)
864 if "filenames" in metafunc.fixturenames:
865 from .parametrizers import parametrize_filenames
867 parametrize_filenames(metafunc)
869 if "template_dir" in metafunc.fixturenames:
870 from .parametrizers import parametrize_template_dir
872 parametrize_template_dir(metafunc)
874 if "environment_pair" in metafunc.fixturenames:
875 from .parametrizers import parametrize_environment_pair
877 parametrize_environment_pair(metafunc)
879 if "heat_volume_pair" in metafunc.fixturenames:
880 from .parametrizers import parametrize_heat_volume_pair
882 parametrize_heat_volume_pair(metafunc)
884 if "yaml_files" in metafunc.fixturenames:
885 from .parametrizers import parametrize_yaml_files
887 parametrize_yaml_files(metafunc)
889 if "env_files" in metafunc.fixturenames:
890 from .parametrizers import parametrize_environment_files
892 parametrize_environment_files(metafunc)
894 if "yaml_file" in metafunc.fixturenames:
895 from .parametrizers import parametrize_yaml_file
897 parametrize_yaml_file(metafunc)
899 if "env_file" in metafunc.fixturenames:
900 from .parametrizers import parametrize_environment_file
902 parametrize_environment_file(metafunc)
904 if "parsed_yaml_file" in metafunc.fixturenames:
905 from .parametrizers import parametrize_parsed_yaml_file
907 parametrize_parsed_yaml_file(metafunc)
909 if "parsed_environment_file" in metafunc.fixturenames:
910 from .parametrizers import parametrize_parsed_environment_file
912 parametrize_parsed_environment_file(metafunc)
914 if "heat_template" in metafunc.fixturenames:
915 from .parametrizers import parametrize_heat_template
917 parametrize_heat_template(metafunc)
919 if "heat_templates" in metafunc.fixturenames:
920 from .parametrizers import parametrize_heat_templates
922 parametrize_heat_templates(metafunc)
924 if "volume_template" in metafunc.fixturenames:
925 from .parametrizers import parametrize_volume_template
927 parametrize_volume_template(metafunc)
929 if "volume_templates" in metafunc.fixturenames:
930 from .parametrizers import parametrize_volume_templates
932 parametrize_volume_templates(metafunc)
934 if "template" in metafunc.fixturenames:
935 from .parametrizers import parametrize_template
937 parametrize_template(metafunc)
939 if "templates" in metafunc.fixturenames:
940 from .parametrizers import parametrize_templates
942 parametrize_templates(metafunc)
943 except Exception as e:
944 # If an error occurs in the collection phase, then it won't be logged as a
945 # normal test failure. This means that failures could occur, but not
946 # be seen on the report resulting in a false positive success message. These
947 # errors will be stored and reported separately on the report
948 COLLECTION_FAILURES.append(
950 "module": metafunc.module.__name__,
951 "test": metafunc.function.__name__,
952 "fixtures": metafunc.fixturenames,
953 "error": traceback.format_exc(),
954 "requirements": getattr(metafunc.function, "requirement_ids", []),
960 def hash_directory(path):
962 Create md5 hash using the contents of all files under ``path``
963 :param path: string directory containing files
964 :return: string MD5 hash code (hex)
966 md5 = hashlib.md5() # nosec
967 for dir_path, sub_dirs, filenames in os.walk(path):
968 for filename in filenames:
969 file_path = os.path.join(dir_path, filename)
970 with open(file_path, "rb") as f:
972 return md5.hexdigest()
975 def load_current_requirements():
976 """Loads dict of current requirements or empty dict if file doesn't exist"""
977 with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
979 version = data["current_version"]
980 return data["versions"][version]["needs"]
983 def select_heat_requirements(reqs):
984 """Filters dict requirements to only those requirements pertaining to Heat"""
985 return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
988 def is_testable(reqs):
989 """Filters dict requirements to only those which are testable"""
990 for key, values in reqs.items():
991 if ("MUST" in values.get("keyword", "").upper()) and (
992 "none" not in values.get("validation_mode", "").lower()
994 reqs[key]["testable"] = True
996 reqs[key]["testable"] = False
1000 def build_rst_json(reqs):
1001 """Takes requirements and returns list of only Heat requirements"""
1002 for key, values in list(reqs.items()):
1003 if values["testable"]:
1004 # Creates links in RST format to requirements and test cases
1005 if values["test_case"]:
1006 mod = values["test_case"].split(".")[-1]
1007 val = TEST_SCRIPT_SITE + mod + ".py"
1008 rst_value = "`" + mod + " <" + val + ">`_"
1014 + values["docname"].replace(" ", "%20")
1019 reqs[key].update({"full_title": title, "test_case": rst_value})
1026 + values["docname"].replace(" ", "%20")
1033 "full_title": title,
1034 "test_case": "No test for requirement",
1035 "validated_by": "static",
1043 def generate_rst_table(output_dir, data):
1044 """Generate a formatted csv to be used in RST"""
1045 rst_path = os.path.join(output_dir, "rst.csv")
1046 with open(rst_path, "w", newline="") as f:
1048 out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1049 for req_id, metadata in data.items():
1052 metadata["full_title"],
1053 metadata["description"],
1054 metadata["test_case"],
1055 metadata["validated_by"],
1060 # noinspection PyUnusedLocal
1061 def pytest_report_collectionfinish(config, startdir, items):
1062 """Generates a simple traceability report to output/traceability.csv"""
1063 traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1064 output_dir = os.path.split(traceability_path)[0]
1065 if not os.path.exists(output_dir):
1066 os.makedirs(output_dir)
1067 reqs = load_current_requirements()
1068 requirements = select_heat_requirements(reqs)
1069 testable_requirements = is_testable(requirements)
1070 unmapped, mapped = partition(
1071 lambda i: hasattr(i.function, "requirement_ids"), items
1074 req_to_test = defaultdict(set)
1075 mapping_errors = set()
1077 for req_id in item.function.requirement_ids:
1078 if req_id not in req_to_test:
1079 req_to_test[req_id].add(item)
1080 if req_id in requirements:
1081 reqs[req_id].update(
1083 "test_case": item.function.__module__,
1084 "validated_by": item.function.__name__,
1087 if req_id not in requirements:
1089 (req_id, item.function.__module__, item.function.__name__)
1092 mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1093 with open(mapping_error_path, "w", newline="") as f:
1094 writer = csv.writer(f)
1095 for err in mapping_errors:
1096 writer.writerow(err)
1098 with open(traceability_path, "w", newline="") as f:
1112 for req_id, metadata in testable_requirements.items():
1113 if req_to_test[req_id]:
1114 for item in req_to_test[req_id]:
1118 metadata["description"],
1119 metadata["section_name"],
1120 metadata["keyword"],
1121 metadata["validation_mode"],
1122 metadata["testable"],
1123 item.function.__module__,
1124 item.function.__name__,
1131 metadata["description"],
1132 metadata["section_name"],
1133 metadata["keyword"],
1134 metadata["validation_mode"],
1135 metadata["testable"],
1140 # now write out any test methods that weren't mapped to requirements
1142 (item.function.__module__, item.function.__name__) for item in unmapped
1144 for test_module, test_name in unmapped_tests:
1151 "static", # validation mode
1158 generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))