[VVP-132] Add new JSON output report
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import sys
46 import time
47 from collections import defaultdict
48 from itertools import chain
49
50 import requests
51 import traceback
52 import warnings
53
54 import docutils.core
55 import jinja2
56 import pytest
57 from more_itertools import partition
58 import xlsxwriter
59 from six import string_types
60
61 __path__ = [os.path.dirname(os.path.abspath(__file__))]
62
63 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
64
65 RESOLUTION_STEPS_FILE = "resolution_steps.json"
66 HEAT_REQUIREMENTS_FILE = "heat_requirements.json"
67
68 # noinspection PyPep8
69 NEEDS_JSON_URL = "https://onap.readthedocs.io/en/latest/_downloads/789ac64d223325488fb3f120f959d985/needs.json"
70
71 REPORT_COLUMNS = [
72     ("Input File", "file"),
73     ("Test", "test_file"),
74     ("Requirements", "req_description"),
75     ("Resolution Steps", "resolution_steps"),
76     ("Error Message", "message"),
77     ("Raw Test Output", "raw_output"),
78 ]
79
80 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
81 while preparing to validate the the input files. Some validations may not have been
82 executed. Please refer these issue to the VNF Validation Tool team.
83 """
84
85 COLLECTION_FAILURES = []
86
87 # Captures the results of every test run
88 ALL_RESULTS = []
89
90
91 def get_output_dir(config):
92     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
93     if not os.path.exists(output_dir):
94         os.makedirs(output_dir, exist_ok=True)
95     return output_dir
96
97
98 def extract_error_msg(rep):
99     """
100     If a custom error message was provided, then extract it otherwise
101     just show the pytest assert message
102     """
103     if rep.outcome != "failed":
104         return ""
105     try:
106         full_msg = str(rep.longrepr.reprcrash.message)
107         match = re.match(
108             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
109         )
110         if match:  # custom message was provided
111             # Extract everything between AssertionError and the start
112             # of the assert statement expansion in the pytest report
113             msg = match.group(1)
114         else:
115             msg = str(rep.longrepr.reprcrash)
116             if "AssertionError:" in msg:
117                 msg = msg.split("AssertionError:")[1]
118     except AttributeError:
119         msg = str(rep)
120
121     return msg
122
123
124 class TestResult:
125     """
126     Wraps the test case and result to extract necessary metadata for
127     reporting purposes.
128     """
129
130     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
131
132     def __init__(self, item, outcome):
133         self.item = item
134         self.result = outcome.get_result()
135         self.files = [os.path.normpath(p) for p in self._get_files()]
136         self.error_message = self._get_error_message()
137
138     @property
139     def requirement_ids(self):
140         """
141         Returns list of requirement IDs mapped to the test case.
142
143         :return: Returns a list of string requirement IDs the test was
144                  annotated with ``validates`` otherwise returns and empty list
145         """
146         is_mapped = hasattr(self.item.function, "requirement_ids")
147         return self.item.function.requirement_ids if is_mapped else []
148
149     @property
150     def markers(self):
151         """
152         :return: Returns a set of pytest marker names for the test or an empty set
153         """
154         return set(m.name for m in self.item.iter_markers())
155
156     @property
157     def is_base_test(self):
158         """
159         :return: Returns True if the test is annotated with a pytest marker called base
160         """
161         return "base" in self.markers
162
163     @property
164     def is_failed(self):
165         """
166         :return: True if the test failed
167         """
168         return self.outcome == "FAIL"
169
170     @property
171     def outcome(self):
172         """
173         :return: Returns 'PASS', 'FAIL', or 'SKIP'
174         """
175         return self.RESULT_MAPPING[self.result.outcome]
176
177     @property
178     def test_case(self):
179         """
180         :return: Name of the test case method
181         """
182         return self.item.function.__name__
183
184     @property
185     def test_module(self):
186         """
187         :return: Name of the file containing the test case
188         """
189         return self.item.function.__module__.split(".")[-1]
190
191     @property
192     def raw_output(self):
193         """
194         :return: Full output from pytest for the given test case
195         """
196         return str(self.result.longrepr)
197
198     def requirement_text(self, curr_reqs):
199         """
200         Creates a text summary for the requirement IDs mapped to the test case.
201         If no requirements are mapped, then it returns the empty string.
202
203         :param curr_reqs: mapping of requirement IDs to requirement metadata
204                           loaded from the VNFRQTS projects needs.json output
205         :return: ID and text of the requirements mapped to the test case
206         """
207         text = (
208             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
209             for r_id in self.requirement_ids
210         )
211         return "".join(text)
212
213     def requirements_metadata(self, curr_reqs):
214         """
215         Returns a list of dicts containing the following metadata for each
216         requirement mapped:
217
218         - id: Requirement ID
219         - text: Full text of the requirement
220         - keyword: MUST, MUST NOT, MAY, etc.
221
222         :param curr_reqs: mapping of requirement IDs to requirement metadata
223                           loaded from the VNFRQTS projects needs.json output
224         :return: List of requirement metadata
225         """
226         data = []
227         for r_id in self.requirement_ids:
228             if r_id not in curr_reqs:
229                 continue
230             data.append(
231                 {
232                     "id": r_id,
233                     "text": curr_reqs[r_id]["description"],
234                     "keyword": curr_reqs[r_id]["keyword"],
235                 }
236             )
237         return data
238
239     def resolution_steps(self, resolutions):
240         """
241         :param resolutions: Loaded from contents for resolution_steps.json
242         :return: Header and text for the resolution step associated with this
243                  test case.  Returns empty string if no resolutions are
244                  provided.
245         """
246         text = (
247             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
248             for entry in resolutions
249             if self._match(entry)
250         )
251         return "".join(text)
252
253     def _match(self, resolution_entry):
254         """
255         Returns True if the test result maps to the given entry in
256         the resolutions file
257         """
258         return (
259             self.test_case == resolution_entry["function"]
260             and self.test_module == resolution_entry["module"]
261         )
262
263     def _get_files(self):
264         """
265         Extracts the list of files passed into the test case.
266         :return: List of absolute paths to files
267         """
268         if "environment_pair" in self.item.fixturenames:
269             return [
270                 "{} environment pair".format(
271                     self.item.funcargs["environment_pair"]["name"]
272                 )
273             ]
274         elif "heat_volume_pair" in self.item.fixturenames:
275             return [
276                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
277             ]
278         elif "heat_templates" in self.item.fixturenames:
279             return self.item.funcargs["heat_templates"]
280         elif "yaml_files" in self.item.fixturenames:
281             return self.item.funcargs["yaml_files"]
282         else:
283             return [self.result.nodeid.split("[")[1][:-1]]
284
285     def _get_error_message(self):
286         """
287         :return: Error message or empty string if the test did not fail or error
288         """
289         if self.is_failed:
290             return extract_error_msg(self.result)
291         else:
292             return ""
293
294
295 # noinspection PyUnusedLocal
296 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
297 def pytest_runtest_makereport(item, call):
298     """
299     Captures the test results for later reporting.  This will also halt testing
300     if a base failure is encountered (can be overridden with continue-on-failure)
301     """
302     outcome = yield
303     if outcome.get_result().when != "call":
304         return  # only capture results of test cases themselves
305     result = TestResult(item, outcome)
306     ALL_RESULTS.append(result)
307     if (
308         not item.config.option.continue_on_failure
309         and result.is_base_test
310         and result.is_failed
311     ):
312         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
313             result.error_message
314         )
315         pytest.exit(
316             "{}\n{}\n{}".format(msg, result.files, result.test_case)
317         )
318
319
320 def make_timestamp():
321     """
322     :return: String make_iso_timestamp in format:
323              2019-01-19 10:18:49.865000 Central Standard Time
324     """
325     timezone = time.tzname[time.localtime().tm_isdst]
326     return "{} {}".format(str(datetime.datetime.now()), timezone)
327
328
329 # noinspection PyUnusedLocal
330 def pytest_sessionstart(session):
331     ALL_RESULTS.clear()
332     COLLECTION_FAILURES.clear()
333
334
335 # noinspection PyUnusedLocal
336 def pytest_sessionfinish(session, exitstatus):
337     """
338     If not a self-test run, generate the output reports
339     """
340     if not session.config.option.template_dir:
341         return
342     template_path = os.path.abspath(session.config.option.template_dir[0])
343     profile_name = session.config.option.validation_profile_name or ""
344     generate_report(
345         get_output_dir(session.config),
346         template_path,
347         profile_name,
348         session.config.option.report_format,
349     )
350
351
352 # noinspection PyUnusedLocal
353 def pytest_collection_modifyitems(session, config, items):
354     """
355     Selects tests based on the validation profile requested.  Tests without
356     pytest markers will always be executed.
357     """
358     allowed_marks = ["xfail", "base"]
359     profile = config.option.validation_profile
360
361     for item in items:
362         markers = set(m.name for m in item.iter_markers())
363         if not profile and markers and set(markers).isdisjoint(allowed_marks):
364             item.add_marker(
365                 pytest.mark.skip(
366                     reason="No validation profile selected. "
367                     "Skipping tests with marks."
368                 )
369             )
370         if (
371             profile
372             and markers
373             and profile not in markers
374             and set(markers).isdisjoint(allowed_marks)
375         ):
376             item.add_marker(
377                 pytest.mark.skip(reason="Doesn't match selection " "validation profile")
378             )
379
380     items.sort(
381         key=lambda i: 0 if "base" in set(m.name for m in i.iter_markers()) else 1
382     )
383
384
385 def make_href(paths):
386     """
387     Create an anchor tag to link to the file paths provided.
388     :param paths: string or list of file paths
389     :return: String of hrefs - one for each path, each seperated by a line
390              break (<br/).
391     """
392     paths = [paths] if isinstance(paths, string_types) else paths
393     links = []
394     for p in paths:
395         abs_path = os.path.abspath(p)
396         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
397         links.append(
398             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
399                 abs_path=abs_path, name=name
400             )
401         )
402     return "<br/>".join(links)
403
404
405 def load_resolutions_file():
406     """
407     :return: dict of data loaded from resolutions_steps.json
408     """
409     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
410     if os.path.exists(resolution_steps):
411         with open(resolution_steps, "r") as f:
412             return json.loads(f.read())
413
414
415 def generate_report(outpath, template_path, profile_name, output_format="html"):
416     """
417     Generates the various output reports.
418
419     :param outpath: destination directory for all reports
420     :param template_path: directory containing the Heat templates validated
421     :param profile_name: Optional validation profile selected
422     :param output_format: One of "html", "excel", or "csv". Default is "html"
423     :raises: ValueError if requested output format is unknown
424     """
425     failures = [r for r in ALL_RESULTS if r.is_failed]
426     generate_failure_file(outpath)
427     output_format = output_format.lower().strip() if output_format else "html"
428     if output_format == "html":
429         generate_html_report(outpath, profile_name, template_path, failures)
430     elif output_format == "excel":
431         generate_excel_report(outpath, profile_name, template_path, failures)
432     elif output_format == "json":
433         generate_json(outpath, template_path, profile_name)
434     elif output_format == "csv":
435         generate_csv_report(outpath, profile_name, template_path, failures)
436     else:
437         raise ValueError("Unsupported output format: " + output_format)
438
439
440 def write_json(data, path):
441     """
442     Pretty print data as JSON to the output path requested
443
444     :param data: Data structure to be converted to JSON
445     :param path: Where to write output
446     """
447     with open(path, "w") as f:
448         json.dump(data, f, indent=2)
449
450
451 def generate_failure_file(outpath):
452     """
453     Writes a summary of test failures to a file named failures.
454     This is for backwards compatibility only.  The report.json offers a
455     more comprehensive output.
456     """
457     failure_path = os.path.join(outpath, "failures")
458     failures = [r for r in ALL_RESULTS if r.is_failed]
459     data = {}
460     for i, fail in enumerate(failures):
461         data[str(i)] = {
462             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
463             "vnfrqts": fail.requirement_ids,
464             "test": fail.test_case,
465             "test_file": fail.test_module,
466             "raw_output": fail.raw_output,
467             "message": fail.error_message,
468         }
469     write_json(data, failure_path)
470
471
472 def generate_csv_report(output_dir, profile_name, template_path, failures):
473     rows = [["Validation Failures"]]
474     headers = [
475         ("Profile Selected:", profile_name),
476         ("Report Generated At:", make_timestamp()),
477         ("Directory Validated:", template_path),
478         ("Checksum:", hash_directory(template_path)),
479         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
480     ]
481     rows.append([])
482     for header in headers:
483         rows.append(header)
484     rows.append([])
485
486     if COLLECTION_FAILURES:
487         rows.append([COLLECTION_FAILURE_WARNING])
488         rows.append(["Validation File", "Test", "Fixtures", "Error"])
489         for failure in COLLECTION_FAILURES:
490             rows.append(
491                 [
492                     failure["module"],
493                     failure["test"],
494                     ";".join(failure["fixtures"]),
495                     failure["error"],
496                 ]
497             )
498         rows.append([])
499
500     # table header
501     rows.append([col for col, _ in REPORT_COLUMNS])
502
503     reqs = load_current_requirements()
504     resolutions = load_resolutions_file()
505
506     # table content
507     for failure in failures:
508         rows.append(
509             [
510                 "\n".join(failure.files),
511                 failure.test_module,
512                 failure.requirement_text(reqs),
513                 failure.resolution_steps(resolutions),
514                 failure.error_message,
515                 failure.raw_output,
516             ]
517         )
518
519     output_path = os.path.join(output_dir, "report.csv")
520     with open(output_path, "w", newline="") as f:
521         writer = csv.writer(f)
522         for row in rows:
523             writer.writerow(row)
524
525
526 def generate_excel_report(output_dir, profile_name, template_path, failures):
527     output_path = os.path.join(output_dir, "report.xlsx")
528     workbook = xlsxwriter.Workbook(output_path)
529     bold = workbook.add_format({"bold": True})
530     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
531     normal = workbook.add_format({"text_wrap": True})
532     heading = workbook.add_format({"bold": True, "font_size": 18})
533     worksheet = workbook.add_worksheet("failures")
534     worksheet.write(0, 0, "Validation Failures", heading)
535
536     headers = [
537         ("Profile Selected:", profile_name),
538         ("Report Generated At:", make_timestamp()),
539         ("Directory Validated:", template_path),
540         ("Checksum:", hash_directory(template_path)),
541         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
542     ]
543     for row, (header, value) in enumerate(headers, start=2):
544         worksheet.write(row, 0, header, bold)
545         worksheet.write(row, 1, value)
546
547     worksheet.set_column(0, len(headers) - 1, 40)
548     worksheet.set_column(len(headers), len(headers), 80)
549
550     if COLLECTION_FAILURES:
551         collection_failures_start = 2 + len(headers) + 2
552         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
553         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
554         for col_num, col_name in enumerate(collection_failure_headers):
555             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
556         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
557             worksheet.write(row, 0, data["module"])
558             worksheet.write(row, 1, data["test"])
559             worksheet.write(row, 2, ",".join(data["fixtures"]))
560             worksheet.write(row, 3, data["error"], code)
561
562     # table header
563     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
564     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
565     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
566         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
567
568     reqs = load_current_requirements()
569     resolutions = load_resolutions_file()
570
571     # table content
572     for row, failure in enumerate(failures, start=start_error_table_row + 2):
573         worksheet.write(row, 0, "\n".join(failure.files), normal)
574         worksheet.write(row, 1, failure.test_module, normal)
575         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
576         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
577         worksheet.write(row, 4, failure.error_message, normal)
578         worksheet.write(row, 5, failure.raw_output, code)
579
580     workbook.close()
581
582
583 def make_iso_timestamp():
584     """
585     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
586     """
587     now = datetime.datetime.utcnow()
588     now.replace(tzinfo=datetime.timezone.utc)
589     return now.isoformat()
590
591
592 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
593     """
594     Examines all tests associated with a given requirement and determines
595     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
596
597     * ERROR - At least one ERROR occurred
598     * PASS -  At least one PASS and no FAIL or ERRORs.
599     * FAIL -  At least one FAIL occurred (no ERRORs)
600     * SKIP - All tests were SKIP
601
602
603     :param r_id: Requirement ID to examing
604     :param collection_failures: Errors that occurred during test setup.
605     :param test_results: List of TestResult
606     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
607     """
608     errors = any(r_id in f["requirements"] for f in collection_failures)
609     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
610     return aggregate_results(errors, outcomes, r_id)
611
612
613 def aggregate_results(has_errors, outcomes, r_id=None):
614     """
615     Determines the aggregate result for the conditions provided.  Assumes the
616     results have been filtered and collected for analysis.
617
618     :param has_errors: True if collection failures occurred for the tests being
619                        analyzed.
620     :param outcomes: set of outcomes from the TestResults
621     :param r_id: Optional requirement ID if known
622     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
623              (see aggregate_requirement_adherence for more detail)
624     """
625     if has_errors:
626         return "ERROR"
627
628     if not outcomes:
629         return "PASS"
630     elif "FAIL" in outcomes:
631         return "FAIL"
632     elif "PASS" in outcomes:
633         return "PASS"
634     elif {"SKIP"} == outcomes:
635         return "SKIP"
636     else:
637         pytest.warns(
638             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
639                 outcomes, r_id)
640         )
641         return "ERROR"
642
643
644 def aggregate_run_results(collection_failures, test_results):
645     """
646     Determines overall status of run based on all failures and results.
647
648     * 'ERROR' - At least one collection failure occurred during the run.
649     * 'FAIL' - Template failed at least one test
650     * 'PASS' - All tests executed properly and no failures were detected
651
652     :param collection_failures: failures occuring during test setup
653     :param test_results: list of all test executuion results
654     :return: one of 'ERROR', 'FAIL', or 'PASS'
655     """
656     if collection_failures:
657         return "ERROR"
658     elif any(r.is_failed for r in test_results):
659         return "FAIL"
660     else:
661         return "PASS"
662
663
664 def error(failure_or_result):
665     """
666     Extracts the error message from a collection failure or test result
667     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
668     :return: Error message as string
669     """
670     if isinstance(failure_or_result, TestResult):
671         return failure_or_result.error_message
672     else:
673         return failure_or_result["error"]
674
675
676 def req_ids(failure_or_result):
677     """
678     Extracts the requirement IDs from a collection failure or test result
679     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
680     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
681     """
682     if isinstance(failure_or_result, TestResult):
683         return set(failure_or_result.requirement_ids)
684     else:
685         return set(failure_or_result["requirements"])
686
687
688 def collect_errors(r_id, collection_failures, test_result):
689     """
690     Creates a list of error messages from the collection failures and
691     test results.  If r_id is provided, then it collects the error messages
692     where the failure or test is associated with that requirement ID.  If
693     r_id is None, then it collects all errors that occur on failures and
694     results that are not mapped to requirements
695     """
696     def selector(item):
697         if r_id:
698             return r_id in req_ids(item)
699         else:
700             return not req_ids(item)
701
702     errors = (error(x) for x in chain(collection_failures, test_result)
703               if selector(x))
704     return [e for e in errors if e]
705
706
707 def generate_json(outpath, template_path, profile_name):
708     """
709     Creates a JSON summary of the entire test run.
710     """
711     reqs = load_current_requirements()
712     data = {
713         "version": "dublin",
714         "template_directory": template_path,
715         "timestamp": make_iso_timestamp(),
716         "checksum": hash_directory(template_path),
717         "profile": profile_name,
718         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
719         "tests": [],
720         "requirements": [],
721     }
722
723     results = data["tests"]
724     for result in COLLECTION_FAILURES:
725         results.append(
726             {
727                 "files": [],
728                 "test_module": result["module"],
729                 "test_case": result["test"],
730                 "result": "ERROR",
731                 "error": result["error"],
732                 "requirements": result["requirements"],
733             }
734         )
735     for result in ALL_RESULTS:
736         results.append(
737             {
738                 "files": result.files,
739                 "test_module": result.test_module,
740                 "test_case": result.test_case,
741                 "result": result.outcome,
742                 "error": result.error_message if result.is_failed else "",
743                 "requirements": result.requirements_metadata(reqs),
744             }
745         )
746
747     requirements = data["requirements"]
748     for r_id, r_data in reqs.items():
749         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
750         if result:
751             requirements.append(
752                 {
753                     "id": r_id,
754                     "text": r_data["description"],
755                     "keyword": r_data["keyword"],
756                     "result": result,
757                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS)
758                 }
759             )
760     # If there are tests that aren't mapped to a requirement, then we'll
761     # map them to a special entry so the results are coherent.
762     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
763     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
764     if unmapped_outcomes or has_errors:
765         requirements.append(
766             {
767                 "id": "Unmapped",
768                 "text": "Tests not mapped to requirements (see tests)",
769                 "result": aggregate_results(has_errors, unmapped_outcomes),
770                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS)
771             }
772         )
773
774     report_path = os.path.join(outpath, "report.json")
775     write_json(data, report_path)
776
777
778 def generate_html_report(outpath, profile_name, template_path, failures):
779     reqs = load_current_requirements()
780     resolutions = load_resolutions_file()
781     fail_data = []
782     for failure in failures:
783         fail_data.append(
784             {
785                 "file_links": make_href(failure.files),
786                 "test_id": failure.test_module,
787                 "error_message": failure.error_message,
788                 "raw_output": failure.raw_output,
789                 "requirements": docutils.core.publish_parts(
790                     writer_name="html", source=failure.requirement_text(reqs)
791                 )["body"],
792                 "resolution_steps": failure.resolution_steps(resolutions),
793             }
794         )
795     pkg_dir = os.path.split(__file__)[0]
796     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
797     with open(j2_template_path, "r") as f:
798         report_template = jinja2.Template(f.read())
799         contents = report_template.render(
800             num_failures=len(failures) + len(COLLECTION_FAILURES),
801             profile_name=profile_name,
802             template_dir=make_href(template_path),
803             checksum=hash_directory(template_path),
804             timestamp=make_timestamp(),
805             failures=fail_data,
806             collection_failures=COLLECTION_FAILURES,
807         )
808     with open(os.path.join(outpath, "report.html"), "w") as f:
809         f.write(contents)
810
811
812 def pytest_addoption(parser):
813     """
814     Add needed CLI arguments
815     """
816     parser.addoption(
817         "--template-directory",
818         dest="template_dir",
819         action="append",
820         help="Directory which holds the templates for validation",
821     )
822
823     parser.addoption(
824         "--self-test",
825         dest="self_test",
826         action="store_true",
827         help="Test the unit tests against their fixtured data",
828     )
829
830     parser.addoption(
831         "--validation-profile",
832         dest="validation_profile",
833         action="store",
834         help="Runs all unmarked tests plus test with a matching marker",
835     )
836
837     parser.addoption(
838         "--validation-profile-name",
839         dest="validation_profile_name",
840         action="store",
841         help="Friendly name of the validation profile used in reports",
842     )
843
844     parser.addoption(
845         "--report-format",
846         dest="report_format",
847         action="store",
848         help="Format of output report (html, csv, excel, json)",
849     )
850
851     parser.addoption(
852         "--continue-on-failure",
853         dest="continue_on_failure",
854         action="store_true",
855         help="Continue validation even when structural errors exist in input files",
856     )
857
858     parser.addoption(
859         "--output-directory",
860         dest="output_dir",
861         action="store",
862         default=None,
863         help="Alternate "
864     )
865
866
867 def pytest_configure(config):
868     """
869     Ensure that we are receive either `--self-test` or
870     `--template-dir=<directory` as CLI arguments
871     """
872     if config.getoption("template_dir") and config.getoption("self_test"):
873         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
874     if not (
875         config.getoption("template_dir")
876         or config.getoption("self_test")
877         or config.getoption("help")
878     ):
879         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
880
881
882 def pytest_generate_tests(metafunc):
883     """
884     If a unit test requires an argument named 'filename'
885     we generate a test for the filenames selected. Either
886     the files contained in `template_dir` or if `template_dir`
887     is not specified on the CLI, the fixtures associated with this
888     test name.
889     """
890
891     # noinspection PyBroadException
892     try:
893         if "filename" in metafunc.fixturenames:
894             from .parametrizers import parametrize_filename
895
896             parametrize_filename(metafunc)
897
898         if "filenames" in metafunc.fixturenames:
899             from .parametrizers import parametrize_filenames
900
901             parametrize_filenames(metafunc)
902
903         if "template_dir" in metafunc.fixturenames:
904             from .parametrizers import parametrize_template_dir
905
906             parametrize_template_dir(metafunc)
907
908         if "environment_pair" in metafunc.fixturenames:
909             from .parametrizers import parametrize_environment_pair
910
911             parametrize_environment_pair(metafunc)
912
913         if "heat_volume_pair" in metafunc.fixturenames:
914             from .parametrizers import parametrize_heat_volume_pair
915
916             parametrize_heat_volume_pair(metafunc)
917
918         if "yaml_files" in metafunc.fixturenames:
919             from .parametrizers import parametrize_yaml_files
920
921             parametrize_yaml_files(metafunc)
922
923         if "env_files" in metafunc.fixturenames:
924             from .parametrizers import parametrize_environment_files
925
926             parametrize_environment_files(metafunc)
927
928         if "yaml_file" in metafunc.fixturenames:
929             from .parametrizers import parametrize_yaml_file
930
931             parametrize_yaml_file(metafunc)
932
933         if "env_file" in metafunc.fixturenames:
934             from .parametrizers import parametrize_environment_file
935
936             parametrize_environment_file(metafunc)
937
938         if "parsed_yaml_file" in metafunc.fixturenames:
939             from .parametrizers import parametrize_parsed_yaml_file
940
941             parametrize_parsed_yaml_file(metafunc)
942
943         if "parsed_environment_file" in metafunc.fixturenames:
944             from .parametrizers import parametrize_parsed_environment_file
945
946             parametrize_parsed_environment_file(metafunc)
947
948         if "heat_template" in metafunc.fixturenames:
949             from .parametrizers import parametrize_heat_template
950
951             parametrize_heat_template(metafunc)
952
953         if "heat_templates" in metafunc.fixturenames:
954             from .parametrizers import parametrize_heat_templates
955
956             parametrize_heat_templates(metafunc)
957
958         if "volume_template" in metafunc.fixturenames:
959             from .parametrizers import parametrize_volume_template
960
961             parametrize_volume_template(metafunc)
962
963         if "volume_templates" in metafunc.fixturenames:
964             from .parametrizers import parametrize_volume_templates
965
966             parametrize_volume_templates(metafunc)
967
968         if "template" in metafunc.fixturenames:
969             from .parametrizers import parametrize_template
970
971             parametrize_template(metafunc)
972
973         if "templates" in metafunc.fixturenames:
974             from .parametrizers import parametrize_templates
975
976             parametrize_templates(metafunc)
977     except Exception as e:
978         # If an error occurs in the collection phase, then it won't be logged as a
979         # normal test failure.  This means that failures could occur, but not
980         # be seen on the report resulting in a false positive success message.  These
981         # errors will be stored and reported separately on the report
982         COLLECTION_FAILURES.append(
983             {
984                 "module": metafunc.module.__name__,
985                 "test": metafunc.function.__name__,
986                 "fixtures": metafunc.fixturenames,
987                 "error": traceback.format_exc(),
988                 "requirements": getattr(metafunc.function, "requirement_ids", []),
989             }
990         )
991         raise e
992
993
994 def hash_directory(path):
995     md5 = hashlib.md5()
996     for dir_path, sub_dirs, filenames in os.walk(path):
997         for filename in filenames:
998             file_path = os.path.join(dir_path, filename)
999             with open(file_path, "rb") as f:
1000                 md5.update(f.read())
1001     return md5.hexdigest()
1002
1003
1004 def load_current_requirements():
1005     """Loads dict of current requirements or empty dict if file doesn't exist"""
1006     try:
1007         r = requests.get(NEEDS_JSON_URL)
1008         if r.headers.get("content-type") == "application/json":
1009             with open(HEAT_REQUIREMENTS_FILE, "wb") as needs:
1010                 needs.write(r.content)
1011         else:
1012             warnings.warn(
1013                 (
1014                     "Unexpected content-type ({}) encountered downloading "
1015                     + "requirements.json, using last saved copy"
1016                 ).format(r.headers.get("content-type"))
1017             )
1018     except requests.exceptions.RequestException as e:
1019         warnings.warn("Error downloading latest JSON, using last saved copy.")
1020         warnings.warn(UserWarning(e))
1021     if not os.path.exists(HEAT_REQUIREMENTS_FILE):
1022         return {}
1023     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1024         data = json.load(f)
1025         version = data["current_version"]
1026         return data["versions"][version]["needs"]
1027
1028
1029 def compat_open(path):
1030     """Invokes open correctly depending on the Python version"""
1031     if sys.version_info.major < 3:
1032         return open(path, "wb")
1033     else:
1034         return open(path, "w", newline="")
1035
1036
1037 def unicode_writerow(writer, row):
1038     if sys.version_info.major < 3:
1039         row = [s.encode("utf8") for s in row]
1040     writer.writerow(row)
1041
1042
1043 # noinspection PyUnusedLocal
1044 def pytest_report_collectionfinish(config, startdir, items):
1045     """Generates a simple traceability report to output/traceability.csv"""
1046     traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
1047     output_dir = os.path.split(traceability_path)[0]
1048     if not os.path.exists(output_dir):
1049         os.makedirs(output_dir)
1050     requirements = load_current_requirements()
1051     unmapped, mapped = partition(
1052         lambda i: hasattr(i.function, "requirement_ids"), items
1053     )
1054
1055     req_to_test = defaultdict(set)
1056     mapping_errors = set()
1057     for item in mapped:
1058         for req_id in item.function.requirement_ids:
1059             if req_id not in req_to_test:
1060                 req_to_test[req_id].add(item)
1061             if req_id not in requirements:
1062                 mapping_errors.add(
1063                     (req_id, item.function.__module__, item.function.__name__)
1064                 )
1065
1066     mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
1067     with compat_open(mapping_error_path) as f:
1068         writer = csv.writer(f)
1069         for err in mapping_errors:
1070             unicode_writerow(writer, err)
1071
1072     with compat_open(traceability_path) as f:
1073         out = csv.writer(f)
1074         unicode_writerow(
1075             out,
1076             ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
1077         )
1078         for req_id, metadata in requirements.items():
1079             if req_to_test[req_id]:
1080                 for item in req_to_test[req_id]:
1081                     unicode_writerow(
1082                         out,
1083                         (
1084                             req_id,
1085                             metadata["description"],
1086                             metadata["section_name"],
1087                             item.function.__module__,
1088                             item.function.__name__,
1089                         ),
1090                     )
1091             else:
1092                 unicode_writerow(
1093                     out,
1094                     (req_id, metadata["description"], metadata["section_name"], "", ""),
1095                 )
1096         # now write out any test methods that weren't mapped to requirements
1097         for item in unmapped:
1098             unicode_writerow(
1099                 out, ("", "", "", item.function.__module__, item.function.__name__)
1100             )