[VVP] Ensure report.json always produced
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47 from itertools import chain
48
49 import traceback
50
51 import docutils.core
52 import jinja2
53 import pytest
54 from more_itertools import partition
55 import xlsxwriter
56 from six import string_types
57
58 import version
59
60 __path__ = [os.path.dirname(os.path.abspath(__file__))]
61
62 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
63
64 RESOLUTION_STEPS_FILE = "resolution_steps.json"
65 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
66 TEST_SCRIPT_SITE = "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
67 VNFRQTS_ID_URL = "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
68
69 REPORT_COLUMNS = [
70     ("Input File", "file"),
71     ("Test", "test_file"),
72     ("Requirements", "req_description"),
73     ("Resolution Steps", "resolution_steps"),
74     ("Error Message", "message"),
75     ("Raw Test Output", "raw_output"),
76 ]
77
78 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
79 while preparing to validate the the input files. Some validations may not have been
80 executed. Please refer these issue to the VNF Validation Tool team.
81 """
82
83 COLLECTION_FAILURES = []
84
85 # Captures the results of every test run
86 ALL_RESULTS = []
87
88
89 def get_output_dir(config):
90     """
91     Retrieve the output directory for the reports and create it if necessary
92     :param config: pytest configuration
93     :return: output directory as string
94     """
95     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
96     if not os.path.exists(output_dir):
97         os.makedirs(output_dir, exist_ok=True)
98     return output_dir
99
100
101 def extract_error_msg(rep):
102     """
103     If a custom error message was provided, then extract it otherwise
104     just show the pytest assert message
105     """
106     if rep.outcome != "failed":
107         return ""
108     try:
109         full_msg = str(rep.longrepr.reprcrash.message)
110         match = re.match(
111             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
112         )
113         if match:  # custom message was provided
114             # Extract everything between AssertionError and the start
115             # of the assert statement expansion in the pytest report
116             msg = match.group(1)
117         else:
118             msg = str(rep.longrepr.reprcrash)
119             if "AssertionError:" in msg:
120                 msg = msg.split("AssertionError:")[1]
121     except AttributeError:
122         msg = str(rep)
123
124     return msg
125
126
127 class TestResult:
128     """
129     Wraps the test case and result to extract necessary metadata for
130     reporting purposes.
131     """
132
133     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
134
135     def __init__(self, item, outcome):
136         self.item = item
137         self.result = outcome.get_result()
138         self.files = [os.path.normpath(p) for p in self._get_files()]
139         self.error_message = self._get_error_message()
140
141     @property
142     def requirement_ids(self):
143         """
144         Returns list of requirement IDs mapped to the test case.
145
146         :return: Returns a list of string requirement IDs the test was
147                  annotated with ``validates`` otherwise returns and empty list
148         """
149         is_mapped = hasattr(self.item.function, "requirement_ids")
150         return self.item.function.requirement_ids if is_mapped else []
151
152     @property
153     def markers(self):
154         """
155         :return: Returns a set of pytest marker names for the test or an empty set
156         """
157         return set(m.name for m in self.item.iter_markers())
158
159     @property
160     def is_base_test(self):
161         """
162         :return: Returns True if the test is annotated with a pytest marker called base
163         """
164         return "base" in self.markers
165
166     @property
167     def is_failed(self):
168         """
169         :return: True if the test failed
170         """
171         return self.outcome == "FAIL"
172
173     @property
174     def outcome(self):
175         """
176         :return: Returns 'PASS', 'FAIL', or 'SKIP'
177         """
178         return self.RESULT_MAPPING[self.result.outcome]
179
180     @property
181     def test_case(self):
182         """
183         :return: Name of the test case method
184         """
185         return self.item.function.__name__
186
187     @property
188     def test_module(self):
189         """
190         :return: Name of the file containing the test case
191         """
192         return self.item.function.__module__.split(".")[-1]
193
194     @property
195     def raw_output(self):
196         """
197         :return: Full output from pytest for the given test case
198         """
199         return str(self.result.longrepr)
200
201     def requirement_text(self, curr_reqs):
202         """
203         Creates a text summary for the requirement IDs mapped to the test case.
204         If no requirements are mapped, then it returns the empty string.
205
206         :param curr_reqs: mapping of requirement IDs to requirement metadata
207                           loaded from the VNFRQTS projects needs.json output
208         :return: ID and text of the requirements mapped to the test case
209         """
210         text = (
211             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
212             for r_id in self.requirement_ids if r_id in curr_reqs
213         )
214         return "".join(text)
215
216     def requirements_metadata(self, curr_reqs):
217         """
218         Returns a list of dicts containing the following metadata for each
219         requirement mapped:
220
221         - id: Requirement ID
222         - text: Full text of the requirement
223         - keyword: MUST, MUST NOT, MAY, etc.
224
225         :param curr_reqs: mapping of requirement IDs to requirement metadata
226                           loaded from the VNFRQTS projects needs.json output
227         :return: List of requirement metadata
228         """
229         data = []
230         for r_id in self.requirement_ids:
231             if r_id not in curr_reqs:
232                 continue
233             data.append(
234                 {
235                     "id": r_id,
236                     "text": curr_reqs[r_id]["description"],
237                     "keyword": curr_reqs[r_id]["keyword"],
238                 }
239             )
240         return data
241
242     def resolution_steps(self, resolutions):
243         """
244         :param resolutions: Loaded from contents for resolution_steps.json
245         :return: Header and text for the resolution step associated with this
246                  test case.  Returns empty string if no resolutions are
247                  provided.
248         """
249         text = (
250             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
251             for entry in resolutions
252             if self._match(entry)
253         )
254         return "".join(text)
255
256     def _match(self, resolution_entry):
257         """
258         Returns True if the test result maps to the given entry in
259         the resolutions file
260         """
261         return (
262             self.test_case == resolution_entry["function"]
263             and self.test_module == resolution_entry["module"]
264         )
265
266     def _get_files(self):
267         """
268         Extracts the list of files passed into the test case.
269         :return: List of absolute paths to files
270         """
271         if "environment_pair" in self.item.fixturenames:
272             return [
273                 "{} environment pair".format(
274                     self.item.funcargs["environment_pair"]["name"]
275                 )
276             ]
277         elif "heat_volume_pair" in self.item.fixturenames:
278             return [
279                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
280             ]
281         elif "heat_templates" in self.item.fixturenames:
282             return self.item.funcargs["heat_templates"]
283         elif "yaml_files" in self.item.fixturenames:
284             return self.item.funcargs["yaml_files"]
285         else:
286             parts = self.result.nodeid.split("[")
287             return [""] if len(parts) == 1 else [parts[1][:-1]]
288
289     def _get_error_message(self):
290         """
291         :return: Error message or empty string if the test did not fail or error
292         """
293         if self.is_failed:
294             return extract_error_msg(self.result)
295         else:
296             return ""
297
298
299 # noinspection PyUnusedLocal
300 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
301 def pytest_runtest_makereport(item, call):
302     """
303     Captures the test results for later reporting.  This will also halt testing
304     if a base failure is encountered (can be overridden with continue-on-failure)
305     """
306     outcome = yield
307     if outcome.get_result().when != "call":
308         return  # only capture results of test cases themselves
309     result = TestResult(item, outcome)
310     ALL_RESULTS.append(result)
311     if (
312         not item.config.option.continue_on_failure
313         and result.is_base_test
314         and result.is_failed
315     ):
316         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
317             result.error_message
318         )
319         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
320
321
322 def make_timestamp():
323     """
324     :return: String make_iso_timestamp in format:
325              2019-01-19 10:18:49.865000 Central Standard Time
326     """
327     timezone = time.tzname[time.localtime().tm_isdst]
328     return "{} {}".format(str(datetime.datetime.now()), timezone)
329
330
331 # noinspection PyUnusedLocal
332 def pytest_sessionstart(session):
333     ALL_RESULTS.clear()
334     COLLECTION_FAILURES.clear()
335
336
337 # noinspection PyUnusedLocal
338 def pytest_sessionfinish(session, exitstatus):
339     """
340     If not a self-test run, generate the output reports
341     """
342     if not session.config.option.template_dir:
343         return
344
345     if session.config.option.template_source:
346         template_source = session.config.option.template_source[0]
347     else:
348         template_source = os.path.abspath(session.config.option.template_dir[0])
349
350     categories_selected = session.config.option.test_categories or ""
351     generate_report(
352         get_output_dir(session.config),
353         template_source,
354         categories_selected,
355         session.config.option.report_format,
356     )
357
358
359 # noinspection PyUnusedLocal
360 def pytest_collection_modifyitems(session, config, items):
361     """
362     Selects tests based on the categories requested.  Tests without
363     categories will always be executed.
364     """
365     config.traceability_items = list(items)  # save all items for traceability
366     if not config.option.self_test:
367         for item in items:
368             # checking if test belongs to a category
369             if hasattr(item.function, "categories"):
370                 if config.option.test_categories:
371                     test_categories = getattr(item.function, "categories")
372                     passed_categories = config.option.test_categories
373                     if not all(
374                         category in passed_categories for category in test_categories
375                     ):
376                         item.add_marker(
377                             pytest.mark.skip(
378                                 reason="Test categories do not match all the passed categories"
379                             )
380                         )
381                 else:
382                     item.add_marker(
383                         pytest.mark.skip(
384                             reason="Test belongs to a category but no categories were passed"
385                         )
386                     )
387     items.sort(
388         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
389     )
390
391
392 def make_href(paths):
393     """
394     Create an anchor tag to link to the file paths provided.
395     :param paths: string or list of file paths
396     :return: String of hrefs - one for each path, each seperated by a line
397              break (<br/).
398     """
399     paths = [paths] if isinstance(paths, string_types) else paths
400     links = []
401     for p in paths:
402         abs_path = os.path.abspath(p)
403         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
404         links.append(
405             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
406                 abs_path=abs_path, name=name
407             )
408         )
409     return "<br/>".join(links)
410
411
412 def load_resolutions_file():
413     """
414     :return: dict of data loaded from resolutions_steps.json
415     """
416     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
417     if os.path.exists(resolution_steps):
418         with open(resolution_steps, "r") as f:
419             return json.loads(f.read())
420
421
422 def generate_report(outpath, template_path, categories, output_format="html"):
423     """
424     Generates the various output reports.
425
426     :param outpath: destination directory for all reports
427     :param template_path: directory containing the Heat templates validated
428     :param categories: Optional categories selected
429     :param output_format: One of "html", "excel", or "csv". Default is "html"
430     :raises: ValueError if requested output format is unknown
431     """
432     failures = [r for r in ALL_RESULTS if r.is_failed]
433     generate_failure_file(outpath)
434     output_format = output_format.lower().strip() if output_format else "html"
435     generate_json(outpath, template_path, categories)
436     if output_format == "html":
437         generate_html_report(outpath, categories, template_path, failures)
438     elif output_format == "excel":
439         generate_excel_report(outpath, categories, template_path, failures)
440     elif output_format == "json":
441         return
442     elif output_format == "csv":
443         generate_csv_report(outpath, categories, template_path, failures)
444     else:
445         raise ValueError("Unsupported output format: " + output_format)
446
447
448 def write_json(data, path):
449     """
450     Pretty print data as JSON to the output path requested
451
452     :param data: Data structure to be converted to JSON
453     :param path: Where to write output
454     """
455     with open(path, "w") as f:
456         json.dump(data, f, indent=2)
457
458
459 def generate_failure_file(outpath):
460     """
461     Writes a summary of test failures to a file named failures.
462     This is for backwards compatibility only.  The report.json offers a
463     more comprehensive output.
464     """
465     failure_path = os.path.join(outpath, "failures")
466     failures = [r for r in ALL_RESULTS if r.is_failed]
467     data = {}
468     for i, fail in enumerate(failures):
469         data[str(i)] = {
470             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
471             "vnfrqts": fail.requirement_ids,
472             "test": fail.test_case,
473             "test_file": fail.test_module,
474             "raw_output": fail.raw_output,
475             "message": fail.error_message,
476         }
477     write_json(data, failure_path)
478
479
480 def generate_csv_report(output_dir, categories, template_path, failures):
481     rows = [["Validation Failures"]]
482     headers = [
483         ("Categories Selected:", categories),
484         ("Tool Version:", version.VERSION),
485         ("Report Generated At:", make_timestamp()),
486         ("Directory Validated:", template_path),
487         ("Checksum:", hash_directory(template_path)),
488         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
489     ]
490     rows.append([])
491     for header in headers:
492         rows.append(header)
493     rows.append([])
494
495     if COLLECTION_FAILURES:
496         rows.append([COLLECTION_FAILURE_WARNING])
497         rows.append(["Validation File", "Test", "Fixtures", "Error"])
498         for failure in COLLECTION_FAILURES:
499             rows.append(
500                 [
501                     failure["module"],
502                     failure["test"],
503                     ";".join(failure["fixtures"]),
504                     failure["error"],
505                 ]
506             )
507         rows.append([])
508
509     # table header
510     rows.append([col for col, _ in REPORT_COLUMNS])
511
512     reqs = load_current_requirements()
513     resolutions = load_resolutions_file()
514
515     # table content
516     for failure in failures:
517         rows.append(
518             [
519                 "\n".join(failure.files),
520                 failure.test_module,
521                 failure.requirement_text(reqs),
522                 failure.resolution_steps(resolutions),
523                 failure.error_message,
524                 failure.raw_output,
525             ]
526         )
527
528     output_path = os.path.join(output_dir, "report.csv")
529     with open(output_path, "w", newline="") as f:
530         writer = csv.writer(f)
531         for row in rows:
532             writer.writerow(row)
533
534
535 def generate_excel_report(output_dir, categories, template_path, failures):
536     output_path = os.path.join(output_dir, "report.xlsx")
537     workbook = xlsxwriter.Workbook(output_path)
538     bold = workbook.add_format({"bold": True})
539     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
540     normal = workbook.add_format({"text_wrap": True})
541     heading = workbook.add_format({"bold": True, "font_size": 18})
542     worksheet = workbook.add_worksheet("failures")
543     worksheet.write(0, 0, "Validation Failures", heading)
544
545     headers = [
546         ("Categories Selected:", ",".join(categories)),
547         ("Tool Version:", version.VERSION),
548         ("Report Generated At:", make_timestamp()),
549         ("Directory Validated:", template_path),
550         ("Checksum:", hash_directory(template_path)),
551         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
552     ]
553     for row, (header, value) in enumerate(headers, start=2):
554         worksheet.write(row, 0, header, bold)
555         worksheet.write(row, 1, value)
556
557     worksheet.set_column(0, len(headers) - 1, 40)
558     worksheet.set_column(len(headers), len(headers), 80)
559
560     if COLLECTION_FAILURES:
561         collection_failures_start = 2 + len(headers) + 2
562         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
563         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
564         for col_num, col_name in enumerate(collection_failure_headers):
565             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
566         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
567             worksheet.write(row, 0, data["module"])
568             worksheet.write(row, 1, data["test"])
569             worksheet.write(row, 2, ",".join(data["fixtures"]))
570             worksheet.write(row, 3, data["error"], code)
571
572     # table header
573     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
574     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
575     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
576         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
577
578     reqs = load_current_requirements()
579     resolutions = load_resolutions_file()
580
581     # table content
582     for row, failure in enumerate(failures, start=start_error_table_row + 2):
583         worksheet.write(row, 0, "\n".join(failure.files), normal)
584         worksheet.write(row, 1, failure.test_module, normal)
585         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
586         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
587         worksheet.write(row, 4, failure.error_message, normal)
588         worksheet.write(row, 5, failure.raw_output, code)
589
590     workbook.close()
591
592
593 def make_iso_timestamp():
594     """
595     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
596     """
597     now = datetime.datetime.utcnow()
598     now.replace(tzinfo=datetime.timezone.utc)
599     return now.isoformat()
600
601
602 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
603     """
604     Examines all tests associated with a given requirement and determines
605     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
606
607     * ERROR - At least one ERROR occurred
608     * PASS -  At least one PASS and no FAIL or ERRORs.
609     * FAIL -  At least one FAIL occurred (no ERRORs)
610     * SKIP - All tests were SKIP
611
612
613     :param r_id: Requirement ID to examing
614     :param collection_failures: Errors that occurred during test setup.
615     :param test_results: List of TestResult
616     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
617     """
618     errors = any(r_id in f["requirements"] for f in collection_failures)
619     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
620     return aggregate_results(errors, outcomes, r_id)
621
622
623 def aggregate_results(has_errors, outcomes, r_id=None):
624     """
625     Determines the aggregate result for the conditions provided.  Assumes the
626     results have been filtered and collected for analysis.
627
628     :param has_errors: True if collection failures occurred for the tests being
629                        analyzed.
630     :param outcomes: set of outcomes from the TestResults
631     :param r_id: Optional requirement ID if known
632     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
633              (see aggregate_requirement_adherence for more detail)
634     """
635     if has_errors:
636         return "ERROR"
637
638     if not outcomes:
639         return "PASS"
640     elif "FAIL" in outcomes:
641         return "FAIL"
642     elif "PASS" in outcomes:
643         return "PASS"
644     elif {"SKIP"} == outcomes:
645         return "SKIP"
646     else:
647         pytest.warns(
648             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
649                 outcomes, r_id
650             )
651         )
652         return "ERROR"
653
654
655 def aggregate_run_results(collection_failures, test_results):
656     """
657     Determines overall status of run based on all failures and results.
658
659     * 'ERROR' - At least one collection failure occurred during the run.
660     * 'FAIL' - Template failed at least one test
661     * 'PASS' - All tests executed properly and no failures were detected
662
663     :param collection_failures: failures occuring during test setup
664     :param test_results: list of all test executuion results
665     :return: one of 'ERROR', 'FAIL', or 'PASS'
666     """
667     if collection_failures:
668         return "ERROR"
669     elif any(r.is_failed for r in test_results):
670         return "FAIL"
671     else:
672         return "PASS"
673
674
675 def error(failure_or_result):
676     """
677     Extracts the error message from a collection failure or test result
678     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
679     :return: Error message as string
680     """
681     if isinstance(failure_or_result, TestResult):
682         return failure_or_result.error_message
683     else:
684         return failure_or_result["error"]
685
686
687 def req_ids(failure_or_result):
688     """
689     Extracts the requirement IDs from a collection failure or test result
690     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
691     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
692     """
693     if isinstance(failure_or_result, TestResult):
694         return set(failure_or_result.requirement_ids)
695     else:
696         return set(failure_or_result["requirements"])
697
698
699 def collect_errors(r_id, collection_failures, test_result):
700     """
701     Creates a list of error messages from the collection failures and
702     test results.  If r_id is provided, then it collects the error messages
703     where the failure or test is associated with that requirement ID.  If
704     r_id is None, then it collects all errors that occur on failures and
705     results that are not mapped to requirements
706     """
707
708     def selector(item):
709         if r_id:
710             return r_id in req_ids(item)
711         else:
712             return not req_ids(item)
713
714     errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
715     return [e for e in errors if e]
716
717
718 def relative_paths(base_dir, paths):
719     return [os.path.relpath(p, base_dir) for p in paths]
720
721
722 def generate_json(outpath, template_path, categories):
723     """
724     Creates a JSON summary of the entire test run.
725     """
726     reqs = load_current_requirements()
727     data = {
728         "version": "dublin",
729         "template_directory": os.path.splitdrive(template_path)[1].replace(
730             os.path.sep, "/"
731         ),
732         "timestamp": make_iso_timestamp(),
733         "checksum": hash_directory(template_path),
734         "categories": categories,
735         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
736         "tests": [],
737         "requirements": [],
738     }
739
740     results = data["tests"]
741     for result in COLLECTION_FAILURES:
742         results.append(
743             {
744                 "files": [],
745                 "test_module": result["module"],
746                 "test_case": result["test"],
747                 "result": "ERROR",
748                 "error": result["error"],
749                 "requirements": result["requirements"],
750             }
751         )
752     for result in ALL_RESULTS:
753         results.append(
754             {
755                 "files": relative_paths(template_path, result.files),
756                 "test_module": result.test_module,
757                 "test_case": result.test_case,
758                 "result": result.outcome,
759                 "error": result.error_message if result.is_failed else "",
760                 "requirements": result.requirements_metadata(reqs),
761             }
762         )
763
764     requirements = data["requirements"]
765     for r_id, r_data in reqs.items():
766         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
767         if result:
768             requirements.append(
769                 {
770                     "id": r_id,
771                     "text": r_data["description"],
772                     "keyword": r_data["keyword"],
773                     "result": result,
774                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
775                 }
776             )
777     # If there are tests that aren't mapped to a requirement, then we'll
778     # map them to a special entry so the results are coherent.
779     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
780     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
781     if unmapped_outcomes or has_errors:
782         requirements.append(
783             {
784                 "id": "Unmapped",
785                 "text": "Tests not mapped to requirements (see tests)",
786                 "result": aggregate_results(has_errors, unmapped_outcomes),
787                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
788             }
789         )
790
791     report_path = os.path.join(outpath, "report.json")
792     write_json(data, report_path)
793
794
795 def generate_html_report(outpath, categories, template_path, failures):
796     reqs = load_current_requirements()
797     resolutions = load_resolutions_file()
798     fail_data = []
799     for failure in failures:
800         fail_data.append(
801             {
802                 "file_links": make_href(failure.files),
803                 "test_id": failure.test_module,
804                 "error_message": failure.error_message,
805                 "raw_output": failure.raw_output,
806                 "requirements": docutils.core.publish_parts(
807                     writer_name="html", source=failure.requirement_text(reqs)
808                 )["body"],
809                 "resolution_steps": failure.resolution_steps(resolutions),
810             }
811         )
812     pkg_dir = os.path.split(__file__)[0]
813     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
814     with open(j2_template_path, "r") as f:
815         report_template = jinja2.Template(f.read())
816         contents = report_template.render(
817             version=version.VERSION,
818             num_failures=len(failures) + len(COLLECTION_FAILURES),
819             categories=categories,
820             template_dir=make_href(template_path),
821             checksum=hash_directory(template_path),
822             timestamp=make_timestamp(),
823             failures=fail_data,
824             collection_failures=COLLECTION_FAILURES,
825         )
826     with open(os.path.join(outpath, "report.html"), "w") as f:
827         f.write(contents)
828
829
830 def pytest_addoption(parser):
831     """
832     Add needed CLI arguments
833     """
834     parser.addoption(
835         "--template-directory",
836         dest="template_dir",
837         action="append",
838         help="Directory which holds the templates for validation",
839     )
840
841     parser.addoption(
842         "--template-source",
843         dest="template_source",
844         action="append",
845         help="Source Directory which holds the templates for validation",
846     )
847
848     parser.addoption(
849         "--self-test",
850         dest="self_test",
851         action="store_true",
852         help="Test the unit tests against their fixtured data",
853     )
854
855     parser.addoption(
856         "--report-format",
857         dest="report_format",
858         action="store",
859         help="Format of output report (html, csv, excel, json)",
860     )
861
862     parser.addoption(
863         "--continue-on-failure",
864         dest="continue_on_failure",
865         action="store_true",
866         help="Continue validation even when structural errors exist in input files",
867     )
868
869     parser.addoption(
870         "--output-directory",
871         dest="output_dir",
872         action="store",
873         default=None,
874         help="Alternate ",
875     )
876
877     parser.addoption(
878         "--category",
879         dest="test_categories",
880         action="append",
881         help="optional category of test to execute",
882     )
883
884
885 def pytest_configure(config):
886     """
887     Ensure that we are receive either `--self-test` or
888     `--template-dir=<directory` as CLI arguments
889     """
890     if config.getoption("template_dir") and config.getoption("self_test"):
891         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
892     if not (
893         config.getoption("template_dir")
894         or config.getoption("self_test")
895         or config.getoption("help")
896     ):
897         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
898
899
900 def pytest_generate_tests(metafunc):
901     """
902     If a unit test requires an argument named 'filename'
903     we generate a test for the filenames selected. Either
904     the files contained in `template_dir` or if `template_dir`
905     is not specified on the CLI, the fixtures associated with this
906     test name.
907     """
908
909     # noinspection PyBroadException
910     try:
911         if "filename" in metafunc.fixturenames:
912             from .parametrizers import parametrize_filename
913
914             parametrize_filename(metafunc)
915
916         if "filenames" in metafunc.fixturenames:
917             from .parametrizers import parametrize_filenames
918
919             parametrize_filenames(metafunc)
920
921         if "template_dir" in metafunc.fixturenames:
922             from .parametrizers import parametrize_template_dir
923
924             parametrize_template_dir(metafunc)
925
926         if "environment_pair" in metafunc.fixturenames:
927             from .parametrizers import parametrize_environment_pair
928
929             parametrize_environment_pair(metafunc)
930
931         if "heat_volume_pair" in metafunc.fixturenames:
932             from .parametrizers import parametrize_heat_volume_pair
933
934             parametrize_heat_volume_pair(metafunc)
935
936         if "yaml_files" in metafunc.fixturenames:
937             from .parametrizers import parametrize_yaml_files
938
939             parametrize_yaml_files(metafunc)
940
941         if "env_files" in metafunc.fixturenames:
942             from .parametrizers import parametrize_environment_files
943
944             parametrize_environment_files(metafunc)
945
946         if "yaml_file" in metafunc.fixturenames:
947             from .parametrizers import parametrize_yaml_file
948
949             parametrize_yaml_file(metafunc)
950
951         if "env_file" in metafunc.fixturenames:
952             from .parametrizers import parametrize_environment_file
953
954             parametrize_environment_file(metafunc)
955
956         if "parsed_yaml_file" in metafunc.fixturenames:
957             from .parametrizers import parametrize_parsed_yaml_file
958
959             parametrize_parsed_yaml_file(metafunc)
960
961         if "parsed_environment_file" in metafunc.fixturenames:
962             from .parametrizers import parametrize_parsed_environment_file
963
964             parametrize_parsed_environment_file(metafunc)
965
966         if "heat_template" in metafunc.fixturenames:
967             from .parametrizers import parametrize_heat_template
968
969             parametrize_heat_template(metafunc)
970
971         if "heat_templates" in metafunc.fixturenames:
972             from .parametrizers import parametrize_heat_templates
973
974             parametrize_heat_templates(metafunc)
975
976         if "volume_template" in metafunc.fixturenames:
977             from .parametrizers import parametrize_volume_template
978
979             parametrize_volume_template(metafunc)
980
981         if "volume_templates" in metafunc.fixturenames:
982             from .parametrizers import parametrize_volume_templates
983
984             parametrize_volume_templates(metafunc)
985
986         if "template" in metafunc.fixturenames:
987             from .parametrizers import parametrize_template
988
989             parametrize_template(metafunc)
990
991         if "templates" in metafunc.fixturenames:
992             from .parametrizers import parametrize_templates
993
994             parametrize_templates(metafunc)
995     except Exception as e:
996         # If an error occurs in the collection phase, then it won't be logged as a
997         # normal test failure.  This means that failures could occur, but not
998         # be seen on the report resulting in a false positive success message.  These
999         # errors will be stored and reported separately on the report
1000         COLLECTION_FAILURES.append(
1001             {
1002                 "module": metafunc.module.__name__,
1003                 "test": metafunc.function.__name__,
1004                 "fixtures": metafunc.fixturenames,
1005                 "error": traceback.format_exc(),
1006                 "requirements": getattr(metafunc.function, "requirement_ids", []),
1007             }
1008         )
1009         raise e
1010
1011
1012 def hash_directory(path):
1013     """
1014     Create md5 hash using the contents of all files under ``path``
1015     :param path: string directory containing files
1016     :return: string MD5 hash code (hex)
1017     """
1018     md5 = hashlib.md5()
1019     for dir_path, sub_dirs, filenames in os.walk(path):
1020         for filename in filenames:
1021             file_path = os.path.join(dir_path, filename)
1022             with open(file_path, "rb") as f:
1023                 md5.update(f.read())
1024     return md5.hexdigest()
1025
1026
1027 def load_current_requirements():
1028     """Loads dict of current requirements or empty dict if file doesn't exist"""
1029     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1030         data = json.load(f)
1031         version = data["current_version"]
1032         return data["versions"][version]["needs"]
1033
1034
1035 def select_heat_requirements(reqs):
1036     """Filters dict requirements to only those requirements pertaining to Heat"""
1037     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1038
1039
1040 def build_rst_json(reqs):
1041     """Takes requirements and returns list of only Heat requirements"""
1042     data = json.loads(reqs)
1043     for key, values in list(data.items()):
1044         if "Heat" in (values["docname"]):
1045             if "MUST" in (values["keyword"]):
1046                 if "none" in (values["validation_mode"]):
1047                     del data[key]
1048                 else:
1049                     # Creates links in RST format to requirements and test cases
1050                     if values["test_case"]:
1051                         mod = values["test_case"].split(".")[-1]
1052                         val = TEST_SCRIPT_SITE + mod + ".py"
1053                         rst_value = ("`" + mod + " <" + val + ">`_")
1054                         title = "`" + values["id"] + " <" + VNFRQTS_ID_URL + values["docname"].replace(" ", "%20") + ".html#" + values["id"] + ">`_"
1055                         data[key].update({'full_title': title, 'test_case': rst_value})
1056                     else:
1057                         del data[key]
1058             else:
1059                 del data[key]
1060         else:
1061             del data[key]
1062     return data
1063
1064
1065 def generate_rst_table(output_dir, data):
1066     """Generate a formatted csv to be used in RST"""
1067     rst_path = os.path.join(output_dir, "rst.csv")
1068     with open(rst_path, "w", newline="") as f:
1069         out = csv.writer(f)
1070         out.writerow(
1071             ("Requirement ID", "Requirement", "Test Module", "Test Name"),
1072         )
1073         for req_id, metadata in data.items():
1074             out.writerow(
1075                 (
1076                     metadata["full_title"],
1077                     metadata["description"],
1078                     metadata["test_case"],
1079                     metadata["validated_by"],
1080                 )
1081             )
1082
1083
1084 # noinspection PyUnusedLocal
1085 def pytest_report_collectionfinish(config, startdir, items):
1086     """Generates a simple traceability report to output/traceability.csv"""
1087     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1088     output_dir = os.path.split(traceability_path)[0]
1089     if not os.path.exists(output_dir):
1090         os.makedirs(output_dir)
1091     reqs = load_current_requirements()
1092     requirements = select_heat_requirements(reqs)
1093     unmapped, mapped = partition(
1094         lambda i: hasattr(i.function, "requirement_ids"), items
1095     )
1096
1097     req_to_test = defaultdict(set)
1098     mapping_errors = set()
1099     for item in mapped:
1100         for req_id in item.function.requirement_ids:
1101             if req_id not in req_to_test:
1102                 req_to_test[req_id].add(item)
1103                 if req_id in requirements:
1104                     reqs[req_id].update({'test_case': item.function.__module__,
1105                                          'validated_by': item.function.__name__})
1106             if req_id not in requirements:
1107                 mapping_errors.add(
1108                     (req_id, item.function.__module__, item.function.__name__)
1109                 )
1110
1111     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1112     with open(mapping_error_path, "w", newline="") as f:
1113         writer = csv.writer(f)
1114         for err in mapping_errors:
1115             writer.writerow(err)
1116
1117     with open(traceability_path, "w", newline="") as f:
1118         out = csv.writer(f)
1119         out.writerow(
1120             ("Requirement ID", "Requirement", "Section",
1121              "Keyword", "Validation Mode", "Is Testable",
1122              "Test Module", "Test Name"),
1123         )
1124         for req_id, metadata in requirements.items():
1125             keyword = metadata["keyword"].upper()
1126             mode = metadata["validation_mode"].lower()
1127             testable = keyword in {"MUST", "MUST NOT"} and mode != "none"
1128             if req_to_test[req_id]:
1129                 for item in req_to_test[req_id]:
1130                     out.writerow(
1131                         (
1132                             req_id,
1133                             metadata["description"],
1134                             metadata["section_name"],
1135                             keyword,
1136                             mode,
1137                             "TRUE" if testable else "FALSE",
1138                             item.function.__module__,
1139                             item.function.__name__,
1140                         ),
1141                     )
1142             else:
1143                 out.writerow(
1144                     (req_id,
1145                      metadata["description"],
1146                      metadata["section_name"],
1147                      keyword,
1148                      mode,
1149                      "TRUE" if testable else "FALSE",
1150                      "",   # test module
1151                      ""),  # test function
1152                 )
1153         # now write out any test methods that weren't mapped to requirements
1154         unmapped_tests = {(item.function.__module__, item.function.__name__) for item in
1155                           unmapped}
1156         for test_module, test_name in unmapped_tests:
1157             out.writerow(
1158                 ("",        # req ID
1159                  "",        # description
1160                  "",        # section name
1161                  "",        # keyword
1162                  "static",  # validation mode
1163                  "TRUE",    # testable
1164                  test_module,
1165                  test_name)
1166             )
1167
1168     generate_rst_table(get_output_dir(config), build_rst_json(json.dumps(reqs)))