[VVP] Allow any_of and all_of in categories decorator
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46
47 from preload.model import create_preloads
48 from config import get_generator_plugin_names
49 from tests.helpers import get_output_dir
50
51 try:
52     from html import escape
53 except ImportError:
54     from cgi import escape
55 from collections import defaultdict
56
57 import traceback
58
59 import docutils.core
60 import jinja2
61 import pytest
62 from more_itertools import partition
63 import xlsxwriter
64 from six import string_types
65
66 # noinspection PyUnresolvedReferences
67 import version
68 import logging
69
70 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
71
72 __path__ = [os.path.dirname(os.path.abspath(__file__))]
73
74 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
75
76 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
77 TEST_SCRIPT_SITE = (
78     "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
79 )
80 VNFRQTS_ID_URL = (
81     "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
82 )
83
84 REPORT_COLUMNS = [
85     ("Error #", "err_num"),
86     ("Input File", "file"),
87     ("Requirements", "req_description"),
88     ("Error Message", "message"),
89     ("Test", "test_file"),
90 ]
91
92 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
93 while preparing to validate the the input files. Some validations may not have been
94 executed. Please refer these issue to the VNF Validation Tool team.
95 """
96
97 COLLECTION_FAILURES = []
98
99 # Captures the results of every test run
100 ALL_RESULTS = []
101
102
103 def extract_error_msg(rep):
104     """
105     If a custom error message was provided, then extract it otherwise
106     just show the pytest assert message
107     """
108     if rep.outcome != "failed":
109         return ""
110     try:
111         full_msg = str(rep.longrepr.reprcrash.message)
112         match = re.match(
113             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
114         )
115         if match:  # custom message was provided
116             # Extract everything between AssertionError and the start
117             # of the assert statement expansion in the pytest report
118             msg = match.group(1)
119         elif "AssertionError:" in full_msg:
120             msg = full_msg.split("AssertionError:")[1]
121         else:
122             msg = full_msg
123     except AttributeError:
124         msg = str(rep)
125
126     return msg
127
128
129 class TestResult:
130     """
131     Wraps the test case and result to extract necessary metadata for
132     reporting purposes.
133     """
134
135     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
136
137     def __init__(self, item, outcome):
138         self.item = item
139         self.result = outcome.get_result()
140         self.files = self._get_files()
141         self.error_message = self._get_error_message()
142
143     @property
144     def requirement_ids(self):
145         """
146         Returns list of requirement IDs mapped to the test case.
147
148         :return: Returns a list of string requirement IDs the test was
149                  annotated with ``validates`` otherwise returns and empty list
150         """
151         is_mapped = hasattr(self.item.function, "requirement_ids")
152         return self.item.function.requirement_ids if is_mapped else []
153
154     @property
155     def markers(self):
156         """
157         :return: Returns a set of pytest marker names for the test or an empty set
158         """
159         return set(m.name for m in self.item.iter_markers())
160
161     @property
162     def is_base_test(self):
163         """
164         :return: Returns True if the test is annotated with a pytest marker called base
165         """
166         return "base" in self.markers
167
168     @property
169     def is_failed(self):
170         """
171         :return: True if the test failed
172         """
173         return self.outcome == "FAIL"
174
175     @property
176     def outcome(self):
177         """
178         :return: Returns 'PASS', 'FAIL', or 'SKIP'
179         """
180         return self.RESULT_MAPPING[self.result.outcome]
181
182     @property
183     def test_case(self):
184         """
185         :return: Name of the test case method
186         """
187         return self.item.function.__name__
188
189     @property
190     def test_module(self):
191         """
192         :return: Name of the file containing the test case
193         """
194         return self.item.function.__module__.split(".")[-1]
195
196     @property
197     def test_id(self):
198         """
199         :return: ID of the test (test_module + test_case)
200         """
201         return "{}::{}".format(self.test_module, self.test_case)
202
203     @property
204     def raw_output(self):
205         """
206         :return: Full output from pytest for the given test case
207         """
208         return str(self.result.longrepr)
209
210     def requirement_text(self, curr_reqs):
211         """
212         Creates a text summary for the requirement IDs mapped to the test case.
213         If no requirements are mapped, then it returns the empty string.
214
215         :param curr_reqs: mapping of requirement IDs to requirement metadata
216                           loaded from the VNFRQTS projects needs.json output
217         :return: ID and text of the requirements mapped to the test case
218         """
219         text = (
220             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
221             for r_id in self.requirement_ids
222             if r_id in curr_reqs
223         )
224         return "".join(text)
225
226     def requirements_metadata(self, curr_reqs):
227         """
228         Returns a list of dicts containing the following metadata for each
229         requirement mapped:
230
231         - id: Requirement ID
232         - text: Full text of the requirement
233         - keyword: MUST, MUST NOT, MAY, etc.
234
235         :param curr_reqs: mapping of requirement IDs to requirement metadata
236                           loaded from the VNFRQTS projects needs.json output
237         :return: List of requirement metadata
238         """
239         data = []
240         for r_id in self.requirement_ids:
241             if r_id not in curr_reqs:
242                 continue
243             data.append(
244                 {
245                     "id": r_id,
246                     "text": curr_reqs[r_id]["description"],
247                     "keyword": curr_reqs[r_id]["keyword"],
248                 }
249             )
250         return data
251
252     def _get_files(self):
253         """
254         Extracts the list of files passed into the test case.
255         :return: List of absolute paths to files
256         """
257         if "environment_pair" in self.item.fixturenames:
258             return [
259                 "{} environment pair".format(
260                     self.item.funcargs["environment_pair"]["name"]
261                 )
262             ]
263         elif "heat_volume_pair" in self.item.fixturenames:
264             return [
265                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
266             ]
267         elif "heat_templates" in self.item.fixturenames:
268             return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
269         elif "yaml_files" in self.item.fixturenames:
270             return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
271         else:
272             parts = self.result.nodeid.split("[")
273             return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
274
275     def _get_error_message(self):
276         """
277         :return: Error message or empty string if the test did not fail or error
278         """
279         if self.is_failed:
280             return extract_error_msg(self.result)
281         else:
282             return ""
283
284
285 # noinspection PyUnusedLocal
286 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
287 def pytest_runtest_makereport(item, call):
288     """
289     Captures the test results for later reporting.  This will also halt testing
290     if a base failure is encountered (can be overridden with continue-on-failure)
291     """
292     outcome = yield
293     if outcome.get_result().when != "call":
294         return  # only capture results of test cases themselves
295     result = TestResult(item, outcome)
296     if (
297         not item.config.option.continue_on_failure
298         and result.is_base_test
299         and result.is_failed
300     ):
301         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
302             result.error_message
303         )
304         result.error_message = msg
305         ALL_RESULTS.append(result)
306         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
307
308     ALL_RESULTS.append(result)
309
310
311 def make_timestamp():
312     """
313     :return: String make_iso_timestamp in format:
314              2019-01-19 10:18:49.865000 Central Standard Time
315     """
316     timezone = time.tzname[time.localtime().tm_isdst]
317     return "{} {}".format(str(datetime.datetime.now()), timezone)
318
319
320 # noinspection PyUnusedLocal
321 def pytest_sessionstart(session):
322     ALL_RESULTS.clear()
323     COLLECTION_FAILURES.clear()
324
325
326 # noinspection PyUnusedLocal
327 def pytest_sessionfinish(session, exitstatus):
328     """
329     If not a self-test run, generate the output reports
330     """
331     if not session.config.option.template_dir:
332         return
333
334     if session.config.option.template_source:
335         template_source = session.config.option.template_source[0]
336     else:
337         template_source = os.path.abspath(session.config.option.template_dir[0])
338
339     categories_selected = session.config.option.test_categories or ""
340     generate_report(
341         get_output_dir(session.config),
342         template_source,
343         categories_selected,
344         session.config.option.report_format,
345     )
346
347
348 def pytest_terminal_summary(terminalreporter, exitstatus):
349     # Ensures all preload information and warnings appear after
350     # test results
351     create_preloads(terminalreporter.config, exitstatus)
352
353
354 # noinspection PyUnusedLocal
355 def pytest_collection_modifyitems(session, config, items):
356     """
357     Selects tests based on the categories requested.  Tests without
358     categories will always be executed.
359     """
360     config.traceability_items = list(items)  # save all items for traceability
361     if not config.option.self_test:
362         for item in items:
363             passed_categories = set(config.option.test_categories or [])
364             all_of_categories = getattr(item.function, "all_categories", set())
365             any_of_categories = getattr(item.function, "any_categories", set())
366             if all_of_categories and not all_of_categories.issubset(passed_categories):
367                 item.add_marker(
368                     pytest.mark.skip(
369                         reason=(
370                             "Test categories do not match " "all the passed categories"
371                         )
372                     )
373                 )
374             if any_of_categories and not passed_categories.intersection(
375                 any_of_categories
376             ):
377                 item.add_marker(
378                     pytest.mark.skip(
379                         reason=(
380                             "Test categories do not match " "any the passed categories"
381                         )
382                     )
383                 )
384
385     items.sort(
386         key=lambda x: (0, x.name)
387         if "base" in set(m.name for m in x.iter_markers())
388         else (1, x.name)
389     )
390
391
392 def make_href(paths, base_dir=None):
393     """
394     Create an anchor tag to link to the file paths provided.
395     :param paths: string or list of file paths
396     :param base_dir: If specified this is pre-pended to each path
397     :return: String of hrefs - one for each path, each seperated by a line
398              break (<br/).
399     """
400     paths = [paths] if isinstance(paths, string_types) else paths
401     if base_dir:
402         paths = [os.path.join(base_dir, p) for p in paths]
403     links = []
404     for p in paths:
405         abs_path = os.path.abspath(p)
406         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
407         links.append(
408             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
409                 abs_path=abs_path, name=name
410             )
411         )
412     return "<br/>".join(links)
413
414
415 def generate_report(outpath, template_path, categories, output_format="html"):
416     """
417     Generates the various output reports.
418
419     :param outpath: destination directory for all reports
420     :param template_path: directory containing the Heat templates validated
421     :param categories: Optional categories selected
422     :param output_format: One of "html", "excel", or "csv". Default is "html"
423     :raises: ValueError if requested output format is unknown
424     """
425     failures = [r for r in ALL_RESULTS if r.is_failed]
426     generate_failure_file(outpath)
427     output_format = output_format.lower().strip() if output_format else "html"
428     generate_json(outpath, template_path, categories)
429     if output_format == "html":
430         generate_html_report(outpath, categories, template_path, failures)
431     elif output_format == "excel":
432         generate_excel_report(outpath, categories, template_path, failures)
433     elif output_format == "json":
434         return
435     elif output_format == "csv":
436         generate_csv_report(outpath, categories, template_path, failures)
437     else:
438         raise ValueError("Unsupported output format: " + output_format)
439
440
441 def write_json(data, path):
442     """
443     Pretty print data as JSON to the output path requested
444
445     :param data: Data structure to be converted to JSON
446     :param path: Where to write output
447     """
448     with open(path, "w") as f:
449         json.dump(data, f, indent=2)
450
451
452 def generate_failure_file(outpath):
453     """
454     Writes a summary of test failures to a file named failures.
455     This is for backwards compatibility only.  The report.json offers a
456     more comprehensive output.
457     """
458     failure_path = os.path.join(outpath, "failures")
459     failures = [r for r in ALL_RESULTS if r.is_failed]
460     data = {}
461     for i, fail in enumerate(failures):
462         data[str(i)] = {
463             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
464             "vnfrqts": fail.requirement_ids,
465             "test": fail.test_case,
466             "test_file": fail.test_module,
467             "raw_output": fail.raw_output,
468             "message": fail.error_message,
469         }
470     write_json(data, failure_path)
471
472
473 def generate_csv_report(output_dir, categories, template_path, failures):
474     rows = [["Validation Failures"]]
475     headers = [
476         ("Categories Selected:", categories),
477         ("Tool Version:", version.VERSION),
478         ("Report Generated At:", make_timestamp()),
479         ("Directory Validated:", template_path),
480         ("Checksum:", hash_directory(template_path)),
481         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
482     ]
483     rows.append([])
484     for header in headers:
485         rows.append(header)
486     rows.append([])
487
488     if COLLECTION_FAILURES:
489         rows.append([COLLECTION_FAILURE_WARNING])
490         rows.append(["Validation File", "Test", "Fixtures", "Error"])
491         for failure in COLLECTION_FAILURES:
492             rows.append(
493                 [
494                     failure["module"],
495                     failure["test"],
496                     ";".join(failure["fixtures"]),
497                     failure["error"],
498                 ]
499             )
500         rows.append([])
501
502     # table header
503     rows.append([col for col, _ in REPORT_COLUMNS])
504
505     reqs = load_current_requirements()
506
507     # table content
508     for i, failure in enumerate(failures, start=1):
509         rows.append(
510             [
511                 i,
512                 "\n".join(failure.files),
513                 failure.requirement_text(reqs),
514                 failure.error_message,
515                 failure.test_id,
516             ]
517         )
518
519     output_path = os.path.join(output_dir, "report.csv")
520     with open(output_path, "w", newline="") as f:
521         writer = csv.writer(f)
522         for row in rows:
523             writer.writerow(row)
524
525
526 def generate_excel_report(output_dir, categories, template_path, failures):
527     output_path = os.path.join(output_dir, "report.xlsx")
528     workbook = xlsxwriter.Workbook(output_path)
529     bold = workbook.add_format({"bold": True, "align": "top"})
530     code = workbook.add_format(
531         {"font_name": "Courier", "text_wrap": True, "align": "top"}
532     )
533     normal = workbook.add_format({"text_wrap": True, "align": "top"})
534     heading = workbook.add_format({"bold": True, "font_size": 18})
535     worksheet = workbook.add_worksheet("failures")
536     worksheet.write(0, 0, "Validation Failures", heading)
537
538     headers = [
539         ("Categories Selected:", ",".join(categories)),
540         ("Tool Version:", version.VERSION),
541         ("Report Generated At:", make_timestamp()),
542         ("Directory Validated:", template_path),
543         ("Checksum:", hash_directory(template_path)),
544         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
545     ]
546     for row, (header, value) in enumerate(headers, start=2):
547         worksheet.write(row, 0, header, bold)
548         worksheet.write(row, 1, value)
549
550     worksheet.set_column(0, len(headers) - 1, 40)
551     worksheet.set_column(len(headers), len(headers), 80)
552
553     if COLLECTION_FAILURES:
554         collection_failures_start = 2 + len(headers) + 2
555         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
556         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
557         for col_num, col_name in enumerate(collection_failure_headers):
558             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
559         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
560             worksheet.write(row, 0, data["module"])
561             worksheet.write(row, 1, data["test"])
562             worksheet.write(row, 2, ",".join(data["fixtures"]))
563             worksheet.write(row, 3, data["error"], code)
564
565     # table header
566     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
567     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
568     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
569         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
570
571     reqs = load_current_requirements()
572
573     # table content
574     for col, width in enumerate((20, 30, 60, 60, 40)):
575         worksheet.set_column(col, col, width)
576     err_num = 1
577     for row, failure in enumerate(failures, start=start_error_table_row + 2):
578         worksheet.write(row, 0, str(err_num), normal)
579         worksheet.write(row, 1, "\n".join(failure.files), normal)
580         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
581         worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
582         worksheet.write(row, 4, failure.test_id, normal)
583         err_num += 1
584     worksheet.autofilter(
585         start_error_table_row + 1,
586         0,
587         start_error_table_row + 1 + err_num,
588         len(REPORT_COLUMNS) - 1,
589     )
590     workbook.close()
591
592
593 def make_iso_timestamp():
594     """
595     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
596     """
597     now = datetime.datetime.utcnow()
598     now.replace(tzinfo=datetime.timezone.utc)
599     return now.isoformat()
600
601
602 def aggregate_results(outcomes, r_id=None):
603     """
604     Determines the aggregate result for the conditions provided.  Assumes the
605     results have been filtered and collected for analysis.
606
607     :param outcomes: set of outcomes from the TestResults
608     :param r_id: Optional requirement ID if known
609     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
610              (see aggregate_requirement_adherence for more detail)
611     """
612     if not outcomes:
613         return "PASS"
614     elif "ERROR" in outcomes:
615         return "ERROR"
616     elif "FAIL" in outcomes:
617         return "FAIL"
618     elif "PASS" in outcomes:
619         return "PASS"
620     elif {"SKIP"} == outcomes:
621         return "SKIP"
622     else:
623         pytest.warns(
624             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
625                 outcomes, r_id
626             )
627         )
628         return "ERROR"
629
630
631 def aggregate_run_results(collection_failures, test_results):
632     """
633     Determines overall status of run based on all failures and results.
634
635     * 'ERROR' - At least one collection failure occurred during the run.
636     * 'FAIL' - Template failed at least one test
637     * 'PASS' - All tests executed properly and no failures were detected
638
639     :param collection_failures: failures occuring during test setup
640     :param test_results: list of all test executuion results
641     :return: one of 'ERROR', 'FAIL', or 'PASS'
642     """
643     if collection_failures:
644         return "ERROR"
645     elif any(r.is_failed for r in test_results):
646         return "FAIL"
647     else:
648         return "PASS"
649
650
651 def relative_paths(base_dir, paths):
652     return [os.path.relpath(p, base_dir) for p in paths if p != ""]
653
654
655 # noinspection PyTypeChecker
656 def generate_json(outpath, template_path, categories):
657     """
658     Creates a JSON summary of the entire test run.
659     """
660     reqs = load_current_requirements()
661     data = {
662         "version": "dublin",
663         "template_directory": os.path.splitdrive(template_path)[1].replace(
664             os.path.sep, "/"
665         ),
666         "timestamp": make_iso_timestamp(),
667         "checksum": hash_directory(template_path),
668         "categories": categories,
669         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
670         "tests": [],
671         "requirements": [],
672     }
673
674     results = data["tests"]
675     for result in COLLECTION_FAILURES:
676         results.append(
677             {
678                 "files": [],
679                 "test_module": result["module"],
680                 "test_case": result["test"],
681                 "result": "ERROR",
682                 "error": result["error"],
683                 "requirements": result["requirements"],
684             }
685         )
686     for result in ALL_RESULTS:
687         results.append(
688             {
689                 "files": relative_paths(template_path, result.files),
690                 "test_module": result.test_module,
691                 "test_case": result.test_case,
692                 "result": result.outcome,
693                 "error": result.error_message if result.is_failed else "",
694                 "requirements": result.requirements_metadata(reqs),
695             }
696         )
697
698     # Build a mapping of requirement ID to the results
699     r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
700     for test_result in results:
701         test_reqs = test_result["requirements"]
702         r_ids = (
703             [r["id"] if isinstance(r, dict) else r for r in test_reqs]
704             if test_reqs
705             else ("",)
706         )
707         for r_id in r_ids:
708             item = r_id_results[r_id]
709             item["outcomes"].add(test_result["result"])
710             if test_result["error"]:
711                 item["errors"].add(test_result["error"])
712
713     requirements = data["requirements"]
714     for r_id, r_data in reqs.items():
715         requirements.append(
716             {
717                 "id": r_id,
718                 "text": r_data["description"],
719                 "keyword": r_data["keyword"],
720                 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
721                 "errors": list(r_id_results[r_id]["errors"]),
722             }
723         )
724
725     if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
726         requirements.append(
727             {
728                 "id": "Unmapped",
729                 "text": "Tests not mapped to requirements (see tests)",
730                 "result": aggregate_results(r_id_results[""]["outcomes"]),
731                 "errors": list(r_id_results[""]["errors"]),
732             }
733         )
734
735     report_path = os.path.join(outpath, "report.json")
736     write_json(data, report_path)
737
738
739 def generate_html_report(outpath, categories, template_path, failures):
740     reqs = load_current_requirements()
741     fail_data = []
742     for failure in failures:
743         fail_data.append(
744             {
745                 "file_links": make_href(failure.files, template_path),
746                 "test_id": failure.test_id,
747                 "error_message": escape(failure.error_message).replace(
748                     "\n", "<br/><br/>"
749                 ),
750                 "raw_output": escape(failure.raw_output),
751                 "requirements": docutils.core.publish_parts(
752                     writer_name="html", source=failure.requirement_text(reqs)
753                 )["body"],
754             }
755         )
756     pkg_dir = os.path.split(__file__)[0]
757     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
758     with open(j2_template_path, "r") as f:
759         report_template = jinja2.Template(f.read())
760         contents = report_template.render(
761             version=version.VERSION,
762             num_failures=len(failures) + len(COLLECTION_FAILURES),
763             categories=categories,
764             template_dir=make_href(template_path),
765             checksum=hash_directory(template_path),
766             timestamp=make_timestamp(),
767             failures=fail_data,
768             collection_failures=COLLECTION_FAILURES,
769         )
770     with open(os.path.join(outpath, "report.html"), "w") as f:
771         f.write(contents)
772
773
774 def pytest_addoption(parser):
775     """
776     Add needed CLI arguments
777     """
778     parser.addoption(
779         "--template-directory",
780         dest="template_dir",
781         action="append",
782         help="Directory which holds the templates for validation",
783     )
784
785     parser.addoption(
786         "--template-source",
787         dest="template_source",
788         action="append",
789         help="Source Directory which holds the templates for validation",
790     )
791
792     parser.addoption(
793         "--self-test",
794         dest="self_test",
795         action="store_true",
796         help="Test the unit tests against their fixtured data",
797     )
798
799     parser.addoption(
800         "--report-format",
801         dest="report_format",
802         action="store",
803         help="Format of output report (html, csv, excel, json)",
804     )
805
806     parser.addoption(
807         "--continue-on-failure",
808         dest="continue_on_failure",
809         action="store_true",
810         help="Continue validation even when structural errors exist in input files",
811     )
812
813     parser.addoption(
814         "--output-directory",
815         dest="output_dir",
816         action="store",
817         default=None,
818         help="Alternate ",
819     )
820
821     parser.addoption(
822         "--category",
823         dest="test_categories",
824         action="append",
825         help="optional category of test to execute",
826     )
827
828     parser.addoption(
829         "--env-directory",
830         dest="env_dir",
831         action="store",
832         help="optional directory of .env files for preload generation",
833     )
834
835     parser.addoption(
836         "--preload-format",
837         dest="preload_formats",
838         action="append",
839         help=(
840             "Preload format to create (multiple allowed). If not provided "
841             "then all available formats will be created: {}"
842         ).format(", ".join(get_generator_plugin_names())),
843     )
844
845
846 def pytest_configure(config):
847     """
848     Ensure that we are receive either `--self-test` or
849     `--template-dir=<directory` as CLI arguments
850     """
851     if config.getoption("template_dir") and config.getoption("self_test"):
852         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
853     if not (
854         config.getoption("template_dir")
855         or config.getoption("self_test")
856         or config.getoption("help")
857     ):
858         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
859
860
861 def pytest_generate_tests(metafunc):
862     """
863     If a unit test requires an argument named 'filename'
864     we generate a test for the filenames selected. Either
865     the files contained in `template_dir` or if `template_dir`
866     is not specified on the CLI, the fixtures associated with this
867     test name.
868     """
869
870     # noinspection PyBroadException
871     try:
872         if "filename" in metafunc.fixturenames:
873             from .parametrizers import parametrize_filename
874
875             parametrize_filename(metafunc)
876
877         if "filenames" in metafunc.fixturenames:
878             from .parametrizers import parametrize_filenames
879
880             parametrize_filenames(metafunc)
881
882         if "template_dir" in metafunc.fixturenames:
883             from .parametrizers import parametrize_template_dir
884
885             parametrize_template_dir(metafunc)
886
887         if "environment_pair" in metafunc.fixturenames:
888             from .parametrizers import parametrize_environment_pair
889
890             parametrize_environment_pair(metafunc)
891
892         if "heat_volume_pair" in metafunc.fixturenames:
893             from .parametrizers import parametrize_heat_volume_pair
894
895             parametrize_heat_volume_pair(metafunc)
896
897         if "yaml_files" in metafunc.fixturenames:
898             from .parametrizers import parametrize_yaml_files
899
900             parametrize_yaml_files(metafunc)
901
902         if "env_files" in metafunc.fixturenames:
903             from .parametrizers import parametrize_environment_files
904
905             parametrize_environment_files(metafunc)
906
907         if "yaml_file" in metafunc.fixturenames:
908             from .parametrizers import parametrize_yaml_file
909
910             parametrize_yaml_file(metafunc)
911
912         if "env_file" in metafunc.fixturenames:
913             from .parametrizers import parametrize_environment_file
914
915             parametrize_environment_file(metafunc)
916
917         if "parsed_yaml_file" in metafunc.fixturenames:
918             from .parametrizers import parametrize_parsed_yaml_file
919
920             parametrize_parsed_yaml_file(metafunc)
921
922         if "parsed_environment_file" in metafunc.fixturenames:
923             from .parametrizers import parametrize_parsed_environment_file
924
925             parametrize_parsed_environment_file(metafunc)
926
927         if "heat_template" in metafunc.fixturenames:
928             from .parametrizers import parametrize_heat_template
929
930             parametrize_heat_template(metafunc)
931
932         if "heat_templates" in metafunc.fixturenames:
933             from .parametrizers import parametrize_heat_templates
934
935             parametrize_heat_templates(metafunc)
936
937         if "volume_template" in metafunc.fixturenames:
938             from .parametrizers import parametrize_volume_template
939
940             parametrize_volume_template(metafunc)
941
942         if "volume_templates" in metafunc.fixturenames:
943             from .parametrizers import parametrize_volume_templates
944
945             parametrize_volume_templates(metafunc)
946
947         if "template" in metafunc.fixturenames:
948             from .parametrizers import parametrize_template
949
950             parametrize_template(metafunc)
951
952         if "templates" in metafunc.fixturenames:
953             from .parametrizers import parametrize_templates
954
955             parametrize_templates(metafunc)
956     except Exception as e:
957         # If an error occurs in the collection phase, then it won't be logged as a
958         # normal test failure.  This means that failures could occur, but not
959         # be seen on the report resulting in a false positive success message.  These
960         # errors will be stored and reported separately on the report
961         COLLECTION_FAILURES.append(
962             {
963                 "module": metafunc.module.__name__,
964                 "test": metafunc.function.__name__,
965                 "fixtures": metafunc.fixturenames,
966                 "error": traceback.format_exc(),
967                 "requirements": getattr(metafunc.function, "requirement_ids", []),
968             }
969         )
970         raise e
971
972
973 def hash_directory(path):
974     """
975     Create md5 hash using the contents of all files under ``path``
976     :param path: string directory containing files
977     :return: string MD5 hash code (hex)
978     """
979     md5 = hashlib.md5()  # nosec
980     for dir_path, sub_dirs, filenames in os.walk(path):
981         for filename in filenames:
982             file_path = os.path.join(dir_path, filename)
983             with open(file_path, "rb") as f:
984                 md5.update(f.read())
985     return md5.hexdigest()
986
987
988 def load_current_requirements():
989     """Loads dict of current requirements or empty dict if file doesn't exist"""
990     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
991         data = json.load(f)
992         version = data["current_version"]
993         return data["versions"][version]["needs"]
994
995
996 def select_heat_requirements(reqs):
997     """Filters dict requirements to only those requirements pertaining to Heat"""
998     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
999
1000
1001 def is_testable(reqs):
1002     """Filters dict requirements to only those which are testable"""
1003     for key, values in reqs.items():
1004         if ("MUST" in values.get("keyword", "").upper()) and (
1005             "none" not in values.get("validation_mode", "").lower()
1006         ):
1007             reqs[key]["testable"] = True
1008         else:
1009             reqs[key]["testable"] = False
1010     return reqs
1011
1012
1013 def build_rst_json(reqs):
1014     """Takes requirements and returns list of only Heat requirements"""
1015     for key, values in list(reqs.items()):
1016         if values["testable"]:
1017             # Creates links in RST format to requirements and test cases
1018             if values["test_case"]:
1019                 mod = values["test_case"].split(".")[-1]
1020                 val = TEST_SCRIPT_SITE + mod + ".py"
1021                 rst_value = "`" + mod + " <" + val + ">`_"
1022                 title = (
1023                     "`"
1024                     + values["id"]
1025                     + " <"
1026                     + VNFRQTS_ID_URL
1027                     + values["docname"].replace(" ", "%20")
1028                     + ".html#"
1029                     + values["id"]
1030                     + ">`_"
1031                 )
1032                 reqs[key].update({"full_title": title, "test_case": rst_value})
1033             else:
1034                 title = (
1035                     "`"
1036                     + values["id"]
1037                     + " <"
1038                     + VNFRQTS_ID_URL
1039                     + values["docname"].replace(" ", "%20")
1040                     + ".html#"
1041                     + values["id"]
1042                     + ">`_"
1043                 )
1044                 reqs[key].update(
1045                     {
1046                         "full_title": title,
1047                         "test_case": "No test for requirement",
1048                         "validated_by": "static",
1049                     }
1050                 )
1051         else:
1052             del reqs[key]
1053     return reqs
1054
1055
1056 def generate_rst_table(output_dir, data):
1057     """Generate a formatted csv to be used in RST"""
1058     rst_path = os.path.join(output_dir, "rst.csv")
1059     with open(rst_path, "w", newline="") as f:
1060         out = csv.writer(f)
1061         out.writerow(("Requirement ID", "Test Module", "Test Name"))
1062         for req_id, metadata in data.items():
1063             out.writerow(
1064                 (
1065                     metadata["full_title"],
1066                     metadata["test_case"],
1067                     metadata["validated_by"],
1068                 )
1069             )
1070
1071
1072 # noinspection PyUnusedLocal
1073 def pytest_report_collectionfinish(config, startdir, items):
1074     """Generates a simple traceability report to output/traceability.csv"""
1075     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1076     output_dir = os.path.split(traceability_path)[0]
1077     if not os.path.exists(output_dir):
1078         os.makedirs(output_dir)
1079     reqs = load_current_requirements()
1080     requirements = select_heat_requirements(reqs)
1081     testable_requirements = is_testable(requirements)
1082     unmapped, mapped = partition(
1083         lambda i: hasattr(i.function, "requirement_ids"), items
1084     )
1085
1086     req_to_test = defaultdict(set)
1087     mapping_errors = set()
1088     for item in mapped:
1089         for req_id in item.function.requirement_ids:
1090             if req_id not in req_to_test:
1091                 req_to_test[req_id].add(item)
1092                 if req_id in requirements:
1093                     reqs[req_id].update(
1094                         {
1095                             "test_case": item.function.__module__,
1096                             "validated_by": item.function.__name__,
1097                         }
1098                     )
1099             if req_id not in requirements:
1100                 mapping_errors.add(
1101                     (req_id, item.function.__module__, item.function.__name__)
1102                 )
1103
1104     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1105     with open(mapping_error_path, "w", newline="") as f:
1106         writer = csv.writer(f)
1107         for err in mapping_errors:
1108             writer.writerow(err)
1109
1110     with open(traceability_path, "w", newline="") as f:
1111         out = csv.writer(f)
1112         out.writerow(
1113             (
1114                 "Requirement ID",
1115                 "Requirement",
1116                 "Section",
1117                 "Keyword",
1118                 "Validation Mode",
1119                 "Is Testable",
1120                 "Test Module",
1121                 "Test Name",
1122             )
1123         )
1124         for req_id, metadata in testable_requirements.items():
1125             if req_to_test[req_id]:
1126                 for item in req_to_test[req_id]:
1127                     out.writerow(
1128                         (
1129                             req_id,
1130                             metadata["description"],
1131                             metadata["section_name"],
1132                             metadata["keyword"],
1133                             metadata["validation_mode"],
1134                             metadata["testable"],
1135                             item.function.__module__,
1136                             item.function.__name__,
1137                         )
1138                     )
1139             else:
1140                 out.writerow(
1141                     (
1142                         req_id,
1143                         metadata["description"],
1144                         metadata["section_name"],
1145                         metadata["keyword"],
1146                         metadata["validation_mode"],
1147                         metadata["testable"],
1148                         "",  # test module
1149                         "",
1150                     )  # test function
1151                 )
1152         # now write out any test methods that weren't mapped to requirements
1153         unmapped_tests = {
1154             (item.function.__module__, item.function.__name__) for item in unmapped
1155         }
1156         for test_module, test_name in unmapped_tests:
1157             out.writerow(
1158                 (
1159                     "",  # req ID
1160                     "",  # description
1161                     "",  # section name
1162                     "",  # keyword
1163                     "static",  # validation mode
1164                     "TRUE",  # testable
1165                     test_module,
1166                     test_name,
1167                 )
1168             )
1169
1170     generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))