[VVP] Performance Enhancements (report generation and test collection)
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47
48 import traceback
49
50 import docutils.core
51 import jinja2
52 import pytest
53 from more_itertools import partition
54 import xlsxwriter
55 from six import string_types
56
57 # noinspection PyUnresolvedReferences
58 import version
59 import logging
60
61 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
62
63 __path__ = [os.path.dirname(os.path.abspath(__file__))]
64
65 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
66
67 RESOLUTION_STEPS_FILE = "resolution_steps.json"
68 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
69 TEST_SCRIPT_SITE = (
70     "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
71 )
72 VNFRQTS_ID_URL = (
73     "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
74 )
75
76 REPORT_COLUMNS = [
77     ("Input File", "file"),
78     ("Test", "test_file"),
79     ("Requirements", "req_description"),
80     ("Resolution Steps", "resolution_steps"),
81     ("Error Message", "message"),
82     ("Raw Test Output", "raw_output"),
83 ]
84
85 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
86 while preparing to validate the the input files. Some validations may not have been
87 executed. Please refer these issue to the VNF Validation Tool team.
88 """
89
90 COLLECTION_FAILURES = []
91
92 # Captures the results of every test run
93 ALL_RESULTS = []
94
95
96 def get_output_dir(config):
97     """
98     Retrieve the output directory for the reports and create it if necessary
99     :param config: pytest configuration
100     :return: output directory as string
101     """
102     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
103     if not os.path.exists(output_dir):
104         os.makedirs(output_dir, exist_ok=True)
105     return output_dir
106
107
108 def extract_error_msg(rep):
109     """
110     If a custom error message was provided, then extract it otherwise
111     just show the pytest assert message
112     """
113     if rep.outcome != "failed":
114         return ""
115     try:
116         full_msg = str(rep.longrepr.reprcrash.message)
117         match = re.match(
118             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
119         )
120         if match:  # custom message was provided
121             # Extract everything between AssertionError and the start
122             # of the assert statement expansion in the pytest report
123             msg = match.group(1)
124         else:
125             msg = str(rep.longrepr.reprcrash)
126             if "AssertionError:" in msg:
127                 msg = msg.split("AssertionError:")[1]
128     except AttributeError:
129         msg = str(rep)
130
131     return msg
132
133
134 class TestResult:
135     """
136     Wraps the test case and result to extract necessary metadata for
137     reporting purposes.
138     """
139
140     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
141
142     def __init__(self, item, outcome):
143         self.item = item
144         self.result = outcome.get_result()
145         self.files = [os.path.normpath(p) for p in self._get_files()]
146         self.error_message = self._get_error_message()
147
148     @property
149     def requirement_ids(self):
150         """
151         Returns list of requirement IDs mapped to the test case.
152
153         :return: Returns a list of string requirement IDs the test was
154                  annotated with ``validates`` otherwise returns and empty list
155         """
156         is_mapped = hasattr(self.item.function, "requirement_ids")
157         return self.item.function.requirement_ids if is_mapped else []
158
159     @property
160     def markers(self):
161         """
162         :return: Returns a set of pytest marker names for the test or an empty set
163         """
164         return set(m.name for m in self.item.iter_markers())
165
166     @property
167     def is_base_test(self):
168         """
169         :return: Returns True if the test is annotated with a pytest marker called base
170         """
171         return "base" in self.markers
172
173     @property
174     def is_failed(self):
175         """
176         :return: True if the test failed
177         """
178         return self.outcome == "FAIL"
179
180     @property
181     def outcome(self):
182         """
183         :return: Returns 'PASS', 'FAIL', or 'SKIP'
184         """
185         return self.RESULT_MAPPING[self.result.outcome]
186
187     @property
188     def test_case(self):
189         """
190         :return: Name of the test case method
191         """
192         return self.item.function.__name__
193
194     @property
195     def test_module(self):
196         """
197         :return: Name of the file containing the test case
198         """
199         return self.item.function.__module__.split(".")[-1]
200
201     @property
202     def test_id(self):
203         """
204         :return: ID of the test (test_module + test_case)
205         """
206         return "{}::{}".format(self.test_module, self.test_case)
207
208     @property
209     def raw_output(self):
210         """
211         :return: Full output from pytest for the given test case
212         """
213         return str(self.result.longrepr)
214
215     def requirement_text(self, curr_reqs):
216         """
217         Creates a text summary for the requirement IDs mapped to the test case.
218         If no requirements are mapped, then it returns the empty string.
219
220         :param curr_reqs: mapping of requirement IDs to requirement metadata
221                           loaded from the VNFRQTS projects needs.json output
222         :return: ID and text of the requirements mapped to the test case
223         """
224         text = (
225             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
226             for r_id in self.requirement_ids
227             if r_id in curr_reqs
228         )
229         return "".join(text)
230
231     def requirements_metadata(self, curr_reqs):
232         """
233         Returns a list of dicts containing the following metadata for each
234         requirement mapped:
235
236         - id: Requirement ID
237         - text: Full text of the requirement
238         - keyword: MUST, MUST NOT, MAY, etc.
239
240         :param curr_reqs: mapping of requirement IDs to requirement metadata
241                           loaded from the VNFRQTS projects needs.json output
242         :return: List of requirement metadata
243         """
244         data = []
245         for r_id in self.requirement_ids:
246             if r_id not in curr_reqs:
247                 continue
248             data.append(
249                 {
250                     "id": r_id,
251                     "text": curr_reqs[r_id]["description"],
252                     "keyword": curr_reqs[r_id]["keyword"],
253                 }
254             )
255         return data
256
257     def resolution_steps(self, resolutions):
258         """
259         :param resolutions: Loaded from contents for resolution_steps.json
260         :return: Header and text for the resolution step associated with this
261                  test case.  Returns empty string if no resolutions are
262                  provided.
263         """
264         text = (
265             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
266             for entry in resolutions
267             if self._match(entry)
268         )
269         return "".join(text)
270
271     def _match(self, resolution_entry):
272         """
273         Returns True if the test result maps to the given entry in
274         the resolutions file
275         """
276         return (
277             self.test_case == resolution_entry["function"]
278             and self.test_module == resolution_entry["module"]
279         )
280
281     def _get_files(self):
282         """
283         Extracts the list of files passed into the test case.
284         :return: List of absolute paths to files
285         """
286         if "environment_pair" in self.item.fixturenames:
287             return [
288                 "{} environment pair".format(
289                     self.item.funcargs["environment_pair"]["name"]
290                 )
291             ]
292         elif "heat_volume_pair" in self.item.fixturenames:
293             return [
294                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
295             ]
296         elif "heat_templates" in self.item.fixturenames:
297             return self.item.funcargs["heat_templates"]
298         elif "yaml_files" in self.item.fixturenames:
299             return self.item.funcargs["yaml_files"]
300         else:
301             parts = self.result.nodeid.split("[")
302             return [""] if len(parts) == 1 else [parts[1][:-1]]
303
304     def _get_error_message(self):
305         """
306         :return: Error message or empty string if the test did not fail or error
307         """
308         if self.is_failed:
309             return extract_error_msg(self.result)
310         else:
311             return ""
312
313
314 # noinspection PyUnusedLocal
315 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
316 def pytest_runtest_makereport(item, call):
317     """
318     Captures the test results for later reporting.  This will also halt testing
319     if a base failure is encountered (can be overridden with continue-on-failure)
320     """
321     outcome = yield
322     if outcome.get_result().when != "call":
323         return  # only capture results of test cases themselves
324     result = TestResult(item, outcome)
325     if (
326         not item.config.option.continue_on_failure
327         and result.is_base_test
328         and result.is_failed
329     ):
330         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
331             result.error_message
332         )
333         result.error_message = msg
334         ALL_RESULTS.append(result)
335         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
336
337     ALL_RESULTS.append(result)
338
339
340 def make_timestamp():
341     """
342     :return: String make_iso_timestamp in format:
343              2019-01-19 10:18:49.865000 Central Standard Time
344     """
345     timezone = time.tzname[time.localtime().tm_isdst]
346     return "{} {}".format(str(datetime.datetime.now()), timezone)
347
348
349 # noinspection PyUnusedLocal
350 def pytest_sessionstart(session):
351     ALL_RESULTS.clear()
352     COLLECTION_FAILURES.clear()
353
354
355 # noinspection PyUnusedLocal
356 def pytest_sessionfinish(session, exitstatus):
357     """
358     If not a self-test run, generate the output reports
359     """
360     if not session.config.option.template_dir:
361         return
362
363     if session.config.option.template_source:
364         template_source = session.config.option.template_source[0]
365     else:
366         template_source = os.path.abspath(session.config.option.template_dir[0])
367
368     categories_selected = session.config.option.test_categories or ""
369     generate_report(
370         get_output_dir(session.config),
371         template_source,
372         categories_selected,
373         session.config.option.report_format,
374     )
375
376
377 # noinspection PyUnusedLocal
378 def pytest_collection_modifyitems(session, config, items):
379     """
380     Selects tests based on the categories requested.  Tests without
381     categories will always be executed.
382     """
383     config.traceability_items = list(items)  # save all items for traceability
384     if not config.option.self_test:
385         for item in items:
386             # checking if test belongs to a category
387             if hasattr(item.function, "categories"):
388                 if config.option.test_categories:
389                     test_categories = getattr(item.function, "categories")
390                     passed_categories = config.option.test_categories
391                     if not all(
392                         category in passed_categories for category in test_categories
393                     ):
394                         item.add_marker(
395                             pytest.mark.skip(
396                                 reason=("Test categories do not match "
397                                         "all the passed categories")
398                             )
399                         )
400                 else:
401                     item.add_marker(
402                         pytest.mark.skip(
403                             reason=("Test belongs to a category but "
404                                     "no categories were passed")
405                         )
406                     )
407     items.sort(
408         key=lambda x: 0 if "base" in set(m.name for m in x.iter_markers()) else 1
409     )
410
411
412 def make_href(paths):
413     """
414     Create an anchor tag to link to the file paths provided.
415     :param paths: string or list of file paths
416     :return: String of hrefs - one for each path, each seperated by a line
417              break (<br/).
418     """
419     paths = [paths] if isinstance(paths, string_types) else paths
420     links = []
421     for p in paths:
422         abs_path = os.path.abspath(p)
423         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
424         links.append(
425             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
426                 abs_path=abs_path, name=name
427             )
428         )
429     return "<br/>".join(links)
430
431
432 def load_resolutions_file():
433     """
434     :return: dict of data loaded from resolutions_steps.json
435     """
436     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
437     if os.path.exists(resolution_steps):
438         with open(resolution_steps, "r") as f:
439             return json.loads(f.read())
440
441
442 def generate_report(outpath, template_path, categories, output_format="html"):
443     """
444     Generates the various output reports.
445
446     :param outpath: destination directory for all reports
447     :param template_path: directory containing the Heat templates validated
448     :param categories: Optional categories selected
449     :param output_format: One of "html", "excel", or "csv". Default is "html"
450     :raises: ValueError if requested output format is unknown
451     """
452     failures = [r for r in ALL_RESULTS if r.is_failed]
453     generate_failure_file(outpath)
454     output_format = output_format.lower().strip() if output_format else "html"
455     generate_json(outpath, template_path, categories)
456     if output_format == "html":
457         generate_html_report(outpath, categories, template_path, failures)
458     elif output_format == "excel":
459         generate_excel_report(outpath, categories, template_path, failures)
460     elif output_format == "json":
461         return
462     elif output_format == "csv":
463         generate_csv_report(outpath, categories, template_path, failures)
464     else:
465         raise ValueError("Unsupported output format: " + output_format)
466
467
468 def write_json(data, path):
469     """
470     Pretty print data as JSON to the output path requested
471
472     :param data: Data structure to be converted to JSON
473     :param path: Where to write output
474     """
475     with open(path, "w") as f:
476         json.dump(data, f, indent=2)
477
478
479 def generate_failure_file(outpath):
480     """
481     Writes a summary of test failures to a file named failures.
482     This is for backwards compatibility only.  The report.json offers a
483     more comprehensive output.
484     """
485     failure_path = os.path.join(outpath, "failures")
486     failures = [r for r in ALL_RESULTS if r.is_failed]
487     data = {}
488     for i, fail in enumerate(failures):
489         data[str(i)] = {
490             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
491             "vnfrqts": fail.requirement_ids,
492             "test": fail.test_case,
493             "test_file": fail.test_module,
494             "raw_output": fail.raw_output,
495             "message": fail.error_message,
496         }
497     write_json(data, failure_path)
498
499
500 def generate_csv_report(output_dir, categories, template_path, failures):
501     rows = [["Validation Failures"]]
502     headers = [
503         ("Categories Selected:", categories),
504         ("Tool Version:", version.VERSION),
505         ("Report Generated At:", make_timestamp()),
506         ("Directory Validated:", template_path),
507         ("Checksum:", hash_directory(template_path)),
508         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
509     ]
510     rows.append([])
511     for header in headers:
512         rows.append(header)
513     rows.append([])
514
515     if COLLECTION_FAILURES:
516         rows.append([COLLECTION_FAILURE_WARNING])
517         rows.append(["Validation File", "Test", "Fixtures", "Error"])
518         for failure in COLLECTION_FAILURES:
519             rows.append(
520                 [
521                     failure["module"],
522                     failure["test"],
523                     ";".join(failure["fixtures"]),
524                     failure["error"],
525                 ]
526             )
527         rows.append([])
528
529     # table header
530     rows.append([col for col, _ in REPORT_COLUMNS])
531
532     reqs = load_current_requirements()
533     resolutions = load_resolutions_file()
534
535     # table content
536     for failure in failures:
537         rows.append(
538             [
539                 "\n".join(failure.files),
540                 failure.test_id,
541                 failure.requirement_text(reqs),
542                 failure.resolution_steps(resolutions),
543                 failure.error_message,
544                 failure.raw_output,
545             ]
546         )
547
548     output_path = os.path.join(output_dir, "report.csv")
549     with open(output_path, "w", newline="") as f:
550         writer = csv.writer(f)
551         for row in rows:
552             writer.writerow(row)
553
554
555 def generate_excel_report(output_dir, categories, template_path, failures):
556     output_path = os.path.join(output_dir, "report.xlsx")
557     workbook = xlsxwriter.Workbook(output_path)
558     bold = workbook.add_format({"bold": True})
559     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
560     normal = workbook.add_format({"text_wrap": True})
561     heading = workbook.add_format({"bold": True, "font_size": 18})
562     worksheet = workbook.add_worksheet("failures")
563     worksheet.write(0, 0, "Validation Failures", heading)
564
565     headers = [
566         ("Categories Selected:", ",".join(categories)),
567         ("Tool Version:", version.VERSION),
568         ("Report Generated At:", make_timestamp()),
569         ("Directory Validated:", template_path),
570         ("Checksum:", hash_directory(template_path)),
571         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
572     ]
573     for row, (header, value) in enumerate(headers, start=2):
574         worksheet.write(row, 0, header, bold)
575         worksheet.write(row, 1, value)
576
577     worksheet.set_column(0, len(headers) - 1, 40)
578     worksheet.set_column(len(headers), len(headers), 80)
579
580     if COLLECTION_FAILURES:
581         collection_failures_start = 2 + len(headers) + 2
582         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
583         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
584         for col_num, col_name in enumerate(collection_failure_headers):
585             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
586         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
587             worksheet.write(row, 0, data["module"])
588             worksheet.write(row, 1, data["test"])
589             worksheet.write(row, 2, ",".join(data["fixtures"]))
590             worksheet.write(row, 3, data["error"], code)
591
592     # table header
593     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
594     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
595     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
596         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
597
598     reqs = load_current_requirements()
599     resolutions = load_resolutions_file()
600
601     # table content
602     for row, failure in enumerate(failures, start=start_error_table_row + 2):
603         worksheet.write(row, 0, "\n".join(failure.files), normal)
604         worksheet.write(row, 1, failure.test_id, normal)
605         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
606         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
607         worksheet.write(row, 4, failure.error_message, normal)
608         worksheet.write(row, 5, failure.raw_output, code)
609
610     workbook.close()
611
612
613 def make_iso_timestamp():
614     """
615     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
616     """
617     now = datetime.datetime.utcnow()
618     now.replace(tzinfo=datetime.timezone.utc)
619     return now.isoformat()
620
621
622 def aggregate_results(outcomes, r_id=None):
623     """
624     Determines the aggregate result for the conditions provided.  Assumes the
625     results have been filtered and collected for analysis.
626
627     :param outcomes: set of outcomes from the TestResults
628     :param r_id: Optional requirement ID if known
629     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
630              (see aggregate_requirement_adherence for more detail)
631     """
632     if not outcomes:
633         return "PASS"
634     elif "ERROR" in outcomes:
635         return "ERROR"
636     elif "FAIL" in outcomes:
637         return "FAIL"
638     elif "PASS" in outcomes:
639         return "PASS"
640     elif {"SKIP"} == outcomes:
641         return "SKIP"
642     else:
643         pytest.warns(
644             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
645                 outcomes, r_id
646             )
647         )
648         return "ERROR"
649
650
651 def aggregate_run_results(collection_failures, test_results):
652     """
653     Determines overall status of run based on all failures and results.
654
655     * 'ERROR' - At least one collection failure occurred during the run.
656     * 'FAIL' - Template failed at least one test
657     * 'PASS' - All tests executed properly and no failures were detected
658
659     :param collection_failures: failures occuring during test setup
660     :param test_results: list of all test executuion results
661     :return: one of 'ERROR', 'FAIL', or 'PASS'
662     """
663     if collection_failures:
664         return "ERROR"
665     elif any(r.is_failed for r in test_results):
666         return "FAIL"
667     else:
668         return "PASS"
669
670
671 def relative_paths(base_dir, paths):
672     return [os.path.relpath(p, base_dir) for p in paths]
673
674
675 # noinspection PyTypeChecker
676 def generate_json(outpath, template_path, categories):
677     """
678     Creates a JSON summary of the entire test run.
679     """
680     reqs = load_current_requirements()
681     data = {
682         "version": "dublin",
683         "template_directory": os.path.splitdrive(template_path)[1].replace(
684             os.path.sep, "/"
685         ),
686         "timestamp": make_iso_timestamp(),
687         "checksum": hash_directory(template_path),
688         "categories": categories,
689         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
690         "tests": [],
691         "requirements": [],
692     }
693
694     results = data["tests"]
695     for result in COLLECTION_FAILURES:
696         results.append(
697             {
698                 "files": [],
699                 "test_module": result["module"],
700                 "test_case": result["test"],
701                 "result": "ERROR",
702                 "error": result["error"],
703                 "requirements": result["requirements"],
704             }
705         )
706     for result in ALL_RESULTS:
707         results.append(
708             {
709                 "files": relative_paths(template_path, result.files),
710                 "test_module": result.test_module,
711                 "test_case": result.test_case,
712                 "result": result.outcome,
713                 "error": result.error_message if result.is_failed else "",
714                 "requirements": result.requirements_metadata(reqs),
715             }
716         )
717
718     # Build a mapping of requirement ID to the results
719     r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
720     for test_result in results:
721         test_reqs = test_result["requirements"]
722         r_ids = (
723             [r["id"] if isinstance(r, dict) else r for r in test_reqs]
724             if test_reqs
725             else ("",)
726         )
727         for r_id in r_ids:
728             item = r_id_results[r_id]
729             item["outcomes"].add(test_result["result"])
730             if test_result["error"]:
731                 item["errors"].add(test_result["error"])
732
733     requirements = data["requirements"]
734     for r_id, r_data in reqs.items():
735         requirements.append(
736             {
737                 "id": r_id,
738                 "text": r_data["description"],
739                 "keyword": r_data["keyword"],
740                 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
741                 "errors": list(r_id_results[r_id]["errors"]),
742             }
743         )
744
745     if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
746         requirements.append(
747             {
748                 "id": "Unmapped",
749                 "text": "Tests not mapped to requirements (see tests)",
750                 "result": aggregate_results(r_id_results[""]["outcomes"]),
751                 "errors": list(r_id_results[""]["errors"]),
752             }
753         )
754
755     report_path = os.path.join(outpath, "report.json")
756     write_json(data, report_path)
757
758
759 def generate_html_report(outpath, categories, template_path, failures):
760     reqs = load_current_requirements()
761     resolutions = load_resolutions_file()
762     fail_data = []
763     for failure in failures:
764         fail_data.append(
765             {
766                 "file_links": make_href(failure.files),
767                 "test_id": failure.test_id,
768                 "error_message": failure.error_message,
769                 "raw_output": failure.raw_output,
770                 "requirements": docutils.core.publish_parts(
771                     writer_name="html", source=failure.requirement_text(reqs)
772                 )["body"],
773                 "resolution_steps": failure.resolution_steps(resolutions),
774             }
775         )
776     pkg_dir = os.path.split(__file__)[0]
777     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
778     with open(j2_template_path, "r") as f:
779         report_template = jinja2.Template(f.read())
780         contents = report_template.render(
781             version=version.VERSION,
782             num_failures=len(failures) + len(COLLECTION_FAILURES),
783             categories=categories,
784             template_dir=make_href(template_path),
785             checksum=hash_directory(template_path),
786             timestamp=make_timestamp(),
787             failures=fail_data,
788             collection_failures=COLLECTION_FAILURES,
789         )
790     with open(os.path.join(outpath, "report.html"), "w") as f:
791         f.write(contents)
792
793
794 def pytest_addoption(parser):
795     """
796     Add needed CLI arguments
797     """
798     parser.addoption(
799         "--template-directory",
800         dest="template_dir",
801         action="append",
802         help="Directory which holds the templates for validation",
803     )
804
805     parser.addoption(
806         "--template-source",
807         dest="template_source",
808         action="append",
809         help="Source Directory which holds the templates for validation",
810     )
811
812     parser.addoption(
813         "--self-test",
814         dest="self_test",
815         action="store_true",
816         help="Test the unit tests against their fixtured data",
817     )
818
819     parser.addoption(
820         "--report-format",
821         dest="report_format",
822         action="store",
823         help="Format of output report (html, csv, excel, json)",
824     )
825
826     parser.addoption(
827         "--continue-on-failure",
828         dest="continue_on_failure",
829         action="store_true",
830         help="Continue validation even when structural errors exist in input files",
831     )
832
833     parser.addoption(
834         "--output-directory",
835         dest="output_dir",
836         action="store",
837         default=None,
838         help="Alternate ",
839     )
840
841     parser.addoption(
842         "--category",
843         dest="test_categories",
844         action="append",
845         help="optional category of test to execute",
846     )
847
848
849 def pytest_configure(config):
850     """
851     Ensure that we are receive either `--self-test` or
852     `--template-dir=<directory` as CLI arguments
853     """
854     if config.getoption("template_dir") and config.getoption("self_test"):
855         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
856     if not (
857         config.getoption("template_dir")
858         or config.getoption("self_test")
859         or config.getoption("help")
860     ):
861         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
862
863
864 def pytest_generate_tests(metafunc):
865     """
866     If a unit test requires an argument named 'filename'
867     we generate a test for the filenames selected. Either
868     the files contained in `template_dir` or if `template_dir`
869     is not specified on the CLI, the fixtures associated with this
870     test name.
871     """
872
873     # noinspection PyBroadException
874     try:
875         if "filename" in metafunc.fixturenames:
876             from .parametrizers import parametrize_filename
877
878             parametrize_filename(metafunc)
879
880         if "filenames" in metafunc.fixturenames:
881             from .parametrizers import parametrize_filenames
882
883             parametrize_filenames(metafunc)
884
885         if "template_dir" in metafunc.fixturenames:
886             from .parametrizers import parametrize_template_dir
887
888             parametrize_template_dir(metafunc)
889
890         if "environment_pair" in metafunc.fixturenames:
891             from .parametrizers import parametrize_environment_pair
892
893             parametrize_environment_pair(metafunc)
894
895         if "heat_volume_pair" in metafunc.fixturenames:
896             from .parametrizers import parametrize_heat_volume_pair
897
898             parametrize_heat_volume_pair(metafunc)
899
900         if "yaml_files" in metafunc.fixturenames:
901             from .parametrizers import parametrize_yaml_files
902
903             parametrize_yaml_files(metafunc)
904
905         if "env_files" in metafunc.fixturenames:
906             from .parametrizers import parametrize_environment_files
907
908             parametrize_environment_files(metafunc)
909
910         if "yaml_file" in metafunc.fixturenames:
911             from .parametrizers import parametrize_yaml_file
912
913             parametrize_yaml_file(metafunc)
914
915         if "env_file" in metafunc.fixturenames:
916             from .parametrizers import parametrize_environment_file
917
918             parametrize_environment_file(metafunc)
919
920         if "parsed_yaml_file" in metafunc.fixturenames:
921             from .parametrizers import parametrize_parsed_yaml_file
922
923             parametrize_parsed_yaml_file(metafunc)
924
925         if "parsed_environment_file" in metafunc.fixturenames:
926             from .parametrizers import parametrize_parsed_environment_file
927
928             parametrize_parsed_environment_file(metafunc)
929
930         if "heat_template" in metafunc.fixturenames:
931             from .parametrizers import parametrize_heat_template
932
933             parametrize_heat_template(metafunc)
934
935         if "heat_templates" in metafunc.fixturenames:
936             from .parametrizers import parametrize_heat_templates
937
938             parametrize_heat_templates(metafunc)
939
940         if "volume_template" in metafunc.fixturenames:
941             from .parametrizers import parametrize_volume_template
942
943             parametrize_volume_template(metafunc)
944
945         if "volume_templates" in metafunc.fixturenames:
946             from .parametrizers import parametrize_volume_templates
947
948             parametrize_volume_templates(metafunc)
949
950         if "template" in metafunc.fixturenames:
951             from .parametrizers import parametrize_template
952
953             parametrize_template(metafunc)
954
955         if "templates" in metafunc.fixturenames:
956             from .parametrizers import parametrize_templates
957
958             parametrize_templates(metafunc)
959     except Exception as e:
960         # If an error occurs in the collection phase, then it won't be logged as a
961         # normal test failure.  This means that failures could occur, but not
962         # be seen on the report resulting in a false positive success message.  These
963         # errors will be stored and reported separately on the report
964         COLLECTION_FAILURES.append(
965             {
966                 "module": metafunc.module.__name__,
967                 "test": metafunc.function.__name__,
968                 "fixtures": metafunc.fixturenames,
969                 "error": traceback.format_exc(),
970                 "requirements": getattr(metafunc.function, "requirement_ids", []),
971             }
972         )
973         raise e
974
975
976 def hash_directory(path):
977     """
978     Create md5 hash using the contents of all files under ``path``
979     :param path: string directory containing files
980     :return: string MD5 hash code (hex)
981     """
982     md5 = hashlib.md5()
983     for dir_path, sub_dirs, filenames in os.walk(path):
984         for filename in filenames:
985             file_path = os.path.join(dir_path, filename)
986             with open(file_path, "rb") as f:
987                 md5.update(f.read())
988     return md5.hexdigest()
989
990
991 def load_current_requirements():
992     """Loads dict of current requirements or empty dict if file doesn't exist"""
993     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
994         data = json.load(f)
995         version = data["current_version"]
996         return data["versions"][version]["needs"]
997
998
999 def select_heat_requirements(reqs):
1000     """Filters dict requirements to only those requirements pertaining to Heat"""
1001     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1002
1003
1004 def is_testable(reqs):
1005     """Filters dict requirements to only those which are testable"""
1006     for key, values in reqs.items():
1007         if ("MUST" in values.get("keyword", "").upper()) and (
1008             "none" not in values.get("validation_mode", "").lower()
1009         ):
1010             reqs[key]["testable"] = True
1011         else:
1012             reqs[key]["testable"] = False
1013     return reqs
1014
1015
1016 def build_rst_json(reqs):
1017     """Takes requirements and returns list of only Heat requirements"""
1018     for key, values in list(reqs.items()):
1019         if values["testable"]:
1020             # Creates links in RST format to requirements and test cases
1021             if values["test_case"]:
1022                 mod = values["test_case"].split(".")[-1]
1023                 val = TEST_SCRIPT_SITE + mod + ".py"
1024                 rst_value = "`" + mod + " <" + val + ">`_"
1025                 title = (
1026                     "`"
1027                     + values["id"]
1028                     + " <"
1029                     + VNFRQTS_ID_URL
1030                     + values["docname"].replace(" ", "%20")
1031                     + ".html#"
1032                     + values["id"]
1033                     + ">`_"
1034                 )
1035                 reqs[key].update({"full_title": title, "test_case": rst_value})
1036             else:
1037                 title = (
1038                     "`"
1039                     + values["id"]
1040                     + " <"
1041                     + VNFRQTS_ID_URL
1042                     + values["docname"].replace(" ", "%20")
1043                     + ".html#"
1044                     + values["id"]
1045                     + ">`_"
1046                 )
1047                 reqs[key].update(
1048                     {
1049                         "full_title": title,
1050                         "test_case": "No test for requirement",
1051                         "validated_by": "static",
1052                     }
1053                 )
1054         else:
1055             del reqs[key]
1056     return reqs
1057
1058
1059 def generate_rst_table(output_dir, data):
1060     """Generate a formatted csv to be used in RST"""
1061     rst_path = os.path.join(output_dir, "rst.csv")
1062     with open(rst_path, "w", newline="") as f:
1063         out = csv.writer(f)
1064         out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1065         for req_id, metadata in data.items():
1066             out.writerow(
1067                 (
1068                     metadata["full_title"],
1069                     metadata["description"],
1070                     metadata["test_case"],
1071                     metadata["validated_by"],
1072                 )
1073             )
1074
1075
1076 # noinspection PyUnusedLocal
1077 def pytest_report_collectionfinish(config, startdir, items):
1078     """Generates a simple traceability report to output/traceability.csv"""
1079     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1080     output_dir = os.path.split(traceability_path)[0]
1081     if not os.path.exists(output_dir):
1082         os.makedirs(output_dir)
1083     reqs = load_current_requirements()
1084     requirements = select_heat_requirements(reqs)
1085     testable_requirements = is_testable(requirements)
1086     unmapped, mapped = partition(
1087         lambda i: hasattr(i.function, "requirement_ids"), items
1088     )
1089
1090     req_to_test = defaultdict(set)
1091     mapping_errors = set()
1092     for item in mapped:
1093         for req_id in item.function.requirement_ids:
1094             if req_id not in req_to_test:
1095                 req_to_test[req_id].add(item)
1096                 if req_id in requirements:
1097                     reqs[req_id].update(
1098                         {
1099                             "test_case": item.function.__module__,
1100                             "validated_by": item.function.__name__,
1101                         }
1102                     )
1103             if req_id not in requirements:
1104                 mapping_errors.add(
1105                     (req_id, item.function.__module__, item.function.__name__)
1106                 )
1107
1108     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1109     with open(mapping_error_path, "w", newline="") as f:
1110         writer = csv.writer(f)
1111         for err in mapping_errors:
1112             writer.writerow(err)
1113
1114     with open(traceability_path, "w", newline="") as f:
1115         out = csv.writer(f)
1116         out.writerow(
1117             (
1118                 "Requirement ID",
1119                 "Requirement",
1120                 "Section",
1121                 "Keyword",
1122                 "Validation Mode",
1123                 "Is Testable",
1124                 "Test Module",
1125                 "Test Name",
1126             )
1127         )
1128         for req_id, metadata in testable_requirements.items():
1129             if req_to_test[req_id]:
1130                 for item in req_to_test[req_id]:
1131                     out.writerow(
1132                         (
1133                             req_id,
1134                             metadata["description"],
1135                             metadata["section_name"],
1136                             metadata["keyword"],
1137                             metadata["validation_mode"],
1138                             metadata["testable"],
1139                             item.function.__module__,
1140                             item.function.__name__,
1141                         )
1142                     )
1143             else:
1144                 out.writerow(
1145                     (
1146                         req_id,
1147                         metadata["description"],
1148                         metadata["section_name"],
1149                         metadata["keyword"],
1150                         metadata["validation_mode"],
1151                         metadata["testable"],
1152                         "",  # test module
1153                         "",
1154                     )  # test function
1155                 )
1156         # now write out any test methods that weren't mapped to requirements
1157         unmapped_tests = {
1158             (item.function.__module__, item.function.__name__) for item in unmapped
1159         }
1160         for test_module, test_name in unmapped_tests:
1161             out.writerow(
1162                 (
1163                     "",  # req ID
1164                     "",  # description
1165                     "",  # section name
1166                     "",  # keyword
1167                     "static",  # validation mode
1168                     "TRUE",  # testable
1169                     test_module,
1170                     test_name,
1171                 )
1172             )
1173
1174     generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))