[VVP] Removing dynamic download of needs.json
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import sys
46 import time
47 from collections import defaultdict
48 from itertools import chain
49
50 import traceback
51
52 import docutils.core
53 import jinja2
54 import pytest
55 from more_itertools import partition
56 import xlsxwriter
57 from six import string_types
58
59 import version
60
61 __path__ = [os.path.dirname(os.path.abspath(__file__))]
62
63 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
64
65 RESOLUTION_STEPS_FILE = "resolution_steps.json"
66 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
67
68 REPORT_COLUMNS = [
69     ("Input File", "file"),
70     ("Test", "test_file"),
71     ("Requirements", "req_description"),
72     ("Resolution Steps", "resolution_steps"),
73     ("Error Message", "message"),
74     ("Raw Test Output", "raw_output"),
75 ]
76
77 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
78 while preparing to validate the the input files. Some validations may not have been
79 executed. Please refer these issue to the VNF Validation Tool team.
80 """
81
82 COLLECTION_FAILURES = []
83
84 # Captures the results of every test run
85 ALL_RESULTS = []
86
87
88 def get_output_dir(config):
89     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
90     if not os.path.exists(output_dir):
91         os.makedirs(output_dir, exist_ok=True)
92     return output_dir
93
94
95 def extract_error_msg(rep):
96     """
97     If a custom error message was provided, then extract it otherwise
98     just show the pytest assert message
99     """
100     if rep.outcome != "failed":
101         return ""
102     try:
103         full_msg = str(rep.longrepr.reprcrash.message)
104         match = re.match(
105             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
106         )
107         if match:  # custom message was provided
108             # Extract everything between AssertionError and the start
109             # of the assert statement expansion in the pytest report
110             msg = match.group(1)
111         else:
112             msg = str(rep.longrepr.reprcrash)
113             if "AssertionError:" in msg:
114                 msg = msg.split("AssertionError:")[1]
115     except AttributeError:
116         msg = str(rep)
117
118     return msg
119
120
121 class TestResult:
122     """
123     Wraps the test case and result to extract necessary metadata for
124     reporting purposes.
125     """
126
127     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
128
129     def __init__(self, item, outcome):
130         self.item = item
131         self.result = outcome.get_result()
132         self.files = [os.path.normpath(p) for p in self._get_files()]
133         self.error_message = self._get_error_message()
134
135     @property
136     def requirement_ids(self):
137         """
138         Returns list of requirement IDs mapped to the test case.
139
140         :return: Returns a list of string requirement IDs the test was
141                  annotated with ``validates`` otherwise returns and empty list
142         """
143         is_mapped = hasattr(self.item.function, "requirement_ids")
144         return self.item.function.requirement_ids if is_mapped else []
145
146     @property
147     def markers(self):
148         """
149         :return: Returns a set of pytest marker names for the test or an empty set
150         """
151         return set(m.name for m in self.item.iter_markers())
152
153     @property
154     def is_base_test(self):
155         """
156         :return: Returns True if the test is annotated with a pytest marker called base
157         """
158         return "base" in self.markers
159
160     @property
161     def is_failed(self):
162         """
163         :return: True if the test failed
164         """
165         return self.outcome == "FAIL"
166
167     @property
168     def outcome(self):
169         """
170         :return: Returns 'PASS', 'FAIL', or 'SKIP'
171         """
172         return self.RESULT_MAPPING[self.result.outcome]
173
174     @property
175     def test_case(self):
176         """
177         :return: Name of the test case method
178         """
179         return self.item.function.__name__
180
181     @property
182     def test_module(self):
183         """
184         :return: Name of the file containing the test case
185         """
186         return self.item.function.__module__.split(".")[-1]
187
188     @property
189     def raw_output(self):
190         """
191         :return: Full output from pytest for the given test case
192         """
193         return str(self.result.longrepr)
194
195     def requirement_text(self, curr_reqs):
196         """
197         Creates a text summary for the requirement IDs mapped to the test case.
198         If no requirements are mapped, then it returns the empty string.
199
200         :param curr_reqs: mapping of requirement IDs to requirement metadata
201                           loaded from the VNFRQTS projects needs.json output
202         :return: ID and text of the requirements mapped to the test case
203         """
204         text = (
205             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
206             for r_id in self.requirement_ids
207         )
208         return "".join(text)
209
210     def requirements_metadata(self, curr_reqs):
211         """
212         Returns a list of dicts containing the following metadata for each
213         requirement mapped:
214
215         - id: Requirement ID
216         - text: Full text of the requirement
217         - keyword: MUST, MUST NOT, MAY, etc.
218
219         :param curr_reqs: mapping of requirement IDs to requirement metadata
220                           loaded from the VNFRQTS projects needs.json output
221         :return: List of requirement metadata
222         """
223         data = []
224         for r_id in self.requirement_ids:
225             if r_id not in curr_reqs:
226                 continue
227             data.append(
228                 {
229                     "id": r_id,
230                     "text": curr_reqs[r_id]["description"],
231                     "keyword": curr_reqs[r_id]["keyword"],
232                 }
233             )
234         return data
235
236     def resolution_steps(self, resolutions):
237         """
238         :param resolutions: Loaded from contents for resolution_steps.json
239         :return: Header and text for the resolution step associated with this
240                  test case.  Returns empty string if no resolutions are
241                  provided.
242         """
243         text = (
244             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
245             for entry in resolutions
246             if self._match(entry)
247         )
248         return "".join(text)
249
250     def _match(self, resolution_entry):
251         """
252         Returns True if the test result maps to the given entry in
253         the resolutions file
254         """
255         return (
256             self.test_case == resolution_entry["function"]
257             and self.test_module == resolution_entry["module"]
258         )
259
260     def _get_files(self):
261         """
262         Extracts the list of files passed into the test case.
263         :return: List of absolute paths to files
264         """
265         if "environment_pair" in self.item.fixturenames:
266             return [
267                 "{} environment pair".format(
268                     self.item.funcargs["environment_pair"]["name"]
269                 )
270             ]
271         elif "heat_volume_pair" in self.item.fixturenames:
272             return [
273                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
274             ]
275         elif "heat_templates" in self.item.fixturenames:
276             return self.item.funcargs["heat_templates"]
277         elif "yaml_files" in self.item.fixturenames:
278             return self.item.funcargs["yaml_files"]
279         else:
280             return [self.result.nodeid.split("[")[1][:-1]]
281
282     def _get_error_message(self):
283         """
284         :return: Error message or empty string if the test did not fail or error
285         """
286         if self.is_failed:
287             return extract_error_msg(self.result)
288         else:
289             return ""
290
291
292 # noinspection PyUnusedLocal
293 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
294 def pytest_runtest_makereport(item, call):
295     """
296     Captures the test results for later reporting.  This will also halt testing
297     if a base failure is encountered (can be overridden with continue-on-failure)
298     """
299     outcome = yield
300     if outcome.get_result().when != "call":
301         return  # only capture results of test cases themselves
302     result = TestResult(item, outcome)
303     ALL_RESULTS.append(result)
304     if (
305         not item.config.option.continue_on_failure
306         and result.is_base_test
307         and result.is_failed
308     ):
309         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
310             result.error_message
311         )
312         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
313
314
315 def make_timestamp():
316     """
317     :return: String make_iso_timestamp in format:
318              2019-01-19 10:18:49.865000 Central Standard Time
319     """
320     timezone = time.tzname[time.localtime().tm_isdst]
321     return "{} {}".format(str(datetime.datetime.now()), timezone)
322
323
324 # noinspection PyUnusedLocal
325 def pytest_sessionstart(session):
326     ALL_RESULTS.clear()
327     COLLECTION_FAILURES.clear()
328
329
330 # noinspection PyUnusedLocal
331 def pytest_sessionfinish(session, exitstatus):
332     """
333     If not a self-test run, generate the output reports
334     """
335     if not session.config.option.template_dir:
336         return
337
338     if session.config.option.template_source:
339         template_source = session.config.option.template_source[0]
340     else:
341         template_source = os.path.abspath(session.config.option.template_dir[0])
342
343     categories_selected = session.config.option.test_categories or ""
344     generate_report(
345         get_output_dir(session.config),
346         template_source,
347         categories_selected,
348         session.config.option.report_format,
349     )
350
351
352 # noinspection PyUnusedLocal
353 def pytest_collection_modifyitems(session, config, items):
354     """
355     Selects tests based on the categories requested.  Tests without
356     categories will always be executed.
357     """
358     config.traceability_items = list(items)  # save all items for traceability
359     if not config.option.self_test:
360         for item in items:
361             # checking if test belongs to a category
362             if hasattr(item.function, "categories"):
363                 if config.option.test_categories:
364                     test_categories = getattr(item.function, "categories")
365                     passed_categories = config.option.test_categories
366                     if not all(
367                         category in passed_categories for category in test_categories
368                     ):
369                         item.add_marker(
370                             pytest.mark.skip(
371                                 reason="Test categories do not match all the passed categories"
372                             )
373                         )
374                 else:
375                     item.add_marker(
376                         pytest.mark.skip(
377                             reason="Test belongs to a category but no categories were passed"
378                         )
379                     )
380     items.sort(
381         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
382     )
383
384
385 def make_href(paths):
386     """
387     Create an anchor tag to link to the file paths provided.
388     :param paths: string or list of file paths
389     :return: String of hrefs - one for each path, each seperated by a line
390              break (<br/).
391     """
392     paths = [paths] if isinstance(paths, string_types) else paths
393     links = []
394     for p in paths:
395         abs_path = os.path.abspath(p)
396         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
397         links.append(
398             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
399                 abs_path=abs_path, name=name
400             )
401         )
402     return "<br/>".join(links)
403
404
405 def load_resolutions_file():
406     """
407     :return: dict of data loaded from resolutions_steps.json
408     """
409     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
410     if os.path.exists(resolution_steps):
411         with open(resolution_steps, "r") as f:
412             return json.loads(f.read())
413
414
415 def generate_report(outpath, template_path, categories, output_format="html"):
416     """
417     Generates the various output reports.
418
419     :param outpath: destination directory for all reports
420     :param template_path: directory containing the Heat templates validated
421     :param categories: Optional categories selected
422     :param output_format: One of "html", "excel", or "csv". Default is "html"
423     :raises: ValueError if requested output format is unknown
424     """
425     failures = [r for r in ALL_RESULTS if r.is_failed]
426     generate_failure_file(outpath)
427     output_format = output_format.lower().strip() if output_format else "html"
428     if output_format == "html":
429         generate_html_report(outpath, categories, template_path, failures)
430     elif output_format == "excel":
431         generate_excel_report(outpath, categories, template_path, failures)
432     elif output_format == "json":
433         generate_json(outpath, template_path, categories)
434     elif output_format == "csv":
435         generate_csv_report(outpath, categories, template_path, failures)
436     else:
437         raise ValueError("Unsupported output format: " + output_format)
438
439
440 def write_json(data, path):
441     """
442     Pretty print data as JSON to the output path requested
443
444     :param data: Data structure to be converted to JSON
445     :param path: Where to write output
446     """
447     with open(path, "w") as f:
448         json.dump(data, f, indent=2)
449
450
451 def generate_failure_file(outpath):
452     """
453     Writes a summary of test failures to a file named failures.
454     This is for backwards compatibility only.  The report.json offers a
455     more comprehensive output.
456     """
457     failure_path = os.path.join(outpath, "failures")
458     failures = [r for r in ALL_RESULTS if r.is_failed]
459     data = {}
460     for i, fail in enumerate(failures):
461         data[str(i)] = {
462             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
463             "vnfrqts": fail.requirement_ids,
464             "test": fail.test_case,
465             "test_file": fail.test_module,
466             "raw_output": fail.raw_output,
467             "message": fail.error_message,
468         }
469     write_json(data, failure_path)
470
471
472 def generate_csv_report(output_dir, categories, template_path, failures):
473     rows = [["Validation Failures"]]
474     headers = [
475         ("Categories Selected:", categories),
476         ("Tool Version:", version.VERSION),
477         ("Report Generated At:", make_timestamp()),
478         ("Directory Validated:", template_path),
479         ("Checksum:", hash_directory(template_path)),
480         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
481     ]
482     rows.append([])
483     for header in headers:
484         rows.append(header)
485     rows.append([])
486
487     if COLLECTION_FAILURES:
488         rows.append([COLLECTION_FAILURE_WARNING])
489         rows.append(["Validation File", "Test", "Fixtures", "Error"])
490         for failure in COLLECTION_FAILURES:
491             rows.append(
492                 [
493                     failure["module"],
494                     failure["test"],
495                     ";".join(failure["fixtures"]),
496                     failure["error"],
497                 ]
498             )
499         rows.append([])
500
501     # table header
502     rows.append([col for col, _ in REPORT_COLUMNS])
503
504     reqs = load_current_requirements()
505     resolutions = load_resolutions_file()
506
507     # table content
508     for failure in failures:
509         rows.append(
510             [
511                 "\n".join(failure.files),
512                 failure.test_module,
513                 failure.requirement_text(reqs),
514                 failure.resolution_steps(resolutions),
515                 failure.error_message,
516                 failure.raw_output,
517             ]
518         )
519
520     output_path = os.path.join(output_dir, "report.csv")
521     with open(output_path, "w", newline="") as f:
522         writer = csv.writer(f)
523         for row in rows:
524             writer.writerow(row)
525
526
527 def generate_excel_report(output_dir, categories, template_path, failures):
528     output_path = os.path.join(output_dir, "report.xlsx")
529     workbook = xlsxwriter.Workbook(output_path)
530     bold = workbook.add_format({"bold": True})
531     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
532     normal = workbook.add_format({"text_wrap": True})
533     heading = workbook.add_format({"bold": True, "font_size": 18})
534     worksheet = workbook.add_worksheet("failures")
535     worksheet.write(0, 0, "Validation Failures", heading)
536
537     headers = [
538         ("Categories Selected:", ",".join(categories)),
539         ("Tool Version:", version.VERSION),
540         ("Report Generated At:", make_timestamp()),
541         ("Directory Validated:", template_path),
542         ("Checksum:", hash_directory(template_path)),
543         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
544     ]
545     for row, (header, value) in enumerate(headers, start=2):
546         worksheet.write(row, 0, header, bold)
547         worksheet.write(row, 1, value)
548
549     worksheet.set_column(0, len(headers) - 1, 40)
550     worksheet.set_column(len(headers), len(headers), 80)
551
552     if COLLECTION_FAILURES:
553         collection_failures_start = 2 + len(headers) + 2
554         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
555         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
556         for col_num, col_name in enumerate(collection_failure_headers):
557             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
558         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
559             worksheet.write(row, 0, data["module"])
560             worksheet.write(row, 1, data["test"])
561             worksheet.write(row, 2, ",".join(data["fixtures"]))
562             worksheet.write(row, 3, data["error"], code)
563
564     # table header
565     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
566     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
567     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
568         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
569
570     reqs = load_current_requirements()
571     resolutions = load_resolutions_file()
572
573     # table content
574     for row, failure in enumerate(failures, start=start_error_table_row + 2):
575         worksheet.write(row, 0, "\n".join(failure.files), normal)
576         worksheet.write(row, 1, failure.test_module, normal)
577         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
578         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
579         worksheet.write(row, 4, failure.error_message, normal)
580         worksheet.write(row, 5, failure.raw_output, code)
581
582     workbook.close()
583
584
585 def make_iso_timestamp():
586     """
587     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
588     """
589     now = datetime.datetime.utcnow()
590     now.replace(tzinfo=datetime.timezone.utc)
591     return now.isoformat()
592
593
594 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
595     """
596     Examines all tests associated with a given requirement and determines
597     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
598
599     * ERROR - At least one ERROR occurred
600     * PASS -  At least one PASS and no FAIL or ERRORs.
601     * FAIL -  At least one FAIL occurred (no ERRORs)
602     * SKIP - All tests were SKIP
603
604
605     :param r_id: Requirement ID to examing
606     :param collection_failures: Errors that occurred during test setup.
607     :param test_results: List of TestResult
608     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
609     """
610     errors = any(r_id in f["requirements"] for f in collection_failures)
611     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
612     return aggregate_results(errors, outcomes, r_id)
613
614
615 def aggregate_results(has_errors, outcomes, r_id=None):
616     """
617     Determines the aggregate result for the conditions provided.  Assumes the
618     results have been filtered and collected for analysis.
619
620     :param has_errors: True if collection failures occurred for the tests being
621                        analyzed.
622     :param outcomes: set of outcomes from the TestResults
623     :param r_id: Optional requirement ID if known
624     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
625              (see aggregate_requirement_adherence for more detail)
626     """
627     if has_errors:
628         return "ERROR"
629
630     if not outcomes:
631         return "PASS"
632     elif "FAIL" in outcomes:
633         return "FAIL"
634     elif "PASS" in outcomes:
635         return "PASS"
636     elif {"SKIP"} == outcomes:
637         return "SKIP"
638     else:
639         pytest.warns(
640             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
641                 outcomes, r_id
642             )
643         )
644         return "ERROR"
645
646
647 def aggregate_run_results(collection_failures, test_results):
648     """
649     Determines overall status of run based on all failures and results.
650
651     * 'ERROR' - At least one collection failure occurred during the run.
652     * 'FAIL' - Template failed at least one test
653     * 'PASS' - All tests executed properly and no failures were detected
654
655     :param collection_failures: failures occuring during test setup
656     :param test_results: list of all test executuion results
657     :return: one of 'ERROR', 'FAIL', or 'PASS'
658     """
659     if collection_failures:
660         return "ERROR"
661     elif any(r.is_failed for r in test_results):
662         return "FAIL"
663     else:
664         return "PASS"
665
666
667 def error(failure_or_result):
668     """
669     Extracts the error message from a collection failure or test result
670     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
671     :return: Error message as string
672     """
673     if isinstance(failure_or_result, TestResult):
674         return failure_or_result.error_message
675     else:
676         return failure_or_result["error"]
677
678
679 def req_ids(failure_or_result):
680     """
681     Extracts the requirement IDs from a collection failure or test result
682     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
683     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
684     """
685     if isinstance(failure_or_result, TestResult):
686         return set(failure_or_result.requirement_ids)
687     else:
688         return set(failure_or_result["requirements"])
689
690
691 def collect_errors(r_id, collection_failures, test_result):
692     """
693     Creates a list of error messages from the collection failures and
694     test results.  If r_id is provided, then it collects the error messages
695     where the failure or test is associated with that requirement ID.  If
696     r_id is None, then it collects all errors that occur on failures and
697     results that are not mapped to requirements
698     """
699
700     def selector(item):
701         if r_id:
702             return r_id in req_ids(item)
703         else:
704             return not req_ids(item)
705
706     errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
707     return [e for e in errors if e]
708
709
710 def generate_json(outpath, template_path, categories):
711     """
712     Creates a JSON summary of the entire test run.
713     """
714     reqs = load_current_requirements()
715     data = {
716         "version": "dublin",
717         "template_directory": template_path,
718         "timestamp": make_iso_timestamp(),
719         "checksum": hash_directory(template_path),
720         "categories": categories,
721         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
722         "tests": [],
723         "requirements": [],
724     }
725
726     results = data["tests"]
727     for result in COLLECTION_FAILURES:
728         results.append(
729             {
730                 "files": [],
731                 "test_module": result["module"],
732                 "test_case": result["test"],
733                 "result": "ERROR",
734                 "error": result["error"],
735                 "requirements": result["requirements"],
736             }
737         )
738     for result in ALL_RESULTS:
739         results.append(
740             {
741                 "files": result.files,
742                 "test_module": result.test_module,
743                 "test_case": result.test_case,
744                 "result": result.outcome,
745                 "error": result.error_message if result.is_failed else "",
746                 "requirements": result.requirements_metadata(reqs),
747             }
748         )
749
750     requirements = data["requirements"]
751     for r_id, r_data in reqs.items():
752         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
753         if result:
754             requirements.append(
755                 {
756                     "id": r_id,
757                     "text": r_data["description"],
758                     "keyword": r_data["keyword"],
759                     "result": result,
760                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
761                 }
762             )
763     # If there are tests that aren't mapped to a requirement, then we'll
764     # map them to a special entry so the results are coherent.
765     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
766     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
767     if unmapped_outcomes or has_errors:
768         requirements.append(
769             {
770                 "id": "Unmapped",
771                 "text": "Tests not mapped to requirements (see tests)",
772                 "result": aggregate_results(has_errors, unmapped_outcomes),
773                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
774             }
775         )
776
777     report_path = os.path.join(outpath, "report.json")
778     write_json(data, report_path)
779
780
781 def generate_html_report(outpath, categories, template_path, failures):
782     reqs = load_current_requirements()
783     resolutions = load_resolutions_file()
784     fail_data = []
785     for failure in failures:
786         fail_data.append(
787             {
788                 "file_links": make_href(failure.files),
789                 "test_id": failure.test_module,
790                 "error_message": failure.error_message,
791                 "raw_output": failure.raw_output,
792                 "requirements": docutils.core.publish_parts(
793                     writer_name="html", source=failure.requirement_text(reqs)
794                 )["body"],
795                 "resolution_steps": failure.resolution_steps(resolutions),
796             }
797         )
798     pkg_dir = os.path.split(__file__)[0]
799     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
800     with open(j2_template_path, "r") as f:
801         report_template = jinja2.Template(f.read())
802         contents = report_template.render(
803             version=version.VERSION,
804             num_failures=len(failures) + len(COLLECTION_FAILURES),
805             categories=categories,
806             template_dir=make_href(template_path),
807             checksum=hash_directory(template_path),
808             timestamp=make_timestamp(),
809             failures=fail_data,
810             collection_failures=COLLECTION_FAILURES,
811         )
812     with open(os.path.join(outpath, "report.html"), "w") as f:
813         f.write(contents)
814
815
816 def pytest_addoption(parser):
817     """
818     Add needed CLI arguments
819     """
820     parser.addoption(
821         "--template-directory",
822         dest="template_dir",
823         action="append",
824         help="Directory which holds the templates for validation",
825     )
826
827     parser.addoption(
828         "--template-source",
829         dest="template_source",
830         action="append",
831         help="Source Directory which holds the templates for validation",
832     )
833
834     parser.addoption(
835         "--self-test",
836         dest="self_test",
837         action="store_true",
838         help="Test the unit tests against their fixtured data",
839     )
840
841     parser.addoption(
842         "--report-format",
843         dest="report_format",
844         action="store",
845         help="Format of output report (html, csv, excel, json)",
846     )
847
848     parser.addoption(
849         "--continue-on-failure",
850         dest="continue_on_failure",
851         action="store_true",
852         help="Continue validation even when structural errors exist in input files",
853     )
854
855     parser.addoption(
856         "--output-directory",
857         dest="output_dir",
858         action="store",
859         default=None,
860         help="Alternate ",
861     )
862
863     parser.addoption(
864         "--category",
865         dest="test_categories",
866         action="append",
867         help="optional category of test to execute",
868     )
869
870
871 def pytest_configure(config):
872     """
873     Ensure that we are receive either `--self-test` or
874     `--template-dir=<directory` as CLI arguments
875     """
876     if config.getoption("template_dir") and config.getoption("self_test"):
877         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
878     if not (
879         config.getoption("template_dir")
880         or config.getoption("self_test")
881         or config.getoption("help")
882     ):
883         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
884
885
886 def pytest_generate_tests(metafunc):
887     """
888     If a unit test requires an argument named 'filename'
889     we generate a test for the filenames selected. Either
890     the files contained in `template_dir` or if `template_dir`
891     is not specified on the CLI, the fixtures associated with this
892     test name.
893     """
894
895     # noinspection PyBroadException
896     try:
897         if "filename" in metafunc.fixturenames:
898             from .parametrizers import parametrize_filename
899
900             parametrize_filename(metafunc)
901
902         if "filenames" in metafunc.fixturenames:
903             from .parametrizers import parametrize_filenames
904
905             parametrize_filenames(metafunc)
906
907         if "template_dir" in metafunc.fixturenames:
908             from .parametrizers import parametrize_template_dir
909
910             parametrize_template_dir(metafunc)
911
912         if "environment_pair" in metafunc.fixturenames:
913             from .parametrizers import parametrize_environment_pair
914
915             parametrize_environment_pair(metafunc)
916
917         if "heat_volume_pair" in metafunc.fixturenames:
918             from .parametrizers import parametrize_heat_volume_pair
919
920             parametrize_heat_volume_pair(metafunc)
921
922         if "yaml_files" in metafunc.fixturenames:
923             from .parametrizers import parametrize_yaml_files
924
925             parametrize_yaml_files(metafunc)
926
927         if "env_files" in metafunc.fixturenames:
928             from .parametrizers import parametrize_environment_files
929
930             parametrize_environment_files(metafunc)
931
932         if "yaml_file" in metafunc.fixturenames:
933             from .parametrizers import parametrize_yaml_file
934
935             parametrize_yaml_file(metafunc)
936
937         if "env_file" in metafunc.fixturenames:
938             from .parametrizers import parametrize_environment_file
939
940             parametrize_environment_file(metafunc)
941
942         if "parsed_yaml_file" in metafunc.fixturenames:
943             from .parametrizers import parametrize_parsed_yaml_file
944
945             parametrize_parsed_yaml_file(metafunc)
946
947         if "parsed_environment_file" in metafunc.fixturenames:
948             from .parametrizers import parametrize_parsed_environment_file
949
950             parametrize_parsed_environment_file(metafunc)
951
952         if "heat_template" in metafunc.fixturenames:
953             from .parametrizers import parametrize_heat_template
954
955             parametrize_heat_template(metafunc)
956
957         if "heat_templates" in metafunc.fixturenames:
958             from .parametrizers import parametrize_heat_templates
959
960             parametrize_heat_templates(metafunc)
961
962         if "volume_template" in metafunc.fixturenames:
963             from .parametrizers import parametrize_volume_template
964
965             parametrize_volume_template(metafunc)
966
967         if "volume_templates" in metafunc.fixturenames:
968             from .parametrizers import parametrize_volume_templates
969
970             parametrize_volume_templates(metafunc)
971
972         if "template" in metafunc.fixturenames:
973             from .parametrizers import parametrize_template
974
975             parametrize_template(metafunc)
976
977         if "templates" in metafunc.fixturenames:
978             from .parametrizers import parametrize_templates
979
980             parametrize_templates(metafunc)
981     except Exception as e:
982         # If an error occurs in the collection phase, then it won't be logged as a
983         # normal test failure.  This means that failures could occur, but not
984         # be seen on the report resulting in a false positive success message.  These
985         # errors will be stored and reported separately on the report
986         COLLECTION_FAILURES.append(
987             {
988                 "module": metafunc.module.__name__,
989                 "test": metafunc.function.__name__,
990                 "fixtures": metafunc.fixturenames,
991                 "error": traceback.format_exc(),
992                 "requirements": getattr(metafunc.function, "requirement_ids", []),
993             }
994         )
995         raise e
996
997
998 def hash_directory(path):
999     md5 = hashlib.md5()
1000     for dir_path, sub_dirs, filenames in os.walk(path):
1001         for filename in filenames:
1002             file_path = os.path.join(dir_path, filename)
1003             with open(file_path, "rb") as f:
1004                 md5.update(f.read())
1005     return md5.hexdigest()
1006
1007
1008 def load_current_requirements():
1009     """Loads dict of current requirements or empty dict if file doesn't exist"""
1010     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1011         data = json.load(f)
1012         version = data["current_version"]
1013         return data["versions"][version]["needs"]
1014
1015
1016 def compat_open(path):
1017     """Invokes open correctly depending on the Python version"""
1018     if sys.version_info.major < 3:
1019         return open(path, "wb")
1020     else:
1021         return open(path, "w", newline="")
1022
1023
1024 def unicode_writerow(writer, row):
1025     if sys.version_info.major < 3:
1026         row = [s.encode("utf8") for s in row]
1027     writer.writerow(row)
1028
1029
1030 def parse_heat_requirements(reqs):
1031     """Takes requirements and returns list of only Heat requirements"""
1032     data = json.loads(reqs)
1033     for key, values in list(data.items()):
1034         if "Heat" in (values["docname"]):
1035             if "MUST" not in (values["keyword"]):
1036                 del data[key]
1037             else:
1038                 if "none" in (values["validation_mode"]):
1039                     del data[key]
1040         else:
1041             del data[key]
1042     return data
1043
1044
1045 # noinspection PyUnusedLocal
1046 def pytest_report_collectionfinish(config, startdir, items):
1047     """Generates a simple traceability report to output/traceability.csv"""
1048     traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
1049     output_dir = os.path.split(traceability_path)[0]
1050     if not os.path.exists(output_dir):
1051         os.makedirs(output_dir)
1052     reqs = load_current_requirements()
1053     reqs = json.dumps(reqs)
1054     requirements = parse_heat_requirements(reqs)
1055     unmapped, mapped = partition(
1056         lambda i: hasattr(i.function, "requirement_ids"), items
1057     )
1058
1059     req_to_test = defaultdict(set)
1060     mapping_errors = set()
1061     for item in mapped:
1062         for req_id in item.function.requirement_ids:
1063             if req_id not in req_to_test:
1064                 req_to_test[req_id].add(item)
1065             if req_id not in requirements:
1066                 mapping_errors.add(
1067                     (req_id, item.function.__module__, item.function.__name__)
1068                 )
1069
1070     mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
1071     with compat_open(mapping_error_path) as f:
1072         writer = csv.writer(f)
1073         for err in mapping_errors:
1074             unicode_writerow(writer, err)
1075
1076     with compat_open(traceability_path) as f:
1077         out = csv.writer(f)
1078         unicode_writerow(
1079             out,
1080             ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
1081         )
1082         for req_id, metadata in requirements.items():
1083             if req_to_test[req_id]:
1084                 for item in req_to_test[req_id]:
1085                     unicode_writerow(
1086                         out,
1087                         (
1088                             req_id,
1089                             metadata["description"],
1090                             metadata["section_name"],
1091                             item.function.__module__,
1092                             item.function.__name__,
1093                         ),
1094                     )
1095             else:
1096                 unicode_writerow(
1097                     out,
1098                     (req_id, metadata["description"], metadata["section_name"], "", ""),
1099                 )
1100         # now write out any test methods that weren't mapped to requirements
1101         for item in unmapped:
1102             unicode_writerow(
1103                 out, ("", "", "", item.function.__module__, item.function.__name__)
1104             )