5653cca97eb470e4f093cc62f0eb721c2fb626ba
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 try:
47     from html import escape
48 except ImportError:
49     from cgi import escape
50 from collections import defaultdict
51
52 import traceback
53
54 import docutils.core
55 import jinja2
56 import pytest
57 from more_itertools import partition
58 import xlsxwriter
59 from six import string_types
60
61 # noinspection PyUnresolvedReferences
62 import version
63 import logging
64
65 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
66
67 __path__ = [os.path.dirname(os.path.abspath(__file__))]
68
69 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
70
71 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
72 TEST_SCRIPT_SITE = (
73     "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
74 )
75 VNFRQTS_ID_URL = (
76     "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
77 )
78
79 REPORT_COLUMNS = [
80     ("Error #", "err_num"),
81     ("Input File", "file"),
82     ("Requirements", "req_description"),
83     ("Error Message", "message"),
84     ("Test", "test_file"),
85 ]
86
87 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
88 while preparing to validate the the input files. Some validations may not have been
89 executed. Please refer these issue to the VNF Validation Tool team.
90 """
91
92 COLLECTION_FAILURES = []
93
94 # Captures the results of every test run
95 ALL_RESULTS = []
96
97
98 def get_output_dir(config):
99     """
100     Retrieve the output directory for the reports and create it if necessary
101     :param config: pytest configuration
102     :return: output directory as string
103     """
104     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
105     if not os.path.exists(output_dir):
106         os.makedirs(output_dir, exist_ok=True)
107     return output_dir
108
109
110 def extract_error_msg(rep):
111     """
112     If a custom error message was provided, then extract it otherwise
113     just show the pytest assert message
114     """
115     if rep.outcome != "failed":
116         return ""
117     try:
118         full_msg = str(rep.longrepr.reprcrash.message)
119         match = re.match(
120             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
121         )
122         if match:  # custom message was provided
123             # Extract everything between AssertionError and the start
124             # of the assert statement expansion in the pytest report
125             msg = match.group(1)
126         elif "AssertionError:" in full_msg:
127             msg = full_msg.split("AssertionError:")[1]
128         else:
129             msg = full_msg
130     except AttributeError:
131         msg = str(rep)
132
133     return msg
134
135
136 class TestResult:
137     """
138     Wraps the test case and result to extract necessary metadata for
139     reporting purposes.
140     """
141
142     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
143
144     def __init__(self, item, outcome):
145         self.item = item
146         self.result = outcome.get_result()
147         self.files = self._get_files()
148         self.error_message = self._get_error_message()
149
150     @property
151     def requirement_ids(self):
152         """
153         Returns list of requirement IDs mapped to the test case.
154
155         :return: Returns a list of string requirement IDs the test was
156                  annotated with ``validates`` otherwise returns and empty list
157         """
158         is_mapped = hasattr(self.item.function, "requirement_ids")
159         return self.item.function.requirement_ids if is_mapped else []
160
161     @property
162     def markers(self):
163         """
164         :return: Returns a set of pytest marker names for the test or an empty set
165         """
166         return set(m.name for m in self.item.iter_markers())
167
168     @property
169     def is_base_test(self):
170         """
171         :return: Returns True if the test is annotated with a pytest marker called base
172         """
173         return "base" in self.markers
174
175     @property
176     def is_failed(self):
177         """
178         :return: True if the test failed
179         """
180         return self.outcome == "FAIL"
181
182     @property
183     def outcome(self):
184         """
185         :return: Returns 'PASS', 'FAIL', or 'SKIP'
186         """
187         return self.RESULT_MAPPING[self.result.outcome]
188
189     @property
190     def test_case(self):
191         """
192         :return: Name of the test case method
193         """
194         return self.item.function.__name__
195
196     @property
197     def test_module(self):
198         """
199         :return: Name of the file containing the test case
200         """
201         return self.item.function.__module__.split(".")[-1]
202
203     @property
204     def test_id(self):
205         """
206         :return: ID of the test (test_module + test_case)
207         """
208         return "{}::{}".format(self.test_module, self.test_case)
209
210     @property
211     def raw_output(self):
212         """
213         :return: Full output from pytest for the given test case
214         """
215         return str(self.result.longrepr)
216
217     def requirement_text(self, curr_reqs):
218         """
219         Creates a text summary for the requirement IDs mapped to the test case.
220         If no requirements are mapped, then it returns the empty string.
221
222         :param curr_reqs: mapping of requirement IDs to requirement metadata
223                           loaded from the VNFRQTS projects needs.json output
224         :return: ID and text of the requirements mapped to the test case
225         """
226         text = (
227             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
228             for r_id in self.requirement_ids
229             if r_id in curr_reqs
230         )
231         return "".join(text)
232
233     def requirements_metadata(self, curr_reqs):
234         """
235         Returns a list of dicts containing the following metadata for each
236         requirement mapped:
237
238         - id: Requirement ID
239         - text: Full text of the requirement
240         - keyword: MUST, MUST NOT, MAY, etc.
241
242         :param curr_reqs: mapping of requirement IDs to requirement metadata
243                           loaded from the VNFRQTS projects needs.json output
244         :return: List of requirement metadata
245         """
246         data = []
247         for r_id in self.requirement_ids:
248             if r_id not in curr_reqs:
249                 continue
250             data.append(
251                 {
252                     "id": r_id,
253                     "text": curr_reqs[r_id]["description"],
254                     "keyword": curr_reqs[r_id]["keyword"],
255                 }
256             )
257         return data
258
259     def _get_files(self):
260         """
261         Extracts the list of files passed into the test case.
262         :return: List of absolute paths to files
263         """
264         if "environment_pair" in self.item.fixturenames:
265             return [
266                 "{} environment pair".format(
267                     self.item.funcargs["environment_pair"]["name"]
268                 )
269             ]
270         elif "heat_volume_pair" in self.item.fixturenames:
271             return [
272                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
273             ]
274         elif "heat_templates" in self.item.fixturenames:
275             return [os.path.basename(f) for f in self.item.funcargs["heat_templates"]]
276         elif "yaml_files" in self.item.fixturenames:
277             return [os.path.basename(f) for f in self.item.funcargs["yaml_files"]]
278         else:
279             parts = self.result.nodeid.split("[")
280             return [""] if len(parts) == 1 else [os.path.basename(parts[1][:-1])]
281
282     def _get_error_message(self):
283         """
284         :return: Error message or empty string if the test did not fail or error
285         """
286         if self.is_failed:
287             return extract_error_msg(self.result)
288         else:
289             return ""
290
291
292 # noinspection PyUnusedLocal
293 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
294 def pytest_runtest_makereport(item, call):
295     """
296     Captures the test results for later reporting.  This will also halt testing
297     if a base failure is encountered (can be overridden with continue-on-failure)
298     """
299     outcome = yield
300     if outcome.get_result().when != "call":
301         return  # only capture results of test cases themselves
302     result = TestResult(item, outcome)
303     if (
304         not item.config.option.continue_on_failure
305         and result.is_base_test
306         and result.is_failed
307     ):
308         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
309             result.error_message
310         )
311         result.error_message = msg
312         ALL_RESULTS.append(result)
313         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
314
315     ALL_RESULTS.append(result)
316
317
318 def make_timestamp():
319     """
320     :return: String make_iso_timestamp in format:
321              2019-01-19 10:18:49.865000 Central Standard Time
322     """
323     timezone = time.tzname[time.localtime().tm_isdst]
324     return "{} {}".format(str(datetime.datetime.now()), timezone)
325
326
327 # noinspection PyUnusedLocal
328 def pytest_sessionstart(session):
329     ALL_RESULTS.clear()
330     COLLECTION_FAILURES.clear()
331
332
333 # noinspection PyUnusedLocal
334 def pytest_sessionfinish(session, exitstatus):
335     """
336     If not a self-test run, generate the output reports
337     """
338     if not session.config.option.template_dir:
339         return
340
341     if session.config.option.template_source:
342         template_source = session.config.option.template_source[0]
343     else:
344         template_source = os.path.abspath(session.config.option.template_dir[0])
345
346     categories_selected = session.config.option.test_categories or ""
347     generate_report(
348         get_output_dir(session.config),
349         template_source,
350         categories_selected,
351         session.config.option.report_format,
352     )
353
354
355 # noinspection PyUnusedLocal
356 def pytest_collection_modifyitems(session, config, items):
357     """
358     Selects tests based on the categories requested.  Tests without
359     categories will always be executed.
360     """
361     config.traceability_items = list(items)  # save all items for traceability
362     if not config.option.self_test:
363         for item in items:
364             # checking if test belongs to a category
365             if hasattr(item.function, "categories"):
366                 if config.option.test_categories:
367                     test_categories = getattr(item.function, "categories")
368                     passed_categories = config.option.test_categories
369                     if not all(
370                         category in passed_categories for category in test_categories
371                     ):
372                         item.add_marker(
373                             pytest.mark.skip(
374                                 reason=(
375                                     "Test categories do not match "
376                                     "all the passed categories"
377                                 )
378                             )
379                         )
380                 else:
381                     item.add_marker(
382                         pytest.mark.skip(
383                             reason=(
384                                 "Test belongs to a category but "
385                                 "no categories were passed"
386                             )
387                         )
388                     )
389
390     items.sort(
391         key=lambda x: (0, x.name)
392         if "base" in set(m.name for m in x.iter_markers())
393         else (1, x.name)
394     )
395
396
397 def make_href(paths, base_dir=None):
398     """
399     Create an anchor tag to link to the file paths provided.
400     :param paths: string or list of file paths
401     :param base_dir: If specified this is pre-pended to each path
402     :return: String of hrefs - one for each path, each seperated by a line
403              break (<br/).
404     """
405     paths = [paths] if isinstance(paths, string_types) else paths
406     if base_dir:
407         paths = [os.path.join(base_dir, p) for p in paths]
408     links = []
409     for p in paths:
410         abs_path = os.path.abspath(p)
411         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
412         links.append(
413             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
414                 abs_path=abs_path, name=name
415             )
416         )
417     return "<br/>".join(links)
418
419
420 def generate_report(outpath, template_path, categories, output_format="html"):
421     """
422     Generates the various output reports.
423
424     :param outpath: destination directory for all reports
425     :param template_path: directory containing the Heat templates validated
426     :param categories: Optional categories selected
427     :param output_format: One of "html", "excel", or "csv". Default is "html"
428     :raises: ValueError if requested output format is unknown
429     """
430     failures = [r for r in ALL_RESULTS if r.is_failed]
431     generate_failure_file(outpath)
432     output_format = output_format.lower().strip() if output_format else "html"
433     generate_json(outpath, template_path, categories)
434     if output_format == "html":
435         generate_html_report(outpath, categories, template_path, failures)
436     elif output_format == "excel":
437         generate_excel_report(outpath, categories, template_path, failures)
438     elif output_format == "json":
439         return
440     elif output_format == "csv":
441         generate_csv_report(outpath, categories, template_path, failures)
442     else:
443         raise ValueError("Unsupported output format: " + output_format)
444
445
446 def write_json(data, path):
447     """
448     Pretty print data as JSON to the output path requested
449
450     :param data: Data structure to be converted to JSON
451     :param path: Where to write output
452     """
453     with open(path, "w") as f:
454         json.dump(data, f, indent=2)
455
456
457 def generate_failure_file(outpath):
458     """
459     Writes a summary of test failures to a file named failures.
460     This is for backwards compatibility only.  The report.json offers a
461     more comprehensive output.
462     """
463     failure_path = os.path.join(outpath, "failures")
464     failures = [r for r in ALL_RESULTS if r.is_failed]
465     data = {}
466     for i, fail in enumerate(failures):
467         data[str(i)] = {
468             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
469             "vnfrqts": fail.requirement_ids,
470             "test": fail.test_case,
471             "test_file": fail.test_module,
472             "raw_output": fail.raw_output,
473             "message": fail.error_message,
474         }
475     write_json(data, failure_path)
476
477
478 def generate_csv_report(output_dir, categories, template_path, failures):
479     rows = [["Validation Failures"]]
480     headers = [
481         ("Categories Selected:", categories),
482         ("Tool Version:", version.VERSION),
483         ("Report Generated At:", make_timestamp()),
484         ("Directory Validated:", template_path),
485         ("Checksum:", hash_directory(template_path)),
486         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
487     ]
488     rows.append([])
489     for header in headers:
490         rows.append(header)
491     rows.append([])
492
493     if COLLECTION_FAILURES:
494         rows.append([COLLECTION_FAILURE_WARNING])
495         rows.append(["Validation File", "Test", "Fixtures", "Error"])
496         for failure in COLLECTION_FAILURES:
497             rows.append(
498                 [
499                     failure["module"],
500                     failure["test"],
501                     ";".join(failure["fixtures"]),
502                     failure["error"],
503                 ]
504             )
505         rows.append([])
506
507     # table header
508     rows.append([col for col, _ in REPORT_COLUMNS])
509
510     reqs = load_current_requirements()
511
512     # table content
513     for i, failure in enumerate(failures, start=1):
514         rows.append(
515             [
516                 i,
517                 "\n".join(failure.files),
518                 failure.requirement_text(reqs),
519                 failure.error_message,
520                 failure.test_id,
521             ]
522         )
523
524     output_path = os.path.join(output_dir, "report.csv")
525     with open(output_path, "w", newline="") as f:
526         writer = csv.writer(f)
527         for row in rows:
528             writer.writerow(row)
529
530
531 def generate_excel_report(output_dir, categories, template_path, failures):
532     output_path = os.path.join(output_dir, "report.xlsx")
533     workbook = xlsxwriter.Workbook(output_path)
534     bold = workbook.add_format({"bold": True, "align": "top"})
535     code = workbook.add_format(
536         {"font_name": "Courier", "text_wrap": True, "align": "top"}
537     )
538     normal = workbook.add_format({"text_wrap": True, "align": "top"})
539     heading = workbook.add_format({"bold": True, "font_size": 18})
540     worksheet = workbook.add_worksheet("failures")
541     worksheet.write(0, 0, "Validation Failures", heading)
542
543     headers = [
544         ("Categories Selected:", ",".join(categories)),
545         ("Tool Version:", version.VERSION),
546         ("Report Generated At:", make_timestamp()),
547         ("Directory Validated:", template_path),
548         ("Checksum:", hash_directory(template_path)),
549         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
550     ]
551     for row, (header, value) in enumerate(headers, start=2):
552         worksheet.write(row, 0, header, bold)
553         worksheet.write(row, 1, value)
554
555     worksheet.set_column(0, len(headers) - 1, 40)
556     worksheet.set_column(len(headers), len(headers), 80)
557
558     if COLLECTION_FAILURES:
559         collection_failures_start = 2 + len(headers) + 2
560         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
561         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
562         for col_num, col_name in enumerate(collection_failure_headers):
563             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
564         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
565             worksheet.write(row, 0, data["module"])
566             worksheet.write(row, 1, data["test"])
567             worksheet.write(row, 2, ",".join(data["fixtures"]))
568             worksheet.write(row, 3, data["error"], code)
569
570     # table header
571     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
572     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
573     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
574         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
575
576     reqs = load_current_requirements()
577
578     # table content
579     for col, width in enumerate((20, 30, 60, 60, 40)):
580         worksheet.set_column(col, col, width)
581     err_num = 1
582     for row, failure in enumerate(failures, start=start_error_table_row + 2):
583         worksheet.write(row, 0, str(err_num), normal)
584         worksheet.write(row, 1, "\n".join(failure.files), normal)
585         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
586         worksheet.write(row, 3, failure.error_message.replace("\n", "\n\n"), normal)
587         worksheet.write(row, 4, failure.test_id, normal)
588         err_num += 1
589     worksheet.autofilter(
590         start_error_table_row + 1,
591         0,
592         start_error_table_row + 1 + err_num,
593         len(REPORT_COLUMNS) - 1,
594     )
595     workbook.close()
596
597
598 def make_iso_timestamp():
599     """
600     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
601     """
602     now = datetime.datetime.utcnow()
603     now.replace(tzinfo=datetime.timezone.utc)
604     return now.isoformat()
605
606
607 def aggregate_results(outcomes, r_id=None):
608     """
609     Determines the aggregate result for the conditions provided.  Assumes the
610     results have been filtered and collected for analysis.
611
612     :param outcomes: set of outcomes from the TestResults
613     :param r_id: Optional requirement ID if known
614     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
615              (see aggregate_requirement_adherence for more detail)
616     """
617     if not outcomes:
618         return "PASS"
619     elif "ERROR" in outcomes:
620         return "ERROR"
621     elif "FAIL" in outcomes:
622         return "FAIL"
623     elif "PASS" in outcomes:
624         return "PASS"
625     elif {"SKIP"} == outcomes:
626         return "SKIP"
627     else:
628         pytest.warns(
629             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
630                 outcomes, r_id
631             )
632         )
633         return "ERROR"
634
635
636 def aggregate_run_results(collection_failures, test_results):
637     """
638     Determines overall status of run based on all failures and results.
639
640     * 'ERROR' - At least one collection failure occurred during the run.
641     * 'FAIL' - Template failed at least one test
642     * 'PASS' - All tests executed properly and no failures were detected
643
644     :param collection_failures: failures occuring during test setup
645     :param test_results: list of all test executuion results
646     :return: one of 'ERROR', 'FAIL', or 'PASS'
647     """
648     if collection_failures:
649         return "ERROR"
650     elif any(r.is_failed for r in test_results):
651         return "FAIL"
652     else:
653         return "PASS"
654
655
656 def relative_paths(base_dir, paths):
657     return [os.path.relpath(p, base_dir) for p in paths if p != ""]
658
659
660 # noinspection PyTypeChecker
661 def generate_json(outpath, template_path, categories):
662     """
663     Creates a JSON summary of the entire test run.
664     """
665     reqs = load_current_requirements()
666     data = {
667         "version": "dublin",
668         "template_directory": os.path.splitdrive(template_path)[1].replace(
669             os.path.sep, "/"
670         ),
671         "timestamp": make_iso_timestamp(),
672         "checksum": hash_directory(template_path),
673         "categories": categories,
674         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
675         "tests": [],
676         "requirements": [],
677     }
678
679     results = data["tests"]
680     for result in COLLECTION_FAILURES:
681         results.append(
682             {
683                 "files": [],
684                 "test_module": result["module"],
685                 "test_case": result["test"],
686                 "result": "ERROR",
687                 "error": result["error"],
688                 "requirements": result["requirements"],
689             }
690         )
691     for result in ALL_RESULTS:
692         results.append(
693             {
694                 "files": relative_paths(template_path, result.files),
695                 "test_module": result.test_module,
696                 "test_case": result.test_case,
697                 "result": result.outcome,
698                 "error": result.error_message if result.is_failed else "",
699                 "requirements": result.requirements_metadata(reqs),
700             }
701         )
702
703     # Build a mapping of requirement ID to the results
704     r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
705     for test_result in results:
706         test_reqs = test_result["requirements"]
707         r_ids = (
708             [r["id"] if isinstance(r, dict) else r for r in test_reqs]
709             if test_reqs
710             else ("",)
711         )
712         for r_id in r_ids:
713             item = r_id_results[r_id]
714             item["outcomes"].add(test_result["result"])
715             if test_result["error"]:
716                 item["errors"].add(test_result["error"])
717
718     requirements = data["requirements"]
719     for r_id, r_data in reqs.items():
720         requirements.append(
721             {
722                 "id": r_id,
723                 "text": r_data["description"],
724                 "keyword": r_data["keyword"],
725                 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
726                 "errors": list(r_id_results[r_id]["errors"]),
727             }
728         )
729
730     if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
731         requirements.append(
732             {
733                 "id": "Unmapped",
734                 "text": "Tests not mapped to requirements (see tests)",
735                 "result": aggregate_results(r_id_results[""]["outcomes"]),
736                 "errors": list(r_id_results[""]["errors"]),
737             }
738         )
739
740     report_path = os.path.join(outpath, "report.json")
741     write_json(data, report_path)
742
743
744 def generate_html_report(outpath, categories, template_path, failures):
745     reqs = load_current_requirements()
746     fail_data = []
747     for failure in failures:
748         fail_data.append(
749             {
750                 "file_links": make_href(failure.files, template_path),
751                 "test_id": failure.test_id,
752                 "error_message": escape(failure.error_message).replace("\n",
753                                                                        "<br/><br/>"),
754                 "raw_output": escape(failure.raw_output),
755                 "requirements": docutils.core.publish_parts(
756                     writer_name="html", source=failure.requirement_text(reqs)
757                 )["body"],
758             }
759         )
760     pkg_dir = os.path.split(__file__)[0]
761     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
762     with open(j2_template_path, "r") as f:
763         report_template = jinja2.Template(f.read())
764         contents = report_template.render(
765             version=version.VERSION,
766             num_failures=len(failures) + len(COLLECTION_FAILURES),
767             categories=categories,
768             template_dir=make_href(template_path),
769             checksum=hash_directory(template_path),
770             timestamp=make_timestamp(),
771             failures=fail_data,
772             collection_failures=COLLECTION_FAILURES,
773         )
774     with open(os.path.join(outpath, "report.html"), "w") as f:
775         f.write(contents)
776
777
778 def pytest_addoption(parser):
779     """
780     Add needed CLI arguments
781     """
782     parser.addoption(
783         "--template-directory",
784         dest="template_dir",
785         action="append",
786         help="Directory which holds the templates for validation",
787     )
788
789     parser.addoption(
790         "--template-source",
791         dest="template_source",
792         action="append",
793         help="Source Directory which holds the templates for validation",
794     )
795
796     parser.addoption(
797         "--self-test",
798         dest="self_test",
799         action="store_true",
800         help="Test the unit tests against their fixtured data",
801     )
802
803     parser.addoption(
804         "--report-format",
805         dest="report_format",
806         action="store",
807         help="Format of output report (html, csv, excel, json)",
808     )
809
810     parser.addoption(
811         "--continue-on-failure",
812         dest="continue_on_failure",
813         action="store_true",
814         help="Continue validation even when structural errors exist in input files",
815     )
816
817     parser.addoption(
818         "--output-directory",
819         dest="output_dir",
820         action="store",
821         default=None,
822         help="Alternate ",
823     )
824
825     parser.addoption(
826         "--category",
827         dest="test_categories",
828         action="append",
829         help="optional category of test to execute",
830     )
831
832
833 def pytest_configure(config):
834     """
835     Ensure that we are receive either `--self-test` or
836     `--template-dir=<directory` as CLI arguments
837     """
838     if config.getoption("template_dir") and config.getoption("self_test"):
839         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
840     if not (
841         config.getoption("template_dir")
842         or config.getoption("self_test")
843         or config.getoption("help")
844     ):
845         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
846
847
848 def pytest_generate_tests(metafunc):
849     """
850     If a unit test requires an argument named 'filename'
851     we generate a test for the filenames selected. Either
852     the files contained in `template_dir` or if `template_dir`
853     is not specified on the CLI, the fixtures associated with this
854     test name.
855     """
856
857     # noinspection PyBroadException
858     try:
859         if "filename" in metafunc.fixturenames:
860             from .parametrizers import parametrize_filename
861
862             parametrize_filename(metafunc)
863
864         if "filenames" in metafunc.fixturenames:
865             from .parametrizers import parametrize_filenames
866
867             parametrize_filenames(metafunc)
868
869         if "template_dir" in metafunc.fixturenames:
870             from .parametrizers import parametrize_template_dir
871
872             parametrize_template_dir(metafunc)
873
874         if "environment_pair" in metafunc.fixturenames:
875             from .parametrizers import parametrize_environment_pair
876
877             parametrize_environment_pair(metafunc)
878
879         if "heat_volume_pair" in metafunc.fixturenames:
880             from .parametrizers import parametrize_heat_volume_pair
881
882             parametrize_heat_volume_pair(metafunc)
883
884         if "yaml_files" in metafunc.fixturenames:
885             from .parametrizers import parametrize_yaml_files
886
887             parametrize_yaml_files(metafunc)
888
889         if "env_files" in metafunc.fixturenames:
890             from .parametrizers import parametrize_environment_files
891
892             parametrize_environment_files(metafunc)
893
894         if "yaml_file" in metafunc.fixturenames:
895             from .parametrizers import parametrize_yaml_file
896
897             parametrize_yaml_file(metafunc)
898
899         if "env_file" in metafunc.fixturenames:
900             from .parametrizers import parametrize_environment_file
901
902             parametrize_environment_file(metafunc)
903
904         if "parsed_yaml_file" in metafunc.fixturenames:
905             from .parametrizers import parametrize_parsed_yaml_file
906
907             parametrize_parsed_yaml_file(metafunc)
908
909         if "parsed_environment_file" in metafunc.fixturenames:
910             from .parametrizers import parametrize_parsed_environment_file
911
912             parametrize_parsed_environment_file(metafunc)
913
914         if "heat_template" in metafunc.fixturenames:
915             from .parametrizers import parametrize_heat_template
916
917             parametrize_heat_template(metafunc)
918
919         if "heat_templates" in metafunc.fixturenames:
920             from .parametrizers import parametrize_heat_templates
921
922             parametrize_heat_templates(metafunc)
923
924         if "volume_template" in metafunc.fixturenames:
925             from .parametrizers import parametrize_volume_template
926
927             parametrize_volume_template(metafunc)
928
929         if "volume_templates" in metafunc.fixturenames:
930             from .parametrizers import parametrize_volume_templates
931
932             parametrize_volume_templates(metafunc)
933
934         if "template" in metafunc.fixturenames:
935             from .parametrizers import parametrize_template
936
937             parametrize_template(metafunc)
938
939         if "templates" in metafunc.fixturenames:
940             from .parametrizers import parametrize_templates
941
942             parametrize_templates(metafunc)
943     except Exception as e:
944         # If an error occurs in the collection phase, then it won't be logged as a
945         # normal test failure.  This means that failures could occur, but not
946         # be seen on the report resulting in a false positive success message.  These
947         # errors will be stored and reported separately on the report
948         COLLECTION_FAILURES.append(
949             {
950                 "module": metafunc.module.__name__,
951                 "test": metafunc.function.__name__,
952                 "fixtures": metafunc.fixturenames,
953                 "error": traceback.format_exc(),
954                 "requirements": getattr(metafunc.function, "requirement_ids", []),
955             }
956         )
957         raise e
958
959
960 def hash_directory(path):
961     """
962     Create md5 hash using the contents of all files under ``path``
963     :param path: string directory containing files
964     :return: string MD5 hash code (hex)
965     """
966     md5 = hashlib.md5()  # nosec
967     for dir_path, sub_dirs, filenames in os.walk(path):
968         for filename in filenames:
969             file_path = os.path.join(dir_path, filename)
970             with open(file_path, "rb") as f:
971                 md5.update(f.read())
972     return md5.hexdigest()
973
974
975 def load_current_requirements():
976     """Loads dict of current requirements or empty dict if file doesn't exist"""
977     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
978         data = json.load(f)
979         version = data["current_version"]
980         return data["versions"][version]["needs"]
981
982
983 def select_heat_requirements(reqs):
984     """Filters dict requirements to only those requirements pertaining to Heat"""
985     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
986
987
988 def is_testable(reqs):
989     """Filters dict requirements to only those which are testable"""
990     for key, values in reqs.items():
991         if ("MUST" in values.get("keyword", "").upper()) and (
992             "none" not in values.get("validation_mode", "").lower()
993         ):
994             reqs[key]["testable"] = True
995         else:
996             reqs[key]["testable"] = False
997     return reqs
998
999
1000 def build_rst_json(reqs):
1001     """Takes requirements and returns list of only Heat requirements"""
1002     for key, values in list(reqs.items()):
1003         if values["testable"]:
1004             # Creates links in RST format to requirements and test cases
1005             if values["test_case"]:
1006                 mod = values["test_case"].split(".")[-1]
1007                 val = TEST_SCRIPT_SITE + mod + ".py"
1008                 rst_value = "`" + mod + " <" + val + ">`_"
1009                 title = (
1010                     "`"
1011                     + values["id"]
1012                     + " <"
1013                     + VNFRQTS_ID_URL
1014                     + values["docname"].replace(" ", "%20")
1015                     + ".html#"
1016                     + values["id"]
1017                     + ">`_"
1018                 )
1019                 reqs[key].update({"full_title": title, "test_case": rst_value})
1020             else:
1021                 title = (
1022                     "`"
1023                     + values["id"]
1024                     + " <"
1025                     + VNFRQTS_ID_URL
1026                     + values["docname"].replace(" ", "%20")
1027                     + ".html#"
1028                     + values["id"]
1029                     + ">`_"
1030                 )
1031                 reqs[key].update(
1032                     {
1033                         "full_title": title,
1034                         "test_case": "No test for requirement",
1035                         "validated_by": "static",
1036                     }
1037                 )
1038         else:
1039             del reqs[key]
1040     return reqs
1041
1042
1043 def generate_rst_table(output_dir, data):
1044     """Generate a formatted csv to be used in RST"""
1045     rst_path = os.path.join(output_dir, "rst.csv")
1046     with open(rst_path, "w", newline="") as f:
1047         out = csv.writer(f)
1048         out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1049         for req_id, metadata in data.items():
1050             out.writerow(
1051                 (
1052                     metadata["full_title"],
1053                     metadata["description"],
1054                     metadata["test_case"],
1055                     metadata["validated_by"],
1056                 )
1057             )
1058
1059
1060 # noinspection PyUnusedLocal
1061 def pytest_report_collectionfinish(config, startdir, items):
1062     """Generates a simple traceability report to output/traceability.csv"""
1063     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1064     output_dir = os.path.split(traceability_path)[0]
1065     if not os.path.exists(output_dir):
1066         os.makedirs(output_dir)
1067     reqs = load_current_requirements()
1068     requirements = select_heat_requirements(reqs)
1069     testable_requirements = is_testable(requirements)
1070     unmapped, mapped = partition(
1071         lambda i: hasattr(i.function, "requirement_ids"), items
1072     )
1073
1074     req_to_test = defaultdict(set)
1075     mapping_errors = set()
1076     for item in mapped:
1077         for req_id in item.function.requirement_ids:
1078             if req_id not in req_to_test:
1079                 req_to_test[req_id].add(item)
1080                 if req_id in requirements:
1081                     reqs[req_id].update(
1082                         {
1083                             "test_case": item.function.__module__,
1084                             "validated_by": item.function.__name__,
1085                         }
1086                     )
1087             if req_id not in requirements:
1088                 mapping_errors.add(
1089                     (req_id, item.function.__module__, item.function.__name__)
1090                 )
1091
1092     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1093     with open(mapping_error_path, "w", newline="") as f:
1094         writer = csv.writer(f)
1095         for err in mapping_errors:
1096             writer.writerow(err)
1097
1098     with open(traceability_path, "w", newline="") as f:
1099         out = csv.writer(f)
1100         out.writerow(
1101             (
1102                 "Requirement ID",
1103                 "Requirement",
1104                 "Section",
1105                 "Keyword",
1106                 "Validation Mode",
1107                 "Is Testable",
1108                 "Test Module",
1109                 "Test Name",
1110             )
1111         )
1112         for req_id, metadata in testable_requirements.items():
1113             if req_to_test[req_id]:
1114                 for item in req_to_test[req_id]:
1115                     out.writerow(
1116                         (
1117                             req_id,
1118                             metadata["description"],
1119                             metadata["section_name"],
1120                             metadata["keyword"],
1121                             metadata["validation_mode"],
1122                             metadata["testable"],
1123                             item.function.__module__,
1124                             item.function.__name__,
1125                         )
1126                     )
1127             else:
1128                 out.writerow(
1129                     (
1130                         req_id,
1131                         metadata["description"],
1132                         metadata["section_name"],
1133                         metadata["keyword"],
1134                         metadata["validation_mode"],
1135                         metadata["testable"],
1136                         "",  # test module
1137                         "",
1138                     )  # test function
1139                 )
1140         # now write out any test methods that weren't mapped to requirements
1141         unmapped_tests = {
1142             (item.function.__module__, item.function.__name__) for item in unmapped
1143         }
1144         for test_module, test_name in unmapped_tests:
1145             out.writerow(
1146                 (
1147                     "",  # req ID
1148                     "",  # description
1149                     "",  # section name
1150                     "",  # keyword
1151                     "static",  # validation mode
1152                     "TRUE",  # testable
1153                     test_module,
1154                     test_name,
1155                 )
1156             )
1157
1158     generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))