2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
28 # https://creativecommons.org/licenses/by/4.0/
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
36 # ============LICENSE_END============================================
46 from collections import defaultdict
53 from more_itertools import partition
55 from six import string_types
57 # noinspection PyUnresolvedReferences
61 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
63 __path__ = [os.path.dirname(os.path.abspath(__file__))]
65 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
67 RESOLUTION_STEPS_FILE = "resolution_steps.json"
68 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
70 "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
73 "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
77 ("Input File", "file"),
78 ("Test", "test_file"),
79 ("Requirements", "req_description"),
80 ("Resolution Steps", "resolution_steps"),
81 ("Error Message", "message"),
82 ("Raw Test Output", "raw_output"),
85 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
86 while preparing to validate the the input files. Some validations may not have been
87 executed. Please refer these issue to the VNF Validation Tool team.
90 COLLECTION_FAILURES = []
92 # Captures the results of every test run
96 def get_output_dir(config):
98 Retrieve the output directory for the reports and create it if necessary
99 :param config: pytest configuration
100 :return: output directory as string
102 output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
103 if not os.path.exists(output_dir):
104 os.makedirs(output_dir, exist_ok=True)
108 def extract_error_msg(rep):
110 If a custom error message was provided, then extract it otherwise
111 just show the pytest assert message
113 if rep.outcome != "failed":
116 full_msg = str(rep.longrepr.reprcrash.message)
118 "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
120 if match: # custom message was provided
121 # Extract everything between AssertionError and the start
122 # of the assert statement expansion in the pytest report
125 msg = str(rep.longrepr.reprcrash)
126 if "AssertionError:" in msg:
127 msg = msg.split("AssertionError:")[1]
128 except AttributeError:
136 Wraps the test case and result to extract necessary metadata for
140 RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
142 def __init__(self, item, outcome):
144 self.result = outcome.get_result()
145 self.files = [os.path.normpath(p) for p in self._get_files()]
146 self.error_message = self._get_error_message()
149 def requirement_ids(self):
151 Returns list of requirement IDs mapped to the test case.
153 :return: Returns a list of string requirement IDs the test was
154 annotated with ``validates`` otherwise returns and empty list
156 is_mapped = hasattr(self.item.function, "requirement_ids")
157 return self.item.function.requirement_ids if is_mapped else []
162 :return: Returns a set of pytest marker names for the test or an empty set
164 return set(m.name for m in self.item.iter_markers())
167 def is_base_test(self):
169 :return: Returns True if the test is annotated with a pytest marker called base
171 return "base" in self.markers
176 :return: True if the test failed
178 return self.outcome == "FAIL"
183 :return: Returns 'PASS', 'FAIL', or 'SKIP'
185 return self.RESULT_MAPPING[self.result.outcome]
190 :return: Name of the test case method
192 return self.item.function.__name__
195 def test_module(self):
197 :return: Name of the file containing the test case
199 return self.item.function.__module__.split(".")[-1]
204 :return: ID of the test (test_module + test_case)
206 return "{}::{}".format(self.test_module, self.test_case)
209 def raw_output(self):
211 :return: Full output from pytest for the given test case
213 return str(self.result.longrepr)
215 def requirement_text(self, curr_reqs):
217 Creates a text summary for the requirement IDs mapped to the test case.
218 If no requirements are mapped, then it returns the empty string.
220 :param curr_reqs: mapping of requirement IDs to requirement metadata
221 loaded from the VNFRQTS projects needs.json output
222 :return: ID and text of the requirements mapped to the test case
225 "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
226 for r_id in self.requirement_ids
231 def requirements_metadata(self, curr_reqs):
233 Returns a list of dicts containing the following metadata for each
237 - text: Full text of the requirement
238 - keyword: MUST, MUST NOT, MAY, etc.
240 :param curr_reqs: mapping of requirement IDs to requirement metadata
241 loaded from the VNFRQTS projects needs.json output
242 :return: List of requirement metadata
245 for r_id in self.requirement_ids:
246 if r_id not in curr_reqs:
251 "text": curr_reqs[r_id]["description"],
252 "keyword": curr_reqs[r_id]["keyword"],
257 def resolution_steps(self, resolutions):
259 :param resolutions: Loaded from contents for resolution_steps.json
260 :return: Header and text for the resolution step associated with this
261 test case. Returns empty string if no resolutions are
265 "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
266 for entry in resolutions
267 if self._match(entry)
271 def _match(self, resolution_entry):
273 Returns True if the test result maps to the given entry in
277 self.test_case == resolution_entry["function"]
278 and self.test_module == resolution_entry["module"]
281 def _get_files(self):
283 Extracts the list of files passed into the test case.
284 :return: List of absolute paths to files
286 if "environment_pair" in self.item.fixturenames:
288 "{} environment pair".format(
289 self.item.funcargs["environment_pair"]["name"]
292 elif "heat_volume_pair" in self.item.fixturenames:
294 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
296 elif "heat_templates" in self.item.fixturenames:
297 return self.item.funcargs["heat_templates"]
298 elif "yaml_files" in self.item.fixturenames:
299 return self.item.funcargs["yaml_files"]
301 parts = self.result.nodeid.split("[")
302 return [""] if len(parts) == 1 else [parts[1][:-1]]
304 def _get_error_message(self):
306 :return: Error message or empty string if the test did not fail or error
309 return extract_error_msg(self.result)
314 # noinspection PyUnusedLocal
315 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
316 def pytest_runtest_makereport(item, call):
318 Captures the test results for later reporting. This will also halt testing
319 if a base failure is encountered (can be overridden with continue-on-failure)
322 if outcome.get_result().when != "call":
323 return # only capture results of test cases themselves
324 result = TestResult(item, outcome)
326 not item.config.option.continue_on_failure
327 and result.is_base_test
330 msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
333 result.error_message = msg
334 ALL_RESULTS.append(result)
335 pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
337 ALL_RESULTS.append(result)
340 def make_timestamp():
342 :return: String make_iso_timestamp in format:
343 2019-01-19 10:18:49.865000 Central Standard Time
345 timezone = time.tzname[time.localtime().tm_isdst]
346 return "{} {}".format(str(datetime.datetime.now()), timezone)
349 # noinspection PyUnusedLocal
350 def pytest_sessionstart(session):
352 COLLECTION_FAILURES.clear()
355 # noinspection PyUnusedLocal
356 def pytest_sessionfinish(session, exitstatus):
358 If not a self-test run, generate the output reports
360 if not session.config.option.template_dir:
363 if session.config.option.template_source:
364 template_source = session.config.option.template_source[0]
366 template_source = os.path.abspath(session.config.option.template_dir[0])
368 categories_selected = session.config.option.test_categories or ""
370 get_output_dir(session.config),
373 session.config.option.report_format,
377 # noinspection PyUnusedLocal
378 def pytest_collection_modifyitems(session, config, items):
380 Selects tests based on the categories requested. Tests without
381 categories will always be executed.
383 config.traceability_items = list(items) # save all items for traceability
384 if not config.option.self_test:
386 # checking if test belongs to a category
387 if hasattr(item.function, "categories"):
388 if config.option.test_categories:
389 test_categories = getattr(item.function, "categories")
390 passed_categories = config.option.test_categories
392 category in passed_categories for category in test_categories
396 reason=("Test categories do not match "
397 "all the passed categories")
403 reason=("Test belongs to a category but "
404 "no categories were passed")
408 key=lambda x: 0 if "base" in set(m.name for m in x.iter_markers()) else 1
412 def make_href(paths):
414 Create an anchor tag to link to the file paths provided.
415 :param paths: string or list of file paths
416 :return: String of hrefs - one for each path, each seperated by a line
419 paths = [paths] if isinstance(paths, string_types) else paths
422 abs_path = os.path.abspath(p)
423 name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
425 "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
426 abs_path=abs_path, name=name
429 return "<br/>".join(links)
432 def load_resolutions_file():
434 :return: dict of data loaded from resolutions_steps.json
436 resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
437 if os.path.exists(resolution_steps):
438 with open(resolution_steps, "r") as f:
439 return json.loads(f.read())
442 def generate_report(outpath, template_path, categories, output_format="html"):
444 Generates the various output reports.
446 :param outpath: destination directory for all reports
447 :param template_path: directory containing the Heat templates validated
448 :param categories: Optional categories selected
449 :param output_format: One of "html", "excel", or "csv". Default is "html"
450 :raises: ValueError if requested output format is unknown
452 failures = [r for r in ALL_RESULTS if r.is_failed]
453 generate_failure_file(outpath)
454 output_format = output_format.lower().strip() if output_format else "html"
455 generate_json(outpath, template_path, categories)
456 if output_format == "html":
457 generate_html_report(outpath, categories, template_path, failures)
458 elif output_format == "excel":
459 generate_excel_report(outpath, categories, template_path, failures)
460 elif output_format == "json":
462 elif output_format == "csv":
463 generate_csv_report(outpath, categories, template_path, failures)
465 raise ValueError("Unsupported output format: " + output_format)
468 def write_json(data, path):
470 Pretty print data as JSON to the output path requested
472 :param data: Data structure to be converted to JSON
473 :param path: Where to write output
475 with open(path, "w") as f:
476 json.dump(data, f, indent=2)
479 def generate_failure_file(outpath):
481 Writes a summary of test failures to a file named failures.
482 This is for backwards compatibility only. The report.json offers a
483 more comprehensive output.
485 failure_path = os.path.join(outpath, "failures")
486 failures = [r for r in ALL_RESULTS if r.is_failed]
488 for i, fail in enumerate(failures):
490 "file": fail.files[0] if len(fail.files) == 1 else fail.files,
491 "vnfrqts": fail.requirement_ids,
492 "test": fail.test_case,
493 "test_file": fail.test_module,
494 "raw_output": fail.raw_output,
495 "message": fail.error_message,
497 write_json(data, failure_path)
500 def generate_csv_report(output_dir, categories, template_path, failures):
501 rows = [["Validation Failures"]]
503 ("Categories Selected:", categories),
504 ("Tool Version:", version.VERSION),
505 ("Report Generated At:", make_timestamp()),
506 ("Directory Validated:", template_path),
507 ("Checksum:", hash_directory(template_path)),
508 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
511 for header in headers:
515 if COLLECTION_FAILURES:
516 rows.append([COLLECTION_FAILURE_WARNING])
517 rows.append(["Validation File", "Test", "Fixtures", "Error"])
518 for failure in COLLECTION_FAILURES:
523 ";".join(failure["fixtures"]),
530 rows.append([col for col, _ in REPORT_COLUMNS])
532 reqs = load_current_requirements()
533 resolutions = load_resolutions_file()
536 for failure in failures:
539 "\n".join(failure.files),
541 failure.requirement_text(reqs),
542 failure.resolution_steps(resolutions),
543 failure.error_message,
548 output_path = os.path.join(output_dir, "report.csv")
549 with open(output_path, "w", newline="") as f:
550 writer = csv.writer(f)
555 def generate_excel_report(output_dir, categories, template_path, failures):
556 output_path = os.path.join(output_dir, "report.xlsx")
557 workbook = xlsxwriter.Workbook(output_path)
558 bold = workbook.add_format({"bold": True})
559 code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
560 normal = workbook.add_format({"text_wrap": True})
561 heading = workbook.add_format({"bold": True, "font_size": 18})
562 worksheet = workbook.add_worksheet("failures")
563 worksheet.write(0, 0, "Validation Failures", heading)
566 ("Categories Selected:", ",".join(categories)),
567 ("Tool Version:", version.VERSION),
568 ("Report Generated At:", make_timestamp()),
569 ("Directory Validated:", template_path),
570 ("Checksum:", hash_directory(template_path)),
571 ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
573 for row, (header, value) in enumerate(headers, start=2):
574 worksheet.write(row, 0, header, bold)
575 worksheet.write(row, 1, value)
577 worksheet.set_column(0, len(headers) - 1, 40)
578 worksheet.set_column(len(headers), len(headers), 80)
580 if COLLECTION_FAILURES:
581 collection_failures_start = 2 + len(headers) + 2
582 worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
583 collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
584 for col_num, col_name in enumerate(collection_failure_headers):
585 worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
586 for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
587 worksheet.write(row, 0, data["module"])
588 worksheet.write(row, 1, data["test"])
589 worksheet.write(row, 2, ",".join(data["fixtures"]))
590 worksheet.write(row, 3, data["error"], code)
593 start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
594 worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
595 for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
596 worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
598 reqs = load_current_requirements()
599 resolutions = load_resolutions_file()
602 for row, failure in enumerate(failures, start=start_error_table_row + 2):
603 worksheet.write(row, 0, "\n".join(failure.files), normal)
604 worksheet.write(row, 1, failure.test_id, normal)
605 worksheet.write(row, 2, failure.requirement_text(reqs), normal)
606 worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
607 worksheet.write(row, 4, failure.error_message, normal)
608 worksheet.write(row, 5, failure.raw_output, code)
613 def make_iso_timestamp():
615 Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
617 now = datetime.datetime.utcnow()
618 now.replace(tzinfo=datetime.timezone.utc)
619 return now.isoformat()
622 def aggregate_results(outcomes, r_id=None):
624 Determines the aggregate result for the conditions provided. Assumes the
625 results have been filtered and collected for analysis.
627 :param outcomes: set of outcomes from the TestResults
628 :param r_id: Optional requirement ID if known
629 :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
630 (see aggregate_requirement_adherence for more detail)
634 elif "ERROR" in outcomes:
636 elif "FAIL" in outcomes:
638 elif "PASS" in outcomes:
640 elif {"SKIP"} == outcomes:
644 "Unexpected error aggregating outcomes ({}) for requirement {}".format(
651 def aggregate_run_results(collection_failures, test_results):
653 Determines overall status of run based on all failures and results.
655 * 'ERROR' - At least one collection failure occurred during the run.
656 * 'FAIL' - Template failed at least one test
657 * 'PASS' - All tests executed properly and no failures were detected
659 :param collection_failures: failures occuring during test setup
660 :param test_results: list of all test executuion results
661 :return: one of 'ERROR', 'FAIL', or 'PASS'
663 if collection_failures:
665 elif any(r.is_failed for r in test_results):
671 def relative_paths(base_dir, paths):
672 return [os.path.relpath(p, base_dir) for p in paths]
675 # noinspection PyTypeChecker
676 def generate_json(outpath, template_path, categories):
678 Creates a JSON summary of the entire test run.
680 reqs = load_current_requirements()
683 "template_directory": os.path.splitdrive(template_path)[1].replace(
686 "timestamp": make_iso_timestamp(),
687 "checksum": hash_directory(template_path),
688 "categories": categories,
689 "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
694 results = data["tests"]
695 for result in COLLECTION_FAILURES:
699 "test_module": result["module"],
700 "test_case": result["test"],
702 "error": result["error"],
703 "requirements": result["requirements"],
706 for result in ALL_RESULTS:
709 "files": relative_paths(template_path, result.files),
710 "test_module": result.test_module,
711 "test_case": result.test_case,
712 "result": result.outcome,
713 "error": result.error_message if result.is_failed else "",
714 "requirements": result.requirements_metadata(reqs),
718 # Build a mapping of requirement ID to the results
719 r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
720 for test_result in results:
721 test_reqs = test_result["requirements"]
723 [r["id"] if isinstance(r, dict) else r for r in test_reqs]
728 item = r_id_results[r_id]
729 item["outcomes"].add(test_result["result"])
730 if test_result["error"]:
731 item["errors"].add(test_result["error"])
733 requirements = data["requirements"]
734 for r_id, r_data in reqs.items():
738 "text": r_data["description"],
739 "keyword": r_data["keyword"],
740 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
741 "errors": list(r_id_results[r_id]["errors"]),
745 if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
749 "text": "Tests not mapped to requirements (see tests)",
750 "result": aggregate_results(r_id_results[""]["outcomes"]),
751 "errors": list(r_id_results[""]["errors"]),
755 report_path = os.path.join(outpath, "report.json")
756 write_json(data, report_path)
759 def generate_html_report(outpath, categories, template_path, failures):
760 reqs = load_current_requirements()
761 resolutions = load_resolutions_file()
763 for failure in failures:
766 "file_links": make_href(failure.files),
767 "test_id": failure.test_id,
768 "error_message": failure.error_message,
769 "raw_output": failure.raw_output,
770 "requirements": docutils.core.publish_parts(
771 writer_name="html", source=failure.requirement_text(reqs)
773 "resolution_steps": failure.resolution_steps(resolutions),
776 pkg_dir = os.path.split(__file__)[0]
777 j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
778 with open(j2_template_path, "r") as f:
779 report_template = jinja2.Template(f.read())
780 contents = report_template.render(
781 version=version.VERSION,
782 num_failures=len(failures) + len(COLLECTION_FAILURES),
783 categories=categories,
784 template_dir=make_href(template_path),
785 checksum=hash_directory(template_path),
786 timestamp=make_timestamp(),
788 collection_failures=COLLECTION_FAILURES,
790 with open(os.path.join(outpath, "report.html"), "w") as f:
794 def pytest_addoption(parser):
796 Add needed CLI arguments
799 "--template-directory",
802 help="Directory which holds the templates for validation",
807 dest="template_source",
809 help="Source Directory which holds the templates for validation",
816 help="Test the unit tests against their fixtured data",
821 dest="report_format",
823 help="Format of output report (html, csv, excel, json)",
827 "--continue-on-failure",
828 dest="continue_on_failure",
830 help="Continue validation even when structural errors exist in input files",
834 "--output-directory",
843 dest="test_categories",
845 help="optional category of test to execute",
849 def pytest_configure(config):
851 Ensure that we are receive either `--self-test` or
852 `--template-dir=<directory` as CLI arguments
854 if config.getoption("template_dir") and config.getoption("self_test"):
855 raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
857 config.getoption("template_dir")
858 or config.getoption("self_test")
859 or config.getoption("help")
861 raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
864 def pytest_generate_tests(metafunc):
866 If a unit test requires an argument named 'filename'
867 we generate a test for the filenames selected. Either
868 the files contained in `template_dir` or if `template_dir`
869 is not specified on the CLI, the fixtures associated with this
873 # noinspection PyBroadException
875 if "filename" in metafunc.fixturenames:
876 from .parametrizers import parametrize_filename
878 parametrize_filename(metafunc)
880 if "filenames" in metafunc.fixturenames:
881 from .parametrizers import parametrize_filenames
883 parametrize_filenames(metafunc)
885 if "template_dir" in metafunc.fixturenames:
886 from .parametrizers import parametrize_template_dir
888 parametrize_template_dir(metafunc)
890 if "environment_pair" in metafunc.fixturenames:
891 from .parametrizers import parametrize_environment_pair
893 parametrize_environment_pair(metafunc)
895 if "heat_volume_pair" in metafunc.fixturenames:
896 from .parametrizers import parametrize_heat_volume_pair
898 parametrize_heat_volume_pair(metafunc)
900 if "yaml_files" in metafunc.fixturenames:
901 from .parametrizers import parametrize_yaml_files
903 parametrize_yaml_files(metafunc)
905 if "env_files" in metafunc.fixturenames:
906 from .parametrizers import parametrize_environment_files
908 parametrize_environment_files(metafunc)
910 if "yaml_file" in metafunc.fixturenames:
911 from .parametrizers import parametrize_yaml_file
913 parametrize_yaml_file(metafunc)
915 if "env_file" in metafunc.fixturenames:
916 from .parametrizers import parametrize_environment_file
918 parametrize_environment_file(metafunc)
920 if "parsed_yaml_file" in metafunc.fixturenames:
921 from .parametrizers import parametrize_parsed_yaml_file
923 parametrize_parsed_yaml_file(metafunc)
925 if "parsed_environment_file" in metafunc.fixturenames:
926 from .parametrizers import parametrize_parsed_environment_file
928 parametrize_parsed_environment_file(metafunc)
930 if "heat_template" in metafunc.fixturenames:
931 from .parametrizers import parametrize_heat_template
933 parametrize_heat_template(metafunc)
935 if "heat_templates" in metafunc.fixturenames:
936 from .parametrizers import parametrize_heat_templates
938 parametrize_heat_templates(metafunc)
940 if "volume_template" in metafunc.fixturenames:
941 from .parametrizers import parametrize_volume_template
943 parametrize_volume_template(metafunc)
945 if "volume_templates" in metafunc.fixturenames:
946 from .parametrizers import parametrize_volume_templates
948 parametrize_volume_templates(metafunc)
950 if "template" in metafunc.fixturenames:
951 from .parametrizers import parametrize_template
953 parametrize_template(metafunc)
955 if "templates" in metafunc.fixturenames:
956 from .parametrizers import parametrize_templates
958 parametrize_templates(metafunc)
959 except Exception as e:
960 # If an error occurs in the collection phase, then it won't be logged as a
961 # normal test failure. This means that failures could occur, but not
962 # be seen on the report resulting in a false positive success message. These
963 # errors will be stored and reported separately on the report
964 COLLECTION_FAILURES.append(
966 "module": metafunc.module.__name__,
967 "test": metafunc.function.__name__,
968 "fixtures": metafunc.fixturenames,
969 "error": traceback.format_exc(),
970 "requirements": getattr(metafunc.function, "requirement_ids", []),
976 def hash_directory(path):
978 Create md5 hash using the contents of all files under ``path``
979 :param path: string directory containing files
980 :return: string MD5 hash code (hex)
983 for dir_path, sub_dirs, filenames in os.walk(path):
984 for filename in filenames:
985 file_path = os.path.join(dir_path, filename)
986 with open(file_path, "rb") as f:
988 return md5.hexdigest()
991 def load_current_requirements():
992 """Loads dict of current requirements or empty dict if file doesn't exist"""
993 with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
995 version = data["current_version"]
996 return data["versions"][version]["needs"]
999 def select_heat_requirements(reqs):
1000 """Filters dict requirements to only those requirements pertaining to Heat"""
1001 return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1004 def is_testable(reqs):
1005 """Filters dict requirements to only those which are testable"""
1006 for key, values in reqs.items():
1007 if ("MUST" in values.get("keyword", "").upper()) and (
1008 "none" not in values.get("validation_mode", "").lower()
1010 reqs[key]["testable"] = True
1012 reqs[key]["testable"] = False
1016 def build_rst_json(reqs):
1017 """Takes requirements and returns list of only Heat requirements"""
1018 for key, values in list(reqs.items()):
1019 if values["testable"]:
1020 # Creates links in RST format to requirements and test cases
1021 if values["test_case"]:
1022 mod = values["test_case"].split(".")[-1]
1023 val = TEST_SCRIPT_SITE + mod + ".py"
1024 rst_value = "`" + mod + " <" + val + ">`_"
1030 + values["docname"].replace(" ", "%20")
1035 reqs[key].update({"full_title": title, "test_case": rst_value})
1042 + values["docname"].replace(" ", "%20")
1049 "full_title": title,
1050 "test_case": "No test for requirement",
1051 "validated_by": "static",
1059 def generate_rst_table(output_dir, data):
1060 """Generate a formatted csv to be used in RST"""
1061 rst_path = os.path.join(output_dir, "rst.csv")
1062 with open(rst_path, "w", newline="") as f:
1064 out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1065 for req_id, metadata in data.items():
1068 metadata["full_title"],
1069 metadata["description"],
1070 metadata["test_case"],
1071 metadata["validated_by"],
1076 # noinspection PyUnusedLocal
1077 def pytest_report_collectionfinish(config, startdir, items):
1078 """Generates a simple traceability report to output/traceability.csv"""
1079 traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1080 output_dir = os.path.split(traceability_path)[0]
1081 if not os.path.exists(output_dir):
1082 os.makedirs(output_dir)
1083 reqs = load_current_requirements()
1084 requirements = select_heat_requirements(reqs)
1085 testable_requirements = is_testable(requirements)
1086 unmapped, mapped = partition(
1087 lambda i: hasattr(i.function, "requirement_ids"), items
1090 req_to_test = defaultdict(set)
1091 mapping_errors = set()
1093 for req_id in item.function.requirement_ids:
1094 if req_id not in req_to_test:
1095 req_to_test[req_id].add(item)
1096 if req_id in requirements:
1097 reqs[req_id].update(
1099 "test_case": item.function.__module__,
1100 "validated_by": item.function.__name__,
1103 if req_id not in requirements:
1105 (req_id, item.function.__module__, item.function.__name__)
1108 mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1109 with open(mapping_error_path, "w", newline="") as f:
1110 writer = csv.writer(f)
1111 for err in mapping_errors:
1112 writer.writerow(err)
1114 with open(traceability_path, "w", newline="") as f:
1128 for req_id, metadata in testable_requirements.items():
1129 if req_to_test[req_id]:
1130 for item in req_to_test[req_id]:
1134 metadata["description"],
1135 metadata["section_name"],
1136 metadata["keyword"],
1137 metadata["validation_mode"],
1138 metadata["testable"],
1139 item.function.__module__,
1140 item.function.__name__,
1147 metadata["description"],
1148 metadata["section_name"],
1149 metadata["keyword"],
1150 metadata["validation_mode"],
1151 metadata["testable"],
1156 # now write out any test methods that weren't mapped to requirements
1158 (item.function.__module__, item.function.__name__) for item in unmapped
1160 for test_module, test_name in unmapped_tests:
1167 "static", # validation mode
1174 generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))