9a839b5add6858634a657f476ec2ac3888617216
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46
47 from preload.model import create_preloads
48 from config import get_generator_plugin_names
49 from tests.helpers import get_output_dir
50
51 try:
52     from html import escape
53 except ImportError:
54     from cgi import escape
55 from collections import defaultdict
56
57 import traceback
58
59 import docutils.core
60 import jinja2
61 import pytest
62 from more_itertools import partition
63 import xlsxwriter
64 from six import string_types
65
66 # noinspection PyUnresolvedReferences
67 import version
68 import logging
69
70 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
71
72 __path__ = [os.path.dirname(os.path.abspath(__file__))]
73
74 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
75
76 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
77 TEST_SCRIPT_SITE = (
78     "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
79 )
80 VNFRQTS_ID_URL = (
81     "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
82 )
83
84 REPORT_COLUMNS = [
85     ("Error #", "err_num"),
86     ("Input File", "file"),
87     ("Requirements", "req_description"),
88     ("Error Message", "message"),
89     ("Test", "test_file"),
90 ]
91
92 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
93 while preparing to validate the the input files. Some validations may not have been
94 executed. Please refer these issue to the VNF Validation Tool team.
95 """
96
97 COLLECTION_FAILURES = []
98
99 # Captures the results of every test run
100 ALL_RESULTS = []
101
102
103 def extract_error_msg(rep):
104     """
105     If a custom error message was provided, then extract it otherwise
106     just show the pytest assert message
107     """
108     if rep.outcome != "failed":
109         return ""
110     try:
111         full_msg = str(rep.longrepr.reprcrash.message)
112         match = re.match(
113             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
114         )
115         if match:  # custom message was provided
116             # Extract everything between AssertionError and the start
117             # of the assert statement expansion in the pytest report
118             msg = match.group(1)
119         elif "AssertionError:" in full_msg:
120             msg = full_msg.split("AssertionError:")[1]
121         else:
122             msg = full_msg
123     except AttributeError:
124         msg = str(rep)
125
126     return msg
127
128
129 class TestResult:
130     """
131     Wraps the test case and result to extract necessary metadata for
132     reporting purposes.
133     """
134
135     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
136
137     def __init__(self, item, outcome):
138         self.item = item
139         self.result = outcome.get_result()
140         self.files = self._get_files()
141         self.error_message = self._get_error_message()
142
143     @property
144     def requirement_ids(self):
145         """
146         Returns list of requirement IDs mapped to the test case.
147
148         :return: Returns a list of string requirement IDs the test was
149                  annotated with ``validates`` otherwise returns and empty list
150         """
151         is_mapped = hasattr(self.item.function, "requirement_ids")
152         return self.item.function.requirement_ids if is_mapped else []
153
154     @property
155     def markers(self):
156         """
157         :return: Returns a set of pytest marker names for the test or an empty set
158         """
159         return set(m.name for m in self.item.iter_markers())
160
161     @property
162     def is_base_test(self):
163         """
164         :return: Returns True if the test is annotated with a pytest marker called base
165         """
166         return "base" in self.markers
167
168     @property
169     def is_failed(self):
170         """
171         :return: True if the test failed
172         """
173         return self.outcome == "FAIL"
174
175     @property
176     def outcome(self):
177         """
178         :return: Returns 'PASS', 'FAIL', or 'SKIP'
179         """
180         return self.RESULT_MAPPING[self.result.outcome]
181
182     @property
183     def test_case(self):
184         """
185         :return: Name of the test case method
186         """
187         return self.item.function.__name__
188
189     @property
190     def test_module(self):
191         """
192         :return: Name of the file containing the test case
193         """
194         return self.item.function.__module__.split(".")[-1]
195
196     @property
197     def test_id(self):
198         """
199         :return: ID of the test (test_module + test_case)
200         """
201         return "{}::{}".format(self.test_module, self.test_case)
202
203     @property
204     def raw_output(self):
205         """
206         :return: Full output from pytest for the given test case
207         """
208         return str(self.result.longrepr)
209
210     def requirement_text(self, curr_reqs):
211         """
212         Creates a text summary for the requirement IDs mapped to the test case.
213         If no requirements are mapped, then it returns the empty string.
214
215         :param curr_reqs: mapping of requirement IDs to requirement metadata
216                           loaded from the VNFRQTS projects needs.json output
217         :return: ID and text of the requirements mapped to the test case
218         """
219         text = (
220             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
221             for r_id in self.requirement_ids
222             if r_id in curr_reqs
223         )
224         return "".join(text)
225
226     def requirements_metadata(self, curr_reqs):
227         """
228         Returns a list of dicts containing the following metadata for each
229         requirement mapped:
230
231         - id: Requirement ID
232         - text: Full text of the requirement
233         - keyword: MUST, MUST NOT, MAY, etc.
234
235         :param curr_reqs: mapping of requirement IDs to requirement metadata
236                           loaded from the VNFRQTS projects needs.json output
237         :return: List of requirement metadata
238         """
239         data = []
240         for r_id in self.requirement_ids:
241             if r_id not in curr_reqs:
242                 continue
243             data.append(
244                 {
245                     "id": r_id,
246                     "text": curr_reqs[r_id]["description"],
247                     "keyword": curr_reqs[r_id]["keyword"],
248                 }
249             )
250         return data
251
252     def _get_files(self):
253         """
254         Extracts the list of files passed into the test case.
255         :return: List of absolute paths to files
256         """
257         if "environment_pair" in self.item.fixturenames:
258             return [
259                 "{} environment pair".format(
260                     self.item.funcargs["environment_pair"]["name"]
261                 )
262             ]
263         elif "heat_volume_pair" in self.item.fixturenames:
264             return [
265                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
266             ]
267         elif "heat_templates" in self.item.fixturenames:
268             return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
269         elif "yaml_files" in self.item.fixturenames:
270             return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
271         else:
272             parts = self.result.nodeid.split("[")
273             return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
274
275     def _get_error_message(self):
276         """
277         :return: Error message or empty string if the test did not fail or error
278         """
279         if self.is_failed:
280             return extract_error_msg(self.result)
281         else:
282             return ""
283
284
285 # noinspection PyUnusedLocal
286 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
287 def pytest_runtest_makereport(item, call):
288     """
289     Captures the test results for later reporting.  This will also halt testing
290     if a base failure is encountered (can be overridden with continue-on-failure)
291     """
292     outcome = yield
293     if outcome.get_result().when != "call":
294         return  # only capture results of test cases themselves
295     result = TestResult(item, outcome)
296     if (
297         not item.config.option.continue_on_failure
298         and result.is_base_test
299         and result.is_failed
300     ):
301         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
302             result.error_message
303         )
304         result.error_message = msg
305         ALL_RESULTS.append(result)
306         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
307
308     ALL_RESULTS.append(result)
309
310
311 def make_timestamp():
312     """
313     :return: String make_iso_timestamp in format:
314              2019-01-19 10:18:49.865000 Central Standard Time
315     """
316     timezone = time.tzname[time.localtime().tm_isdst]
317     return "{} {}".format(str(datetime.datetime.now()), timezone)
318
319
320 # noinspection PyUnusedLocal
321 def pytest_sessionstart(session):
322     ALL_RESULTS.clear()
323     COLLECTION_FAILURES.clear()
324
325
326 # noinspection PyUnusedLocal
327 def pytest_sessionfinish(session, exitstatus):
328     """
329     If not a self-test run, generate the output reports
330     """
331     if not session.config.option.template_dir:
332         return
333
334     if session.config.option.template_source:
335         template_source = session.config.option.template_source[0]
336     else:
337         template_source = os.path.abspath(session.config.option.template_dir[0])
338
339     categories_selected = session.config.option.test_categories or ""
340     generate_report(
341         get_output_dir(session.config),
342         template_source,
343         categories_selected,
344         session.config.option.report_format,
345     )
346
347
348 def pytest_terminal_summary(terminalreporter, exitstatus):
349     # Ensures all preload information and warnings appear after
350     # test results
351     try:
352         create_preloads(terminalreporter.config, exitstatus)
353     except Exception:
354         print("Error creating preloads, skipping preload generation")
355         traceback.print_exc()
356
357
358 # noinspection PyUnusedLocal
359 def pytest_collection_modifyitems(session, config, items):
360     """
361     Selects tests based on the categories requested.  Tests without
362     categories will always be executed.
363     """
364     config.traceability_items = list(items)  # save all items for traceability
365     if not config.option.self_test:
366         for item in items:
367             passed_categories = set(config.option.test_categories or [])
368             all_of_categories = getattr(item.function, "all_categories", set())
369             any_of_categories = getattr(item.function, "any_categories", set())
370             if all_of_categories and not all_of_categories.issubset(passed_categories):
371                 item.add_marker(
372                     pytest.mark.skip(
373                         reason=(
374                             "Test categories do not match " "all the passed categories"
375                         )
376                     )
377                 )
378             if any_of_categories and not passed_categories.intersection(
379                 any_of_categories
380             ):
381                 item.add_marker(
382                     pytest.mark.skip(
383                         reason=(
384                             "Test categories do not match " "any the passed categories"
385                         )
386                     )
387                 )
388
389     items.sort(
390         key=lambda x: (0, x.name)
391         if "base" in set(m.name for m in x.iter_markers())
392         else (1, x.name)
393     )
394
395
396 def make_href(paths, base_dir=None):
397     """
398     Create an anchor tag to link to the file paths provided.
399     :param paths: string or list of file paths
400     :param base_dir: If specified this is pre-pended to each path
401     :return: String of hrefs - one for each path, each seperated by a line
402              break (<br/).
403     """
404     paths = [paths] if isinstance(paths, string_types) else paths
405     if base_dir:
406         paths = [os.path.join(base_dir, p) for p in paths]
407     links = []
408     for p in paths:
409         abs_path = os.path.abspath(p)
410         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
411         links.append(
412             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
413                 abs_path=abs_path, name=name
414             )
415         )
416     return "<br/>".join(links)
417
418
419 def generate_report(outpath, template_path, categories, output_format="html"):
420     """
421     Generates the various output reports.
422
423     :param outpath: destination directory for all reports
424     :param template_path: directory containing the Heat templates validated
425     :param categories: Optional categories selected
426     :param output_format: One of "html", "excel", or "csv". Default is "html"
427     :raises: ValueError if requested output format is unknown
428     """
429     failures = [r for r in ALL_RESULTS if r.is_failed]
430     generate_failure_file(outpath)
431     output_format = output_format.lower().strip() if output_format else "html"
432     generate_json(outpath, template_path, categories)
433     if output_format == "html":
434         generate_html_report(outpath, categories, template_path, failures)
435     elif output_format == "excel":
436         generate_excel_report(outpath, categories, template_path, failures)
437     elif output_format == "json":
438         return
439     elif output_format == "csv":
440         generate_csv_report(outpath, categories, template_path, failures)
441     else:
442         raise ValueError("Unsupported output format: " + output_format)
443
444
445 def write_json(data, path):
446     """
447     Pretty print data as JSON to the output path requested
448
449     :param data: Data structure to be converted to JSON
450     :param path: Where to write output
451     """
452     with open(path, "w") as f:
453         json.dump(data, f, indent=2)
454
455
456 def generate_failure_file(outpath):
457     """
458     Writes a summary of test failures to a file named failures.
459     This is for backwards compatibility only.  The report.json offers a
460     more comprehensive output.
461     """
462     failure_path = os.path.join(outpath, "failures")
463     failures = [r for r in ALL_RESULTS if r.is_failed]
464     data = {}
465     for i, fail in enumerate(failures):
466         data[str(i)] = {
467             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
468             "vnfrqts": fail.requirement_ids,
469             "test": fail.test_case,
470             "test_file": fail.test_module,
471             "raw_output": fail.raw_output,
472             "message": fail.error_message,
473         }
474     write_json(data, failure_path)
475
476
477 def generate_csv_report(output_dir, categories, template_path, failures):
478     rows = [["Validation Failures"]]
479     headers = [
480         ("Categories Selected:", categories),
481         ("Tool Version:", version.VERSION),
482         ("Report Generated At:", make_timestamp()),
483         ("Directory Validated:", template_path),
484         ("Checksum:", hash_directory(template_path)),
485         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
486     ]
487     rows.append([])
488     for header in headers:
489         rows.append(header)
490     rows.append([])
491
492     if COLLECTION_FAILURES:
493         rows.append([COLLECTION_FAILURE_WARNING])
494         rows.append(["Validation File", "Test", "Fixtures", "Error"])
495         for failure in COLLECTION_FAILURES:
496             rows.append(
497                 [
498                     failure["module"],
499                     failure["test"],
500                     ";".join(failure["fixtures"]),
501                     failure["error"],
502                 ]
503             )
504         rows.append([])
505
506     # table header
507     rows.append([col for col, _ in REPORT_COLUMNS])
508
509     reqs = load_current_requirements()
510
511     # table content
512     for i, failure in enumerate(failures, start=1):
513         rows.append(
514             [
515                 i,
516                 "\n".join(failure.files),
517                 failure.requirement_text(reqs),
518                 failure.error_message,
519                 failure.test_id,
520             ]
521         )
522
523     output_path = os.path.join(output_dir, "report.csv")
524     with open(output_path, "w", newline="") as f:
525         writer = csv.writer(f)
526         for row in rows:
527             writer.writerow(row)
528
529
530 def generate_excel_report(output_dir, categories, template_path, failures):
531     output_path = os.path.join(output_dir, "report.xlsx")
532     workbook = xlsxwriter.Workbook(output_path)
533     bold = workbook.add_format({"bold": True, "align": "top"})
534     code = workbook.add_format(
535         {"font_name": "Courier", "text_wrap": True, "align": "top"}
536     )
537     normal = workbook.add_format({"text_wrap": True, "align": "top"})
538     heading = workbook.add_format({"bold": True, "font_size": 18})
539     worksheet = workbook.add_worksheet("failures")
540     worksheet.write(0, 0, "Validation Failures", heading)
541
542     headers = [
543         ("Categories Selected:", ",".join(categories)),
544         ("Tool Version:", version.VERSION),
545         ("Report Generated At:", make_timestamp()),
546         ("Directory Validated:", template_path),
547         ("Checksum:", hash_directory(template_path)),
548         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
549     ]
550     for row, (header, value) in enumerate(headers, start=2):
551         worksheet.write(row, 0, header, bold)
552         worksheet.write(row, 1, value)
553
554     worksheet.set_column(0, len(headers) - 1, 40)
555     worksheet.set_column(len(headers), len(headers), 80)
556
557     if COLLECTION_FAILURES:
558         collection_failures_start = 2 + len(headers) + 2
559         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
560         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
561         for col_num, col_name in enumerate(collection_failure_headers):
562             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
563         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
564             worksheet.write(row, 0, data["module"])
565             worksheet.write(row, 1, data["test"])
566             worksheet.write(row, 2, ",".join(data["fixtures"]))
567             worksheet.write(row, 3, data["error"], code)
568
569     # table header
570     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
571     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
572     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
573         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
574
575     reqs = load_current_requirements()
576
577     # table content
578     for col, width in enumerate((20, 30, 60, 60, 40)):
579         worksheet.set_column(col, col, width)
580     err_num = 1
581     for row, failure in enumerate(failures, start=start_error_table_row + 2):
582         worksheet.write(row, 0, str(err_num), normal)
583         worksheet.write(row, 1, "\n".join(failure.files), normal)
584         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
585         worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
586         worksheet.write(row, 4, failure.test_id, normal)
587         err_num += 1
588     worksheet.autofilter(
589         start_error_table_row + 1,
590         0,
591         start_error_table_row + 1 + err_num,
592         len(REPORT_COLUMNS) - 1,
593     )
594     workbook.close()
595
596
597 def make_iso_timestamp():
598     """
599     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
600     """
601     now = datetime.datetime.utcnow()
602     now.replace(tzinfo=datetime.timezone.utc)
603     return now.isoformat()
604
605
606 def aggregate_results(outcomes, r_id=None):
607     """
608     Determines the aggregate result for the conditions provided.  Assumes the
609     results have been filtered and collected for analysis.
610
611     :param outcomes: set of outcomes from the TestResults
612     :param r_id: Optional requirement ID if known
613     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
614              (see aggregate_requirement_adherence for more detail)
615     """
616     if not outcomes:
617         return "PASS"
618     elif "ERROR" in outcomes:
619         return "ERROR"
620     elif "FAIL" in outcomes:
621         return "FAIL"
622     elif "PASS" in outcomes:
623         return "PASS"
624     elif {"SKIP"} == outcomes:
625         return "SKIP"
626     else:
627         pytest.warns(
628             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
629                 outcomes, r_id
630             )
631         )
632         return "ERROR"
633
634
635 def aggregate_run_results(collection_failures, test_results):
636     """
637     Determines overall status of run based on all failures and results.
638
639     * 'ERROR' - At least one collection failure occurred during the run.
640     * 'FAIL' - Template failed at least one test
641     * 'PASS' - All tests executed properly and no failures were detected
642
643     :param collection_failures: failures occuring during test setup
644     :param test_results: list of all test executuion results
645     :return: one of 'ERROR', 'FAIL', or 'PASS'
646     """
647     if collection_failures:
648         return "ERROR"
649     elif any(r.is_failed for r in test_results):
650         return "FAIL"
651     else:
652         return "PASS"
653
654
655 def relative_paths(base_dir, paths):
656     return [os.path.relpath(p, base_dir) for p in paths if p != ""]
657
658
659 # noinspection PyTypeChecker
660 def generate_json(outpath, template_path, categories):
661     """
662     Creates a JSON summary of the entire test run.
663     """
664     reqs = load_current_requirements()
665     data = {
666         "version": "dublin",
667         "template_directory": os.path.splitdrive(template_path)[1].replace(
668             os.path.sep, "/"
669         ),
670         "timestamp": make_iso_timestamp(),
671         "checksum": hash_directory(template_path),
672         "categories": categories,
673         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
674         "tests": [],
675         "requirements": [],
676     }
677
678     results = data["tests"]
679     for result in COLLECTION_FAILURES:
680         results.append(
681             {
682                 "files": [],
683                 "test_module": result["module"],
684                 "test_case": result["test"],
685                 "result": "ERROR",
686                 "error": result["error"],
687                 "requirements": result["requirements"],
688             }
689         )
690     for result in ALL_RESULTS:
691         results.append(
692             {
693                 "files": relative_paths(template_path, result.files),
694                 "test_module": result.test_module,
695                 "test_case": result.test_case,
696                 "result": result.outcome,
697                 "error": result.error_message if result.is_failed else "",
698                 "requirements": result.requirements_metadata(reqs),
699             }
700         )
701
702     # Build a mapping of requirement ID to the results
703     r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
704     for test_result in results:
705         test_reqs = test_result["requirements"]
706         r_ids = (
707             [r["id"] if isinstance(r, dict) else r for r in test_reqs]
708             if test_reqs
709             else ("",)
710         )
711         for r_id in r_ids:
712             item = r_id_results[r_id]
713             item["outcomes"].add(test_result["result"])
714             if test_result["error"]:
715                 item["errors"].add(test_result["error"])
716
717     requirements = data["requirements"]
718     for r_id, r_data in reqs.items():
719         requirements.append(
720             {
721                 "id": r_id,
722                 "text": r_data["description"],
723                 "keyword": r_data["keyword"],
724                 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
725                 "errors": list(r_id_results[r_id]["errors"]),
726             }
727         )
728
729     if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
730         requirements.append(
731             {
732                 "id": "Unmapped",
733                 "text": "Tests not mapped to requirements (see tests)",
734                 "result": aggregate_results(r_id_results[""]["outcomes"]),
735                 "errors": list(r_id_results[""]["errors"]),
736             }
737         )
738
739     report_path = os.path.join(outpath, "report.json")
740     write_json(data, report_path)
741
742
743 def generate_html_report(outpath, categories, template_path, failures):
744     reqs = load_current_requirements()
745     fail_data = []
746     for failure in failures:
747         fail_data.append(
748             {
749                 "file_links": make_href(failure.files, template_path),
750                 "test_id": failure.test_id,
751                 "error_message": escape(failure.error_message).replace(
752                     "\n", "<br/><br/>"
753                 ),
754                 "raw_output": escape(failure.raw_output),
755                 "requirements": docutils.core.publish_parts(
756                     writer_name="html", source=failure.requirement_text(reqs)
757                 )["body"],
758             }
759         )
760     pkg_dir = os.path.split(__file__)[0]
761     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
762     with open(j2_template_path, "r") as f:
763         report_template = jinja2.Template(f.read())
764         contents = report_template.render(
765             version=version.VERSION,
766             num_failures=len(failures) + len(COLLECTION_FAILURES),
767             categories=categories,
768             template_dir=make_href(template_path),
769             checksum=hash_directory(template_path),
770             timestamp=make_timestamp(),
771             failures=fail_data,
772             collection_failures=COLLECTION_FAILURES,
773         )
774     with open(os.path.join(outpath, "report.html"), "w") as f:
775         f.write(contents)
776
777
778 def pytest_addoption(parser):
779     """
780     Add needed CLI arguments
781     """
782     parser.addoption(
783         "--template-directory",
784         dest="template_dir",
785         action="append",
786         help="Directory which holds the templates for validation",
787     )
788
789     parser.addoption(
790         "--template-source",
791         dest="template_source",
792         action="append",
793         help="Source Directory which holds the templates for validation",
794     )
795
796     parser.addoption(
797         "--self-test",
798         dest="self_test",
799         action="store_true",
800         help="Test the unit tests against their fixtured data",
801     )
802
803     parser.addoption(
804         "--report-format",
805         dest="report_format",
806         action="store",
807         help="Format of output report (html, csv, excel, json)",
808     )
809
810     parser.addoption(
811         "--continue-on-failure",
812         dest="continue_on_failure",
813         action="store_true",
814         help="Continue validation even when structural errors exist in input files",
815     )
816
817     parser.addoption(
818         "--output-directory",
819         dest="output_dir",
820         action="store",
821         default=None,
822         help="Alternate ",
823     )
824
825     parser.addoption(
826         "--category",
827         dest="test_categories",
828         action="append",
829         help="optional category of test to execute",
830     )
831
832     parser.addoption(
833         "--env-directory",
834         dest="env_dir",
835         action="store",
836         help="optional directory of .env files for preload generation",
837     )
838
839     parser.addoption(
840         "--preload-format",
841         dest="preload_formats",
842         action="append",
843         help=(
844             "Preload format to create (multiple allowed). If not provided "
845             "then all available formats will be created: {}"
846         ).format(", ".join(get_generator_plugin_names())),
847     )
848
849
850 def pytest_configure(config):
851     """
852     Ensure that we are receive either `--self-test` or
853     `--template-dir=<directory` as CLI arguments
854     """
855     if config.getoption("template_dir") and config.getoption("self_test"):
856         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
857     if not (
858         config.getoption("template_dir")
859         or config.getoption("self_test")
860         or config.getoption("help")
861     ):
862         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
863
864
865 def pytest_generate_tests(metafunc):
866     """
867     If a unit test requires an argument named 'filename'
868     we generate a test for the filenames selected. Either
869     the files contained in `template_dir` or if `template_dir`
870     is not specified on the CLI, the fixtures associated with this
871     test name.
872     """
873
874     # noinspection PyBroadException
875     try:
876         if "filename" in metafunc.fixturenames:
877             from .parametrizers import parametrize_filename
878
879             parametrize_filename(metafunc)
880
881         if "filenames" in metafunc.fixturenames:
882             from .parametrizers import parametrize_filenames
883
884             parametrize_filenames(metafunc)
885
886         if "template_dir" in metafunc.fixturenames:
887             from .parametrizers import parametrize_template_dir
888
889             parametrize_template_dir(metafunc)
890
891         if "environment_pair" in metafunc.fixturenames:
892             from .parametrizers import parametrize_environment_pair
893
894             parametrize_environment_pair(metafunc)
895
896         if "heat_volume_pair" in metafunc.fixturenames:
897             from .parametrizers import parametrize_heat_volume_pair
898
899             parametrize_heat_volume_pair(metafunc)
900
901         if "yaml_files" in metafunc.fixturenames:
902             from .parametrizers import parametrize_yaml_files
903
904             parametrize_yaml_files(metafunc)
905
906         if "env_files" in metafunc.fixturenames:
907             from .parametrizers import parametrize_environment_files
908
909             parametrize_environment_files(metafunc)
910
911         if "yaml_file" in metafunc.fixturenames:
912             from .parametrizers import parametrize_yaml_file
913
914             parametrize_yaml_file(metafunc)
915
916         if "env_file" in metafunc.fixturenames:
917             from .parametrizers import parametrize_environment_file
918
919             parametrize_environment_file(metafunc)
920
921         if "parsed_yaml_file" in metafunc.fixturenames:
922             from .parametrizers import parametrize_parsed_yaml_file
923
924             parametrize_parsed_yaml_file(metafunc)
925
926         if "parsed_environment_file" in metafunc.fixturenames:
927             from .parametrizers import parametrize_parsed_environment_file
928
929             parametrize_parsed_environment_file(metafunc)
930
931         if "heat_template" in metafunc.fixturenames:
932             from .parametrizers import parametrize_heat_template
933
934             parametrize_heat_template(metafunc)
935
936         if "heat_templates" in metafunc.fixturenames:
937             from .parametrizers import parametrize_heat_templates
938
939             parametrize_heat_templates(metafunc)
940
941         if "volume_template" in metafunc.fixturenames:
942             from .parametrizers import parametrize_volume_template
943
944             parametrize_volume_template(metafunc)
945
946         if "volume_templates" in metafunc.fixturenames:
947             from .parametrizers import parametrize_volume_templates
948
949             parametrize_volume_templates(metafunc)
950
951         if "template" in metafunc.fixturenames:
952             from .parametrizers import parametrize_template
953
954             parametrize_template(metafunc)
955
956         if "templates" in metafunc.fixturenames:
957             from .parametrizers import parametrize_templates
958
959             parametrize_templates(metafunc)
960     except Exception as e:
961         # If an error occurs in the collection phase, then it won't be logged as a
962         # normal test failure.  This means that failures could occur, but not
963         # be seen on the report resulting in a false positive success message.  These
964         # errors will be stored and reported separately on the report
965         COLLECTION_FAILURES.append(
966             {
967                 "module": metafunc.module.__name__,
968                 "test": metafunc.function.__name__,
969                 "fixtures": metafunc.fixturenames,
970                 "error": traceback.format_exc(),
971                 "requirements": getattr(metafunc.function, "requirement_ids", []),
972             }
973         )
974         raise e
975
976
977 def hash_directory(path):
978     """
979     Create md5 hash using the contents of all files under ``path``
980     :param path: string directory containing files
981     :return: string MD5 hash code (hex)
982     """
983     md5 = hashlib.md5()  # nosec
984     for dir_path, sub_dirs, filenames in os.walk(path):
985         for filename in filenames:
986             file_path = os.path.join(dir_path, filename)
987             with open(file_path, "rb") as f:
988                 md5.update(f.read())
989     return md5.hexdigest()
990
991
992 def load_current_requirements():
993     """Loads dict of current requirements or empty dict if file doesn't exist"""
994     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
995         data = json.load(f)
996         version = data["current_version"]
997         return data["versions"][version]["needs"]
998
999
1000 def select_heat_requirements(reqs):
1001     """Filters dict requirements to only those requirements pertaining to Heat"""
1002     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1003
1004
1005 def is_testable(reqs):
1006     """Filters dict requirements to only those which are testable"""
1007     for key, values in reqs.items():
1008         if ("MUST" in values.get("keyword", "").upper()) and (
1009             "none" not in values.get("validation_mode", "").lower()
1010         ):
1011             reqs[key]["testable"] = True
1012         else:
1013             reqs[key]["testable"] = False
1014     return reqs
1015
1016
1017 def build_rst_json(reqs):
1018     """Takes requirements and returns list of only Heat requirements"""
1019     for key, values in list(reqs.items()):
1020         if values["testable"]:
1021             # Creates links in RST format to requirements and test cases
1022             if values["test_case"]:
1023                 mod = values["test_case"].split(".")[-1]
1024                 val = TEST_SCRIPT_SITE + mod + ".py"
1025                 rst_value = "`" + mod + " <" + val + ">`_"
1026                 title = (
1027                     "`"
1028                     + values["id"]
1029                     + " <"
1030                     + VNFRQTS_ID_URL
1031                     + values["docname"].replace(" ", "%20")
1032                     + ".html#"
1033                     + values["id"]
1034                     + ">`_"
1035                 )
1036                 reqs[key].update({"full_title": title, "test_case": rst_value})
1037             else:
1038                 title = (
1039                     "`"
1040                     + values["id"]
1041                     + " <"
1042                     + VNFRQTS_ID_URL
1043                     + values["docname"].replace(" ", "%20")
1044                     + ".html#"
1045                     + values["id"]
1046                     + ">`_"
1047                 )
1048                 reqs[key].update(
1049                     {
1050                         "full_title": title,
1051                         "test_case": "No test for requirement",
1052                         "validated_by": "static",
1053                     }
1054                 )
1055         else:
1056             del reqs[key]
1057     return reqs
1058
1059
1060 def generate_rst_table(output_dir, data):
1061     """Generate a formatted csv to be used in RST"""
1062     rst_path = os.path.join(output_dir, "rst.csv")
1063     with open(rst_path, "w", newline="") as f:
1064         out = csv.writer(f)
1065         out.writerow(("Requirement ID", "Test Module", "Test Name"))
1066         for req_id, metadata in data.items():
1067             out.writerow(
1068                 (
1069                     metadata["full_title"],
1070                     metadata["test_case"],
1071                     metadata["validated_by"],
1072                 )
1073             )
1074
1075
1076 # noinspection PyUnusedLocal
1077 def pytest_report_collectionfinish(config, startdir, items):
1078     """Generates a simple traceability report to output/traceability.csv"""
1079     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1080     output_dir = os.path.split(traceability_path)[0]
1081     if not os.path.exists(output_dir):
1082         os.makedirs(output_dir)
1083     reqs = load_current_requirements()
1084     requirements = select_heat_requirements(reqs)
1085     testable_requirements = is_testable(requirements)
1086     unmapped, mapped = partition(
1087         lambda i: hasattr(i.function, "requirement_ids"), items
1088     )
1089
1090     req_to_test = defaultdict(set)
1091     mapping_errors = set()
1092     for item in mapped:
1093         for req_id in item.function.requirement_ids:
1094             if req_id not in req_to_test:
1095                 req_to_test[req_id].add(item)
1096                 if req_id in requirements:
1097                     reqs[req_id].update(
1098                         {
1099                             "test_case": item.function.__module__,
1100                             "validated_by": item.function.__name__,
1101                         }
1102                     )
1103             if req_id not in requirements:
1104                 mapping_errors.add(
1105                     (req_id, item.function.__module__, item.function.__name__)
1106                 )
1107
1108     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1109     with open(mapping_error_path, "w", newline="") as f:
1110         writer = csv.writer(f)
1111         for err in mapping_errors:
1112             writer.writerow(err)
1113
1114     with open(traceability_path, "w", newline="") as f:
1115         out = csv.writer(f)
1116         out.writerow(
1117             (
1118                 "Requirement ID",
1119                 "Requirement",
1120                 "Section",
1121                 "Keyword",
1122                 "Validation Mode",
1123                 "Is Testable",
1124                 "Test Module",
1125                 "Test Name",
1126             )
1127         )
1128         for req_id, metadata in testable_requirements.items():
1129             if req_to_test[req_id]:
1130                 for item in req_to_test[req_id]:
1131                     out.writerow(
1132                         (
1133                             req_id,
1134                             metadata["description"],
1135                             metadata["section_name"],
1136                             metadata["keyword"],
1137                             metadata["validation_mode"],
1138                             metadata["testable"],
1139                             item.function.__module__,
1140                             item.function.__name__,
1141                         )
1142                     )
1143             else:
1144                 out.writerow(
1145                     (
1146                         req_id,
1147                         metadata["description"],
1148                         metadata["section_name"],
1149                         metadata["keyword"],
1150                         metadata["validation_mode"],
1151                         metadata["testable"],
1152                         "",  # test module
1153                         "",
1154                     )  # test function
1155                 )
1156         # now write out any test methods that weren't mapped to requirements
1157         unmapped_tests = {
1158             (item.function.__module__, item.function.__name__) for item in unmapped
1159         }
1160         for test_module, test_name in unmapped_tests:
1161             out.writerow(
1162                 (
1163                     "",  # req ID
1164                     "",  # description
1165                     "",  # section name
1166                     "",  # keyword
1167                     "static",  # validation mode
1168                     "TRUE",  # testable
1169                     test_module,
1170                     test_name,
1171                 )
1172             )
1173
1174     generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))