56213547a0d9e552b2da0adde2cf2bc2691403ef
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import sys
46 import time
47 from collections import defaultdict
48 from itertools import chain
49
50 import requests
51 import traceback
52 import warnings
53
54 import docutils.core
55 import jinja2
56 import pytest
57 from more_itertools import partition
58 import xlsxwriter
59 from six import string_types
60
61 import version
62
63 __path__ = [os.path.dirname(os.path.abspath(__file__))]
64
65 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
66
67 RESOLUTION_STEPS_FILE = "resolution_steps.json"
68 HEAT_REQUIREMENTS_FILE = "heat_requirements.json"
69
70 # noinspection PyPep8
71 NEEDS_JSON_URL = "https://onap.readthedocs.io/en/latest/_downloads/789ac64d223325488fb3f120f959d985/needs.json"
72
73 REPORT_COLUMNS = [
74     ("Input File", "file"),
75     ("Test", "test_file"),
76     ("Requirements", "req_description"),
77     ("Resolution Steps", "resolution_steps"),
78     ("Error Message", "message"),
79     ("Raw Test Output", "raw_output"),
80 ]
81
82 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
83 while preparing to validate the the input files. Some validations may not have been
84 executed. Please refer these issue to the VNF Validation Tool team.
85 """
86
87 COLLECTION_FAILURES = []
88
89 # Captures the results of every test run
90 ALL_RESULTS = []
91
92
93 def get_output_dir(config):
94     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
95     if not os.path.exists(output_dir):
96         os.makedirs(output_dir, exist_ok=True)
97     return output_dir
98
99
100 def extract_error_msg(rep):
101     """
102     If a custom error message was provided, then extract it otherwise
103     just show the pytest assert message
104     """
105     if rep.outcome != "failed":
106         return ""
107     try:
108         full_msg = str(rep.longrepr.reprcrash.message)
109         match = re.match(
110             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
111         )
112         if match:  # custom message was provided
113             # Extract everything between AssertionError and the start
114             # of the assert statement expansion in the pytest report
115             msg = match.group(1)
116         else:
117             msg = str(rep.longrepr.reprcrash)
118             if "AssertionError:" in msg:
119                 msg = msg.split("AssertionError:")[1]
120     except AttributeError:
121         msg = str(rep)
122
123     return msg
124
125
126 class TestResult:
127     """
128     Wraps the test case and result to extract necessary metadata for
129     reporting purposes.
130     """
131
132     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
133
134     def __init__(self, item, outcome):
135         self.item = item
136         self.result = outcome.get_result()
137         self.files = [os.path.normpath(p) for p in self._get_files()]
138         self.error_message = self._get_error_message()
139
140     @property
141     def requirement_ids(self):
142         """
143         Returns list of requirement IDs mapped to the test case.
144
145         :return: Returns a list of string requirement IDs the test was
146                  annotated with ``validates`` otherwise returns and empty list
147         """
148         is_mapped = hasattr(self.item.function, "requirement_ids")
149         return self.item.function.requirement_ids if is_mapped else []
150
151     @property
152     def markers(self):
153         """
154         :return: Returns a set of pytest marker names for the test or an empty set
155         """
156         return set(m.name for m in self.item.iter_markers())
157
158     @property
159     def is_base_test(self):
160         """
161         :return: Returns True if the test is annotated with a pytest marker called base
162         """
163         return "base" in self.markers
164
165     @property
166     def is_failed(self):
167         """
168         :return: True if the test failed
169         """
170         return self.outcome == "FAIL"
171
172     @property
173     def outcome(self):
174         """
175         :return: Returns 'PASS', 'FAIL', or 'SKIP'
176         """
177         return self.RESULT_MAPPING[self.result.outcome]
178
179     @property
180     def test_case(self):
181         """
182         :return: Name of the test case method
183         """
184         return self.item.function.__name__
185
186     @property
187     def test_module(self):
188         """
189         :return: Name of the file containing the test case
190         """
191         return self.item.function.__module__.split(".")[-1]
192
193     @property
194     def raw_output(self):
195         """
196         :return: Full output from pytest for the given test case
197         """
198         return str(self.result.longrepr)
199
200     def requirement_text(self, curr_reqs):
201         """
202         Creates a text summary for the requirement IDs mapped to the test case.
203         If no requirements are mapped, then it returns the empty string.
204
205         :param curr_reqs: mapping of requirement IDs to requirement metadata
206                           loaded from the VNFRQTS projects needs.json output
207         :return: ID and text of the requirements mapped to the test case
208         """
209         text = (
210             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
211             for r_id in self.requirement_ids
212         )
213         return "".join(text)
214
215     def requirements_metadata(self, curr_reqs):
216         """
217         Returns a list of dicts containing the following metadata for each
218         requirement mapped:
219
220         - id: Requirement ID
221         - text: Full text of the requirement
222         - keyword: MUST, MUST NOT, MAY, etc.
223
224         :param curr_reqs: mapping of requirement IDs to requirement metadata
225                           loaded from the VNFRQTS projects needs.json output
226         :return: List of requirement metadata
227         """
228         data = []
229         for r_id in self.requirement_ids:
230             if r_id not in curr_reqs:
231                 continue
232             data.append(
233                 {
234                     "id": r_id,
235                     "text": curr_reqs[r_id]["description"],
236                     "keyword": curr_reqs[r_id]["keyword"],
237                 }
238             )
239         return data
240
241     def resolution_steps(self, resolutions):
242         """
243         :param resolutions: Loaded from contents for resolution_steps.json
244         :return: Header and text for the resolution step associated with this
245                  test case.  Returns empty string if no resolutions are
246                  provided.
247         """
248         text = (
249             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
250             for entry in resolutions
251             if self._match(entry)
252         )
253         return "".join(text)
254
255     def _match(self, resolution_entry):
256         """
257         Returns True if the test result maps to the given entry in
258         the resolutions file
259         """
260         return (
261             self.test_case == resolution_entry["function"]
262             and self.test_module == resolution_entry["module"]
263         )
264
265     def _get_files(self):
266         """
267         Extracts the list of files passed into the test case.
268         :return: List of absolute paths to files
269         """
270         if "environment_pair" in self.item.fixturenames:
271             return [
272                 "{} environment pair".format(
273                     self.item.funcargs["environment_pair"]["name"]
274                 )
275             ]
276         elif "heat_volume_pair" in self.item.fixturenames:
277             return [
278                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
279             ]
280         elif "heat_templates" in self.item.fixturenames:
281             return self.item.funcargs["heat_templates"]
282         elif "yaml_files" in self.item.fixturenames:
283             return self.item.funcargs["yaml_files"]
284         else:
285             return [self.result.nodeid.split("[")[1][:-1]]
286
287     def _get_error_message(self):
288         """
289         :return: Error message or empty string if the test did not fail or error
290         """
291         if self.is_failed:
292             return extract_error_msg(self.result)
293         else:
294             return ""
295
296
297 # noinspection PyUnusedLocal
298 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
299 def pytest_runtest_makereport(item, call):
300     """
301     Captures the test results for later reporting.  This will also halt testing
302     if a base failure is encountered (can be overridden with continue-on-failure)
303     """
304     outcome = yield
305     if outcome.get_result().when != "call":
306         return  # only capture results of test cases themselves
307     result = TestResult(item, outcome)
308     ALL_RESULTS.append(result)
309     if (
310         not item.config.option.continue_on_failure
311         and result.is_base_test
312         and result.is_failed
313     ):
314         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
315             result.error_message
316         )
317         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
318
319
320 def make_timestamp():
321     """
322     :return: String make_iso_timestamp in format:
323              2019-01-19 10:18:49.865000 Central Standard Time
324     """
325     timezone = time.tzname[time.localtime().tm_isdst]
326     return "{} {}".format(str(datetime.datetime.now()), timezone)
327
328
329 # noinspection PyUnusedLocal
330 def pytest_sessionstart(session):
331     ALL_RESULTS.clear()
332     COLLECTION_FAILURES.clear()
333
334
335 # noinspection PyUnusedLocal
336 def pytest_sessionfinish(session, exitstatus):
337     """
338     If not a self-test run, generate the output reports
339     """
340     if not session.config.option.template_dir:
341         return
342
343     if session.config.option.template_source:
344         template_source = session.config.option.template_source[0]
345     else:
346         template_source = os.path.abspath(session.config.option.template_dir[0])
347
348     categories_selected = session.config.option.test_categories or ""
349     generate_report(
350         get_output_dir(session.config),
351         template_source,
352         categories_selected,
353         session.config.option.report_format,
354     )
355
356
357 # noinspection PyUnusedLocal
358 def pytest_collection_modifyitems(session, config, items):
359     """
360     Selects tests based on the categories requested.  Tests without
361     categories will always be executed.
362     """
363     config.traceability_items = list(items)  # save all items for traceability
364     if not config.option.self_test:
365         for item in items:
366             # checking if test belongs to a category
367             if hasattr(item.function, "categories"):
368                 if config.option.test_categories:
369                     test_categories = getattr(item.function, "categories")
370                     passed_categories = config.option.test_categories
371                     if not all(
372                         category in passed_categories for category in test_categories
373                     ):
374                         item.add_marker(
375                             pytest.mark.skip(
376                                 reason="Test categories do not match all the passed categories"
377                             )
378                         )
379                 else:
380                     item.add_marker(
381                         pytest.mark.skip(
382                             reason="Test belongs to a category but no categories were passed"
383                         )
384                     )
385     items.sort(
386         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
387     )
388
389
390 def make_href(paths):
391     """
392     Create an anchor tag to link to the file paths provided.
393     :param paths: string or list of file paths
394     :return: String of hrefs - one for each path, each seperated by a line
395              break (<br/).
396     """
397     paths = [paths] if isinstance(paths, string_types) else paths
398     links = []
399     for p in paths:
400         abs_path = os.path.abspath(p)
401         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
402         links.append(
403             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
404                 abs_path=abs_path, name=name
405             )
406         )
407     return "<br/>".join(links)
408
409
410 def load_resolutions_file():
411     """
412     :return: dict of data loaded from resolutions_steps.json
413     """
414     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
415     if os.path.exists(resolution_steps):
416         with open(resolution_steps, "r") as f:
417             return json.loads(f.read())
418
419
420 def generate_report(outpath, template_path, categories, output_format="html"):
421     """
422     Generates the various output reports.
423
424     :param outpath: destination directory for all reports
425     :param template_path: directory containing the Heat templates validated
426     :param categories: Optional categories selected
427     :param output_format: One of "html", "excel", or "csv". Default is "html"
428     :raises: ValueError if requested output format is unknown
429     """
430     failures = [r for r in ALL_RESULTS if r.is_failed]
431     generate_failure_file(outpath)
432     output_format = output_format.lower().strip() if output_format else "html"
433     if output_format == "html":
434         generate_html_report(outpath, categories, template_path, failures)
435     elif output_format == "excel":
436         generate_excel_report(outpath, categories, template_path, failures)
437     elif output_format == "json":
438         generate_json(outpath, template_path, categories)
439     elif output_format == "csv":
440         generate_csv_report(outpath, categories, template_path, failures)
441     else:
442         raise ValueError("Unsupported output format: " + output_format)
443
444
445 def write_json(data, path):
446     """
447     Pretty print data as JSON to the output path requested
448
449     :param data: Data structure to be converted to JSON
450     :param path: Where to write output
451     """
452     with open(path, "w") as f:
453         json.dump(data, f, indent=2)
454
455
456 def generate_failure_file(outpath):
457     """
458     Writes a summary of test failures to a file named failures.
459     This is for backwards compatibility only.  The report.json offers a
460     more comprehensive output.
461     """
462     failure_path = os.path.join(outpath, "failures")
463     failures = [r for r in ALL_RESULTS if r.is_failed]
464     data = {}
465     for i, fail in enumerate(failures):
466         data[str(i)] = {
467             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
468             "vnfrqts": fail.requirement_ids,
469             "test": fail.test_case,
470             "test_file": fail.test_module,
471             "raw_output": fail.raw_output,
472             "message": fail.error_message,
473         }
474     write_json(data, failure_path)
475
476
477 def generate_csv_report(output_dir, categories, template_path, failures):
478     rows = [["Validation Failures"]]
479     headers = [
480         ("Categories Selected:", categories),
481         ("Tool Version:", version.VERSION),
482         ("Report Generated At:", make_timestamp()),
483         ("Directory Validated:", template_path),
484         ("Checksum:", hash_directory(template_path)),
485         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
486     ]
487     rows.append([])
488     for header in headers:
489         rows.append(header)
490     rows.append([])
491
492     if COLLECTION_FAILURES:
493         rows.append([COLLECTION_FAILURE_WARNING])
494         rows.append(["Validation File", "Test", "Fixtures", "Error"])
495         for failure in COLLECTION_FAILURES:
496             rows.append(
497                 [
498                     failure["module"],
499                     failure["test"],
500                     ";".join(failure["fixtures"]),
501                     failure["error"],
502                 ]
503             )
504         rows.append([])
505
506     # table header
507     rows.append([col for col, _ in REPORT_COLUMNS])
508
509     reqs = load_current_requirements()
510     resolutions = load_resolutions_file()
511
512     # table content
513     for failure in failures:
514         rows.append(
515             [
516                 "\n".join(failure.files),
517                 failure.test_module,
518                 failure.requirement_text(reqs),
519                 failure.resolution_steps(resolutions),
520                 failure.error_message,
521                 failure.raw_output,
522             ]
523         )
524
525     output_path = os.path.join(output_dir, "report.csv")
526     with open(output_path, "w", newline="") as f:
527         writer = csv.writer(f)
528         for row in rows:
529             writer.writerow(row)
530
531
532 def generate_excel_report(output_dir, categories, template_path, failures):
533     output_path = os.path.join(output_dir, "report.xlsx")
534     workbook = xlsxwriter.Workbook(output_path)
535     bold = workbook.add_format({"bold": True})
536     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
537     normal = workbook.add_format({"text_wrap": True})
538     heading = workbook.add_format({"bold": True, "font_size": 18})
539     worksheet = workbook.add_worksheet("failures")
540     worksheet.write(0, 0, "Validation Failures", heading)
541
542     headers = [
543         ("Categories Selected:", ",".join(categories)),
544         ("Tool Version:", version.VERSION),
545         ("Report Generated At:", make_timestamp()),
546         ("Directory Validated:", template_path),
547         ("Checksum:", hash_directory(template_path)),
548         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
549     ]
550     for row, (header, value) in enumerate(headers, start=2):
551         worksheet.write(row, 0, header, bold)
552         worksheet.write(row, 1, value)
553
554     worksheet.set_column(0, len(headers) - 1, 40)
555     worksheet.set_column(len(headers), len(headers), 80)
556
557     if COLLECTION_FAILURES:
558         collection_failures_start = 2 + len(headers) + 2
559         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
560         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
561         for col_num, col_name in enumerate(collection_failure_headers):
562             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
563         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
564             worksheet.write(row, 0, data["module"])
565             worksheet.write(row, 1, data["test"])
566             worksheet.write(row, 2, ",".join(data["fixtures"]))
567             worksheet.write(row, 3, data["error"], code)
568
569     # table header
570     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
571     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
572     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
573         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
574
575     reqs = load_current_requirements()
576     resolutions = load_resolutions_file()
577
578     # table content
579     for row, failure in enumerate(failures, start=start_error_table_row + 2):
580         worksheet.write(row, 0, "\n".join(failure.files), normal)
581         worksheet.write(row, 1, failure.test_module, normal)
582         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
583         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
584         worksheet.write(row, 4, failure.error_message, normal)
585         worksheet.write(row, 5, failure.raw_output, code)
586
587     workbook.close()
588
589
590 def make_iso_timestamp():
591     """
592     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
593     """
594     now = datetime.datetime.utcnow()
595     now.replace(tzinfo=datetime.timezone.utc)
596     return now.isoformat()
597
598
599 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
600     """
601     Examines all tests associated with a given requirement and determines
602     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
603
604     * ERROR - At least one ERROR occurred
605     * PASS -  At least one PASS and no FAIL or ERRORs.
606     * FAIL -  At least one FAIL occurred (no ERRORs)
607     * SKIP - All tests were SKIP
608
609
610     :param r_id: Requirement ID to examing
611     :param collection_failures: Errors that occurred during test setup.
612     :param test_results: List of TestResult
613     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
614     """
615     errors = any(r_id in f["requirements"] for f in collection_failures)
616     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
617     return aggregate_results(errors, outcomes, r_id)
618
619
620 def aggregate_results(has_errors, outcomes, r_id=None):
621     """
622     Determines the aggregate result for the conditions provided.  Assumes the
623     results have been filtered and collected for analysis.
624
625     :param has_errors: True if collection failures occurred for the tests being
626                        analyzed.
627     :param outcomes: set of outcomes from the TestResults
628     :param r_id: Optional requirement ID if known
629     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
630              (see aggregate_requirement_adherence for more detail)
631     """
632     if has_errors:
633         return "ERROR"
634
635     if not outcomes:
636         return "PASS"
637     elif "FAIL" in outcomes:
638         return "FAIL"
639     elif "PASS" in outcomes:
640         return "PASS"
641     elif {"SKIP"} == outcomes:
642         return "SKIP"
643     else:
644         pytest.warns(
645             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
646                 outcomes, r_id
647             )
648         )
649         return "ERROR"
650
651
652 def aggregate_run_results(collection_failures, test_results):
653     """
654     Determines overall status of run based on all failures and results.
655
656     * 'ERROR' - At least one collection failure occurred during the run.
657     * 'FAIL' - Template failed at least one test
658     * 'PASS' - All tests executed properly and no failures were detected
659
660     :param collection_failures: failures occuring during test setup
661     :param test_results: list of all test executuion results
662     :return: one of 'ERROR', 'FAIL', or 'PASS'
663     """
664     if collection_failures:
665         return "ERROR"
666     elif any(r.is_failed for r in test_results):
667         return "FAIL"
668     else:
669         return "PASS"
670
671
672 def error(failure_or_result):
673     """
674     Extracts the error message from a collection failure or test result
675     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
676     :return: Error message as string
677     """
678     if isinstance(failure_or_result, TestResult):
679         return failure_or_result.error_message
680     else:
681         return failure_or_result["error"]
682
683
684 def req_ids(failure_or_result):
685     """
686     Extracts the requirement IDs from a collection failure or test result
687     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
688     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
689     """
690     if isinstance(failure_or_result, TestResult):
691         return set(failure_or_result.requirement_ids)
692     else:
693         return set(failure_or_result["requirements"])
694
695
696 def collect_errors(r_id, collection_failures, test_result):
697     """
698     Creates a list of error messages from the collection failures and
699     test results.  If r_id is provided, then it collects the error messages
700     where the failure or test is associated with that requirement ID.  If
701     r_id is None, then it collects all errors that occur on failures and
702     results that are not mapped to requirements
703     """
704
705     def selector(item):
706         if r_id:
707             return r_id in req_ids(item)
708         else:
709             return not req_ids(item)
710
711     errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
712     return [e for e in errors if e]
713
714
715 def generate_json(outpath, template_path, categories):
716     """
717     Creates a JSON summary of the entire test run.
718     """
719     reqs = load_current_requirements()
720     data = {
721         "version": "dublin",
722         "template_directory": template_path,
723         "timestamp": make_iso_timestamp(),
724         "checksum": hash_directory(template_path),
725         "categories": categories,
726         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
727         "tests": [],
728         "requirements": [],
729     }
730
731     results = data["tests"]
732     for result in COLLECTION_FAILURES:
733         results.append(
734             {
735                 "files": [],
736                 "test_module": result["module"],
737                 "test_case": result["test"],
738                 "result": "ERROR",
739                 "error": result["error"],
740                 "requirements": result["requirements"],
741             }
742         )
743     for result in ALL_RESULTS:
744         results.append(
745             {
746                 "files": result.files,
747                 "test_module": result.test_module,
748                 "test_case": result.test_case,
749                 "result": result.outcome,
750                 "error": result.error_message if result.is_failed else "",
751                 "requirements": result.requirements_metadata(reqs),
752             }
753         )
754
755     requirements = data["requirements"]
756     for r_id, r_data in reqs.items():
757         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
758         if result:
759             requirements.append(
760                 {
761                     "id": r_id,
762                     "text": r_data["description"],
763                     "keyword": r_data["keyword"],
764                     "result": result,
765                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
766                 }
767             )
768     # If there are tests that aren't mapped to a requirement, then we'll
769     # map them to a special entry so the results are coherent.
770     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
771     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
772     if unmapped_outcomes or has_errors:
773         requirements.append(
774             {
775                 "id": "Unmapped",
776                 "text": "Tests not mapped to requirements (see tests)",
777                 "result": aggregate_results(has_errors, unmapped_outcomes),
778                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
779             }
780         )
781
782     report_path = os.path.join(outpath, "report.json")
783     write_json(data, report_path)
784
785
786 def generate_html_report(outpath, categories, template_path, failures):
787     reqs = load_current_requirements()
788     resolutions = load_resolutions_file()
789     fail_data = []
790     for failure in failures:
791         fail_data.append(
792             {
793                 "file_links": make_href(failure.files),
794                 "test_id": failure.test_module,
795                 "error_message": failure.error_message,
796                 "raw_output": failure.raw_output,
797                 "requirements": docutils.core.publish_parts(
798                     writer_name="html", source=failure.requirement_text(reqs)
799                 )["body"],
800                 "resolution_steps": failure.resolution_steps(resolutions),
801             }
802         )
803     pkg_dir = os.path.split(__file__)[0]
804     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
805     with open(j2_template_path, "r") as f:
806         report_template = jinja2.Template(f.read())
807         contents = report_template.render(
808             version=version.VERSION,
809             num_failures=len(failures) + len(COLLECTION_FAILURES),
810             categories=categories,
811             template_dir=make_href(template_path),
812             checksum=hash_directory(template_path),
813             timestamp=make_timestamp(),
814             failures=fail_data,
815             collection_failures=COLLECTION_FAILURES,
816         )
817     with open(os.path.join(outpath, "report.html"), "w") as f:
818         f.write(contents)
819
820
821 def pytest_addoption(parser):
822     """
823     Add needed CLI arguments
824     """
825     parser.addoption(
826         "--template-directory",
827         dest="template_dir",
828         action="append",
829         help="Directory which holds the templates for validation",
830     )
831
832     parser.addoption(
833         "--template-source",
834         dest="template_source",
835         action="append",
836         help="Source Directory which holds the templates for validation",
837     )
838
839     parser.addoption(
840         "--self-test",
841         dest="self_test",
842         action="store_true",
843         help="Test the unit tests against their fixtured data",
844     )
845
846     parser.addoption(
847         "--report-format",
848         dest="report_format",
849         action="store",
850         help="Format of output report (html, csv, excel, json)",
851     )
852
853     parser.addoption(
854         "--continue-on-failure",
855         dest="continue_on_failure",
856         action="store_true",
857         help="Continue validation even when structural errors exist in input files",
858     )
859
860     parser.addoption(
861         "--output-directory",
862         dest="output_dir",
863         action="store",
864         default=None,
865         help="Alternate ",
866     )
867
868     parser.addoption(
869         "--category",
870         dest="test_categories",
871         action="append",
872         help="optional category of test to execute",
873     )
874
875
876 def pytest_configure(config):
877     """
878     Ensure that we are receive either `--self-test` or
879     `--template-dir=<directory` as CLI arguments
880     """
881     if config.getoption("template_dir") and config.getoption("self_test"):
882         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
883     if not (
884         config.getoption("template_dir")
885         or config.getoption("self_test")
886         or config.getoption("help")
887     ):
888         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
889
890
891 def pytest_generate_tests(metafunc):
892     """
893     If a unit test requires an argument named 'filename'
894     we generate a test for the filenames selected. Either
895     the files contained in `template_dir` or if `template_dir`
896     is not specified on the CLI, the fixtures associated with this
897     test name.
898     """
899
900     # noinspection PyBroadException
901     try:
902         if "filename" in metafunc.fixturenames:
903             from .parametrizers import parametrize_filename
904
905             parametrize_filename(metafunc)
906
907         if "filenames" in metafunc.fixturenames:
908             from .parametrizers import parametrize_filenames
909
910             parametrize_filenames(metafunc)
911
912         if "template_dir" in metafunc.fixturenames:
913             from .parametrizers import parametrize_template_dir
914
915             parametrize_template_dir(metafunc)
916
917         if "environment_pair" in metafunc.fixturenames:
918             from .parametrizers import parametrize_environment_pair
919
920             parametrize_environment_pair(metafunc)
921
922         if "heat_volume_pair" in metafunc.fixturenames:
923             from .parametrizers import parametrize_heat_volume_pair
924
925             parametrize_heat_volume_pair(metafunc)
926
927         if "yaml_files" in metafunc.fixturenames:
928             from .parametrizers import parametrize_yaml_files
929
930             parametrize_yaml_files(metafunc)
931
932         if "env_files" in metafunc.fixturenames:
933             from .parametrizers import parametrize_environment_files
934
935             parametrize_environment_files(metafunc)
936
937         if "yaml_file" in metafunc.fixturenames:
938             from .parametrizers import parametrize_yaml_file
939
940             parametrize_yaml_file(metafunc)
941
942         if "env_file" in metafunc.fixturenames:
943             from .parametrizers import parametrize_environment_file
944
945             parametrize_environment_file(metafunc)
946
947         if "parsed_yaml_file" in metafunc.fixturenames:
948             from .parametrizers import parametrize_parsed_yaml_file
949
950             parametrize_parsed_yaml_file(metafunc)
951
952         if "parsed_environment_file" in metafunc.fixturenames:
953             from .parametrizers import parametrize_parsed_environment_file
954
955             parametrize_parsed_environment_file(metafunc)
956
957         if "heat_template" in metafunc.fixturenames:
958             from .parametrizers import parametrize_heat_template
959
960             parametrize_heat_template(metafunc)
961
962         if "heat_templates" in metafunc.fixturenames:
963             from .parametrizers import parametrize_heat_templates
964
965             parametrize_heat_templates(metafunc)
966
967         if "volume_template" in metafunc.fixturenames:
968             from .parametrizers import parametrize_volume_template
969
970             parametrize_volume_template(metafunc)
971
972         if "volume_templates" in metafunc.fixturenames:
973             from .parametrizers import parametrize_volume_templates
974
975             parametrize_volume_templates(metafunc)
976
977         if "template" in metafunc.fixturenames:
978             from .parametrizers import parametrize_template
979
980             parametrize_template(metafunc)
981
982         if "templates" in metafunc.fixturenames:
983             from .parametrizers import parametrize_templates
984
985             parametrize_templates(metafunc)
986     except Exception as e:
987         # If an error occurs in the collection phase, then it won't be logged as a
988         # normal test failure.  This means that failures could occur, but not
989         # be seen on the report resulting in a false positive success message.  These
990         # errors will be stored and reported separately on the report
991         COLLECTION_FAILURES.append(
992             {
993                 "module": metafunc.module.__name__,
994                 "test": metafunc.function.__name__,
995                 "fixtures": metafunc.fixturenames,
996                 "error": traceback.format_exc(),
997                 "requirements": getattr(metafunc.function, "requirement_ids", []),
998             }
999         )
1000         raise e
1001
1002
1003 def hash_directory(path):
1004     md5 = hashlib.md5()
1005     for dir_path, sub_dirs, filenames in os.walk(path):
1006         for filename in filenames:
1007             file_path = os.path.join(dir_path, filename)
1008             with open(file_path, "rb") as f:
1009                 md5.update(f.read())
1010     return md5.hexdigest()
1011
1012
1013 def load_current_requirements():
1014     """Loads dict of current requirements or empty dict if file doesn't exist"""
1015     try:
1016         r = requests.get(NEEDS_JSON_URL)
1017         if r.headers.get("content-type") == "application/json":
1018             with open(HEAT_REQUIREMENTS_FILE, "wb") as needs:
1019                 needs.write(r.content)
1020         else:
1021             warnings.warn(
1022                 (
1023                     "Unexpected content-type ({}) encountered downloading "
1024                     + "requirements.json, using last saved copy"
1025                 ).format(r.headers.get("content-type"))
1026             )
1027     except requests.exceptions.RequestException as e:
1028         warnings.warn("Error downloading latest JSON, using last saved copy.")
1029         warnings.warn(UserWarning(e))
1030     if not os.path.exists(HEAT_REQUIREMENTS_FILE):
1031         return {}
1032     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1033         data = json.load(f)
1034         version = data["current_version"]
1035         return data["versions"][version]["needs"]
1036
1037
1038 def compat_open(path):
1039     """Invokes open correctly depending on the Python version"""
1040     if sys.version_info.major < 3:
1041         return open(path, "wb")
1042     else:
1043         return open(path, "w", newline="")
1044
1045
1046 def unicode_writerow(writer, row):
1047     if sys.version_info.major < 3:
1048         row = [s.encode("utf8") for s in row]
1049     writer.writerow(row)
1050
1051
1052 def parse_heat_requirements(reqs):
1053     """Takes requirements and returns list of only Heat requirements"""
1054     data = json.loads(reqs)
1055     for key, values in list(data.items()):
1056         if "Heat" in (values["docname"]):
1057             if "MUST" not in (values["keyword"]):
1058                 del data[key]
1059             else:
1060                 if "none" in (values["validation_mode"]):
1061                     del data[key]
1062         else:
1063             del data[key]
1064     return data
1065
1066
1067 # noinspection PyUnusedLocal
1068 def pytest_report_collectionfinish(config, startdir, items):
1069     """Generates a simple traceability report to output/traceability.csv"""
1070     traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
1071     output_dir = os.path.split(traceability_path)[0]
1072     if not os.path.exists(output_dir):
1073         os.makedirs(output_dir)
1074     reqs = load_current_requirements()
1075     reqs = json.dumps(reqs)
1076     requirements = parse_heat_requirements(reqs)
1077     unmapped, mapped = partition(
1078         lambda i: hasattr(i.function, "requirement_ids"), items
1079     )
1080
1081     req_to_test = defaultdict(set)
1082     mapping_errors = set()
1083     for item in mapped:
1084         for req_id in item.function.requirement_ids:
1085             if req_id not in req_to_test:
1086                 req_to_test[req_id].add(item)
1087             if req_id not in requirements:
1088                 mapping_errors.add(
1089                     (req_id, item.function.__module__, item.function.__name__)
1090                 )
1091
1092     mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
1093     with compat_open(mapping_error_path) as f:
1094         writer = csv.writer(f)
1095         for err in mapping_errors:
1096             unicode_writerow(writer, err)
1097
1098     with compat_open(traceability_path) as f:
1099         out = csv.writer(f)
1100         unicode_writerow(
1101             out,
1102             ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
1103         )
1104         for req_id, metadata in requirements.items():
1105             if req_to_test[req_id]:
1106                 for item in req_to_test[req_id]:
1107                     unicode_writerow(
1108                         out,
1109                         (
1110                             req_id,
1111                             metadata["description"],
1112                             metadata["section_name"],
1113                             item.function.__module__,
1114                             item.function.__name__,
1115                         ),
1116                     )
1117             else:
1118                 unicode_writerow(
1119                     out,
1120                     (req_id, metadata["description"], metadata["section_name"], "", ""),
1121                 )
1122         # now write out any test methods that weren't mapped to requirements
1123         for item in unmapped:
1124             unicode_writerow(
1125                 out, ("", "", "", item.function.__module__, item.function.__name__)
1126             )