Merge "[VVP] Adding preload generation functionality"
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46
47 from preload import create_preloads
48 from tests.helpers import get_output_dir
49
50 try:
51     from html import escape
52 except ImportError:
53     from cgi import escape
54 from collections import defaultdict
55
56 import traceback
57
58 import docutils.core
59 import jinja2
60 import pytest
61 from more_itertools import partition
62 import xlsxwriter
63 from six import string_types
64
65 # noinspection PyUnresolvedReferences
66 import version
67 import logging
68
69 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
70
71 __path__ = [os.path.dirname(os.path.abspath(__file__))]
72
73 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
74
75 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
76 TEST_SCRIPT_SITE = (
77     "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
78 )
79 VNFRQTS_ID_URL = (
80     "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
81 )
82
83 REPORT_COLUMNS = [
84     ("Error #", "err_num"),
85     ("Input File", "file"),
86     ("Requirements", "req_description"),
87     ("Error Message", "message"),
88     ("Test", "test_file"),
89 ]
90
91 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
92 while preparing to validate the the input files. Some validations may not have been
93 executed. Please refer these issue to the VNF Validation Tool team.
94 """
95
96 COLLECTION_FAILURES = []
97
98 # Captures the results of every test run
99 ALL_RESULTS = []
100
101
102 def extract_error_msg(rep):
103     """
104     If a custom error message was provided, then extract it otherwise
105     just show the pytest assert message
106     """
107     if rep.outcome != "failed":
108         return ""
109     try:
110         full_msg = str(rep.longrepr.reprcrash.message)
111         match = re.match(
112             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
113         )
114         if match:  # custom message was provided
115             # Extract everything between AssertionError and the start
116             # of the assert statement expansion in the pytest report
117             msg = match.group(1)
118         elif "AssertionError:" in full_msg:
119             msg = full_msg.split("AssertionError:")[1]
120         else:
121             msg = full_msg
122     except AttributeError:
123         msg = str(rep)
124
125     return msg
126
127
128 class TestResult:
129     """
130     Wraps the test case and result to extract necessary metadata for
131     reporting purposes.
132     """
133
134     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
135
136     def __init__(self, item, outcome):
137         self.item = item
138         self.result = outcome.get_result()
139         self.files = self._get_files()
140         self.error_message = self._get_error_message()
141
142     @property
143     def requirement_ids(self):
144         """
145         Returns list of requirement IDs mapped to the test case.
146
147         :return: Returns a list of string requirement IDs the test was
148                  annotated with ``validates`` otherwise returns and empty list
149         """
150         is_mapped = hasattr(self.item.function, "requirement_ids")
151         return self.item.function.requirement_ids if is_mapped else []
152
153     @property
154     def markers(self):
155         """
156         :return: Returns a set of pytest marker names for the test or an empty set
157         """
158         return set(m.name for m in self.item.iter_markers())
159
160     @property
161     def is_base_test(self):
162         """
163         :return: Returns True if the test is annotated with a pytest marker called base
164         """
165         return "base" in self.markers
166
167     @property
168     def is_failed(self):
169         """
170         :return: True if the test failed
171         """
172         return self.outcome == "FAIL"
173
174     @property
175     def outcome(self):
176         """
177         :return: Returns 'PASS', 'FAIL', or 'SKIP'
178         """
179         return self.RESULT_MAPPING[self.result.outcome]
180
181     @property
182     def test_case(self):
183         """
184         :return: Name of the test case method
185         """
186         return self.item.function.__name__
187
188     @property
189     def test_module(self):
190         """
191         :return: Name of the file containing the test case
192         """
193         return self.item.function.__module__.split(".")[-1]
194
195     @property
196     def test_id(self):
197         """
198         :return: ID of the test (test_module + test_case)
199         """
200         return "{}::{}".format(self.test_module, self.test_case)
201
202     @property
203     def raw_output(self):
204         """
205         :return: Full output from pytest for the given test case
206         """
207         return str(self.result.longrepr)
208
209     def requirement_text(self, curr_reqs):
210         """
211         Creates a text summary for the requirement IDs mapped to the test case.
212         If no requirements are mapped, then it returns the empty string.
213
214         :param curr_reqs: mapping of requirement IDs to requirement metadata
215                           loaded from the VNFRQTS projects needs.json output
216         :return: ID and text of the requirements mapped to the test case
217         """
218         text = (
219             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
220             for r_id in self.requirement_ids
221             if r_id in curr_reqs
222         )
223         return "".join(text)
224
225     def requirements_metadata(self, curr_reqs):
226         """
227         Returns a list of dicts containing the following metadata for each
228         requirement mapped:
229
230         - id: Requirement ID
231         - text: Full text of the requirement
232         - keyword: MUST, MUST NOT, MAY, etc.
233
234         :param curr_reqs: mapping of requirement IDs to requirement metadata
235                           loaded from the VNFRQTS projects needs.json output
236         :return: List of requirement metadata
237         """
238         data = []
239         for r_id in self.requirement_ids:
240             if r_id not in curr_reqs:
241                 continue
242             data.append(
243                 {
244                     "id": r_id,
245                     "text": curr_reqs[r_id]["description"],
246                     "keyword": curr_reqs[r_id]["keyword"],
247                 }
248             )
249         return data
250
251     def _get_files(self):
252         """
253         Extracts the list of files passed into the test case.
254         :return: List of absolute paths to files
255         """
256         if "environment_pair" in self.item.fixturenames:
257             return [
258                 "{} environment pair".format(
259                     self.item.funcargs["environment_pair"]["name"]
260                 )
261             ]
262         elif "heat_volume_pair" in self.item.fixturenames:
263             return [
264                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
265             ]
266         elif "heat_templates" in self.item.fixturenames:
267             return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
268         elif "yaml_files" in self.item.fixturenames:
269             return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
270         else:
271             parts = self.result.nodeid.split("[")
272             return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
273
274     def _get_error_message(self):
275         """
276         :return: Error message or empty string if the test did not fail or error
277         """
278         if self.is_failed:
279             return extract_error_msg(self.result)
280         else:
281             return ""
282
283
284 # noinspection PyUnusedLocal
285 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
286 def pytest_runtest_makereport(item, call):
287     """
288     Captures the test results for later reporting.  This will also halt testing
289     if a base failure is encountered (can be overridden with continue-on-failure)
290     """
291     outcome = yield
292     if outcome.get_result().when != "call":
293         return  # only capture results of test cases themselves
294     result = TestResult(item, outcome)
295     if (
296         not item.config.option.continue_on_failure
297         and result.is_base_test
298         and result.is_failed
299     ):
300         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
301             result.error_message
302         )
303         result.error_message = msg
304         ALL_RESULTS.append(result)
305         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
306
307     ALL_RESULTS.append(result)
308
309
310 def make_timestamp():
311     """
312     :return: String make_iso_timestamp in format:
313              2019-01-19 10:18:49.865000 Central Standard Time
314     """
315     timezone = time.tzname[time.localtime().tm_isdst]
316     return "{} {}".format(str(datetime.datetime.now()), timezone)
317
318
319 # noinspection PyUnusedLocal
320 def pytest_sessionstart(session):
321     ALL_RESULTS.clear()
322     COLLECTION_FAILURES.clear()
323
324
325 # noinspection PyUnusedLocal
326 def pytest_sessionfinish(session, exitstatus):
327     """
328     If not a self-test run, generate the output reports
329     """
330     if not session.config.option.template_dir:
331         return
332
333     if session.config.option.template_source:
334         template_source = session.config.option.template_source[0]
335     else:
336         template_source = os.path.abspath(session.config.option.template_dir[0])
337
338     categories_selected = session.config.option.test_categories or ""
339     generate_report(
340         get_output_dir(session.config),
341         template_source,
342         categories_selected,
343         session.config.option.report_format,
344     )
345
346
347 def pytest_terminal_summary(terminalreporter, exitstatus):
348     # Ensures all preload information and warnings appear after
349     # test results
350     create_preloads(terminalreporter.config, exitstatus)
351
352
353 # noinspection PyUnusedLocal
354 def pytest_collection_modifyitems(session, config, items):
355     """
356     Selects tests based on the categories requested.  Tests without
357     categories will always be executed.
358     """
359     config.traceability_items = list(items)  # save all items for traceability
360     if not config.option.self_test:
361         for item in items:
362             # checking if test belongs to a category
363             if hasattr(item.function, "categories"):
364                 if config.option.test_categories:
365                     test_categories = getattr(item.function, "categories")
366                     passed_categories = config.option.test_categories
367                     if not all(
368                         category in passed_categories for category in test_categories
369                     ):
370                         item.add_marker(
371                             pytest.mark.skip(
372                                 reason=(
373                                     "Test categories do not match "
374                                     "all the passed categories"
375                                 )
376                             )
377                         )
378                 else:
379                     item.add_marker(
380                         pytest.mark.skip(
381                             reason=(
382                                 "Test belongs to a category but "
383                                 "no categories were passed"
384                             )
385                         )
386                     )
387
388     items.sort(
389         key=lambda x: (0, x.name)
390         if "base" in set(m.name for m in x.iter_markers())
391         else (1, x.name)
392     )
393
394
395 def make_href(paths, base_dir=None):
396     """
397     Create an anchor tag to link to the file paths provided.
398     :param paths: string or list of file paths
399     :param base_dir: If specified this is pre-pended to each path
400     :return: String of hrefs - one for each path, each seperated by a line
401              break (<br/).
402     """
403     paths = [paths] if isinstance(paths, string_types) else paths
404     if base_dir:
405         paths = [os.path.join(base_dir, p) for p in paths]
406     links = []
407     for p in paths:
408         abs_path = os.path.abspath(p)
409         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
410         links.append(
411             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
412                 abs_path=abs_path, name=name
413             )
414         )
415     return "<br/>".join(links)
416
417
418 def generate_report(outpath, template_path, categories, output_format="html"):
419     """
420     Generates the various output reports.
421
422     :param outpath: destination directory for all reports
423     :param template_path: directory containing the Heat templates validated
424     :param categories: Optional categories selected
425     :param output_format: One of "html", "excel", or "csv". Default is "html"
426     :raises: ValueError if requested output format is unknown
427     """
428     failures = [r for r in ALL_RESULTS if r.is_failed]
429     generate_failure_file(outpath)
430     output_format = output_format.lower().strip() if output_format else "html"
431     generate_json(outpath, template_path, categories)
432     if output_format == "html":
433         generate_html_report(outpath, categories, template_path, failures)
434     elif output_format == "excel":
435         generate_excel_report(outpath, categories, template_path, failures)
436     elif output_format == "json":
437         return
438     elif output_format == "csv":
439         generate_csv_report(outpath, categories, template_path, failures)
440     else:
441         raise ValueError("Unsupported output format: " + output_format)
442
443
444 def write_json(data, path):
445     """
446     Pretty print data as JSON to the output path requested
447
448     :param data: Data structure to be converted to JSON
449     :param path: Where to write output
450     """
451     with open(path, "w") as f:
452         json.dump(data, f, indent=2)
453
454
455 def generate_failure_file(outpath):
456     """
457     Writes a summary of test failures to a file named failures.
458     This is for backwards compatibility only.  The report.json offers a
459     more comprehensive output.
460     """
461     failure_path = os.path.join(outpath, "failures")
462     failures = [r for r in ALL_RESULTS if r.is_failed]
463     data = {}
464     for i, fail in enumerate(failures):
465         data[str(i)] = {
466             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
467             "vnfrqts": fail.requirement_ids,
468             "test": fail.test_case,
469             "test_file": fail.test_module,
470             "raw_output": fail.raw_output,
471             "message": fail.error_message,
472         }
473     write_json(data, failure_path)
474
475
476 def generate_csv_report(output_dir, categories, template_path, failures):
477     rows = [["Validation Failures"]]
478     headers = [
479         ("Categories Selected:", categories),
480         ("Tool Version:", version.VERSION),
481         ("Report Generated At:", make_timestamp()),
482         ("Directory Validated:", template_path),
483         ("Checksum:", hash_directory(template_path)),
484         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
485     ]
486     rows.append([])
487     for header in headers:
488         rows.append(header)
489     rows.append([])
490
491     if COLLECTION_FAILURES:
492         rows.append([COLLECTION_FAILURE_WARNING])
493         rows.append(["Validation File", "Test", "Fixtures", "Error"])
494         for failure in COLLECTION_FAILURES:
495             rows.append(
496                 [
497                     failure["module"],
498                     failure["test"],
499                     ";".join(failure["fixtures"]),
500                     failure["error"],
501                 ]
502             )
503         rows.append([])
504
505     # table header
506     rows.append([col for col, _ in REPORT_COLUMNS])
507
508     reqs = load_current_requirements()
509
510     # table content
511     for i, failure in enumerate(failures, start=1):
512         rows.append(
513             [
514                 i,
515                 "\n".join(failure.files),
516                 failure.requirement_text(reqs),
517                 failure.error_message,
518                 failure.test_id,
519             ]
520         )
521
522     output_path = os.path.join(output_dir, "report.csv")
523     with open(output_path, "w", newline="") as f:
524         writer = csv.writer(f)
525         for row in rows:
526             writer.writerow(row)
527
528
529 def generate_excel_report(output_dir, categories, template_path, failures):
530     output_path = os.path.join(output_dir, "report.xlsx")
531     workbook = xlsxwriter.Workbook(output_path)
532     bold = workbook.add_format({"bold": True, "align": "top"})
533     code = workbook.add_format(
534         {"font_name": "Courier", "text_wrap": True, "align": "top"}
535     )
536     normal = workbook.add_format({"text_wrap": True, "align": "top"})
537     heading = workbook.add_format({"bold": True, "font_size": 18})
538     worksheet = workbook.add_worksheet("failures")
539     worksheet.write(0, 0, "Validation Failures", heading)
540
541     headers = [
542         ("Categories Selected:", ",".join(categories)),
543         ("Tool Version:", version.VERSION),
544         ("Report Generated At:", make_timestamp()),
545         ("Directory Validated:", template_path),
546         ("Checksum:", hash_directory(template_path)),
547         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
548     ]
549     for row, (header, value) in enumerate(headers, start=2):
550         worksheet.write(row, 0, header, bold)
551         worksheet.write(row, 1, value)
552
553     worksheet.set_column(0, len(headers) - 1, 40)
554     worksheet.set_column(len(headers), len(headers), 80)
555
556     if COLLECTION_FAILURES:
557         collection_failures_start = 2 + len(headers) + 2
558         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
559         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
560         for col_num, col_name in enumerate(collection_failure_headers):
561             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
562         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
563             worksheet.write(row, 0, data["module"])
564             worksheet.write(row, 1, data["test"])
565             worksheet.write(row, 2, ",".join(data["fixtures"]))
566             worksheet.write(row, 3, data["error"], code)
567
568     # table header
569     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
570     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
571     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
572         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
573
574     reqs = load_current_requirements()
575
576     # table content
577     for col, width in enumerate((20, 30, 60, 60, 40)):
578         worksheet.set_column(col, col, width)
579     err_num = 1
580     for row, failure in enumerate(failures, start=start_error_table_row + 2):
581         worksheet.write(row, 0, str(err_num), normal)
582         worksheet.write(row, 1, "\n".join(failure.files), normal)
583         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
584         worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
585         worksheet.write(row, 4, failure.test_id, normal)
586         err_num += 1
587     worksheet.autofilter(
588         start_error_table_row + 1,
589         0,
590         start_error_table_row + 1 + err_num,
591         len(REPORT_COLUMNS) - 1,
592     )
593     workbook.close()
594
595
596 def make_iso_timestamp():
597     """
598     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
599     """
600     now = datetime.datetime.utcnow()
601     now.replace(tzinfo=datetime.timezone.utc)
602     return now.isoformat()
603
604
605 def aggregate_results(outcomes, r_id=None):
606     """
607     Determines the aggregate result for the conditions provided.  Assumes the
608     results have been filtered and collected for analysis.
609
610     :param outcomes: set of outcomes from the TestResults
611     :param r_id: Optional requirement ID if known
612     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
613              (see aggregate_requirement_adherence for more detail)
614     """
615     if not outcomes:
616         return "PASS"
617     elif "ERROR" in outcomes:
618         return "ERROR"
619     elif "FAIL" in outcomes:
620         return "FAIL"
621     elif "PASS" in outcomes:
622         return "PASS"
623     elif {"SKIP"} == outcomes:
624         return "SKIP"
625     else:
626         pytest.warns(
627             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
628                 outcomes, r_id
629             )
630         )
631         return "ERROR"
632
633
634 def aggregate_run_results(collection_failures, test_results):
635     """
636     Determines overall status of run based on all failures and results.
637
638     * 'ERROR' - At least one collection failure occurred during the run.
639     * 'FAIL' - Template failed at least one test
640     * 'PASS' - All tests executed properly and no failures were detected
641
642     :param collection_failures: failures occuring during test setup
643     :param test_results: list of all test executuion results
644     :return: one of 'ERROR', 'FAIL', or 'PASS'
645     """
646     if collection_failures:
647         return "ERROR"
648     elif any(r.is_failed for r in test_results):
649         return "FAIL"
650     else:
651         return "PASS"
652
653
654 def relative_paths(base_dir, paths):
655     return [os.path.relpath(p, base_dir) for p in paths if p != ""]
656
657
658 # noinspection PyTypeChecker
659 def generate_json(outpath, template_path, categories):
660     """
661     Creates a JSON summary of the entire test run.
662     """
663     reqs = load_current_requirements()
664     data = {
665         "version": "dublin",
666         "template_directory": os.path.splitdrive(template_path)[1].replace(
667             os.path.sep, "/"
668         ),
669         "timestamp": make_iso_timestamp(),
670         "checksum": hash_directory(template_path),
671         "categories": categories,
672         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
673         "tests": [],
674         "requirements": [],
675     }
676
677     results = data["tests"]
678     for result in COLLECTION_FAILURES:
679         results.append(
680             {
681                 "files": [],
682                 "test_module": result["module"],
683                 "test_case": result["test"],
684                 "result": "ERROR",
685                 "error": result["error"],
686                 "requirements": result["requirements"],
687             }
688         )
689     for result in ALL_RESULTS:
690         results.append(
691             {
692                 "files": relative_paths(template_path, result.files),
693                 "test_module": result.test_module,
694                 "test_case": result.test_case,
695                 "result": result.outcome,
696                 "error": result.error_message if result.is_failed else "",
697                 "requirements": result.requirements_metadata(reqs),
698             }
699         )
700
701     # Build a mapping of requirement ID to the results
702     r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
703     for test_result in results:
704         test_reqs = test_result["requirements"]
705         r_ids = (
706             [r["id"] if isinstance(r, dict) else r for r in test_reqs]
707             if test_reqs
708             else ("",)
709         )
710         for r_id in r_ids:
711             item = r_id_results[r_id]
712             item["outcomes"].add(test_result["result"])
713             if test_result["error"]:
714                 item["errors"].add(test_result["error"])
715
716     requirements = data["requirements"]
717     for r_id, r_data in reqs.items():
718         requirements.append(
719             {
720                 "id": r_id,
721                 "text": r_data["description"],
722                 "keyword": r_data["keyword"],
723                 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
724                 "errors": list(r_id_results[r_id]["errors"]),
725             }
726         )
727
728     if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
729         requirements.append(
730             {
731                 "id": "Unmapped",
732                 "text": "Tests not mapped to requirements (see tests)",
733                 "result": aggregate_results(r_id_results[""]["outcomes"]),
734                 "errors": list(r_id_results[""]["errors"]),
735             }
736         )
737
738     report_path = os.path.join(outpath, "report.json")
739     write_json(data, report_path)
740
741
742 def generate_html_report(outpath, categories, template_path, failures):
743     reqs = load_current_requirements()
744     fail_data = []
745     for failure in failures:
746         fail_data.append(
747             {
748                 "file_links": make_href(failure.files, template_path),
749                 "test_id": failure.test_id,
750                 "error_message": escape(failure.error_message).replace(
751                     "\n", "<br/><br/>"
752                 ),
753                 "raw_output": escape(failure.raw_output),
754                 "requirements": docutils.core.publish_parts(
755                     writer_name="html", source=failure.requirement_text(reqs)
756                 )["body"],
757             }
758         )
759     pkg_dir = os.path.split(__file__)[0]
760     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
761     with open(j2_template_path, "r") as f:
762         report_template = jinja2.Template(f.read())
763         contents = report_template.render(
764             version=version.VERSION,
765             num_failures=len(failures) + len(COLLECTION_FAILURES),
766             categories=categories,
767             template_dir=make_href(template_path),
768             checksum=hash_directory(template_path),
769             timestamp=make_timestamp(),
770             failures=fail_data,
771             collection_failures=COLLECTION_FAILURES,
772         )
773     with open(os.path.join(outpath, "report.html"), "w") as f:
774         f.write(contents)
775
776
777 def pytest_addoption(parser):
778     """
779     Add needed CLI arguments
780     """
781     parser.addoption(
782         "--template-directory",
783         dest="template_dir",
784         action="append",
785         help="Directory which holds the templates for validation",
786     )
787
788     parser.addoption(
789         "--template-source",
790         dest="template_source",
791         action="append",
792         help="Source Directory which holds the templates for validation",
793     )
794
795     parser.addoption(
796         "--self-test",
797         dest="self_test",
798         action="store_true",
799         help="Test the unit tests against their fixtured data",
800     )
801
802     parser.addoption(
803         "--report-format",
804         dest="report_format",
805         action="store",
806         help="Format of output report (html, csv, excel, json)",
807     )
808
809     parser.addoption(
810         "--continue-on-failure",
811         dest="continue_on_failure",
812         action="store_true",
813         help="Continue validation even when structural errors exist in input files",
814     )
815
816     parser.addoption(
817         "--output-directory",
818         dest="output_dir",
819         action="store",
820         default=None,
821         help="Alternate ",
822     )
823
824     parser.addoption(
825         "--category",
826         dest="test_categories",
827         action="append",
828         help="optional category of test to execute",
829     )
830
831
832 def pytest_configure(config):
833     """
834     Ensure that we are receive either `--self-test` or
835     `--template-dir=<directory` as CLI arguments
836     """
837     if config.getoption("template_dir") and config.getoption("self_test"):
838         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
839     if not (
840         config.getoption("template_dir")
841         or config.getoption("self_test")
842         or config.getoption("help")
843     ):
844         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
845
846
847 def pytest_generate_tests(metafunc):
848     """
849     If a unit test requires an argument named 'filename'
850     we generate a test for the filenames selected. Either
851     the files contained in `template_dir` or if `template_dir`
852     is not specified on the CLI, the fixtures associated with this
853     test name.
854     """
855
856     # noinspection PyBroadException
857     try:
858         if "filename" in metafunc.fixturenames:
859             from .parametrizers import parametrize_filename
860
861             parametrize_filename(metafunc)
862
863         if "filenames" in metafunc.fixturenames:
864             from .parametrizers import parametrize_filenames
865
866             parametrize_filenames(metafunc)
867
868         if "template_dir" in metafunc.fixturenames:
869             from .parametrizers import parametrize_template_dir
870
871             parametrize_template_dir(metafunc)
872
873         if "environment_pair" in metafunc.fixturenames:
874             from .parametrizers import parametrize_environment_pair
875
876             parametrize_environment_pair(metafunc)
877
878         if "heat_volume_pair" in metafunc.fixturenames:
879             from .parametrizers import parametrize_heat_volume_pair
880
881             parametrize_heat_volume_pair(metafunc)
882
883         if "yaml_files" in metafunc.fixturenames:
884             from .parametrizers import parametrize_yaml_files
885
886             parametrize_yaml_files(metafunc)
887
888         if "env_files" in metafunc.fixturenames:
889             from .parametrizers import parametrize_environment_files
890
891             parametrize_environment_files(metafunc)
892
893         if "yaml_file" in metafunc.fixturenames:
894             from .parametrizers import parametrize_yaml_file
895
896             parametrize_yaml_file(metafunc)
897
898         if "env_file" in metafunc.fixturenames:
899             from .parametrizers import parametrize_environment_file
900
901             parametrize_environment_file(metafunc)
902
903         if "parsed_yaml_file" in metafunc.fixturenames:
904             from .parametrizers import parametrize_parsed_yaml_file
905
906             parametrize_parsed_yaml_file(metafunc)
907
908         if "parsed_environment_file" in metafunc.fixturenames:
909             from .parametrizers import parametrize_parsed_environment_file
910
911             parametrize_parsed_environment_file(metafunc)
912
913         if "heat_template" in metafunc.fixturenames:
914             from .parametrizers import parametrize_heat_template
915
916             parametrize_heat_template(metafunc)
917
918         if "heat_templates" in metafunc.fixturenames:
919             from .parametrizers import parametrize_heat_templates
920
921             parametrize_heat_templates(metafunc)
922
923         if "volume_template" in metafunc.fixturenames:
924             from .parametrizers import parametrize_volume_template
925
926             parametrize_volume_template(metafunc)
927
928         if "volume_templates" in metafunc.fixturenames:
929             from .parametrizers import parametrize_volume_templates
930
931             parametrize_volume_templates(metafunc)
932
933         if "template" in metafunc.fixturenames:
934             from .parametrizers import parametrize_template
935
936             parametrize_template(metafunc)
937
938         if "templates" in metafunc.fixturenames:
939             from .parametrizers import parametrize_templates
940
941             parametrize_templates(metafunc)
942     except Exception as e:
943         # If an error occurs in the collection phase, then it won't be logged as a
944         # normal test failure.  This means that failures could occur, but not
945         # be seen on the report resulting in a false positive success message.  These
946         # errors will be stored and reported separately on the report
947         COLLECTION_FAILURES.append(
948             {
949                 "module": metafunc.module.__name__,
950                 "test": metafunc.function.__name__,
951                 "fixtures": metafunc.fixturenames,
952                 "error": traceback.format_exc(),
953                 "requirements": getattr(metafunc.function, "requirement_ids", []),
954             }
955         )
956         raise e
957
958
959 def hash_directory(path):
960     """
961     Create md5 hash using the contents of all files under ``path``
962     :param path: string directory containing files
963     :return: string MD5 hash code (hex)
964     """
965     md5 = hashlib.md5()  # nosec
966     for dir_path, sub_dirs, filenames in os.walk(path):
967         for filename in filenames:
968             file_path = os.path.join(dir_path, filename)
969             with open(file_path, "rb") as f:
970                 md5.update(f.read())
971     return md5.hexdigest()
972
973
974 def load_current_requirements():
975     """Loads dict of current requirements or empty dict if file doesn't exist"""
976     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
977         data = json.load(f)
978         version = data["current_version"]
979         return data["versions"][version]["needs"]
980
981
982 def select_heat_requirements(reqs):
983     """Filters dict requirements to only those requirements pertaining to Heat"""
984     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
985
986
987 def is_testable(reqs):
988     """Filters dict requirements to only those which are testable"""
989     for key, values in reqs.items():
990         if ("MUST" in values.get("keyword", "").upper()) and (
991             "none" not in values.get("validation_mode", "").lower()
992         ):
993             reqs[key]["testable"] = True
994         else:
995             reqs[key]["testable"] = False
996     return reqs
997
998
999 def build_rst_json(reqs):
1000     """Takes requirements and returns list of only Heat requirements"""
1001     for key, values in list(reqs.items()):
1002         if values["testable"]:
1003             # Creates links in RST format to requirements and test cases
1004             if values["test_case"]:
1005                 mod = values["test_case"].split(".")[-1]
1006                 val = TEST_SCRIPT_SITE + mod + ".py"
1007                 rst_value = "`" + mod + " <" + val + ">`_"
1008                 title = (
1009                     "`"
1010                     + values["id"]
1011                     + " <"
1012                     + VNFRQTS_ID_URL
1013                     + values["docname"].replace(" ", "%20")
1014                     + ".html#"
1015                     + values["id"]
1016                     + ">`_"
1017                 )
1018                 reqs[key].update({"full_title": title, "test_case": rst_value})
1019             else:
1020                 title = (
1021                     "`"
1022                     + values["id"]
1023                     + " <"
1024                     + VNFRQTS_ID_URL
1025                     + values["docname"].replace(" ", "%20")
1026                     + ".html#"
1027                     + values["id"]
1028                     + ">`_"
1029                 )
1030                 reqs[key].update(
1031                     {
1032                         "full_title": title,
1033                         "test_case": "No test for requirement",
1034                         "validated_by": "static",
1035                     }
1036                 )
1037         else:
1038             del reqs[key]
1039     return reqs
1040
1041
1042 def generate_rst_table(output_dir, data):
1043     """Generate a formatted csv to be used in RST"""
1044     rst_path = os.path.join(output_dir, "rst.csv")
1045     with open(rst_path, "w", newline="") as f:
1046         out = csv.writer(f)
1047         out.writerow(("Requirement ID", "Test Module", "Test Name"))
1048         for req_id, metadata in data.items():
1049             out.writerow(
1050                 (
1051                     metadata["full_title"],
1052                     metadata["test_case"],
1053                     metadata["validated_by"],
1054                 )
1055             )
1056
1057
1058 # noinspection PyUnusedLocal
1059 def pytest_report_collectionfinish(config, startdir, items):
1060     """Generates a simple traceability report to output/traceability.csv"""
1061     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1062     output_dir = os.path.split(traceability_path)[0]
1063     if not os.path.exists(output_dir):
1064         os.makedirs(output_dir)
1065     reqs = load_current_requirements()
1066     requirements = select_heat_requirements(reqs)
1067     testable_requirements = is_testable(requirements)
1068     unmapped, mapped = partition(
1069         lambda i: hasattr(i.function, "requirement_ids"), items
1070     )
1071
1072     req_to_test = defaultdict(set)
1073     mapping_errors = set()
1074     for item in mapped:
1075         for req_id in item.function.requirement_ids:
1076             if req_id not in req_to_test:
1077                 req_to_test[req_id].add(item)
1078                 if req_id in requirements:
1079                     reqs[req_id].update(
1080                         {
1081                             "test_case": item.function.__module__,
1082                             "validated_by": item.function.__name__,
1083                         }
1084                     )
1085             if req_id not in requirements:
1086                 mapping_errors.add(
1087                     (req_id, item.function.__module__, item.function.__name__)
1088                 )
1089
1090     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1091     with open(mapping_error_path, "w", newline="") as f:
1092         writer = csv.writer(f)
1093         for err in mapping_errors:
1094             writer.writerow(err)
1095
1096     with open(traceability_path, "w", newline="") as f:
1097         out = csv.writer(f)
1098         out.writerow(
1099             (
1100                 "Requirement ID",
1101                 "Requirement",
1102                 "Section",
1103                 "Keyword",
1104                 "Validation Mode",
1105                 "Is Testable",
1106                 "Test Module",
1107                 "Test Name",
1108             )
1109         )
1110         for req_id, metadata in testable_requirements.items():
1111             if req_to_test[req_id]:
1112                 for item in req_to_test[req_id]:
1113                     out.writerow(
1114                         (
1115                             req_id,
1116                             metadata["description"],
1117                             metadata["section_name"],
1118                             metadata["keyword"],
1119                             metadata["validation_mode"],
1120                             metadata["testable"],
1121                             item.function.__module__,
1122                             item.function.__name__,
1123                         )
1124                     )
1125             else:
1126                 out.writerow(
1127                     (
1128                         req_id,
1129                         metadata["description"],
1130                         metadata["section_name"],
1131                         metadata["keyword"],
1132                         metadata["validation_mode"],
1133                         metadata["testable"],
1134                         "",  # test module
1135                         "",
1136                     )  # test function
1137                 )
1138         # now write out any test methods that weren't mapped to requirements
1139         unmapped_tests = {
1140             (item.function.__module__, item.function.__name__) for item in unmapped
1141         }
1142         for test_module, test_name in unmapped_tests:
1143             out.writerow(
1144                 (
1145                     "",  # req ID
1146                     "",  # description
1147                     "",  # section name
1148                     "",  # keyword
1149                     "static",  # validation mode
1150                     "TRUE",  # testable
1151                     test_module,
1152                     test_name,
1153                 )
1154             )
1155
1156     generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))