VVP - Fixing script to include static validations
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47 from itertools import chain
48
49 import traceback
50
51 import docutils.core
52 import jinja2
53 import pytest
54 from more_itertools import partition
55 import xlsxwriter
56 from six import string_types
57
58 import version
59
60 __path__ = [os.path.dirname(os.path.abspath(__file__))]
61
62 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
63
64 RESOLUTION_STEPS_FILE = "resolution_steps.json"
65 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
66 TEST_SCRIPT_SITE = (
67     "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
68 )
69 VNFRQTS_ID_URL = (
70     "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
71 )
72
73 REPORT_COLUMNS = [
74     ("Input File", "file"),
75     ("Test", "test_file"),
76     ("Requirements", "req_description"),
77     ("Resolution Steps", "resolution_steps"),
78     ("Error Message", "message"),
79     ("Raw Test Output", "raw_output"),
80 ]
81
82 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
83 while preparing to validate the the input files. Some validations may not have been
84 executed. Please refer these issue to the VNF Validation Tool team.
85 """
86
87 COLLECTION_FAILURES = []
88
89 # Captures the results of every test run
90 ALL_RESULTS = []
91
92
93 def get_output_dir(config):
94     """
95     Retrieve the output directory for the reports and create it if necessary
96     :param config: pytest configuration
97     :return: output directory as string
98     """
99     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
100     if not os.path.exists(output_dir):
101         os.makedirs(output_dir, exist_ok=True)
102     return output_dir
103
104
105 def extract_error_msg(rep):
106     """
107     If a custom error message was provided, then extract it otherwise
108     just show the pytest assert message
109     """
110     if rep.outcome != "failed":
111         return ""
112     try:
113         full_msg = str(rep.longrepr.reprcrash.message)
114         match = re.match(
115             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
116         )
117         if match:  # custom message was provided
118             # Extract everything between AssertionError and the start
119             # of the assert statement expansion in the pytest report
120             msg = match.group(1)
121         else:
122             msg = str(rep.longrepr.reprcrash)
123             if "AssertionError:" in msg:
124                 msg = msg.split("AssertionError:")[1]
125     except AttributeError:
126         msg = str(rep)
127
128     return msg
129
130
131 class TestResult:
132     """
133     Wraps the test case and result to extract necessary metadata for
134     reporting purposes.
135     """
136
137     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
138
139     def __init__(self, item, outcome):
140         self.item = item
141         self.result = outcome.get_result()
142         self.files = [os.path.normpath(p) for p in self._get_files()]
143         self.error_message = self._get_error_message()
144
145     @property
146     def requirement_ids(self):
147         """
148         Returns list of requirement IDs mapped to the test case.
149
150         :return: Returns a list of string requirement IDs the test was
151                  annotated with ``validates`` otherwise returns and empty list
152         """
153         is_mapped = hasattr(self.item.function, "requirement_ids")
154         return self.item.function.requirement_ids if is_mapped else []
155
156     @property
157     def markers(self):
158         """
159         :return: Returns a set of pytest marker names for the test or an empty set
160         """
161         return set(m.name for m in self.item.iter_markers())
162
163     @property
164     def is_base_test(self):
165         """
166         :return: Returns True if the test is annotated with a pytest marker called base
167         """
168         return "base" in self.markers
169
170     @property
171     def is_failed(self):
172         """
173         :return: True if the test failed
174         """
175         return self.outcome == "FAIL"
176
177     @property
178     def outcome(self):
179         """
180         :return: Returns 'PASS', 'FAIL', or 'SKIP'
181         """
182         return self.RESULT_MAPPING[self.result.outcome]
183
184     @property
185     def test_case(self):
186         """
187         :return: Name of the test case method
188         """
189         return self.item.function.__name__
190
191     @property
192     def test_module(self):
193         """
194         :return: Name of the file containing the test case
195         """
196         return self.item.function.__module__.split(".")[-1]
197
198     @property
199     def raw_output(self):
200         """
201         :return: Full output from pytest for the given test case
202         """
203         return str(self.result.longrepr)
204
205     def requirement_text(self, curr_reqs):
206         """
207         Creates a text summary for the requirement IDs mapped to the test case.
208         If no requirements are mapped, then it returns the empty string.
209
210         :param curr_reqs: mapping of requirement IDs to requirement metadata
211                           loaded from the VNFRQTS projects needs.json output
212         :return: ID and text of the requirements mapped to the test case
213         """
214         text = (
215             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
216             for r_id in self.requirement_ids
217             if r_id in curr_reqs
218         )
219         return "".join(text)
220
221     def requirements_metadata(self, curr_reqs):
222         """
223         Returns a list of dicts containing the following metadata for each
224         requirement mapped:
225
226         - id: Requirement ID
227         - text: Full text of the requirement
228         - keyword: MUST, MUST NOT, MAY, etc.
229
230         :param curr_reqs: mapping of requirement IDs to requirement metadata
231                           loaded from the VNFRQTS projects needs.json output
232         :return: List of requirement metadata
233         """
234         data = []
235         for r_id in self.requirement_ids:
236             if r_id not in curr_reqs:
237                 continue
238             data.append(
239                 {
240                     "id": r_id,
241                     "text": curr_reqs[r_id]["description"],
242                     "keyword": curr_reqs[r_id]["keyword"],
243                 }
244             )
245         return data
246
247     def resolution_steps(self, resolutions):
248         """
249         :param resolutions: Loaded from contents for resolution_steps.json
250         :return: Header and text for the resolution step associated with this
251                  test case.  Returns empty string if no resolutions are
252                  provided.
253         """
254         text = (
255             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
256             for entry in resolutions
257             if self._match(entry)
258         )
259         return "".join(text)
260
261     def _match(self, resolution_entry):
262         """
263         Returns True if the test result maps to the given entry in
264         the resolutions file
265         """
266         return (
267             self.test_case == resolution_entry["function"]
268             and self.test_module == resolution_entry["module"]
269         )
270
271     def _get_files(self):
272         """
273         Extracts the list of files passed into the test case.
274         :return: List of absolute paths to files
275         """
276         if "environment_pair" in self.item.fixturenames:
277             return [
278                 "{} environment pair".format(
279                     self.item.funcargs["environment_pair"]["name"]
280                 )
281             ]
282         elif "heat_volume_pair" in self.item.fixturenames:
283             return [
284                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
285             ]
286         elif "heat_templates" in self.item.fixturenames:
287             return self.item.funcargs["heat_templates"]
288         elif "yaml_files" in self.item.fixturenames:
289             return self.item.funcargs["yaml_files"]
290         else:
291             parts = self.result.nodeid.split("[")
292             return [""] if len(parts) == 1 else [parts[1][:-1]]
293
294     def _get_error_message(self):
295         """
296         :return: Error message or empty string if the test did not fail or error
297         """
298         if self.is_failed:
299             return extract_error_msg(self.result)
300         else:
301             return ""
302
303
304 # noinspection PyUnusedLocal
305 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
306 def pytest_runtest_makereport(item, call):
307     """
308     Captures the test results for later reporting.  This will also halt testing
309     if a base failure is encountered (can be overridden with continue-on-failure)
310     """
311     outcome = yield
312     if outcome.get_result().when != "call":
313         return  # only capture results of test cases themselves
314     result = TestResult(item, outcome)
315     ALL_RESULTS.append(result)
316     if (
317         not item.config.option.continue_on_failure
318         and result.is_base_test
319         and result.is_failed
320     ):
321         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
322             result.error_message
323         )
324         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
325
326
327 def make_timestamp():
328     """
329     :return: String make_iso_timestamp in format:
330              2019-01-19 10:18:49.865000 Central Standard Time
331     """
332     timezone = time.tzname[time.localtime().tm_isdst]
333     return "{} {}".format(str(datetime.datetime.now()), timezone)
334
335
336 # noinspection PyUnusedLocal
337 def pytest_sessionstart(session):
338     ALL_RESULTS.clear()
339     COLLECTION_FAILURES.clear()
340
341
342 # noinspection PyUnusedLocal
343 def pytest_sessionfinish(session, exitstatus):
344     """
345     If not a self-test run, generate the output reports
346     """
347     if not session.config.option.template_dir:
348         return
349
350     if session.config.option.template_source:
351         template_source = session.config.option.template_source[0]
352     else:
353         template_source = os.path.abspath(session.config.option.template_dir[0])
354
355     categories_selected = session.config.option.test_categories or ""
356     generate_report(
357         get_output_dir(session.config),
358         template_source,
359         categories_selected,
360         session.config.option.report_format,
361     )
362
363
364 # noinspection PyUnusedLocal
365 def pytest_collection_modifyitems(session, config, items):
366     """
367     Selects tests based on the categories requested.  Tests without
368     categories will always be executed.
369     """
370     config.traceability_items = list(items)  # save all items for traceability
371     if not config.option.self_test:
372         for item in items:
373             # checking if test belongs to a category
374             if hasattr(item.function, "categories"):
375                 if config.option.test_categories:
376                     test_categories = getattr(item.function, "categories")
377                     passed_categories = config.option.test_categories
378                     if not all(
379                         category in passed_categories for category in test_categories
380                     ):
381                         item.add_marker(
382                             pytest.mark.skip(
383                                 reason="Test categories do not match all the passed categories"
384                             )
385                         )
386                 else:
387                     item.add_marker(
388                         pytest.mark.skip(
389                             reason="Test belongs to a category but no categories were passed"
390                         )
391                     )
392     items.sort(
393         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
394     )
395
396
397 def make_href(paths):
398     """
399     Create an anchor tag to link to the file paths provided.
400     :param paths: string or list of file paths
401     :return: String of hrefs - one for each path, each seperated by a line
402              break (<br/).
403     """
404     paths = [paths] if isinstance(paths, string_types) else paths
405     links = []
406     for p in paths:
407         abs_path = os.path.abspath(p)
408         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
409         links.append(
410             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
411                 abs_path=abs_path, name=name
412             )
413         )
414     return "<br/>".join(links)
415
416
417 def load_resolutions_file():
418     """
419     :return: dict of data loaded from resolutions_steps.json
420     """
421     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
422     if os.path.exists(resolution_steps):
423         with open(resolution_steps, "r") as f:
424             return json.loads(f.read())
425
426
427 def generate_report(outpath, template_path, categories, output_format="html"):
428     """
429     Generates the various output reports.
430
431     :param outpath: destination directory for all reports
432     :param template_path: directory containing the Heat templates validated
433     :param categories: Optional categories selected
434     :param output_format: One of "html", "excel", or "csv". Default is "html"
435     :raises: ValueError if requested output format is unknown
436     """
437     failures = [r for r in ALL_RESULTS if r.is_failed]
438     generate_failure_file(outpath)
439     output_format = output_format.lower().strip() if output_format else "html"
440     generate_json(outpath, template_path, categories)
441     if output_format == "html":
442         generate_html_report(outpath, categories, template_path, failures)
443     elif output_format == "excel":
444         generate_excel_report(outpath, categories, template_path, failures)
445     elif output_format == "json":
446         return
447     elif output_format == "csv":
448         generate_csv_report(outpath, categories, template_path, failures)
449     else:
450         raise ValueError("Unsupported output format: " + output_format)
451
452
453 def write_json(data, path):
454     """
455     Pretty print data as JSON to the output path requested
456
457     :param data: Data structure to be converted to JSON
458     :param path: Where to write output
459     """
460     with open(path, "w") as f:
461         json.dump(data, f, indent=2)
462
463
464 def generate_failure_file(outpath):
465     """
466     Writes a summary of test failures to a file named failures.
467     This is for backwards compatibility only.  The report.json offers a
468     more comprehensive output.
469     """
470     failure_path = os.path.join(outpath, "failures")
471     failures = [r for r in ALL_RESULTS if r.is_failed]
472     data = {}
473     for i, fail in enumerate(failures):
474         data[str(i)] = {
475             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
476             "vnfrqts": fail.requirement_ids,
477             "test": fail.test_case,
478             "test_file": fail.test_module,
479             "raw_output": fail.raw_output,
480             "message": fail.error_message,
481         }
482     write_json(data, failure_path)
483
484
485 def generate_csv_report(output_dir, categories, template_path, failures):
486     rows = [["Validation Failures"]]
487     headers = [
488         ("Categories Selected:", categories),
489         ("Tool Version:", version.VERSION),
490         ("Report Generated At:", make_timestamp()),
491         ("Directory Validated:", template_path),
492         ("Checksum:", hash_directory(template_path)),
493         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
494     ]
495     rows.append([])
496     for header in headers:
497         rows.append(header)
498     rows.append([])
499
500     if COLLECTION_FAILURES:
501         rows.append([COLLECTION_FAILURE_WARNING])
502         rows.append(["Validation File", "Test", "Fixtures", "Error"])
503         for failure in COLLECTION_FAILURES:
504             rows.append(
505                 [
506                     failure["module"],
507                     failure["test"],
508                     ";".join(failure["fixtures"]),
509                     failure["error"],
510                 ]
511             )
512         rows.append([])
513
514     # table header
515     rows.append([col for col, _ in REPORT_COLUMNS])
516
517     reqs = load_current_requirements()
518     resolutions = load_resolutions_file()
519
520     # table content
521     for failure in failures:
522         rows.append(
523             [
524                 "\n".join(failure.files),
525                 failure.test_module,
526                 failure.requirement_text(reqs),
527                 failure.resolution_steps(resolutions),
528                 failure.error_message,
529                 failure.raw_output,
530             ]
531         )
532
533     output_path = os.path.join(output_dir, "report.csv")
534     with open(output_path, "w", newline="") as f:
535         writer = csv.writer(f)
536         for row in rows:
537             writer.writerow(row)
538
539
540 def generate_excel_report(output_dir, categories, template_path, failures):
541     output_path = os.path.join(output_dir, "report.xlsx")
542     workbook = xlsxwriter.Workbook(output_path)
543     bold = workbook.add_format({"bold": True})
544     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
545     normal = workbook.add_format({"text_wrap": True})
546     heading = workbook.add_format({"bold": True, "font_size": 18})
547     worksheet = workbook.add_worksheet("failures")
548     worksheet.write(0, 0, "Validation Failures", heading)
549
550     headers = [
551         ("Categories Selected:", ",".join(categories)),
552         ("Tool Version:", version.VERSION),
553         ("Report Generated At:", make_timestamp()),
554         ("Directory Validated:", template_path),
555         ("Checksum:", hash_directory(template_path)),
556         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
557     ]
558     for row, (header, value) in enumerate(headers, start=2):
559         worksheet.write(row, 0, header, bold)
560         worksheet.write(row, 1, value)
561
562     worksheet.set_column(0, len(headers) - 1, 40)
563     worksheet.set_column(len(headers), len(headers), 80)
564
565     if COLLECTION_FAILURES:
566         collection_failures_start = 2 + len(headers) + 2
567         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
568         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
569         for col_num, col_name in enumerate(collection_failure_headers):
570             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
571         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
572             worksheet.write(row, 0, data["module"])
573             worksheet.write(row, 1, data["test"])
574             worksheet.write(row, 2, ",".join(data["fixtures"]))
575             worksheet.write(row, 3, data["error"], code)
576
577     # table header
578     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
579     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
580     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
581         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
582
583     reqs = load_current_requirements()
584     resolutions = load_resolutions_file()
585
586     # table content
587     for row, failure in enumerate(failures, start=start_error_table_row + 2):
588         worksheet.write(row, 0, "\n".join(failure.files), normal)
589         worksheet.write(row, 1, failure.test_module, normal)
590         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
591         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
592         worksheet.write(row, 4, failure.error_message, normal)
593         worksheet.write(row, 5, failure.raw_output, code)
594
595     workbook.close()
596
597
598 def make_iso_timestamp():
599     """
600     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
601     """
602     now = datetime.datetime.utcnow()
603     now.replace(tzinfo=datetime.timezone.utc)
604     return now.isoformat()
605
606
607 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
608     """
609     Examines all tests associated with a given requirement and determines
610     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
611
612     * ERROR - At least one ERROR occurred
613     * PASS -  At least one PASS and no FAIL or ERRORs.
614     * FAIL -  At least one FAIL occurred (no ERRORs)
615     * SKIP - All tests were SKIP
616
617
618     :param r_id: Requirement ID to examing
619     :param collection_failures: Errors that occurred during test setup.
620     :param test_results: List of TestResult
621     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
622     """
623     errors = any(r_id in f["requirements"] for f in collection_failures)
624     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
625     return aggregate_results(errors, outcomes, r_id)
626
627
628 def aggregate_results(has_errors, outcomes, r_id=None):
629     """
630     Determines the aggregate result for the conditions provided.  Assumes the
631     results have been filtered and collected for analysis.
632
633     :param has_errors: True if collection failures occurred for the tests being
634                        analyzed.
635     :param outcomes: set of outcomes from the TestResults
636     :param r_id: Optional requirement ID if known
637     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
638              (see aggregate_requirement_adherence for more detail)
639     """
640     if has_errors:
641         return "ERROR"
642
643     if not outcomes:
644         return "PASS"
645     elif "FAIL" in outcomes:
646         return "FAIL"
647     elif "PASS" in outcomes:
648         return "PASS"
649     elif {"SKIP"} == outcomes:
650         return "SKIP"
651     else:
652         pytest.warns(
653             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
654                 outcomes, r_id
655             )
656         )
657         return "ERROR"
658
659
660 def aggregate_run_results(collection_failures, test_results):
661     """
662     Determines overall status of run based on all failures and results.
663
664     * 'ERROR' - At least one collection failure occurred during the run.
665     * 'FAIL' - Template failed at least one test
666     * 'PASS' - All tests executed properly and no failures were detected
667
668     :param collection_failures: failures occuring during test setup
669     :param test_results: list of all test executuion results
670     :return: one of 'ERROR', 'FAIL', or 'PASS'
671     """
672     if collection_failures:
673         return "ERROR"
674     elif any(r.is_failed for r in test_results):
675         return "FAIL"
676     else:
677         return "PASS"
678
679
680 def error(failure_or_result):
681     """
682     Extracts the error message from a collection failure or test result
683     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
684     :return: Error message as string
685     """
686     if isinstance(failure_or_result, TestResult):
687         return failure_or_result.error_message
688     else:
689         return failure_or_result["error"]
690
691
692 def req_ids(failure_or_result):
693     """
694     Extracts the requirement IDs from a collection failure or test result
695     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
696     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
697     """
698     if isinstance(failure_or_result, TestResult):
699         return set(failure_or_result.requirement_ids)
700     else:
701         return set(failure_or_result["requirements"])
702
703
704 def collect_errors(r_id, collection_failures, test_result):
705     """
706     Creates a list of error messages from the collection failures and
707     test results.  If r_id is provided, then it collects the error messages
708     where the failure or test is associated with that requirement ID.  If
709     r_id is None, then it collects all errors that occur on failures and
710     results that are not mapped to requirements
711     """
712
713     def selector(item):
714         if r_id:
715             return r_id in req_ids(item)
716         else:
717             return not req_ids(item)
718
719     errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
720     return [e for e in errors if e]
721
722
723 def relative_paths(base_dir, paths):
724     return [os.path.relpath(p, base_dir) for p in paths]
725
726
727 def generate_json(outpath, template_path, categories):
728     """
729     Creates a JSON summary of the entire test run.
730     """
731     reqs = load_current_requirements()
732     data = {
733         "version": "dublin",
734         "template_directory": os.path.splitdrive(template_path)[1].replace(
735             os.path.sep, "/"
736         ),
737         "timestamp": make_iso_timestamp(),
738         "checksum": hash_directory(template_path),
739         "categories": categories,
740         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
741         "tests": [],
742         "requirements": [],
743     }
744
745     results = data["tests"]
746     for result in COLLECTION_FAILURES:
747         results.append(
748             {
749                 "files": [],
750                 "test_module": result["module"],
751                 "test_case": result["test"],
752                 "result": "ERROR",
753                 "error": result["error"],
754                 "requirements": result["requirements"],
755             }
756         )
757     for result in ALL_RESULTS:
758         results.append(
759             {
760                 "files": relative_paths(template_path, result.files),
761                 "test_module": result.test_module,
762                 "test_case": result.test_case,
763                 "result": result.outcome,
764                 "error": result.error_message if result.is_failed else "",
765                 "requirements": result.requirements_metadata(reqs),
766             }
767         )
768
769     requirements = data["requirements"]
770     for r_id, r_data in reqs.items():
771         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
772         if result:
773             requirements.append(
774                 {
775                     "id": r_id,
776                     "text": r_data["description"],
777                     "keyword": r_data["keyword"],
778                     "result": result,
779                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
780                 }
781             )
782     # If there are tests that aren't mapped to a requirement, then we'll
783     # map them to a special entry so the results are coherent.
784     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
785     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
786     if unmapped_outcomes or has_errors:
787         requirements.append(
788             {
789                 "id": "Unmapped",
790                 "text": "Tests not mapped to requirements (see tests)",
791                 "result": aggregate_results(has_errors, unmapped_outcomes),
792                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
793             }
794         )
795
796     report_path = os.path.join(outpath, "report.json")
797     write_json(data, report_path)
798
799
800 def generate_html_report(outpath, categories, template_path, failures):
801     reqs = load_current_requirements()
802     resolutions = load_resolutions_file()
803     fail_data = []
804     for failure in failures:
805         fail_data.append(
806             {
807                 "file_links": make_href(failure.files),
808                 "test_id": failure.test_module,
809                 "error_message": failure.error_message,
810                 "raw_output": failure.raw_output,
811                 "requirements": docutils.core.publish_parts(
812                     writer_name="html", source=failure.requirement_text(reqs)
813                 )["body"],
814                 "resolution_steps": failure.resolution_steps(resolutions),
815             }
816         )
817     pkg_dir = os.path.split(__file__)[0]
818     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
819     with open(j2_template_path, "r") as f:
820         report_template = jinja2.Template(f.read())
821         contents = report_template.render(
822             version=version.VERSION,
823             num_failures=len(failures) + len(COLLECTION_FAILURES),
824             categories=categories,
825             template_dir=make_href(template_path),
826             checksum=hash_directory(template_path),
827             timestamp=make_timestamp(),
828             failures=fail_data,
829             collection_failures=COLLECTION_FAILURES,
830         )
831     with open(os.path.join(outpath, "report.html"), "w") as f:
832         f.write(contents)
833
834
835 def pytest_addoption(parser):
836     """
837     Add needed CLI arguments
838     """
839     parser.addoption(
840         "--template-directory",
841         dest="template_dir",
842         action="append",
843         help="Directory which holds the templates for validation",
844     )
845
846     parser.addoption(
847         "--template-source",
848         dest="template_source",
849         action="append",
850         help="Source Directory which holds the templates for validation",
851     )
852
853     parser.addoption(
854         "--self-test",
855         dest="self_test",
856         action="store_true",
857         help="Test the unit tests against their fixtured data",
858     )
859
860     parser.addoption(
861         "--report-format",
862         dest="report_format",
863         action="store",
864         help="Format of output report (html, csv, excel, json)",
865     )
866
867     parser.addoption(
868         "--continue-on-failure",
869         dest="continue_on_failure",
870         action="store_true",
871         help="Continue validation even when structural errors exist in input files",
872     )
873
874     parser.addoption(
875         "--output-directory",
876         dest="output_dir",
877         action="store",
878         default=None,
879         help="Alternate ",
880     )
881
882     parser.addoption(
883         "--category",
884         dest="test_categories",
885         action="append",
886         help="optional category of test to execute",
887     )
888
889
890 def pytest_configure(config):
891     """
892     Ensure that we are receive either `--self-test` or
893     `--template-dir=<directory` as CLI arguments
894     """
895     if config.getoption("template_dir") and config.getoption("self_test"):
896         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
897     if not (
898         config.getoption("template_dir")
899         or config.getoption("self_test")
900         or config.getoption("help")
901     ):
902         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
903
904
905 def pytest_generate_tests(metafunc):
906     """
907     If a unit test requires an argument named 'filename'
908     we generate a test for the filenames selected. Either
909     the files contained in `template_dir` or if `template_dir`
910     is not specified on the CLI, the fixtures associated with this
911     test name.
912     """
913
914     # noinspection PyBroadException
915     try:
916         if "filename" in metafunc.fixturenames:
917             from .parametrizers import parametrize_filename
918
919             parametrize_filename(metafunc)
920
921         if "filenames" in metafunc.fixturenames:
922             from .parametrizers import parametrize_filenames
923
924             parametrize_filenames(metafunc)
925
926         if "template_dir" in metafunc.fixturenames:
927             from .parametrizers import parametrize_template_dir
928
929             parametrize_template_dir(metafunc)
930
931         if "environment_pair" in metafunc.fixturenames:
932             from .parametrizers import parametrize_environment_pair
933
934             parametrize_environment_pair(metafunc)
935
936         if "heat_volume_pair" in metafunc.fixturenames:
937             from .parametrizers import parametrize_heat_volume_pair
938
939             parametrize_heat_volume_pair(metafunc)
940
941         if "yaml_files" in metafunc.fixturenames:
942             from .parametrizers import parametrize_yaml_files
943
944             parametrize_yaml_files(metafunc)
945
946         if "env_files" in metafunc.fixturenames:
947             from .parametrizers import parametrize_environment_files
948
949             parametrize_environment_files(metafunc)
950
951         if "yaml_file" in metafunc.fixturenames:
952             from .parametrizers import parametrize_yaml_file
953
954             parametrize_yaml_file(metafunc)
955
956         if "env_file" in metafunc.fixturenames:
957             from .parametrizers import parametrize_environment_file
958
959             parametrize_environment_file(metafunc)
960
961         if "parsed_yaml_file" in metafunc.fixturenames:
962             from .parametrizers import parametrize_parsed_yaml_file
963
964             parametrize_parsed_yaml_file(metafunc)
965
966         if "parsed_environment_file" in metafunc.fixturenames:
967             from .parametrizers import parametrize_parsed_environment_file
968
969             parametrize_parsed_environment_file(metafunc)
970
971         if "heat_template" in metafunc.fixturenames:
972             from .parametrizers import parametrize_heat_template
973
974             parametrize_heat_template(metafunc)
975
976         if "heat_templates" in metafunc.fixturenames:
977             from .parametrizers import parametrize_heat_templates
978
979             parametrize_heat_templates(metafunc)
980
981         if "volume_template" in metafunc.fixturenames:
982             from .parametrizers import parametrize_volume_template
983
984             parametrize_volume_template(metafunc)
985
986         if "volume_templates" in metafunc.fixturenames:
987             from .parametrizers import parametrize_volume_templates
988
989             parametrize_volume_templates(metafunc)
990
991         if "template" in metafunc.fixturenames:
992             from .parametrizers import parametrize_template
993
994             parametrize_template(metafunc)
995
996         if "templates" in metafunc.fixturenames:
997             from .parametrizers import parametrize_templates
998
999             parametrize_templates(metafunc)
1000     except Exception as e:
1001         # If an error occurs in the collection phase, then it won't be logged as a
1002         # normal test failure.  This means that failures could occur, but not
1003         # be seen on the report resulting in a false positive success message.  These
1004         # errors will be stored and reported separately on the report
1005         COLLECTION_FAILURES.append(
1006             {
1007                 "module": metafunc.module.__name__,
1008                 "test": metafunc.function.__name__,
1009                 "fixtures": metafunc.fixturenames,
1010                 "error": traceback.format_exc(),
1011                 "requirements": getattr(metafunc.function, "requirement_ids", []),
1012             }
1013         )
1014         raise e
1015
1016
1017 def hash_directory(path):
1018     """
1019     Create md5 hash using the contents of all files under ``path``
1020     :param path: string directory containing files
1021     :return: string MD5 hash code (hex)
1022     """
1023     md5 = hashlib.md5()
1024     for dir_path, sub_dirs, filenames in os.walk(path):
1025         for filename in filenames:
1026             file_path = os.path.join(dir_path, filename)
1027             with open(file_path, "rb") as f:
1028                 md5.update(f.read())
1029     return md5.hexdigest()
1030
1031
1032 def load_current_requirements():
1033     """Loads dict of current requirements or empty dict if file doesn't exist"""
1034     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1035         data = json.load(f)
1036         version = data["current_version"]
1037         return data["versions"][version]["needs"]
1038
1039
1040 def select_heat_requirements(reqs):
1041     """Filters dict requirements to only those requirements pertaining to Heat"""
1042     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1043
1044
1045 def is_testable(reqs):
1046     """Filters dict requirements to only those which are testable"""
1047     for key, values in reqs.items():
1048         if (("MUST" in values.get("keyword", "").upper()) and (
1049             "none" not in values.get("validation_mode", "").lower()
1050         )):
1051             reqs[key]["testable"] = True
1052         else:
1053             reqs[key]["testable"] = False
1054     return reqs
1055
1056
1057 def build_rst_json(reqs):
1058     """Takes requirements and returns list of only Heat requirements"""
1059     for key, values in list(reqs.items()):
1060         if values["testable"]:
1061             # Creates links in RST format to requirements and test cases
1062             if values["test_case"]:
1063                 mod = values["test_case"].split(".")[-1]
1064                 val = TEST_SCRIPT_SITE + mod + ".py"
1065                 rst_value = "`" + mod + " <" + val + ">`_"
1066                 title = (
1067                     "`"
1068                     + values["id"]
1069                     + " <"
1070                     + VNFRQTS_ID_URL
1071                     + values["docname"].replace(" ", "%20")
1072                     + ".html#"
1073                     + values["id"]
1074                     + ">`_"
1075                 )
1076                 reqs[key].update({"full_title": title, "test_case": rst_value})
1077             else:
1078                 title = (
1079                     "`"
1080                     + values["id"]
1081                     + " <"
1082                     + VNFRQTS_ID_URL
1083                     + values["docname"].replace(" ", "%20")
1084                     + ".html#"
1085                     + values["id"]
1086                     + ">`_"
1087                 )
1088                 reqs[key].update(
1089                     {
1090                         "full_title": title,
1091                         "test_case": "No test for requirement",
1092                         "validated_by": "static",
1093                     }
1094                 )
1095         else:
1096             del reqs[key]
1097     return reqs
1098
1099
1100 def generate_rst_table(output_dir, data):
1101     """Generate a formatted csv to be used in RST"""
1102     rst_path = os.path.join(output_dir, "rst.csv")
1103     with open(rst_path, "w", newline="") as f:
1104         out = csv.writer(f)
1105         out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1106         for req_id, metadata in data.items():
1107             out.writerow(
1108                 (
1109                     metadata["full_title"],
1110                     metadata["description"],
1111                     metadata["test_case"],
1112                     metadata["validated_by"],
1113                 )
1114             )
1115
1116
1117 # noinspection PyUnusedLocal
1118 def pytest_report_collectionfinish(config, startdir, items):
1119     """Generates a simple traceability report to output/traceability.csv"""
1120     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1121     output_dir = os.path.split(traceability_path)[0]
1122     if not os.path.exists(output_dir):
1123         os.makedirs(output_dir)
1124     reqs = load_current_requirements()
1125     requirements = select_heat_requirements(reqs)
1126     testable_requirements = is_testable(requirements)
1127     unmapped, mapped = partition(
1128         lambda i: hasattr(i.function, "requirement_ids"), items
1129     )
1130
1131     req_to_test = defaultdict(set)
1132     mapping_errors = set()
1133     for item in mapped:
1134         for req_id in item.function.requirement_ids:
1135             if req_id not in req_to_test:
1136                 req_to_test[req_id].add(item)
1137                 if req_id in requirements:
1138                     reqs[req_id].update(
1139                         {
1140                             "test_case": item.function.__module__,
1141                             "validated_by": item.function.__name__,
1142                         }
1143                     )
1144             if req_id not in requirements:
1145                 mapping_errors.add(
1146                     (req_id, item.function.__module__, item.function.__name__)
1147                 )
1148
1149     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1150     with open(mapping_error_path, "w", newline="") as f:
1151         writer = csv.writer(f)
1152         for err in mapping_errors:
1153             writer.writerow(err)
1154
1155     with open(traceability_path, "w", newline="") as f:
1156         out = csv.writer(f)
1157         out.writerow(
1158             (
1159                 "Requirement ID",
1160                 "Requirement",
1161                 "Section",
1162                 "Keyword",
1163                 "Validation Mode",
1164                 "Is Testable",
1165                 "Test Module",
1166                 "Test Name",
1167             )
1168         )
1169         for req_id, metadata in testable_requirements.items():
1170             if req_to_test[req_id]:
1171                 for item in req_to_test[req_id]:
1172                     out.writerow(
1173                         (
1174                             req_id,
1175                             metadata["description"],
1176                             metadata["section_name"],
1177                             metadata["keyword"],
1178                             metadata["validation_mode"],
1179                             metadata["testable"],
1180                             item.function.__module__,
1181                             item.function.__name__,
1182                         )
1183                     )
1184             else:
1185                 out.writerow(
1186                     (
1187                         req_id,
1188                         metadata["description"],
1189                         metadata["section_name"],
1190                         metadata["keyword"],
1191                         metadata["validation_mode"],
1192                         metadata["testable"],
1193                         "",  # test module
1194                         "",
1195                     )  # test function
1196                 )
1197         # now write out any test methods that weren't mapped to requirements
1198         unmapped_tests = {
1199             (item.function.__module__, item.function.__name__) for item in unmapped
1200         }
1201         for test_module, test_name in unmapped_tests:
1202             out.writerow(
1203                 (
1204                     "",  # req ID
1205                     "",  # description
1206                     "",  # section name
1207                     "",  # keyword
1208                     "static",  # validation mode
1209                     "TRUE",  # testable
1210                     test_module,
1211                     test_name,
1212                 )
1213             )
1214
1215     generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))