88dc6b3379540537215dc129583352d6b33f1155
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47 from itertools import chain
48
49 import traceback
50
51 import docutils.core
52 import jinja2
53 import pytest
54 from more_itertools import partition
55 import xlsxwriter
56 from six import string_types
57
58 import version
59
60 __path__ = [os.path.dirname(os.path.abspath(__file__))]
61
62 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
63
64 RESOLUTION_STEPS_FILE = "resolution_steps.json"
65 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
66 TEST_SCRIPT_SITE = (
67     "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
68 )
69 VNFRQTS_ID_URL = (
70     "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
71 )
72
73 REPORT_COLUMNS = [
74     ("Input File", "file"),
75     ("Test", "test_file"),
76     ("Requirements", "req_description"),
77     ("Resolution Steps", "resolution_steps"),
78     ("Error Message", "message"),
79     ("Raw Test Output", "raw_output"),
80 ]
81
82 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
83 while preparing to validate the the input files. Some validations may not have been
84 executed. Please refer these issue to the VNF Validation Tool team.
85 """
86
87 COLLECTION_FAILURES = []
88
89 # Captures the results of every test run
90 ALL_RESULTS = []
91
92
93 def get_output_dir(config):
94     """
95     Retrieve the output directory for the reports and create it if necessary
96     :param config: pytest configuration
97     :return: output directory as string
98     """
99     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
100     if not os.path.exists(output_dir):
101         os.makedirs(output_dir, exist_ok=True)
102     return output_dir
103
104
105 def extract_error_msg(rep):
106     """
107     If a custom error message was provided, then extract it otherwise
108     just show the pytest assert message
109     """
110     if rep.outcome != "failed":
111         return ""
112     try:
113         full_msg = str(rep.longrepr.reprcrash.message)
114         match = re.match(
115             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
116         )
117         if match:  # custom message was provided
118             # Extract everything between AssertionError and the start
119             # of the assert statement expansion in the pytest report
120             msg = match.group(1)
121         else:
122             msg = str(rep.longrepr.reprcrash)
123             if "AssertionError:" in msg:
124                 msg = msg.split("AssertionError:")[1]
125     except AttributeError:
126         msg = str(rep)
127
128     return msg
129
130
131 class TestResult:
132     """
133     Wraps the test case and result to extract necessary metadata for
134     reporting purposes.
135     """
136
137     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
138
139     def __init__(self, item, outcome):
140         self.item = item
141         self.result = outcome.get_result()
142         self.files = [os.path.normpath(p) for p in self._get_files()]
143         self.error_message = self._get_error_message()
144
145     @property
146     def requirement_ids(self):
147         """
148         Returns list of requirement IDs mapped to the test case.
149
150         :return: Returns a list of string requirement IDs the test was
151                  annotated with ``validates`` otherwise returns and empty list
152         """
153         is_mapped = hasattr(self.item.function, "requirement_ids")
154         return self.item.function.requirement_ids if is_mapped else []
155
156     @property
157     def markers(self):
158         """
159         :return: Returns a set of pytest marker names for the test or an empty set
160         """
161         return set(m.name for m in self.item.iter_markers())
162
163     @property
164     def is_base_test(self):
165         """
166         :return: Returns True if the test is annotated with a pytest marker called base
167         """
168         return "base" in self.markers
169
170     @property
171     def is_failed(self):
172         """
173         :return: True if the test failed
174         """
175         return self.outcome == "FAIL"
176
177     @property
178     def outcome(self):
179         """
180         :return: Returns 'PASS', 'FAIL', or 'SKIP'
181         """
182         return self.RESULT_MAPPING[self.result.outcome]
183
184     @property
185     def test_case(self):
186         """
187         :return: Name of the test case method
188         """
189         return self.item.function.__name__
190
191     @property
192     def test_module(self):
193         """
194         :return: Name of the file containing the test case
195         """
196         return self.item.function.__module__.split(".")[-1]
197
198     @property
199     def raw_output(self):
200         """
201         :return: Full output from pytest for the given test case
202         """
203         return str(self.result.longrepr)
204
205     def requirement_text(self, curr_reqs):
206         """
207         Creates a text summary for the requirement IDs mapped to the test case.
208         If no requirements are mapped, then it returns the empty string.
209
210         :param curr_reqs: mapping of requirement IDs to requirement metadata
211                           loaded from the VNFRQTS projects needs.json output
212         :return: ID and text of the requirements mapped to the test case
213         """
214         text = (
215             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
216             for r_id in self.requirement_ids
217             if r_id in curr_reqs
218         )
219         return "".join(text)
220
221     def requirements_metadata(self, curr_reqs):
222         """
223         Returns a list of dicts containing the following metadata for each
224         requirement mapped:
225
226         - id: Requirement ID
227         - text: Full text of the requirement
228         - keyword: MUST, MUST NOT, MAY, etc.
229
230         :param curr_reqs: mapping of requirement IDs to requirement metadata
231                           loaded from the VNFRQTS projects needs.json output
232         :return: List of requirement metadata
233         """
234         data = []
235         for r_id in self.requirement_ids:
236             if r_id not in curr_reqs:
237                 continue
238             data.append(
239                 {
240                     "id": r_id,
241                     "text": curr_reqs[r_id]["description"],
242                     "keyword": curr_reqs[r_id]["keyword"],
243                 }
244             )
245         return data
246
247     def resolution_steps(self, resolutions):
248         """
249         :param resolutions: Loaded from contents for resolution_steps.json
250         :return: Header and text for the resolution step associated with this
251                  test case.  Returns empty string if no resolutions are
252                  provided.
253         """
254         text = (
255             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
256             for entry in resolutions
257             if self._match(entry)
258         )
259         return "".join(text)
260
261     def _match(self, resolution_entry):
262         """
263         Returns True if the test result maps to the given entry in
264         the resolutions file
265         """
266         return (
267             self.test_case == resolution_entry["function"]
268             and self.test_module == resolution_entry["module"]
269         )
270
271     def _get_files(self):
272         """
273         Extracts the list of files passed into the test case.
274         :return: List of absolute paths to files
275         """
276         if "environment_pair" in self.item.fixturenames:
277             return [
278                 "{} environment pair".format(
279                     self.item.funcargs["environment_pair"]["name"]
280                 )
281             ]
282         elif "heat_volume_pair" in self.item.fixturenames:
283             return [
284                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
285             ]
286         elif "heat_templates" in self.item.fixturenames:
287             return self.item.funcargs["heat_templates"]
288         elif "yaml_files" in self.item.fixturenames:
289             return self.item.funcargs["yaml_files"]
290         else:
291             parts = self.result.nodeid.split("[")
292             return [""] if len(parts) == 1 else [parts[1][:-1]]
293
294     def _get_error_message(self):
295         """
296         :return: Error message or empty string if the test did not fail or error
297         """
298         if self.is_failed:
299             return extract_error_msg(self.result)
300         else:
301             return ""
302
303
304 # noinspection PyUnusedLocal
305 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
306 def pytest_runtest_makereport(item, call):
307     """
308     Captures the test results for later reporting.  This will also halt testing
309     if a base failure is encountered (can be overridden with continue-on-failure)
310     """
311     outcome = yield
312     if outcome.get_result().when != "call":
313         return  # only capture results of test cases themselves
314     result = TestResult(item, outcome)
315     if (
316         not item.config.option.continue_on_failure
317         and result.is_base_test
318         and result.is_failed
319     ):
320         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
321             result.error_message
322         )
323         result.error_message = msg
324         ALL_RESULTS.append(result)
325         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
326
327     ALL_RESULTS.append(result)
328
329
330 def make_timestamp():
331     """
332     :return: String make_iso_timestamp in format:
333              2019-01-19 10:18:49.865000 Central Standard Time
334     """
335     timezone = time.tzname[time.localtime().tm_isdst]
336     return "{} {}".format(str(datetime.datetime.now()), timezone)
337
338
339 # noinspection PyUnusedLocal
340 def pytest_sessionstart(session):
341     ALL_RESULTS.clear()
342     COLLECTION_FAILURES.clear()
343
344
345 # noinspection PyUnusedLocal
346 def pytest_sessionfinish(session, exitstatus):
347     """
348     If not a self-test run, generate the output reports
349     """
350     if not session.config.option.template_dir:
351         return
352
353     if session.config.option.template_source:
354         template_source = session.config.option.template_source[0]
355     else:
356         template_source = os.path.abspath(session.config.option.template_dir[0])
357
358     categories_selected = session.config.option.test_categories or ""
359     generate_report(
360         get_output_dir(session.config),
361         template_source,
362         categories_selected,
363         session.config.option.report_format,
364     )
365
366
367 # noinspection PyUnusedLocal
368 def pytest_collection_modifyitems(session, config, items):
369     """
370     Selects tests based on the categories requested.  Tests without
371     categories will always be executed.
372     """
373     config.traceability_items = list(items)  # save all items for traceability
374     if not config.option.self_test:
375         for item in items:
376             # checking if test belongs to a category
377             if hasattr(item.function, "categories"):
378                 if config.option.test_categories:
379                     test_categories = getattr(item.function, "categories")
380                     passed_categories = config.option.test_categories
381                     if not all(
382                         category in passed_categories for category in test_categories
383                     ):
384                         item.add_marker(
385                             pytest.mark.skip(
386                                 reason="Test categories do not match all the passed categories"
387                             )
388                         )
389                 else:
390                     item.add_marker(
391                         pytest.mark.skip(
392                             reason="Test belongs to a category but no categories were passed"
393                         )
394                     )
395     items.sort(
396         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
397     )
398
399
400 def make_href(paths):
401     """
402     Create an anchor tag to link to the file paths provided.
403     :param paths: string or list of file paths
404     :return: String of hrefs - one for each path, each seperated by a line
405              break (<br/).
406     """
407     paths = [paths] if isinstance(paths, string_types) else paths
408     links = []
409     for p in paths:
410         abs_path = os.path.abspath(p)
411         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
412         links.append(
413             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
414                 abs_path=abs_path, name=name
415             )
416         )
417     return "<br/>".join(links)
418
419
420 def load_resolutions_file():
421     """
422     :return: dict of data loaded from resolutions_steps.json
423     """
424     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
425     if os.path.exists(resolution_steps):
426         with open(resolution_steps, "r") as f:
427             return json.loads(f.read())
428
429
430 def generate_report(outpath, template_path, categories, output_format="html"):
431     """
432     Generates the various output reports.
433
434     :param outpath: destination directory for all reports
435     :param template_path: directory containing the Heat templates validated
436     :param categories: Optional categories selected
437     :param output_format: One of "html", "excel", or "csv". Default is "html"
438     :raises: ValueError if requested output format is unknown
439     """
440     failures = [r for r in ALL_RESULTS if r.is_failed]
441     generate_failure_file(outpath)
442     output_format = output_format.lower().strip() if output_format else "html"
443     generate_json(outpath, template_path, categories)
444     if output_format == "html":
445         generate_html_report(outpath, categories, template_path, failures)
446     elif output_format == "excel":
447         generate_excel_report(outpath, categories, template_path, failures)
448     elif output_format == "json":
449         return
450     elif output_format == "csv":
451         generate_csv_report(outpath, categories, template_path, failures)
452     else:
453         raise ValueError("Unsupported output format: " + output_format)
454
455
456 def write_json(data, path):
457     """
458     Pretty print data as JSON to the output path requested
459
460     :param data: Data structure to be converted to JSON
461     :param path: Where to write output
462     """
463     with open(path, "w") as f:
464         json.dump(data, f, indent=2)
465
466
467 def generate_failure_file(outpath):
468     """
469     Writes a summary of test failures to a file named failures.
470     This is for backwards compatibility only.  The report.json offers a
471     more comprehensive output.
472     """
473     failure_path = os.path.join(outpath, "failures")
474     failures = [r for r in ALL_RESULTS if r.is_failed]
475     data = {}
476     for i, fail in enumerate(failures):
477         data[str(i)] = {
478             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
479             "vnfrqts": fail.requirement_ids,
480             "test": fail.test_case,
481             "test_file": fail.test_module,
482             "raw_output": fail.raw_output,
483             "message": fail.error_message,
484         }
485     write_json(data, failure_path)
486
487
488 def generate_csv_report(output_dir, categories, template_path, failures):
489     rows = [["Validation Failures"]]
490     headers = [
491         ("Categories Selected:", categories),
492         ("Tool Version:", version.VERSION),
493         ("Report Generated At:", make_timestamp()),
494         ("Directory Validated:", template_path),
495         ("Checksum:", hash_directory(template_path)),
496         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
497     ]
498     rows.append([])
499     for header in headers:
500         rows.append(header)
501     rows.append([])
502
503     if COLLECTION_FAILURES:
504         rows.append([COLLECTION_FAILURE_WARNING])
505         rows.append(["Validation File", "Test", "Fixtures", "Error"])
506         for failure in COLLECTION_FAILURES:
507             rows.append(
508                 [
509                     failure["module"],
510                     failure["test"],
511                     ";".join(failure["fixtures"]),
512                     failure["error"],
513                 ]
514             )
515         rows.append([])
516
517     # table header
518     rows.append([col for col, _ in REPORT_COLUMNS])
519
520     reqs = load_current_requirements()
521     resolutions = load_resolutions_file()
522
523     # table content
524     for failure in failures:
525         rows.append(
526             [
527                 "\n".join(failure.files),
528                 failure.test_module,
529                 failure.requirement_text(reqs),
530                 failure.resolution_steps(resolutions),
531                 failure.error_message,
532                 failure.raw_output,
533             ]
534         )
535
536     output_path = os.path.join(output_dir, "report.csv")
537     with open(output_path, "w", newline="") as f:
538         writer = csv.writer(f)
539         for row in rows:
540             writer.writerow(row)
541
542
543 def generate_excel_report(output_dir, categories, template_path, failures):
544     output_path = os.path.join(output_dir, "report.xlsx")
545     workbook = xlsxwriter.Workbook(output_path)
546     bold = workbook.add_format({"bold": True})
547     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
548     normal = workbook.add_format({"text_wrap": True})
549     heading = workbook.add_format({"bold": True, "font_size": 18})
550     worksheet = workbook.add_worksheet("failures")
551     worksheet.write(0, 0, "Validation Failures", heading)
552
553     headers = [
554         ("Categories Selected:", ",".join(categories)),
555         ("Tool Version:", version.VERSION),
556         ("Report Generated At:", make_timestamp()),
557         ("Directory Validated:", template_path),
558         ("Checksum:", hash_directory(template_path)),
559         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
560     ]
561     for row, (header, value) in enumerate(headers, start=2):
562         worksheet.write(row, 0, header, bold)
563         worksheet.write(row, 1, value)
564
565     worksheet.set_column(0, len(headers) - 1, 40)
566     worksheet.set_column(len(headers), len(headers), 80)
567
568     if COLLECTION_FAILURES:
569         collection_failures_start = 2 + len(headers) + 2
570         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
571         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
572         for col_num, col_name in enumerate(collection_failure_headers):
573             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
574         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
575             worksheet.write(row, 0, data["module"])
576             worksheet.write(row, 1, data["test"])
577             worksheet.write(row, 2, ",".join(data["fixtures"]))
578             worksheet.write(row, 3, data["error"], code)
579
580     # table header
581     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
582     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
583     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
584         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
585
586     reqs = load_current_requirements()
587     resolutions = load_resolutions_file()
588
589     # table content
590     for row, failure in enumerate(failures, start=start_error_table_row + 2):
591         worksheet.write(row, 0, "\n".join(failure.files), normal)
592         worksheet.write(row, 1, failure.test_module, normal)
593         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
594         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
595         worksheet.write(row, 4, failure.error_message, normal)
596         worksheet.write(row, 5, failure.raw_output, code)
597
598     workbook.close()
599
600
601 def make_iso_timestamp():
602     """
603     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
604     """
605     now = datetime.datetime.utcnow()
606     now.replace(tzinfo=datetime.timezone.utc)
607     return now.isoformat()
608
609
610 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
611     """
612     Examines all tests associated with a given requirement and determines
613     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
614
615     * ERROR - At least one ERROR occurred
616     * PASS -  At least one PASS and no FAIL or ERRORs.
617     * FAIL -  At least one FAIL occurred (no ERRORs)
618     * SKIP - All tests were SKIP
619
620
621     :param r_id: Requirement ID to examing
622     :param collection_failures: Errors that occurred during test setup.
623     :param test_results: List of TestResult
624     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
625     """
626     errors = any(r_id in f["requirements"] for f in collection_failures)
627     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
628     return aggregate_results(errors, outcomes, r_id)
629
630
631 def aggregate_results(has_errors, outcomes, r_id=None):
632     """
633     Determines the aggregate result for the conditions provided.  Assumes the
634     results have been filtered and collected for analysis.
635
636     :param has_errors: True if collection failures occurred for the tests being
637                        analyzed.
638     :param outcomes: set of outcomes from the TestResults
639     :param r_id: Optional requirement ID if known
640     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
641              (see aggregate_requirement_adherence for more detail)
642     """
643     if has_errors:
644         return "ERROR"
645
646     if not outcomes:
647         return "PASS"
648     elif "FAIL" in outcomes:
649         return "FAIL"
650     elif "PASS" in outcomes:
651         return "PASS"
652     elif {"SKIP"} == outcomes:
653         return "SKIP"
654     else:
655         pytest.warns(
656             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
657                 outcomes, r_id
658             )
659         )
660         return "ERROR"
661
662
663 def aggregate_run_results(collection_failures, test_results):
664     """
665     Determines overall status of run based on all failures and results.
666
667     * 'ERROR' - At least one collection failure occurred during the run.
668     * 'FAIL' - Template failed at least one test
669     * 'PASS' - All tests executed properly and no failures were detected
670
671     :param collection_failures: failures occuring during test setup
672     :param test_results: list of all test executuion results
673     :return: one of 'ERROR', 'FAIL', or 'PASS'
674     """
675     if collection_failures:
676         return "ERROR"
677     elif any(r.is_failed for r in test_results):
678         return "FAIL"
679     else:
680         return "PASS"
681
682
683 def error(failure_or_result):
684     """
685     Extracts the error message from a collection failure or test result
686     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
687     :return: Error message as string
688     """
689     if isinstance(failure_or_result, TestResult):
690         return failure_or_result.error_message
691     else:
692         return failure_or_result["error"]
693
694
695 def req_ids(failure_or_result):
696     """
697     Extracts the requirement IDs from a collection failure or test result
698     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
699     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
700     """
701     if isinstance(failure_or_result, TestResult):
702         return set(failure_or_result.requirement_ids)
703     else:
704         return set(failure_or_result["requirements"])
705
706
707 def collect_errors(r_id, collection_failures, test_result):
708     """
709     Creates a list of error messages from the collection failures and
710     test results.  If r_id is provided, then it collects the error messages
711     where the failure or test is associated with that requirement ID.  If
712     r_id is None, then it collects all errors that occur on failures and
713     results that are not mapped to requirements
714     """
715
716     def selector(item):
717         if r_id:
718             return r_id in req_ids(item)
719         else:
720             return not req_ids(item)
721
722     errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
723     return [e for e in errors if e]
724
725
726 def relative_paths(base_dir, paths):
727     return [os.path.relpath(p, base_dir) for p in paths]
728
729
730 def generate_json(outpath, template_path, categories):
731     """
732     Creates a JSON summary of the entire test run.
733     """
734     reqs = load_current_requirements()
735     data = {
736         "version": "dublin",
737         "template_directory": os.path.splitdrive(template_path)[1].replace(
738             os.path.sep, "/"
739         ),
740         "timestamp": make_iso_timestamp(),
741         "checksum": hash_directory(template_path),
742         "categories": categories,
743         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
744         "tests": [],
745         "requirements": [],
746     }
747
748     results = data["tests"]
749     for result in COLLECTION_FAILURES:
750         results.append(
751             {
752                 "files": [],
753                 "test_module": result["module"],
754                 "test_case": result["test"],
755                 "result": "ERROR",
756                 "error": result["error"],
757                 "requirements": result["requirements"],
758             }
759         )
760     for result in ALL_RESULTS:
761         results.append(
762             {
763                 "files": relative_paths(template_path, result.files),
764                 "test_module": result.test_module,
765                 "test_case": result.test_case,
766                 "result": result.outcome,
767                 "error": result.error_message if result.is_failed else "",
768                 "requirements": result.requirements_metadata(reqs),
769             }
770         )
771
772     requirements = data["requirements"]
773     for r_id, r_data in reqs.items():
774         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
775         if result:
776             requirements.append(
777                 {
778                     "id": r_id,
779                     "text": r_data["description"],
780                     "keyword": r_data["keyword"],
781                     "result": result,
782                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
783                 }
784             )
785     # If there are tests that aren't mapped to a requirement, then we'll
786     # map them to a special entry so the results are coherent.
787     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
788     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
789     if unmapped_outcomes or has_errors:
790         requirements.append(
791             {
792                 "id": "Unmapped",
793                 "text": "Tests not mapped to requirements (see tests)",
794                 "result": aggregate_results(has_errors, unmapped_outcomes),
795                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
796             }
797         )
798
799     report_path = os.path.join(outpath, "report.json")
800     write_json(data, report_path)
801
802
803 def generate_html_report(outpath, categories, template_path, failures):
804     reqs = load_current_requirements()
805     resolutions = load_resolutions_file()
806     fail_data = []
807     for failure in failures:
808         fail_data.append(
809             {
810                 "file_links": make_href(failure.files),
811                 "test_id": failure.test_module,
812                 "error_message": failure.error_message,
813                 "raw_output": failure.raw_output,
814                 "requirements": docutils.core.publish_parts(
815                     writer_name="html", source=failure.requirement_text(reqs)
816                 )["body"],
817                 "resolution_steps": failure.resolution_steps(resolutions),
818             }
819         )
820     pkg_dir = os.path.split(__file__)[0]
821     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
822     with open(j2_template_path, "r") as f:
823         report_template = jinja2.Template(f.read())
824         contents = report_template.render(
825             version=version.VERSION,
826             num_failures=len(failures) + len(COLLECTION_FAILURES),
827             categories=categories,
828             template_dir=make_href(template_path),
829             checksum=hash_directory(template_path),
830             timestamp=make_timestamp(),
831             failures=fail_data,
832             collection_failures=COLLECTION_FAILURES,
833         )
834     with open(os.path.join(outpath, "report.html"), "w") as f:
835         f.write(contents)
836
837
838 def pytest_addoption(parser):
839     """
840     Add needed CLI arguments
841     """
842     parser.addoption(
843         "--template-directory",
844         dest="template_dir",
845         action="append",
846         help="Directory which holds the templates for validation",
847     )
848
849     parser.addoption(
850         "--template-source",
851         dest="template_source",
852         action="append",
853         help="Source Directory which holds the templates for validation",
854     )
855
856     parser.addoption(
857         "--self-test",
858         dest="self_test",
859         action="store_true",
860         help="Test the unit tests against their fixtured data",
861     )
862
863     parser.addoption(
864         "--report-format",
865         dest="report_format",
866         action="store",
867         help="Format of output report (html, csv, excel, json)",
868     )
869
870     parser.addoption(
871         "--continue-on-failure",
872         dest="continue_on_failure",
873         action="store_true",
874         help="Continue validation even when structural errors exist in input files",
875     )
876
877     parser.addoption(
878         "--output-directory",
879         dest="output_dir",
880         action="store",
881         default=None,
882         help="Alternate ",
883     )
884
885     parser.addoption(
886         "--category",
887         dest="test_categories",
888         action="append",
889         help="optional category of test to execute",
890     )
891
892
893 def pytest_configure(config):
894     """
895     Ensure that we are receive either `--self-test` or
896     `--template-dir=<directory` as CLI arguments
897     """
898     if config.getoption("template_dir") and config.getoption("self_test"):
899         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
900     if not (
901         config.getoption("template_dir")
902         or config.getoption("self_test")
903         or config.getoption("help")
904     ):
905         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
906
907
908 def pytest_generate_tests(metafunc):
909     """
910     If a unit test requires an argument named 'filename'
911     we generate a test for the filenames selected. Either
912     the files contained in `template_dir` or if `template_dir`
913     is not specified on the CLI, the fixtures associated with this
914     test name.
915     """
916
917     # noinspection PyBroadException
918     try:
919         if "filename" in metafunc.fixturenames:
920             from .parametrizers import parametrize_filename
921
922             parametrize_filename(metafunc)
923
924         if "filenames" in metafunc.fixturenames:
925             from .parametrizers import parametrize_filenames
926
927             parametrize_filenames(metafunc)
928
929         if "template_dir" in metafunc.fixturenames:
930             from .parametrizers import parametrize_template_dir
931
932             parametrize_template_dir(metafunc)
933
934         if "environment_pair" in metafunc.fixturenames:
935             from .parametrizers import parametrize_environment_pair
936
937             parametrize_environment_pair(metafunc)
938
939         if "heat_volume_pair" in metafunc.fixturenames:
940             from .parametrizers import parametrize_heat_volume_pair
941
942             parametrize_heat_volume_pair(metafunc)
943
944         if "yaml_files" in metafunc.fixturenames:
945             from .parametrizers import parametrize_yaml_files
946
947             parametrize_yaml_files(metafunc)
948
949         if "env_files" in metafunc.fixturenames:
950             from .parametrizers import parametrize_environment_files
951
952             parametrize_environment_files(metafunc)
953
954         if "yaml_file" in metafunc.fixturenames:
955             from .parametrizers import parametrize_yaml_file
956
957             parametrize_yaml_file(metafunc)
958
959         if "env_file" in metafunc.fixturenames:
960             from .parametrizers import parametrize_environment_file
961
962             parametrize_environment_file(metafunc)
963
964         if "parsed_yaml_file" in metafunc.fixturenames:
965             from .parametrizers import parametrize_parsed_yaml_file
966
967             parametrize_parsed_yaml_file(metafunc)
968
969         if "parsed_environment_file" in metafunc.fixturenames:
970             from .parametrizers import parametrize_parsed_environment_file
971
972             parametrize_parsed_environment_file(metafunc)
973
974         if "heat_template" in metafunc.fixturenames:
975             from .parametrizers import parametrize_heat_template
976
977             parametrize_heat_template(metafunc)
978
979         if "heat_templates" in metafunc.fixturenames:
980             from .parametrizers import parametrize_heat_templates
981
982             parametrize_heat_templates(metafunc)
983
984         if "volume_template" in metafunc.fixturenames:
985             from .parametrizers import parametrize_volume_template
986
987             parametrize_volume_template(metafunc)
988
989         if "volume_templates" in metafunc.fixturenames:
990             from .parametrizers import parametrize_volume_templates
991
992             parametrize_volume_templates(metafunc)
993
994         if "template" in metafunc.fixturenames:
995             from .parametrizers import parametrize_template
996
997             parametrize_template(metafunc)
998
999         if "templates" in metafunc.fixturenames:
1000             from .parametrizers import parametrize_templates
1001
1002             parametrize_templates(metafunc)
1003     except Exception as e:
1004         # If an error occurs in the collection phase, then it won't be logged as a
1005         # normal test failure.  This means that failures could occur, but not
1006         # be seen on the report resulting in a false positive success message.  These
1007         # errors will be stored and reported separately on the report
1008         COLLECTION_FAILURES.append(
1009             {
1010                 "module": metafunc.module.__name__,
1011                 "test": metafunc.function.__name__,
1012                 "fixtures": metafunc.fixturenames,
1013                 "error": traceback.format_exc(),
1014                 "requirements": getattr(metafunc.function, "requirement_ids", []),
1015             }
1016         )
1017         raise e
1018
1019
1020 def hash_directory(path):
1021     """
1022     Create md5 hash using the contents of all files under ``path``
1023     :param path: string directory containing files
1024     :return: string MD5 hash code (hex)
1025     """
1026     md5 = hashlib.md5()
1027     for dir_path, sub_dirs, filenames in os.walk(path):
1028         for filename in filenames:
1029             file_path = os.path.join(dir_path, filename)
1030             with open(file_path, "rb") as f:
1031                 md5.update(f.read())
1032     return md5.hexdigest()
1033
1034
1035 def load_current_requirements():
1036     """Loads dict of current requirements or empty dict if file doesn't exist"""
1037     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1038         data = json.load(f)
1039         version = data["current_version"]
1040         return data["versions"][version]["needs"]
1041
1042
1043 def select_heat_requirements(reqs):
1044     """Filters dict requirements to only those requirements pertaining to Heat"""
1045     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1046
1047
1048 def is_testable(reqs):
1049     """Filters dict requirements to only those which are testable"""
1050     for key, values in reqs.items():
1051         if (("MUST" in values.get("keyword", "").upper()) and (
1052             "none" not in values.get("validation_mode", "").lower()
1053         )):
1054             reqs[key]["testable"] = True
1055         else:
1056             reqs[key]["testable"] = False
1057     return reqs
1058
1059
1060 def build_rst_json(reqs):
1061     """Takes requirements and returns list of only Heat requirements"""
1062     for key, values in list(reqs.items()):
1063         if values["testable"]:
1064             # Creates links in RST format to requirements and test cases
1065             if values["test_case"]:
1066                 mod = values["test_case"].split(".")[-1]
1067                 val = TEST_SCRIPT_SITE + mod + ".py"
1068                 rst_value = "`" + mod + " <" + val + ">`_"
1069                 title = (
1070                     "`"
1071                     + values["id"]
1072                     + " <"
1073                     + VNFRQTS_ID_URL
1074                     + values["docname"].replace(" ", "%20")
1075                     + ".html#"
1076                     + values["id"]
1077                     + ">`_"
1078                 )
1079                 reqs[key].update({"full_title": title, "test_case": rst_value})
1080             else:
1081                 title = (
1082                     "`"
1083                     + values["id"]
1084                     + " <"
1085                     + VNFRQTS_ID_URL
1086                     + values["docname"].replace(" ", "%20")
1087                     + ".html#"
1088                     + values["id"]
1089                     + ">`_"
1090                 )
1091                 reqs[key].update(
1092                     {
1093                         "full_title": title,
1094                         "test_case": "No test for requirement",
1095                         "validated_by": "static",
1096                     }
1097                 )
1098         else:
1099             del reqs[key]
1100     return reqs
1101
1102
1103 def generate_rst_table(output_dir, data):
1104     """Generate a formatted csv to be used in RST"""
1105     rst_path = os.path.join(output_dir, "rst.csv")
1106     with open(rst_path, "w", newline="") as f:
1107         out = csv.writer(f)
1108         out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1109         for req_id, metadata in data.items():
1110             out.writerow(
1111                 (
1112                     metadata["full_title"],
1113                     metadata["description"],
1114                     metadata["test_case"],
1115                     metadata["validated_by"],
1116                 )
1117             )
1118
1119
1120 # noinspection PyUnusedLocal
1121 def pytest_report_collectionfinish(config, startdir, items):
1122     """Generates a simple traceability report to output/traceability.csv"""
1123     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1124     output_dir = os.path.split(traceability_path)[0]
1125     if not os.path.exists(output_dir):
1126         os.makedirs(output_dir)
1127     reqs = load_current_requirements()
1128     requirements = select_heat_requirements(reqs)
1129     testable_requirements = is_testable(requirements)
1130     unmapped, mapped = partition(
1131         lambda i: hasattr(i.function, "requirement_ids"), items
1132     )
1133
1134     req_to_test = defaultdict(set)
1135     mapping_errors = set()
1136     for item in mapped:
1137         for req_id in item.function.requirement_ids:
1138             if req_id not in req_to_test:
1139                 req_to_test[req_id].add(item)
1140                 if req_id in requirements:
1141                     reqs[req_id].update(
1142                         {
1143                             "test_case": item.function.__module__,
1144                             "validated_by": item.function.__name__,
1145                         }
1146                     )
1147             if req_id not in requirements:
1148                 mapping_errors.add(
1149                     (req_id, item.function.__module__, item.function.__name__)
1150                 )
1151
1152     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1153     with open(mapping_error_path, "w", newline="") as f:
1154         writer = csv.writer(f)
1155         for err in mapping_errors:
1156             writer.writerow(err)
1157
1158     with open(traceability_path, "w", newline="") as f:
1159         out = csv.writer(f)
1160         out.writerow(
1161             (
1162                 "Requirement ID",
1163                 "Requirement",
1164                 "Section",
1165                 "Keyword",
1166                 "Validation Mode",
1167                 "Is Testable",
1168                 "Test Module",
1169                 "Test Name",
1170             )
1171         )
1172         for req_id, metadata in testable_requirements.items():
1173             if req_to_test[req_id]:
1174                 for item in req_to_test[req_id]:
1175                     out.writerow(
1176                         (
1177                             req_id,
1178                             metadata["description"],
1179                             metadata["section_name"],
1180                             metadata["keyword"],
1181                             metadata["validation_mode"],
1182                             metadata["testable"],
1183                             item.function.__module__,
1184                             item.function.__name__,
1185                         )
1186                     )
1187             else:
1188                 out.writerow(
1189                     (
1190                         req_id,
1191                         metadata["description"],
1192                         metadata["section_name"],
1193                         metadata["keyword"],
1194                         metadata["validation_mode"],
1195                         metadata["testable"],
1196                         "",  # test module
1197                         "",
1198                     )  # test function
1199                 )
1200         # now write out any test methods that weren't mapped to requirements
1201         unmapped_tests = {
1202             (item.function.__module__, item.function.__name__) for item in unmapped
1203         }
1204         for test_module, test_name in unmapped_tests:
1205             out.writerow(
1206                 (
1207                     "",  # req ID
1208                     "",  # description
1209                     "",  # section name
1210                     "",  # keyword
1211                     "static",  # validation mode
1212                     "TRUE",  # testable
1213                     test_module,
1214                     test_name,
1215                 )
1216             )
1217
1218     generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))