439cdd7314271f6ff9864d22b1bb69b12a2af451
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47
48 import traceback
49
50 import docutils.core
51 import jinja2
52 import pytest
53 from more_itertools import partition
54 import xlsxwriter
55 from six import string_types
56
57 # noinspection PyUnresolvedReferences
58 import version
59 import logging
60
61 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
62
63 __path__ = [os.path.dirname(os.path.abspath(__file__))]
64
65 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
66
67 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
68 TEST_SCRIPT_SITE = (
69     "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
70 )
71 VNFRQTS_ID_URL = (
72     "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
73 )
74
75 REPORT_COLUMNS = [
76     ("Error #", "err_num"),
77     ("Input File", "file"),
78     ("Requirements", "req_description"),
79     ("Error Message", "message"),
80     ("Test", "test_file"),
81 ]
82
83 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
84 while preparing to validate the the input files. Some validations may not have been
85 executed. Please refer these issue to the VNF Validation Tool team.
86 """
87
88 COLLECTION_FAILURES = []
89
90 # Captures the results of every test run
91 ALL_RESULTS = []
92
93
94 def get_output_dir(config):
95     """
96     Retrieve the output directory for the reports and create it if necessary
97     :param config: pytest configuration
98     :return: output directory as string
99     """
100     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
101     if not os.path.exists(output_dir):
102         os.makedirs(output_dir, exist_ok=True)
103     return output_dir
104
105
106 def extract_error_msg(rep):
107     """
108     If a custom error message was provided, then extract it otherwise
109     just show the pytest assert message
110     """
111     if rep.outcome != "failed":
112         return ""
113     try:
114         full_msg = str(rep.longrepr.reprcrash.message)
115         match = re.match(
116             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
117         )
118         if match:  # custom message was provided
119             # Extract everything between AssertionError and the start
120             # of the assert statement expansion in the pytest report
121             msg = match.group(1)
122         else:
123             msg = str(rep.longrepr.reprcrash)
124             if "AssertionError:" in msg:
125                 msg = msg.split("AssertionError:")[1]
126     except AttributeError:
127         msg = str(rep)
128
129     return msg
130
131
132 class TestResult:
133     """
134     Wraps the test case and result to extract necessary metadata for
135     reporting purposes.
136     """
137
138     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
139
140     def __init__(self, item, outcome):
141         self.item = item
142         self.result = outcome.get_result()
143         self.files = self._get_files()
144         self.error_message = self._get_error_message()
145
146     @property
147     def requirement_ids(self):
148         """
149         Returns list of requirement IDs mapped to the test case.
150
151         :return: Returns a list of string requirement IDs the test was
152                  annotated with ``validates`` otherwise returns and empty list
153         """
154         is_mapped = hasattr(self.item.function, "requirement_ids")
155         return self.item.function.requirement_ids if is_mapped else []
156
157     @property
158     def markers(self):
159         """
160         :return: Returns a set of pytest marker names for the test or an empty set
161         """
162         return set(m.name for m in self.item.iter_markers())
163
164     @property
165     def is_base_test(self):
166         """
167         :return: Returns True if the test is annotated with a pytest marker called base
168         """
169         return "base" in self.markers
170
171     @property
172     def is_failed(self):
173         """
174         :return: True if the test failed
175         """
176         return self.outcome == "FAIL"
177
178     @property
179     def outcome(self):
180         """
181         :return: Returns 'PASS', 'FAIL', or 'SKIP'
182         """
183         return self.RESULT_MAPPING[self.result.outcome]
184
185     @property
186     def test_case(self):
187         """
188         :return: Name of the test case method
189         """
190         return self.item.function.__name__
191
192     @property
193     def test_module(self):
194         """
195         :return: Name of the file containing the test case
196         """
197         return self.item.function.__module__.split(".")[-1]
198
199     @property
200     def test_id(self):
201         """
202         :return: ID of the test (test_module + test_case)
203         """
204         return "{}::{}".format(self.test_module, self.test_case)
205
206     @property
207     def raw_output(self):
208         """
209         :return: Full output from pytest for the given test case
210         """
211         return str(self.result.longrepr)
212
213     def requirement_text(self, curr_reqs):
214         """
215         Creates a text summary for the requirement IDs mapped to the test case.
216         If no requirements are mapped, then it returns the empty string.
217
218         :param curr_reqs: mapping of requirement IDs to requirement metadata
219                           loaded from the VNFRQTS projects needs.json output
220         :return: ID and text of the requirements mapped to the test case
221         """
222         text = (
223             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
224             for r_id in self.requirement_ids
225             if r_id in curr_reqs
226         )
227         return "".join(text)
228
229     def requirements_metadata(self, curr_reqs):
230         """
231         Returns a list of dicts containing the following metadata for each
232         requirement mapped:
233
234         - id: Requirement ID
235         - text: Full text of the requirement
236         - keyword: MUST, MUST NOT, MAY, etc.
237
238         :param curr_reqs: mapping of requirement IDs to requirement metadata
239                           loaded from the VNFRQTS projects needs.json output
240         :return: List of requirement metadata
241         """
242         data = []
243         for r_id in self.requirement_ids:
244             if r_id not in curr_reqs:
245                 continue
246             data.append(
247                 {
248                     "id": r_id,
249                     "text": curr_reqs[r_id]["description"],
250                     "keyword": curr_reqs[r_id]["keyword"],
251                 }
252             )
253         return data
254
255     def _get_files(self):
256         """
257         Extracts the list of files passed into the test case.
258         :return: List of absolute paths to files
259         """
260         if "environment_pair" in self.item.fixturenames:
261             return [
262                 "{} environment pair".format(
263                     self.item.funcargs["environment_pair"]["name"]
264                 )
265             ]
266         elif "heat_volume_pair" in self.item.fixturenames:
267             return [
268                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
269             ]
270         elif "heat_templates" in self.item.fixturenames:
271             return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
272         elif "yaml_files" in self.item.fixturenames:
273             return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
274         else:
275             parts = self.result.nodeid.split("[")
276             return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
277
278     def _get_error_message(self):
279         """
280         :return: Error message or empty string if the test did not fail or error
281         """
282         if self.is_failed:
283             return extract_error_msg(self.result)
284         else:
285             return ""
286
287
288 # noinspection PyUnusedLocal
289 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
290 def pytest_runtest_makereport(item, call):
291     """
292     Captures the test results for later reporting.  This will also halt testing
293     if a base failure is encountered (can be overridden with continue-on-failure)
294     """
295     outcome = yield
296     if outcome.get_result().when != "call":
297         return  # only capture results of test cases themselves
298     result = TestResult(item, outcome)
299     if (
300         not item.config.option.continue_on_failure
301         and result.is_base_test
302         and result.is_failed
303     ):
304         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
305             result.error_message
306         )
307         result.error_message = msg
308         ALL_RESULTS.append(result)
309         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
310
311     ALL_RESULTS.append(result)
312
313
314 def make_timestamp():
315     """
316     :return: String make_iso_timestamp in format:
317              2019-01-19 10:18:49.865000 Central Standard Time
318     """
319     timezone = time.tzname[time.localtime().tm_isdst]
320     return "{} {}".format(str(datetime.datetime.now()), timezone)
321
322
323 # noinspection PyUnusedLocal
324 def pytest_sessionstart(session):
325     ALL_RESULTS.clear()
326     COLLECTION_FAILURES.clear()
327
328
329 # noinspection PyUnusedLocal
330 def pytest_sessionfinish(session, exitstatus):
331     """
332     If not a self-test run, generate the output reports
333     """
334     if not session.config.option.template_dir:
335         return
336
337     if session.config.option.template_source:
338         template_source = session.config.option.template_source[0]
339     else:
340         template_source = os.path.abspath(session.config.option.template_dir[0])
341
342     categories_selected = session.config.option.test_categories or ""
343     generate_report(
344         get_output_dir(session.config),
345         template_source,
346         categories_selected,
347         session.config.option.report_format,
348     )
349
350
351 # noinspection PyUnusedLocal
352 def pytest_collection_modifyitems(session, config, items):
353     """
354     Selects tests based on the categories requested.  Tests without
355     categories will always be executed.
356     """
357     config.traceability_items = list(items)  # save all items for traceability
358     if not config.option.self_test:
359         for item in items:
360             # checking if test belongs to a category
361             if hasattr(item.function, "categories"):
362                 if config.option.test_categories:
363                     test_categories = getattr(item.function, "categories")
364                     passed_categories = config.option.test_categories
365                     if not all(
366                         category in passed_categories for category in test_categories
367                     ):
368                         item.add_marker(
369                             pytest.mark.skip(
370                                 reason=(
371                                     "Test categories do not match "
372                                     "all the passed categories"
373                                 )
374                             )
375                         )
376                 else:
377                     item.add_marker(
378                         pytest.mark.skip(
379                             reason=(
380                                 "Test belongs to a category but "
381                                 "no categories were passed"
382                             )
383                         )
384                     )
385
386     items.sort(
387         key=lambda x: (0, x.name)
388         if "base" in set(m.name for m in x.iter_markers())
389         else (1, x.name)
390     )
391
392
393 def make_href(paths, base_dir=None):
394     """
395     Create an anchor tag to link to the file paths provided.
396     :param paths: string or list of file paths
397     :param base_dir: If specified this is pre-pended to each path
398     :return: String of hrefs - one for each path, each seperated by a line
399              break (<br/).
400     """
401     paths = [paths] if isinstance(paths, string_types) else paths
402     if base_dir:
403         paths = [os.path.join(base_dir, p) for p in paths]
404     links = []
405     for p in paths:
406         abs_path = os.path.abspath(p)
407         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
408         links.append(
409             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
410                 abs_path=abs_path, name=name
411             )
412         )
413     return "<br/>".join(links)
414
415
416 def generate_report(outpath, template_path, categories, output_format="html"):
417     """
418     Generates the various output reports.
419
420     :param outpath: destination directory for all reports
421     :param template_path: directory containing the Heat templates validated
422     :param categories: Optional categories selected
423     :param output_format: One of "html", "excel", or "csv". Default is "html"
424     :raises: ValueError if requested output format is unknown
425     """
426     failures = [r for r in ALL_RESULTS if r.is_failed]
427     generate_failure_file(outpath)
428     output_format = output_format.lower().strip() if output_format else "html"
429     generate_json(outpath, template_path, categories)
430     if output_format == "html":
431         generate_html_report(outpath, categories, template_path, failures)
432     elif output_format == "excel":
433         generate_excel_report(outpath, categories, template_path, failures)
434     elif output_format == "json":
435         return
436     elif output_format == "csv":
437         generate_csv_report(outpath, categories, template_path, failures)
438     else:
439         raise ValueError("Unsupported output format: " + output_format)
440
441
442 def write_json(data, path):
443     """
444     Pretty print data as JSON to the output path requested
445
446     :param data: Data structure to be converted to JSON
447     :param path: Where to write output
448     """
449     with open(path, "w") as f:
450         json.dump(data, f, indent=2)
451
452
453 def generate_failure_file(outpath):
454     """
455     Writes a summary of test failures to a file named failures.
456     This is for backwards compatibility only.  The report.json offers a
457     more comprehensive output.
458     """
459     failure_path = os.path.join(outpath, "failures")
460     failures = [r for r in ALL_RESULTS if r.is_failed]
461     data = {}
462     for i, fail in enumerate(failures):
463         data[str(i)] = {
464             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
465             "vnfrqts": fail.requirement_ids,
466             "test": fail.test_case,
467             "test_file": fail.test_module,
468             "raw_output": fail.raw_output,
469             "message": fail.error_message,
470         }
471     write_json(data, failure_path)
472
473
474 def generate_csv_report(output_dir, categories, template_path, failures):
475     rows = [["Validation Failures"]]
476     headers = [
477         ("Categories Selected:", categories),
478         ("Tool Version:", version.VERSION),
479         ("Report Generated At:", make_timestamp()),
480         ("Directory Validated:", template_path),
481         ("Checksum:", hash_directory(template_path)),
482         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
483     ]
484     rows.append([])
485     for header in headers:
486         rows.append(header)
487     rows.append([])
488
489     if COLLECTION_FAILURES:
490         rows.append([COLLECTION_FAILURE_WARNING])
491         rows.append(["Validation File", "Test", "Fixtures", "Error"])
492         for failure in COLLECTION_FAILURES:
493             rows.append(
494                 [
495                     failure["module"],
496                     failure["test"],
497                     ";".join(failure["fixtures"]),
498                     failure["error"],
499                 ]
500             )
501         rows.append([])
502
503     # table header
504     rows.append([col for col, _ in REPORT_COLUMNS])
505
506     reqs = load_current_requirements()
507
508     # table content
509     for i, failure in enumerate(failures, start=1):
510         rows.append(
511             [
512                 i,
513                 "\n".join(failure.files),
514                 failure.requirement_text(reqs),
515                 failure.error_message,
516                 failure.test_id,
517             ]
518         )
519
520     output_path = os.path.join(output_dir, "report.csv")
521     with open(output_path, "w", newline="") as f:
522         writer = csv.writer(f)
523         for row in rows:
524             writer.writerow(row)
525
526
527 def generate_excel_report(output_dir, categories, template_path, failures):
528     output_path = os.path.join(output_dir, "report.xlsx")
529     workbook = xlsxwriter.Workbook(output_path)
530     bold = workbook.add_format({"bold": True, "align": "top"})
531     code = workbook.add_format(
532         {"font_name": "Courier", "text_wrap": True, "align": "top"}
533     )
534     normal = workbook.add_format({"text_wrap": True, "align": "top"})
535     heading = workbook.add_format({"bold": True, "font_size": 18})
536     worksheet = workbook.add_worksheet("failures")
537     worksheet.write(0, 0, "Validation Failures", heading)
538
539     headers = [
540         ("Categories Selected:", ",".join(categories)),
541         ("Tool Version:", version.VERSION),
542         ("Report Generated At:", make_timestamp()),
543         ("Directory Validated:", template_path),
544         ("Checksum:", hash_directory(template_path)),
545         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
546     ]
547     for row, (header, value) in enumerate(headers, start=2):
548         worksheet.write(row, 0, header, bold)
549         worksheet.write(row, 1, value)
550
551     worksheet.set_column(0, len(headers) - 1, 40)
552     worksheet.set_column(len(headers), len(headers), 80)
553
554     if COLLECTION_FAILURES:
555         collection_failures_start = 2 + len(headers) + 2
556         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
557         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
558         for col_num, col_name in enumerate(collection_failure_headers):
559             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
560         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
561             worksheet.write(row, 0, data["module"])
562             worksheet.write(row, 1, data["test"])
563             worksheet.write(row, 2, ",".join(data["fixtures"]))
564             worksheet.write(row, 3, data["error"], code)
565
566     # table header
567     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
568     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
569     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
570         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
571
572     reqs = load_current_requirements()
573
574     # table content
575     for col, width in enumerate((20, 30, 60, 60, 40)):
576         worksheet.set_column(col, col, width)
577     err_num = 1
578     for row, failure in enumerate(failures, start=start_error_table_row + 2):
579         worksheet.write(row, 0, str(err_num), normal)
580         worksheet.write(row, 1, "\n".join(failure.files), normal)
581         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
582         worksheet.write(row, 3, failure.error_message, normal)
583         worksheet.write(row, 4, failure.test_id, normal)
584         err_num += 1
585     worksheet.autofilter(
586         start_error_table_row + 1,
587         0,
588         start_error_table_row + 1 + err_num,
589         len(REPORT_COLUMNS) - 1,
590     )
591     workbook.close()
592
593
594 def make_iso_timestamp():
595     """
596     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
597     """
598     now = datetime.datetime.utcnow()
599     now.replace(tzinfo=datetime.timezone.utc)
600     return now.isoformat()
601
602
603 def aggregate_results(outcomes, r_id=None):
604     """
605     Determines the aggregate result for the conditions provided.  Assumes the
606     results have been filtered and collected for analysis.
607
608     :param outcomes: set of outcomes from the TestResults
609     :param r_id: Optional requirement ID if known
610     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
611              (see aggregate_requirement_adherence for more detail)
612     """
613     if not outcomes:
614         return "PASS"
615     elif "ERROR" in outcomes:
616         return "ERROR"
617     elif "FAIL" in outcomes:
618         return "FAIL"
619     elif "PASS" in outcomes:
620         return "PASS"
621     elif {"SKIP"} == outcomes:
622         return "SKIP"
623     else:
624         pytest.warns(
625             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
626                 outcomes, r_id
627             )
628         )
629         return "ERROR"
630
631
632 def aggregate_run_results(collection_failures, test_results):
633     """
634     Determines overall status of run based on all failures and results.
635
636     * 'ERROR' - At least one collection failure occurred during the run.
637     * 'FAIL' - Template failed at least one test
638     * 'PASS' - All tests executed properly and no failures were detected
639
640     :param collection_failures: failures occuring during test setup
641     :param test_results: list of all test executuion results
642     :return: one of 'ERROR', 'FAIL', or 'PASS'
643     """
644     if collection_failures:
645         return "ERROR"
646     elif any(r.is_failed for r in test_results):
647         return "FAIL"
648     else:
649         return "PASS"
650
651
652 def relative_paths(base_dir, paths):
653     return [os.path.relpath(p, base_dir) for p in paths]
654
655
656 # noinspection PyTypeChecker
657 def generate_json(outpath, template_path, categories):
658     """
659     Creates a JSON summary of the entire test run.
660     """
661     reqs = load_current_requirements()
662     data = {
663         "version": "dublin",
664         "template_directory": os.path.splitdrive(template_path)[1].replace(
665             os.path.sep, "/"
666         ),
667         "timestamp": make_iso_timestamp(),
668         "checksum": hash_directory(template_path),
669         "categories": categories,
670         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
671         "tests": [],
672         "requirements": [],
673     }
674
675     results = data["tests"]
676     for result in COLLECTION_FAILURES:
677         results.append(
678             {
679                 "files": [],
680                 "test_module": result["module"],
681                 "test_case": result["test"],
682                 "result": "ERROR",
683                 "error": result["error"],
684                 "requirements": result["requirements"],
685             }
686         )
687     for result in ALL_RESULTS:
688         results.append(
689             {
690                 "files": relative_paths(template_path, result.files),
691                 "test_module": result.test_module,
692                 "test_case": result.test_case,
693                 "result": result.outcome,
694                 "error": result.error_message if result.is_failed else "",
695                 "requirements": result.requirements_metadata(reqs),
696             }
697         )
698
699     # Build a mapping of requirement ID to the results
700     r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
701     for test_result in results:
702         test_reqs = test_result["requirements"]
703         r_ids = (
704             [r["id"] if isinstance(r, dict) else r for r in test_reqs]
705             if test_reqs
706             else ("",)
707         )
708         for r_id in r_ids:
709             item = r_id_results[r_id]
710             item["outcomes"].add(test_result["result"])
711             if test_result["error"]:
712                 item["errors"].add(test_result["error"])
713
714     requirements = data["requirements"]
715     for r_id, r_data in reqs.items():
716         requirements.append(
717             {
718                 "id": r_id,
719                 "text": r_data["description"],
720                 "keyword": r_data["keyword"],
721                 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
722                 "errors": list(r_id_results[r_id]["errors"]),
723             }
724         )
725
726     if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
727         requirements.append(
728             {
729                 "id": "Unmapped",
730                 "text": "Tests not mapped to requirements (see tests)",
731                 "result": aggregate_results(r_id_results[""]["outcomes"]),
732                 "errors": list(r_id_results[""]["errors"]),
733             }
734         )
735
736     report_path = os.path.join(outpath, "report.json")
737     write_json(data, report_path)
738
739
740 def generate_html_report(outpath, categories, template_path, failures):
741     reqs = load_current_requirements()
742     fail_data = []
743     for failure in failures:
744         fail_data.append(
745             {
746                 "file_links": make_href(failure.files, template_path),
747                 "test_id": failure.test_id,
748                 "error_message": failure.error_message,
749                 "raw_output": failure.raw_output,
750                 "requirements": docutils.core.publish_parts(
751                     writer_name="html", source=failure.requirement_text(reqs)
752                 )["body"],
753             }
754         )
755     pkg_dir = os.path.split(__file__)[0]
756     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
757     with open(j2_template_path, "r") as f:
758         report_template = jinja2.Template(f.read())
759         contents = report_template.render(
760             version=version.VERSION,
761             num_failures=len(failures) + len(COLLECTION_FAILURES),
762             categories=categories,
763             template_dir=make_href(template_path),
764             checksum=hash_directory(template_path),
765             timestamp=make_timestamp(),
766             failures=fail_data,
767             collection_failures=COLLECTION_FAILURES,
768         )
769     with open(os.path.join(outpath, "report.html"), "w") as f:
770         f.write(contents)
771
772
773 def pytest_addoption(parser):
774     """
775     Add needed CLI arguments
776     """
777     parser.addoption(
778         "--template-directory",
779         dest="template_dir",
780         action="append",
781         help="Directory which holds the templates for validation",
782     )
783
784     parser.addoption(
785         "--template-source",
786         dest="template_source",
787         action="append",
788         help="Source Directory which holds the templates for validation",
789     )
790
791     parser.addoption(
792         "--self-test",
793         dest="self_test",
794         action="store_true",
795         help="Test the unit tests against their fixtured data",
796     )
797
798     parser.addoption(
799         "--report-format",
800         dest="report_format",
801         action="store",
802         help="Format of output report (html, csv, excel, json)",
803     )
804
805     parser.addoption(
806         "--continue-on-failure",
807         dest="continue_on_failure",
808         action="store_true",
809         help="Continue validation even when structural errors exist in input files",
810     )
811
812     parser.addoption(
813         "--output-directory",
814         dest="output_dir",
815         action="store",
816         default=None,
817         help="Alternate ",
818     )
819
820     parser.addoption(
821         "--category",
822         dest="test_categories",
823         action="append",
824         help="optional category of test to execute",
825     )
826
827
828 def pytest_configure(config):
829     """
830     Ensure that we are receive either `--self-test` or
831     `--template-dir=<directory` as CLI arguments
832     """
833     if config.getoption("template_dir") and config.getoption("self_test"):
834         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
835     if not (
836         config.getoption("template_dir")
837         or config.getoption("self_test")
838         or config.getoption("help")
839     ):
840         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
841
842
843 def pytest_generate_tests(metafunc):
844     """
845     If a unit test requires an argument named 'filename'
846     we generate a test for the filenames selected. Either
847     the files contained in `template_dir` or if `template_dir`
848     is not specified on the CLI, the fixtures associated with this
849     test name.
850     """
851
852     # noinspection PyBroadException
853     try:
854         if "filename" in metafunc.fixturenames:
855             from .parametrizers import parametrize_filename
856
857             parametrize_filename(metafunc)
858
859         if "filenames" in metafunc.fixturenames:
860             from .parametrizers import parametrize_filenames
861
862             parametrize_filenames(metafunc)
863
864         if "template_dir" in metafunc.fixturenames:
865             from .parametrizers import parametrize_template_dir
866
867             parametrize_template_dir(metafunc)
868
869         if "environment_pair" in metafunc.fixturenames:
870             from .parametrizers import parametrize_environment_pair
871
872             parametrize_environment_pair(metafunc)
873
874         if "heat_volume_pair" in metafunc.fixturenames:
875             from .parametrizers import parametrize_heat_volume_pair
876
877             parametrize_heat_volume_pair(metafunc)
878
879         if "yaml_files" in metafunc.fixturenames:
880             from .parametrizers import parametrize_yaml_files
881
882             parametrize_yaml_files(metafunc)
883
884         if "env_files" in metafunc.fixturenames:
885             from .parametrizers import parametrize_environment_files
886
887             parametrize_environment_files(metafunc)
888
889         if "yaml_file" in metafunc.fixturenames:
890             from .parametrizers import parametrize_yaml_file
891
892             parametrize_yaml_file(metafunc)
893
894         if "env_file" in metafunc.fixturenames:
895             from .parametrizers import parametrize_environment_file
896
897             parametrize_environment_file(metafunc)
898
899         if "parsed_yaml_file" in metafunc.fixturenames:
900             from .parametrizers import parametrize_parsed_yaml_file
901
902             parametrize_parsed_yaml_file(metafunc)
903
904         if "parsed_environment_file" in metafunc.fixturenames:
905             from .parametrizers import parametrize_parsed_environment_file
906
907             parametrize_parsed_environment_file(metafunc)
908
909         if "heat_template" in metafunc.fixturenames:
910             from .parametrizers import parametrize_heat_template
911
912             parametrize_heat_template(metafunc)
913
914         if "heat_templates" in metafunc.fixturenames:
915             from .parametrizers import parametrize_heat_templates
916
917             parametrize_heat_templates(metafunc)
918
919         if "volume_template" in metafunc.fixturenames:
920             from .parametrizers import parametrize_volume_template
921
922             parametrize_volume_template(metafunc)
923
924         if "volume_templates" in metafunc.fixturenames:
925             from .parametrizers import parametrize_volume_templates
926
927             parametrize_volume_templates(metafunc)
928
929         if "template" in metafunc.fixturenames:
930             from .parametrizers import parametrize_template
931
932             parametrize_template(metafunc)
933
934         if "templates" in metafunc.fixturenames:
935             from .parametrizers import parametrize_templates
936
937             parametrize_templates(metafunc)
938     except Exception as e:
939         # If an error occurs in the collection phase, then it won't be logged as a
940         # normal test failure.  This means that failures could occur, but not
941         # be seen on the report resulting in a false positive success message.  These
942         # errors will be stored and reported separately on the report
943         COLLECTION_FAILURES.append(
944             {
945                 "module": metafunc.module.__name__,
946                 "test": metafunc.function.__name__,
947                 "fixtures": metafunc.fixturenames,
948                 "error": traceback.format_exc(),
949                 "requirements": getattr(metafunc.function, "requirement_ids", []),
950             }
951         )
952         raise e
953
954
955 def hash_directory(path):
956     """
957     Create md5 hash using the contents of all files under ``path``
958     :param path: string directory containing files
959     :return: string MD5 hash code (hex)
960     """
961     md5 = hashlib.md5()
962     for dir_path, sub_dirs, filenames in os.walk(path):
963         for filename in filenames:
964             file_path = os.path.join(dir_path, filename)
965             with open(file_path, "rb") as f:
966                 md5.update(f.read())
967     return md5.hexdigest()
968
969
970 def load_current_requirements():
971     """Loads dict of current requirements or empty dict if file doesn't exist"""
972     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
973         data = json.load(f)
974         version = data["current_version"]
975         return data["versions"][version]["needs"]
976
977
978 def select_heat_requirements(reqs):
979     """Filters dict requirements to only those requirements pertaining to Heat"""
980     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
981
982
983 def is_testable(reqs):
984     """Filters dict requirements to only those which are testable"""
985     for key, values in reqs.items():
986         if ("MUST" in values.get("keyword", "").upper()) and (
987             "none" not in values.get("validation_mode", "").lower()
988         ):
989             reqs[key]["testable"] = True
990         else:
991             reqs[key]["testable"] = False
992     return reqs
993
994
995 def build_rst_json(reqs):
996     """Takes requirements and returns list of only Heat requirements"""
997     for key, values in list(reqs.items()):
998         if values["testable"]:
999             # Creates links in RST format to requirements and test cases
1000             if values["test_case"]:
1001                 mod = values["test_case"].split(".")[-1]
1002                 val = TEST_SCRIPT_SITE + mod + ".py"
1003                 rst_value = "`" + mod + " <" + val + ">`_"
1004                 title = (
1005                     "`"
1006                     + values["id"]
1007                     + " <"
1008                     + VNFRQTS_ID_URL
1009                     + values["docname"].replace(" ", "%20")
1010                     + ".html#"
1011                     + values["id"]
1012                     + ">`_"
1013                 )
1014                 reqs[key].update({"full_title": title, "test_case": rst_value})
1015             else:
1016                 title = (
1017                     "`"
1018                     + values["id"]
1019                     + " <"
1020                     + VNFRQTS_ID_URL
1021                     + values["docname"].replace(" ", "%20")
1022                     + ".html#"
1023                     + values["id"]
1024                     + ">`_"
1025                 )
1026                 reqs[key].update(
1027                     {
1028                         "full_title": title,
1029                         "test_case": "No test for requirement",
1030                         "validated_by": "static",
1031                     }
1032                 )
1033         else:
1034             del reqs[key]
1035     return reqs
1036
1037
1038 def generate_rst_table(output_dir, data):
1039     """Generate a formatted csv to be used in RST"""
1040     rst_path = os.path.join(output_dir, "rst.csv")
1041     with open(rst_path, "w", newline="") as f:
1042         out = csv.writer(f)
1043         out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1044         for req_id, metadata in data.items():
1045             out.writerow(
1046                 (
1047                     metadata["full_title"],
1048                     metadata["description"],
1049                     metadata["test_case"],
1050                     metadata["validated_by"],
1051                 )
1052             )
1053
1054
1055 # noinspection PyUnusedLocal
1056 def pytest_report_collectionfinish(config, startdir, items):
1057     """Generates a simple traceability report to output/traceability.csv"""
1058     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1059     output_dir = os.path.split(traceability_path)[0]
1060     if not os.path.exists(output_dir):
1061         os.makedirs(output_dir)
1062     reqs = load_current_requirements()
1063     requirements = select_heat_requirements(reqs)
1064     testable_requirements = is_testable(requirements)
1065     unmapped, mapped = partition(
1066         lambda i: hasattr(i.function, "requirement_ids"), items
1067     )
1068
1069     req_to_test = defaultdict(set)
1070     mapping_errors = set()
1071     for item in mapped:
1072         for req_id in item.function.requirement_ids:
1073             if req_id not in req_to_test:
1074                 req_to_test[req_id].add(item)
1075                 if req_id in requirements:
1076                     reqs[req_id].update(
1077                         {
1078                             "test_case": item.function.__module__,
1079                             "validated_by": item.function.__name__,
1080                         }
1081                     )
1082             if req_id not in requirements:
1083                 mapping_errors.add(
1084                     (req_id, item.function.__module__, item.function.__name__)
1085                 )
1086
1087     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1088     with open(mapping_error_path, "w", newline="") as f:
1089         writer = csv.writer(f)
1090         for err in mapping_errors:
1091             writer.writerow(err)
1092
1093     with open(traceability_path, "w", newline="") as f:
1094         out = csv.writer(f)
1095         out.writerow(
1096             (
1097                 "Requirement ID",
1098                 "Requirement",
1099                 "Section",
1100                 "Keyword",
1101                 "Validation Mode",
1102                 "Is Testable",
1103                 "Test Module",
1104                 "Test Name",
1105             )
1106         )
1107         for req_id, metadata in testable_requirements.items():
1108             if req_to_test[req_id]:
1109                 for item in req_to_test[req_id]:
1110                     out.writerow(
1111                         (
1112                             req_id,
1113                             metadata["description"],
1114                             metadata["section_name"],
1115                             metadata["keyword"],
1116                             metadata["validation_mode"],
1117                             metadata["testable"],
1118                             item.function.__module__,
1119                             item.function.__name__,
1120                         )
1121                     )
1122             else:
1123                 out.writerow(
1124                     (
1125                         req_id,
1126                         metadata["description"],
1127                         metadata["section_name"],
1128                         metadata["keyword"],
1129                         metadata["validation_mode"],
1130                         metadata["testable"],
1131                         "",  # test module
1132                         "",
1133                     )  # test function
1134                 )
1135         # now write out any test methods that weren't mapped to requirements
1136         unmapped_tests = {
1137             (item.function.__module__, item.function.__name__) for item in unmapped
1138         }
1139         for test_module, test_name in unmapped_tests:
1140             out.writerow(
1141                 (
1142                     "",  # req ID
1143                     "",  # description
1144                     "",  # section name
1145                     "",  # keyword
1146                     "static",  # validation mode
1147                     "TRUE",  # testable
1148                     test_module,
1149                     test_name,
1150                 )
1151             )
1152
1153     generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))