Merge "[VVP] Add new columns to traceability report"
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47 from itertools import chain
48
49 import traceback
50
51 import docutils.core
52 import jinja2
53 import pytest
54 from more_itertools import partition
55 import xlsxwriter
56 from six import string_types
57
58 import version
59
60 __path__ = [os.path.dirname(os.path.abspath(__file__))]
61
62 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
63
64 RESOLUTION_STEPS_FILE = "resolution_steps.json"
65 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
66
67 REPORT_COLUMNS = [
68     ("Input File", "file"),
69     ("Test", "test_file"),
70     ("Requirements", "req_description"),
71     ("Resolution Steps", "resolution_steps"),
72     ("Error Message", "message"),
73     ("Raw Test Output", "raw_output"),
74 ]
75
76 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
77 while preparing to validate the the input files. Some validations may not have been
78 executed. Please refer these issue to the VNF Validation Tool team.
79 """
80
81 COLLECTION_FAILURES = []
82
83 # Captures the results of every test run
84 ALL_RESULTS = []
85
86
87 def get_output_dir(config):
88     """
89     Retrieve the output directory for the reports and create it if necessary
90     :param config: pytest configuration
91     :return: output directory as string
92     """
93     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
94     if not os.path.exists(output_dir):
95         os.makedirs(output_dir, exist_ok=True)
96     return output_dir
97
98
99 def extract_error_msg(rep):
100     """
101     If a custom error message was provided, then extract it otherwise
102     just show the pytest assert message
103     """
104     if rep.outcome != "failed":
105         return ""
106     try:
107         full_msg = str(rep.longrepr.reprcrash.message)
108         match = re.match(
109             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
110         )
111         if match:  # custom message was provided
112             # Extract everything between AssertionError and the start
113             # of the assert statement expansion in the pytest report
114             msg = match.group(1)
115         else:
116             msg = str(rep.longrepr.reprcrash)
117             if "AssertionError:" in msg:
118                 msg = msg.split("AssertionError:")[1]
119     except AttributeError:
120         msg = str(rep)
121
122     return msg
123
124
125 class TestResult:
126     """
127     Wraps the test case and result to extract necessary metadata for
128     reporting purposes.
129     """
130
131     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
132
133     def __init__(self, item, outcome):
134         self.item = item
135         self.result = outcome.get_result()
136         self.files = [os.path.normpath(p) for p in self._get_files()]
137         self.error_message = self._get_error_message()
138
139     @property
140     def requirement_ids(self):
141         """
142         Returns list of requirement IDs mapped to the test case.
143
144         :return: Returns a list of string requirement IDs the test was
145                  annotated with ``validates`` otherwise returns and empty list
146         """
147         is_mapped = hasattr(self.item.function, "requirement_ids")
148         return self.item.function.requirement_ids if is_mapped else []
149
150     @property
151     def markers(self):
152         """
153         :return: Returns a set of pytest marker names for the test or an empty set
154         """
155         return set(m.name for m in self.item.iter_markers())
156
157     @property
158     def is_base_test(self):
159         """
160         :return: Returns True if the test is annotated with a pytest marker called base
161         """
162         return "base" in self.markers
163
164     @property
165     def is_failed(self):
166         """
167         :return: True if the test failed
168         """
169         return self.outcome == "FAIL"
170
171     @property
172     def outcome(self):
173         """
174         :return: Returns 'PASS', 'FAIL', or 'SKIP'
175         """
176         return self.RESULT_MAPPING[self.result.outcome]
177
178     @property
179     def test_case(self):
180         """
181         :return: Name of the test case method
182         """
183         return self.item.function.__name__
184
185     @property
186     def test_module(self):
187         """
188         :return: Name of the file containing the test case
189         """
190         return self.item.function.__module__.split(".")[-1]
191
192     @property
193     def raw_output(self):
194         """
195         :return: Full output from pytest for the given test case
196         """
197         return str(self.result.longrepr)
198
199     def requirement_text(self, curr_reqs):
200         """
201         Creates a text summary for the requirement IDs mapped to the test case.
202         If no requirements are mapped, then it returns the empty string.
203
204         :param curr_reqs: mapping of requirement IDs to requirement metadata
205                           loaded from the VNFRQTS projects needs.json output
206         :return: ID and text of the requirements mapped to the test case
207         """
208         text = (
209             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
210             for r_id in self.requirement_ids
211         )
212         return "".join(text)
213
214     def requirements_metadata(self, curr_reqs):
215         """
216         Returns a list of dicts containing the following metadata for each
217         requirement mapped:
218
219         - id: Requirement ID
220         - text: Full text of the requirement
221         - keyword: MUST, MUST NOT, MAY, etc.
222
223         :param curr_reqs: mapping of requirement IDs to requirement metadata
224                           loaded from the VNFRQTS projects needs.json output
225         :return: List of requirement metadata
226         """
227         data = []
228         for r_id in self.requirement_ids:
229             if r_id not in curr_reqs:
230                 continue
231             data.append(
232                 {
233                     "id": r_id,
234                     "text": curr_reqs[r_id]["description"],
235                     "keyword": curr_reqs[r_id]["keyword"],
236                 }
237             )
238         return data
239
240     def resolution_steps(self, resolutions):
241         """
242         :param resolutions: Loaded from contents for resolution_steps.json
243         :return: Header and text for the resolution step associated with this
244                  test case.  Returns empty string if no resolutions are
245                  provided.
246         """
247         text = (
248             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
249             for entry in resolutions
250             if self._match(entry)
251         )
252         return "".join(text)
253
254     def _match(self, resolution_entry):
255         """
256         Returns True if the test result maps to the given entry in
257         the resolutions file
258         """
259         return (
260             self.test_case == resolution_entry["function"]
261             and self.test_module == resolution_entry["module"]
262         )
263
264     def _get_files(self):
265         """
266         Extracts the list of files passed into the test case.
267         :return: List of absolute paths to files
268         """
269         if "environment_pair" in self.item.fixturenames:
270             return [
271                 "{} environment pair".format(
272                     self.item.funcargs["environment_pair"]["name"]
273                 )
274             ]
275         elif "heat_volume_pair" in self.item.fixturenames:
276             return [
277                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
278             ]
279         elif "heat_templates" in self.item.fixturenames:
280             return self.item.funcargs["heat_templates"]
281         elif "yaml_files" in self.item.fixturenames:
282             return self.item.funcargs["yaml_files"]
283         else:
284             return [self.result.nodeid.split("[")[1][:-1]]
285
286     def _get_error_message(self):
287         """
288         :return: Error message or empty string if the test did not fail or error
289         """
290         if self.is_failed:
291             return extract_error_msg(self.result)
292         else:
293             return ""
294
295
296 # noinspection PyUnusedLocal
297 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
298 def pytest_runtest_makereport(item, call):
299     """
300     Captures the test results for later reporting.  This will also halt testing
301     if a base failure is encountered (can be overridden with continue-on-failure)
302     """
303     outcome = yield
304     if outcome.get_result().when != "call":
305         return  # only capture results of test cases themselves
306     result = TestResult(item, outcome)
307     ALL_RESULTS.append(result)
308     if (
309         not item.config.option.continue_on_failure
310         and result.is_base_test
311         and result.is_failed
312     ):
313         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
314             result.error_message
315         )
316         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
317
318
319 def make_timestamp():
320     """
321     :return: String make_iso_timestamp in format:
322              2019-01-19 10:18:49.865000 Central Standard Time
323     """
324     timezone = time.tzname[time.localtime().tm_isdst]
325     return "{} {}".format(str(datetime.datetime.now()), timezone)
326
327
328 # noinspection PyUnusedLocal
329 def pytest_sessionstart(session):
330     ALL_RESULTS.clear()
331     COLLECTION_FAILURES.clear()
332
333
334 # noinspection PyUnusedLocal
335 def pytest_sessionfinish(session, exitstatus):
336     """
337     If not a self-test run, generate the output reports
338     """
339     if not session.config.option.template_dir:
340         return
341
342     if session.config.option.template_source:
343         template_source = session.config.option.template_source[0]
344     else:
345         template_source = os.path.abspath(session.config.option.template_dir[0])
346
347     categories_selected = session.config.option.test_categories or ""
348     generate_report(
349         get_output_dir(session.config),
350         template_source,
351         categories_selected,
352         session.config.option.report_format,
353     )
354
355
356 # noinspection PyUnusedLocal
357 def pytest_collection_modifyitems(session, config, items):
358     """
359     Selects tests based on the categories requested.  Tests without
360     categories will always be executed.
361     """
362     config.traceability_items = list(items)  # save all items for traceability
363     if not config.option.self_test:
364         for item in items:
365             # checking if test belongs to a category
366             if hasattr(item.function, "categories"):
367                 if config.option.test_categories:
368                     test_categories = getattr(item.function, "categories")
369                     passed_categories = config.option.test_categories
370                     if not all(
371                         category in passed_categories for category in test_categories
372                     ):
373                         item.add_marker(
374                             pytest.mark.skip(
375                                 reason="Test categories do not match all the passed categories"
376                             )
377                         )
378                 else:
379                     item.add_marker(
380                         pytest.mark.skip(
381                             reason="Test belongs to a category but no categories were passed"
382                         )
383                     )
384     items.sort(
385         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
386     )
387
388
389 def make_href(paths):
390     """
391     Create an anchor tag to link to the file paths provided.
392     :param paths: string or list of file paths
393     :return: String of hrefs - one for each path, each seperated by a line
394              break (<br/).
395     """
396     paths = [paths] if isinstance(paths, string_types) else paths
397     links = []
398     for p in paths:
399         abs_path = os.path.abspath(p)
400         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
401         links.append(
402             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
403                 abs_path=abs_path, name=name
404             )
405         )
406     return "<br/>".join(links)
407
408
409 def load_resolutions_file():
410     """
411     :return: dict of data loaded from resolutions_steps.json
412     """
413     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
414     if os.path.exists(resolution_steps):
415         with open(resolution_steps, "r") as f:
416             return json.loads(f.read())
417
418
419 def generate_report(outpath, template_path, categories, output_format="html"):
420     """
421     Generates the various output reports.
422
423     :param outpath: destination directory for all reports
424     :param template_path: directory containing the Heat templates validated
425     :param categories: Optional categories selected
426     :param output_format: One of "html", "excel", or "csv". Default is "html"
427     :raises: ValueError if requested output format is unknown
428     """
429     failures = [r for r in ALL_RESULTS if r.is_failed]
430     generate_failure_file(outpath)
431     output_format = output_format.lower().strip() if output_format else "html"
432     if output_format == "html":
433         generate_html_report(outpath, categories, template_path, failures)
434     elif output_format == "excel":
435         generate_excel_report(outpath, categories, template_path, failures)
436     elif output_format == "json":
437         generate_json(outpath, template_path, categories)
438     elif output_format == "csv":
439         generate_csv_report(outpath, categories, template_path, failures)
440     else:
441         raise ValueError("Unsupported output format: " + output_format)
442
443
444 def write_json(data, path):
445     """
446     Pretty print data as JSON to the output path requested
447
448     :param data: Data structure to be converted to JSON
449     :param path: Where to write output
450     """
451     with open(path, "w") as f:
452         json.dump(data, f, indent=2)
453
454
455 def generate_failure_file(outpath):
456     """
457     Writes a summary of test failures to a file named failures.
458     This is for backwards compatibility only.  The report.json offers a
459     more comprehensive output.
460     """
461     failure_path = os.path.join(outpath, "failures")
462     failures = [r for r in ALL_RESULTS if r.is_failed]
463     data = {}
464     for i, fail in enumerate(failures):
465         data[str(i)] = {
466             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
467             "vnfrqts": fail.requirement_ids,
468             "test": fail.test_case,
469             "test_file": fail.test_module,
470             "raw_output": fail.raw_output,
471             "message": fail.error_message,
472         }
473     write_json(data, failure_path)
474
475
476 def generate_csv_report(output_dir, categories, template_path, failures):
477     rows = [["Validation Failures"]]
478     headers = [
479         ("Categories Selected:", categories),
480         ("Tool Version:", version.VERSION),
481         ("Report Generated At:", make_timestamp()),
482         ("Directory Validated:", template_path),
483         ("Checksum:", hash_directory(template_path)),
484         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
485     ]
486     rows.append([])
487     for header in headers:
488         rows.append(header)
489     rows.append([])
490
491     if COLLECTION_FAILURES:
492         rows.append([COLLECTION_FAILURE_WARNING])
493         rows.append(["Validation File", "Test", "Fixtures", "Error"])
494         for failure in COLLECTION_FAILURES:
495             rows.append(
496                 [
497                     failure["module"],
498                     failure["test"],
499                     ";".join(failure["fixtures"]),
500                     failure["error"],
501                 ]
502             )
503         rows.append([])
504
505     # table header
506     rows.append([col for col, _ in REPORT_COLUMNS])
507
508     reqs = load_current_requirements()
509     resolutions = load_resolutions_file()
510
511     # table content
512     for failure in failures:
513         rows.append(
514             [
515                 "\n".join(failure.files),
516                 failure.test_module,
517                 failure.requirement_text(reqs),
518                 failure.resolution_steps(resolutions),
519                 failure.error_message,
520                 failure.raw_output,
521             ]
522         )
523
524     output_path = os.path.join(output_dir, "report.csv")
525     with open(output_path, "w", newline="") as f:
526         writer = csv.writer(f)
527         for row in rows:
528             writer.writerow(row)
529
530
531 def generate_excel_report(output_dir, categories, template_path, failures):
532     output_path = os.path.join(output_dir, "report.xlsx")
533     workbook = xlsxwriter.Workbook(output_path)
534     bold = workbook.add_format({"bold": True})
535     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
536     normal = workbook.add_format({"text_wrap": True})
537     heading = workbook.add_format({"bold": True, "font_size": 18})
538     worksheet = workbook.add_worksheet("failures")
539     worksheet.write(0, 0, "Validation Failures", heading)
540
541     headers = [
542         ("Categories Selected:", ",".join(categories)),
543         ("Tool Version:", version.VERSION),
544         ("Report Generated At:", make_timestamp()),
545         ("Directory Validated:", template_path),
546         ("Checksum:", hash_directory(template_path)),
547         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
548     ]
549     for row, (header, value) in enumerate(headers, start=2):
550         worksheet.write(row, 0, header, bold)
551         worksheet.write(row, 1, value)
552
553     worksheet.set_column(0, len(headers) - 1, 40)
554     worksheet.set_column(len(headers), len(headers), 80)
555
556     if COLLECTION_FAILURES:
557         collection_failures_start = 2 + len(headers) + 2
558         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
559         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
560         for col_num, col_name in enumerate(collection_failure_headers):
561             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
562         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
563             worksheet.write(row, 0, data["module"])
564             worksheet.write(row, 1, data["test"])
565             worksheet.write(row, 2, ",".join(data["fixtures"]))
566             worksheet.write(row, 3, data["error"], code)
567
568     # table header
569     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
570     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
571     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
572         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
573
574     reqs = load_current_requirements()
575     resolutions = load_resolutions_file()
576
577     # table content
578     for row, failure in enumerate(failures, start=start_error_table_row + 2):
579         worksheet.write(row, 0, "\n".join(failure.files), normal)
580         worksheet.write(row, 1, failure.test_module, normal)
581         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
582         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
583         worksheet.write(row, 4, failure.error_message, normal)
584         worksheet.write(row, 5, failure.raw_output, code)
585
586     workbook.close()
587
588
589 def make_iso_timestamp():
590     """
591     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
592     """
593     now = datetime.datetime.utcnow()
594     now.replace(tzinfo=datetime.timezone.utc)
595     return now.isoformat()
596
597
598 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
599     """
600     Examines all tests associated with a given requirement and determines
601     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
602
603     * ERROR - At least one ERROR occurred
604     * PASS -  At least one PASS and no FAIL or ERRORs.
605     * FAIL -  At least one FAIL occurred (no ERRORs)
606     * SKIP - All tests were SKIP
607
608
609     :param r_id: Requirement ID to examing
610     :param collection_failures: Errors that occurred during test setup.
611     :param test_results: List of TestResult
612     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
613     """
614     errors = any(r_id in f["requirements"] for f in collection_failures)
615     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
616     return aggregate_results(errors, outcomes, r_id)
617
618
619 def aggregate_results(has_errors, outcomes, r_id=None):
620     """
621     Determines the aggregate result for the conditions provided.  Assumes the
622     results have been filtered and collected for analysis.
623
624     :param has_errors: True if collection failures occurred for the tests being
625                        analyzed.
626     :param outcomes: set of outcomes from the TestResults
627     :param r_id: Optional requirement ID if known
628     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
629              (see aggregate_requirement_adherence for more detail)
630     """
631     if has_errors:
632         return "ERROR"
633
634     if not outcomes:
635         return "PASS"
636     elif "FAIL" in outcomes:
637         return "FAIL"
638     elif "PASS" in outcomes:
639         return "PASS"
640     elif {"SKIP"} == outcomes:
641         return "SKIP"
642     else:
643         pytest.warns(
644             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
645                 outcomes, r_id
646             )
647         )
648         return "ERROR"
649
650
651 def aggregate_run_results(collection_failures, test_results):
652     """
653     Determines overall status of run based on all failures and results.
654
655     * 'ERROR' - At least one collection failure occurred during the run.
656     * 'FAIL' - Template failed at least one test
657     * 'PASS' - All tests executed properly and no failures were detected
658
659     :param collection_failures: failures occuring during test setup
660     :param test_results: list of all test executuion results
661     :return: one of 'ERROR', 'FAIL', or 'PASS'
662     """
663     if collection_failures:
664         return "ERROR"
665     elif any(r.is_failed for r in test_results):
666         return "FAIL"
667     else:
668         return "PASS"
669
670
671 def error(failure_or_result):
672     """
673     Extracts the error message from a collection failure or test result
674     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
675     :return: Error message as string
676     """
677     if isinstance(failure_or_result, TestResult):
678         return failure_or_result.error_message
679     else:
680         return failure_or_result["error"]
681
682
683 def req_ids(failure_or_result):
684     """
685     Extracts the requirement IDs from a collection failure or test result
686     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
687     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
688     """
689     if isinstance(failure_or_result, TestResult):
690         return set(failure_or_result.requirement_ids)
691     else:
692         return set(failure_or_result["requirements"])
693
694
695 def collect_errors(r_id, collection_failures, test_result):
696     """
697     Creates a list of error messages from the collection failures and
698     test results.  If r_id is provided, then it collects the error messages
699     where the failure or test is associated with that requirement ID.  If
700     r_id is None, then it collects all errors that occur on failures and
701     results that are not mapped to requirements
702     """
703
704     def selector(item):
705         if r_id:
706             return r_id in req_ids(item)
707         else:
708             return not req_ids(item)
709
710     errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
711     return [e for e in errors if e]
712
713
714 def generate_json(outpath, template_path, categories):
715     """
716     Creates a JSON summary of the entire test run.
717     """
718     reqs = load_current_requirements()
719     data = {
720         "version": "dublin",
721         "template_directory": template_path,
722         "timestamp": make_iso_timestamp(),
723         "checksum": hash_directory(template_path),
724         "categories": categories,
725         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
726         "tests": [],
727         "requirements": [],
728     }
729
730     results = data["tests"]
731     for result in COLLECTION_FAILURES:
732         results.append(
733             {
734                 "files": [],
735                 "test_module": result["module"],
736                 "test_case": result["test"],
737                 "result": "ERROR",
738                 "error": result["error"],
739                 "requirements": result["requirements"],
740             }
741         )
742     for result in ALL_RESULTS:
743         results.append(
744             {
745                 "files": result.files,
746                 "test_module": result.test_module,
747                 "test_case": result.test_case,
748                 "result": result.outcome,
749                 "error": result.error_message if result.is_failed else "",
750                 "requirements": result.requirements_metadata(reqs),
751             }
752         )
753
754     requirements = data["requirements"]
755     for r_id, r_data in reqs.items():
756         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
757         if result:
758             requirements.append(
759                 {
760                     "id": r_id,
761                     "text": r_data["description"],
762                     "keyword": r_data["keyword"],
763                     "result": result,
764                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
765                 }
766             )
767     # If there are tests that aren't mapped to a requirement, then we'll
768     # map them to a special entry so the results are coherent.
769     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
770     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
771     if unmapped_outcomes or has_errors:
772         requirements.append(
773             {
774                 "id": "Unmapped",
775                 "text": "Tests not mapped to requirements (see tests)",
776                 "result": aggregate_results(has_errors, unmapped_outcomes),
777                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
778             }
779         )
780
781     report_path = os.path.join(outpath, "report.json")
782     write_json(data, report_path)
783
784
785 def generate_html_report(outpath, categories, template_path, failures):
786     reqs = load_current_requirements()
787     resolutions = load_resolutions_file()
788     fail_data = []
789     for failure in failures:
790         fail_data.append(
791             {
792                 "file_links": make_href(failure.files),
793                 "test_id": failure.test_module,
794                 "error_message": failure.error_message,
795                 "raw_output": failure.raw_output,
796                 "requirements": docutils.core.publish_parts(
797                     writer_name="html", source=failure.requirement_text(reqs)
798                 )["body"],
799                 "resolution_steps": failure.resolution_steps(resolutions),
800             }
801         )
802     pkg_dir = os.path.split(__file__)[0]
803     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
804     with open(j2_template_path, "r") as f:
805         report_template = jinja2.Template(f.read())
806         contents = report_template.render(
807             version=version.VERSION,
808             num_failures=len(failures) + len(COLLECTION_FAILURES),
809             categories=categories,
810             template_dir=make_href(template_path),
811             checksum=hash_directory(template_path),
812             timestamp=make_timestamp(),
813             failures=fail_data,
814             collection_failures=COLLECTION_FAILURES,
815         )
816     with open(os.path.join(outpath, "report.html"), "w") as f:
817         f.write(contents)
818
819
820 def pytest_addoption(parser):
821     """
822     Add needed CLI arguments
823     """
824     parser.addoption(
825         "--template-directory",
826         dest="template_dir",
827         action="append",
828         help="Directory which holds the templates for validation",
829     )
830
831     parser.addoption(
832         "--template-source",
833         dest="template_source",
834         action="append",
835         help="Source Directory which holds the templates for validation",
836     )
837
838     parser.addoption(
839         "--self-test",
840         dest="self_test",
841         action="store_true",
842         help="Test the unit tests against their fixtured data",
843     )
844
845     parser.addoption(
846         "--report-format",
847         dest="report_format",
848         action="store",
849         help="Format of output report (html, csv, excel, json)",
850     )
851
852     parser.addoption(
853         "--continue-on-failure",
854         dest="continue_on_failure",
855         action="store_true",
856         help="Continue validation even when structural errors exist in input files",
857     )
858
859     parser.addoption(
860         "--output-directory",
861         dest="output_dir",
862         action="store",
863         default=None,
864         help="Alternate ",
865     )
866
867     parser.addoption(
868         "--category",
869         dest="test_categories",
870         action="append",
871         help="optional category of test to execute",
872     )
873
874
875 def pytest_configure(config):
876     """
877     Ensure that we are receive either `--self-test` or
878     `--template-dir=<directory` as CLI arguments
879     """
880     if config.getoption("template_dir") and config.getoption("self_test"):
881         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
882     if not (
883         config.getoption("template_dir")
884         or config.getoption("self_test")
885         or config.getoption("help")
886     ):
887         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
888
889
890 def pytest_generate_tests(metafunc):
891     """
892     If a unit test requires an argument named 'filename'
893     we generate a test for the filenames selected. Either
894     the files contained in `template_dir` or if `template_dir`
895     is not specified on the CLI, the fixtures associated with this
896     test name.
897     """
898
899     # noinspection PyBroadException
900     try:
901         if "filename" in metafunc.fixturenames:
902             from .parametrizers import parametrize_filename
903
904             parametrize_filename(metafunc)
905
906         if "filenames" in metafunc.fixturenames:
907             from .parametrizers import parametrize_filenames
908
909             parametrize_filenames(metafunc)
910
911         if "template_dir" in metafunc.fixturenames:
912             from .parametrizers import parametrize_template_dir
913
914             parametrize_template_dir(metafunc)
915
916         if "environment_pair" in metafunc.fixturenames:
917             from .parametrizers import parametrize_environment_pair
918
919             parametrize_environment_pair(metafunc)
920
921         if "heat_volume_pair" in metafunc.fixturenames:
922             from .parametrizers import parametrize_heat_volume_pair
923
924             parametrize_heat_volume_pair(metafunc)
925
926         if "yaml_files" in metafunc.fixturenames:
927             from .parametrizers import parametrize_yaml_files
928
929             parametrize_yaml_files(metafunc)
930
931         if "env_files" in metafunc.fixturenames:
932             from .parametrizers import parametrize_environment_files
933
934             parametrize_environment_files(metafunc)
935
936         if "yaml_file" in metafunc.fixturenames:
937             from .parametrizers import parametrize_yaml_file
938
939             parametrize_yaml_file(metafunc)
940
941         if "env_file" in metafunc.fixturenames:
942             from .parametrizers import parametrize_environment_file
943
944             parametrize_environment_file(metafunc)
945
946         if "parsed_yaml_file" in metafunc.fixturenames:
947             from .parametrizers import parametrize_parsed_yaml_file
948
949             parametrize_parsed_yaml_file(metafunc)
950
951         if "parsed_environment_file" in metafunc.fixturenames:
952             from .parametrizers import parametrize_parsed_environment_file
953
954             parametrize_parsed_environment_file(metafunc)
955
956         if "heat_template" in metafunc.fixturenames:
957             from .parametrizers import parametrize_heat_template
958
959             parametrize_heat_template(metafunc)
960
961         if "heat_templates" in metafunc.fixturenames:
962             from .parametrizers import parametrize_heat_templates
963
964             parametrize_heat_templates(metafunc)
965
966         if "volume_template" in metafunc.fixturenames:
967             from .parametrizers import parametrize_volume_template
968
969             parametrize_volume_template(metafunc)
970
971         if "volume_templates" in metafunc.fixturenames:
972             from .parametrizers import parametrize_volume_templates
973
974             parametrize_volume_templates(metafunc)
975
976         if "template" in metafunc.fixturenames:
977             from .parametrizers import parametrize_template
978
979             parametrize_template(metafunc)
980
981         if "templates" in metafunc.fixturenames:
982             from .parametrizers import parametrize_templates
983
984             parametrize_templates(metafunc)
985     except Exception as e:
986         # If an error occurs in the collection phase, then it won't be logged as a
987         # normal test failure.  This means that failures could occur, but not
988         # be seen on the report resulting in a false positive success message.  These
989         # errors will be stored and reported separately on the report
990         COLLECTION_FAILURES.append(
991             {
992                 "module": metafunc.module.__name__,
993                 "test": metafunc.function.__name__,
994                 "fixtures": metafunc.fixturenames,
995                 "error": traceback.format_exc(),
996                 "requirements": getattr(metafunc.function, "requirement_ids", []),
997             }
998         )
999         raise e
1000
1001
1002 def hash_directory(path):
1003     """
1004     Create md5 hash using the contents of all files under ``path``
1005     :param path: string directory containing files
1006     :return: string MD5 hash code (hex)
1007     """
1008     md5 = hashlib.md5()
1009     for dir_path, sub_dirs, filenames in os.walk(path):
1010         for filename in filenames:
1011             file_path = os.path.join(dir_path, filename)
1012             with open(file_path, "rb") as f:
1013                 md5.update(f.read())
1014     return md5.hexdigest()
1015
1016
1017 def load_current_requirements():
1018     """Loads dict of current requirements or empty dict if file doesn't exist"""
1019     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1020         data = json.load(f)
1021         version = data["current_version"]
1022         return data["versions"][version]["needs"]
1023
1024
1025 def select_heat_requirements(reqs):
1026     """Filters dict requirements to only those requirements pertaining to Heat"""
1027     return {k: v for k, v in reqs.items() if "Heat" in v["docname"]}
1028
1029
1030 # noinspection PyUnusedLocal
1031 def pytest_report_collectionfinish(config, startdir, items):
1032     """Generates a simple traceability report to output/traceability.csv"""
1033     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1034     output_dir = os.path.split(traceability_path)[0]
1035     if not os.path.exists(output_dir):
1036         os.makedirs(output_dir)
1037     reqs = load_current_requirements()
1038     requirements = select_heat_requirements(reqs)
1039     unmapped, mapped = partition(
1040         lambda i: hasattr(i.function, "requirement_ids"), items
1041     )
1042
1043     req_to_test = defaultdict(set)
1044     mapping_errors = set()
1045     for item in mapped:
1046         for req_id in item.function.requirement_ids:
1047             if req_id not in req_to_test:
1048                 req_to_test[req_id].add(item)
1049             if req_id not in requirements:
1050                 mapping_errors.add(
1051                     (req_id, item.function.__module__, item.function.__name__)
1052                 )
1053
1054     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1055     with open(mapping_error_path, "w", newline="") as f:
1056         writer = csv.writer(f)
1057         for err in mapping_errors:
1058             writer.writerow(err)
1059
1060     with open(traceability_path, "w", newline="") as f:
1061         out = csv.writer(f)
1062         out.writerow(
1063             ("Requirement ID", "Requirement", "Section",
1064              "Keyword", "Validation Mode", "Is Testable",
1065              "Test Module", "Test Name"),
1066         )
1067         for req_id, metadata in requirements.items():
1068             keyword = metadata["keyword"].upper()
1069             mode = metadata["validation_mode"].lower()
1070             testable = keyword in {"MUST", "MUST NOT"} and mode != "none"
1071             if req_to_test[req_id]:
1072                 for item in req_to_test[req_id]:
1073                     out.writerow(
1074                         (
1075                             req_id,
1076                             metadata["description"],
1077                             metadata["section_name"],
1078                             keyword,
1079                             mode,
1080                             "TRUE" if testable else "FALSE",
1081                             item.function.__module__,
1082                             item.function.__name__,
1083                         ),
1084                     )
1085             else:
1086                 out.writerow(
1087                     (req_id,
1088                      metadata["description"],
1089                      metadata["section_name"],
1090                      keyword,
1091                      mode,
1092                      "TRUE" if testable else "FALSE",
1093                      "",   # test module
1094                      ""),  # test function
1095                 )
1096         # now write out any test methods that weren't mapped to requirements
1097         unmapped_tests = {(item.function.__module__, item.function.__name__)
1098                           for item in unmapped}
1099         for test_module, test_name in unmapped_tests:
1100             out.writerow(
1101                 ("",        # req ID
1102                  "",        # description
1103                  "",        # section name
1104                  "",        # keyword
1105                  "static",  # validation mode
1106                  "TRUE",    # testable
1107                  test_module,
1108                  test_name)
1109             )