2c88ece93b0ad386fbfabe051a0859871c3bdeaf
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47 from itertools import chain
48
49 import traceback
50
51 import docutils.core
52 import jinja2
53 import pytest
54 from more_itertools import partition
55 import xlsxwriter
56 from six import string_types
57
58 import version
59
60 __path__ = [os.path.dirname(os.path.abspath(__file__))]
61
62 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
63
64 RESOLUTION_STEPS_FILE = "resolution_steps.json"
65 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
66 TEST_SCRIPT_SITE = "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
67 VNFRQTS_ID_URL = "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
68
69 REPORT_COLUMNS = [
70     ("Input File", "file"),
71     ("Test", "test_file"),
72     ("Requirements", "req_description"),
73     ("Resolution Steps", "resolution_steps"),
74     ("Error Message", "message"),
75     ("Raw Test Output", "raw_output"),
76 ]
77
78 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
79 while preparing to validate the the input files. Some validations may not have been
80 executed. Please refer these issue to the VNF Validation Tool team.
81 """
82
83 COLLECTION_FAILURES = []
84
85 # Captures the results of every test run
86 ALL_RESULTS = []
87
88
89 def get_output_dir(config):
90     """
91     Retrieve the output directory for the reports and create it if necessary
92     :param config: pytest configuration
93     :return: output directory as string
94     """
95     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
96     if not os.path.exists(output_dir):
97         os.makedirs(output_dir, exist_ok=True)
98     return output_dir
99
100
101 def extract_error_msg(rep):
102     """
103     If a custom error message was provided, then extract it otherwise
104     just show the pytest assert message
105     """
106     if rep.outcome != "failed":
107         return ""
108     try:
109         full_msg = str(rep.longrepr.reprcrash.message)
110         match = re.match(
111             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
112         )
113         if match:  # custom message was provided
114             # Extract everything between AssertionError and the start
115             # of the assert statement expansion in the pytest report
116             msg = match.group(1)
117         else:
118             msg = str(rep.longrepr.reprcrash)
119             if "AssertionError:" in msg:
120                 msg = msg.split("AssertionError:")[1]
121     except AttributeError:
122         msg = str(rep)
123
124     return msg
125
126
127 class TestResult:
128     """
129     Wraps the test case and result to extract necessary metadata for
130     reporting purposes.
131     """
132
133     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
134
135     def __init__(self, item, outcome):
136         self.item = item
137         self.result = outcome.get_result()
138         self.files = [os.path.normpath(p) for p in self._get_files()]
139         self.error_message = self._get_error_message()
140
141     @property
142     def requirement_ids(self):
143         """
144         Returns list of requirement IDs mapped to the test case.
145
146         :return: Returns a list of string requirement IDs the test was
147                  annotated with ``validates`` otherwise returns and empty list
148         """
149         is_mapped = hasattr(self.item.function, "requirement_ids")
150         return self.item.function.requirement_ids if is_mapped else []
151
152     @property
153     def markers(self):
154         """
155         :return: Returns a set of pytest marker names for the test or an empty set
156         """
157         return set(m.name for m in self.item.iter_markers())
158
159     @property
160     def is_base_test(self):
161         """
162         :return: Returns True if the test is annotated with a pytest marker called base
163         """
164         return "base" in self.markers
165
166     @property
167     def is_failed(self):
168         """
169         :return: True if the test failed
170         """
171         return self.outcome == "FAIL"
172
173     @property
174     def outcome(self):
175         """
176         :return: Returns 'PASS', 'FAIL', or 'SKIP'
177         """
178         return self.RESULT_MAPPING[self.result.outcome]
179
180     @property
181     def test_case(self):
182         """
183         :return: Name of the test case method
184         """
185         return self.item.function.__name__
186
187     @property
188     def test_module(self):
189         """
190         :return: Name of the file containing the test case
191         """
192         return self.item.function.__module__.split(".")[-1]
193
194     @property
195     def raw_output(self):
196         """
197         :return: Full output from pytest for the given test case
198         """
199         return str(self.result.longrepr)
200
201     def requirement_text(self, curr_reqs):
202         """
203         Creates a text summary for the requirement IDs mapped to the test case.
204         If no requirements are mapped, then it returns the empty string.
205
206         :param curr_reqs: mapping of requirement IDs to requirement metadata
207                           loaded from the VNFRQTS projects needs.json output
208         :return: ID and text of the requirements mapped to the test case
209         """
210         text = (
211             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
212             for r_id in self.requirement_ids
213         )
214         return "".join(text)
215
216     def requirements_metadata(self, curr_reqs):
217         """
218         Returns a list of dicts containing the following metadata for each
219         requirement mapped:
220
221         - id: Requirement ID
222         - text: Full text of the requirement
223         - keyword: MUST, MUST NOT, MAY, etc.
224
225         :param curr_reqs: mapping of requirement IDs to requirement metadata
226                           loaded from the VNFRQTS projects needs.json output
227         :return: List of requirement metadata
228         """
229         data = []
230         for r_id in self.requirement_ids:
231             if r_id not in curr_reqs:
232                 continue
233             data.append(
234                 {
235                     "id": r_id,
236                     "text": curr_reqs[r_id]["description"],
237                     "keyword": curr_reqs[r_id]["keyword"],
238                 }
239             )
240         return data
241
242     def resolution_steps(self, resolutions):
243         """
244         :param resolutions: Loaded from contents for resolution_steps.json
245         :return: Header and text for the resolution step associated with this
246                  test case.  Returns empty string if no resolutions are
247                  provided.
248         """
249         text = (
250             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
251             for entry in resolutions
252             if self._match(entry)
253         )
254         return "".join(text)
255
256     def _match(self, resolution_entry):
257         """
258         Returns True if the test result maps to the given entry in
259         the resolutions file
260         """
261         return (
262             self.test_case == resolution_entry["function"]
263             and self.test_module == resolution_entry["module"]
264         )
265
266     def _get_files(self):
267         """
268         Extracts the list of files passed into the test case.
269         :return: List of absolute paths to files
270         """
271         if "environment_pair" in self.item.fixturenames:
272             return [
273                 "{} environment pair".format(
274                     self.item.funcargs["environment_pair"]["name"]
275                 )
276             ]
277         elif "heat_volume_pair" in self.item.fixturenames:
278             return [
279                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
280             ]
281         elif "heat_templates" in self.item.fixturenames:
282             return self.item.funcargs["heat_templates"]
283         elif "yaml_files" in self.item.fixturenames:
284             return self.item.funcargs["yaml_files"]
285         else:
286             parts = self.result.nodeid.split("[")
287             return [""] if len(parts) == 1 else [parts[1][:-1]]
288
289     def _get_error_message(self):
290         """
291         :return: Error message or empty string if the test did not fail or error
292         """
293         if self.is_failed:
294             return extract_error_msg(self.result)
295         else:
296             return ""
297
298
299 # noinspection PyUnusedLocal
300 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
301 def pytest_runtest_makereport(item, call):
302     """
303     Captures the test results for later reporting.  This will also halt testing
304     if a base failure is encountered (can be overridden with continue-on-failure)
305     """
306     outcome = yield
307     if outcome.get_result().when != "call":
308         return  # only capture results of test cases themselves
309     result = TestResult(item, outcome)
310     ALL_RESULTS.append(result)
311     if (
312         not item.config.option.continue_on_failure
313         and result.is_base_test
314         and result.is_failed
315     ):
316         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
317             result.error_message
318         )
319         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
320
321
322 def make_timestamp():
323     """
324     :return: String make_iso_timestamp in format:
325              2019-01-19 10:18:49.865000 Central Standard Time
326     """
327     timezone = time.tzname[time.localtime().tm_isdst]
328     return "{} {}".format(str(datetime.datetime.now()), timezone)
329
330
331 # noinspection PyUnusedLocal
332 def pytest_sessionstart(session):
333     ALL_RESULTS.clear()
334     COLLECTION_FAILURES.clear()
335
336
337 # noinspection PyUnusedLocal
338 def pytest_sessionfinish(session, exitstatus):
339     """
340     If not a self-test run, generate the output reports
341     """
342     if not session.config.option.template_dir:
343         return
344
345     if session.config.option.template_source:
346         template_source = session.config.option.template_source[0]
347     else:
348         template_source = os.path.abspath(session.config.option.template_dir[0])
349
350     categories_selected = session.config.option.test_categories or ""
351     generate_report(
352         get_output_dir(session.config),
353         template_source,
354         categories_selected,
355         session.config.option.report_format,
356     )
357
358
359 # noinspection PyUnusedLocal
360 def pytest_collection_modifyitems(session, config, items):
361     """
362     Selects tests based on the categories requested.  Tests without
363     categories will always be executed.
364     """
365     config.traceability_items = list(items)  # save all items for traceability
366     if not config.option.self_test:
367         for item in items:
368             # checking if test belongs to a category
369             if hasattr(item.function, "categories"):
370                 if config.option.test_categories:
371                     test_categories = getattr(item.function, "categories")
372                     passed_categories = config.option.test_categories
373                     if not all(
374                         category in passed_categories for category in test_categories
375                     ):
376                         item.add_marker(
377                             pytest.mark.skip(
378                                 reason="Test categories do not match all the passed categories"
379                             )
380                         )
381                 else:
382                     item.add_marker(
383                         pytest.mark.skip(
384                             reason="Test belongs to a category but no categories were passed"
385                         )
386                     )
387     items.sort(
388         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
389     )
390
391
392 def make_href(paths):
393     """
394     Create an anchor tag to link to the file paths provided.
395     :param paths: string or list of file paths
396     :return: String of hrefs - one for each path, each seperated by a line
397              break (<br/).
398     """
399     paths = [paths] if isinstance(paths, string_types) else paths
400     links = []
401     for p in paths:
402         abs_path = os.path.abspath(p)
403         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
404         links.append(
405             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
406                 abs_path=abs_path, name=name
407             )
408         )
409     return "<br/>".join(links)
410
411
412 def load_resolutions_file():
413     """
414     :return: dict of data loaded from resolutions_steps.json
415     """
416     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
417     if os.path.exists(resolution_steps):
418         with open(resolution_steps, "r") as f:
419             return json.loads(f.read())
420
421
422 def generate_report(outpath, template_path, categories, output_format="html"):
423     """
424     Generates the various output reports.
425
426     :param outpath: destination directory for all reports
427     :param template_path: directory containing the Heat templates validated
428     :param categories: Optional categories selected
429     :param output_format: One of "html", "excel", or "csv". Default is "html"
430     :raises: ValueError if requested output format is unknown
431     """
432     failures = [r for r in ALL_RESULTS if r.is_failed]
433     generate_failure_file(outpath)
434     output_format = output_format.lower().strip() if output_format else "html"
435     if output_format == "html":
436         generate_html_report(outpath, categories, template_path, failures)
437     elif output_format == "excel":
438         generate_excel_report(outpath, categories, template_path, failures)
439     elif output_format == "json":
440         generate_json(outpath, template_path, categories)
441     elif output_format == "csv":
442         generate_csv_report(outpath, categories, template_path, failures)
443     else:
444         raise ValueError("Unsupported output format: " + output_format)
445
446
447 def write_json(data, path):
448     """
449     Pretty print data as JSON to the output path requested
450
451     :param data: Data structure to be converted to JSON
452     :param path: Where to write output
453     """
454     with open(path, "w") as f:
455         json.dump(data, f, indent=2)
456
457
458 def generate_failure_file(outpath):
459     """
460     Writes a summary of test failures to a file named failures.
461     This is for backwards compatibility only.  The report.json offers a
462     more comprehensive output.
463     """
464     failure_path = os.path.join(outpath, "failures")
465     failures = [r for r in ALL_RESULTS if r.is_failed]
466     data = {}
467     for i, fail in enumerate(failures):
468         data[str(i)] = {
469             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
470             "vnfrqts": fail.requirement_ids,
471             "test": fail.test_case,
472             "test_file": fail.test_module,
473             "raw_output": fail.raw_output,
474             "message": fail.error_message,
475         }
476     write_json(data, failure_path)
477
478
479 def generate_csv_report(output_dir, categories, template_path, failures):
480     rows = [["Validation Failures"]]
481     headers = [
482         ("Categories Selected:", categories),
483         ("Tool Version:", version.VERSION),
484         ("Report Generated At:", make_timestamp()),
485         ("Directory Validated:", template_path),
486         ("Checksum:", hash_directory(template_path)),
487         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
488     ]
489     rows.append([])
490     for header in headers:
491         rows.append(header)
492     rows.append([])
493
494     if COLLECTION_FAILURES:
495         rows.append([COLLECTION_FAILURE_WARNING])
496         rows.append(["Validation File", "Test", "Fixtures", "Error"])
497         for failure in COLLECTION_FAILURES:
498             rows.append(
499                 [
500                     failure["module"],
501                     failure["test"],
502                     ";".join(failure["fixtures"]),
503                     failure["error"],
504                 ]
505             )
506         rows.append([])
507
508     # table header
509     rows.append([col for col, _ in REPORT_COLUMNS])
510
511     reqs = load_current_requirements()
512     resolutions = load_resolutions_file()
513
514     # table content
515     for failure in failures:
516         rows.append(
517             [
518                 "\n".join(failure.files),
519                 failure.test_module,
520                 failure.requirement_text(reqs),
521                 failure.resolution_steps(resolutions),
522                 failure.error_message,
523                 failure.raw_output,
524             ]
525         )
526
527     output_path = os.path.join(output_dir, "report.csv")
528     with open(output_path, "w", newline="") as f:
529         writer = csv.writer(f)
530         for row in rows:
531             writer.writerow(row)
532
533
534 def generate_excel_report(output_dir, categories, template_path, failures):
535     output_path = os.path.join(output_dir, "report.xlsx")
536     workbook = xlsxwriter.Workbook(output_path)
537     bold = workbook.add_format({"bold": True})
538     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
539     normal = workbook.add_format({"text_wrap": True})
540     heading = workbook.add_format({"bold": True, "font_size": 18})
541     worksheet = workbook.add_worksheet("failures")
542     worksheet.write(0, 0, "Validation Failures", heading)
543
544     headers = [
545         ("Categories Selected:", ",".join(categories)),
546         ("Tool Version:", version.VERSION),
547         ("Report Generated At:", make_timestamp()),
548         ("Directory Validated:", template_path),
549         ("Checksum:", hash_directory(template_path)),
550         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
551     ]
552     for row, (header, value) in enumerate(headers, start=2):
553         worksheet.write(row, 0, header, bold)
554         worksheet.write(row, 1, value)
555
556     worksheet.set_column(0, len(headers) - 1, 40)
557     worksheet.set_column(len(headers), len(headers), 80)
558
559     if COLLECTION_FAILURES:
560         collection_failures_start = 2 + len(headers) + 2
561         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
562         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
563         for col_num, col_name in enumerate(collection_failure_headers):
564             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
565         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
566             worksheet.write(row, 0, data["module"])
567             worksheet.write(row, 1, data["test"])
568             worksheet.write(row, 2, ",".join(data["fixtures"]))
569             worksheet.write(row, 3, data["error"], code)
570
571     # table header
572     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
573     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
574     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
575         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
576
577     reqs = load_current_requirements()
578     resolutions = load_resolutions_file()
579
580     # table content
581     for row, failure in enumerate(failures, start=start_error_table_row + 2):
582         worksheet.write(row, 0, "\n".join(failure.files), normal)
583         worksheet.write(row, 1, failure.test_module, normal)
584         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
585         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
586         worksheet.write(row, 4, failure.error_message, normal)
587         worksheet.write(row, 5, failure.raw_output, code)
588
589     workbook.close()
590
591
592 def make_iso_timestamp():
593     """
594     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
595     """
596     now = datetime.datetime.utcnow()
597     now.replace(tzinfo=datetime.timezone.utc)
598     return now.isoformat()
599
600
601 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
602     """
603     Examines all tests associated with a given requirement and determines
604     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
605
606     * ERROR - At least one ERROR occurred
607     * PASS -  At least one PASS and no FAIL or ERRORs.
608     * FAIL -  At least one FAIL occurred (no ERRORs)
609     * SKIP - All tests were SKIP
610
611
612     :param r_id: Requirement ID to examing
613     :param collection_failures: Errors that occurred during test setup.
614     :param test_results: List of TestResult
615     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
616     """
617     errors = any(r_id in f["requirements"] for f in collection_failures)
618     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
619     return aggregate_results(errors, outcomes, r_id)
620
621
622 def aggregate_results(has_errors, outcomes, r_id=None):
623     """
624     Determines the aggregate result for the conditions provided.  Assumes the
625     results have been filtered and collected for analysis.
626
627     :param has_errors: True if collection failures occurred for the tests being
628                        analyzed.
629     :param outcomes: set of outcomes from the TestResults
630     :param r_id: Optional requirement ID if known
631     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
632              (see aggregate_requirement_adherence for more detail)
633     """
634     if has_errors:
635         return "ERROR"
636
637     if not outcomes:
638         return "PASS"
639     elif "FAIL" in outcomes:
640         return "FAIL"
641     elif "PASS" in outcomes:
642         return "PASS"
643     elif {"SKIP"} == outcomes:
644         return "SKIP"
645     else:
646         pytest.warns(
647             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
648                 outcomes, r_id
649             )
650         )
651         return "ERROR"
652
653
654 def aggregate_run_results(collection_failures, test_results):
655     """
656     Determines overall status of run based on all failures and results.
657
658     * 'ERROR' - At least one collection failure occurred during the run.
659     * 'FAIL' - Template failed at least one test
660     * 'PASS' - All tests executed properly and no failures were detected
661
662     :param collection_failures: failures occuring during test setup
663     :param test_results: list of all test executuion results
664     :return: one of 'ERROR', 'FAIL', or 'PASS'
665     """
666     if collection_failures:
667         return "ERROR"
668     elif any(r.is_failed for r in test_results):
669         return "FAIL"
670     else:
671         return "PASS"
672
673
674 def error(failure_or_result):
675     """
676     Extracts the error message from a collection failure or test result
677     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
678     :return: Error message as string
679     """
680     if isinstance(failure_or_result, TestResult):
681         return failure_or_result.error_message
682     else:
683         return failure_or_result["error"]
684
685
686 def req_ids(failure_or_result):
687     """
688     Extracts the requirement IDs from a collection failure or test result
689     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
690     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
691     """
692     if isinstance(failure_or_result, TestResult):
693         return set(failure_or_result.requirement_ids)
694     else:
695         return set(failure_or_result["requirements"])
696
697
698 def collect_errors(r_id, collection_failures, test_result):
699     """
700     Creates a list of error messages from the collection failures and
701     test results.  If r_id is provided, then it collects the error messages
702     where the failure or test is associated with that requirement ID.  If
703     r_id is None, then it collects all errors that occur on failures and
704     results that are not mapped to requirements
705     """
706
707     def selector(item):
708         if r_id:
709             return r_id in req_ids(item)
710         else:
711             return not req_ids(item)
712
713     errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
714     return [e for e in errors if e]
715
716
717 def relative_paths(base_dir, paths):
718     return [os.path.relpath(p, base_dir) for p in paths]
719
720
721 def generate_json(outpath, template_path, categories):
722     """
723     Creates a JSON summary of the entire test run.
724     """
725     reqs = load_current_requirements()
726     data = {
727         "version": "dublin",
728         "template_directory": os.path.splitdrive(template_path)[1].replace(
729             os.path.sep, "/"
730         ),
731         "timestamp": make_iso_timestamp(),
732         "checksum": hash_directory(template_path),
733         "categories": categories,
734         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
735         "tests": [],
736         "requirements": [],
737     }
738
739     results = data["tests"]
740     for result in COLLECTION_FAILURES:
741         results.append(
742             {
743                 "files": [],
744                 "test_module": result["module"],
745                 "test_case": result["test"],
746                 "result": "ERROR",
747                 "error": result["error"],
748                 "requirements": result["requirements"],
749             }
750         )
751     for result in ALL_RESULTS:
752         results.append(
753             {
754                 "files": relative_paths(template_path, result.files),
755                 "test_module": result.test_module,
756                 "test_case": result.test_case,
757                 "result": result.outcome,
758                 "error": result.error_message if result.is_failed else "",
759                 "requirements": result.requirements_metadata(reqs),
760             }
761         )
762
763     requirements = data["requirements"]
764     for r_id, r_data in reqs.items():
765         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
766         if result:
767             requirements.append(
768                 {
769                     "id": r_id,
770                     "text": r_data["description"],
771                     "keyword": r_data["keyword"],
772                     "result": result,
773                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
774                 }
775             )
776     # If there are tests that aren't mapped to a requirement, then we'll
777     # map them to a special entry so the results are coherent.
778     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
779     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
780     if unmapped_outcomes or has_errors:
781         requirements.append(
782             {
783                 "id": "Unmapped",
784                 "text": "Tests not mapped to requirements (see tests)",
785                 "result": aggregate_results(has_errors, unmapped_outcomes),
786                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
787             }
788         )
789
790     report_path = os.path.join(outpath, "report.json")
791     write_json(data, report_path)
792
793
794 def generate_html_report(outpath, categories, template_path, failures):
795     reqs = load_current_requirements()
796     resolutions = load_resolutions_file()
797     fail_data = []
798     for failure in failures:
799         fail_data.append(
800             {
801                 "file_links": make_href(failure.files),
802                 "test_id": failure.test_module,
803                 "error_message": failure.error_message,
804                 "raw_output": failure.raw_output,
805                 "requirements": docutils.core.publish_parts(
806                     writer_name="html", source=failure.requirement_text(reqs)
807                 )["body"],
808                 "resolution_steps": failure.resolution_steps(resolutions),
809             }
810         )
811     pkg_dir = os.path.split(__file__)[0]
812     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
813     with open(j2_template_path, "r") as f:
814         report_template = jinja2.Template(f.read())
815         contents = report_template.render(
816             version=version.VERSION,
817             num_failures=len(failures) + len(COLLECTION_FAILURES),
818             categories=categories,
819             template_dir=make_href(template_path),
820             checksum=hash_directory(template_path),
821             timestamp=make_timestamp(),
822             failures=fail_data,
823             collection_failures=COLLECTION_FAILURES,
824         )
825     with open(os.path.join(outpath, "report.html"), "w") as f:
826         f.write(contents)
827
828
829 def pytest_addoption(parser):
830     """
831     Add needed CLI arguments
832     """
833     parser.addoption(
834         "--template-directory",
835         dest="template_dir",
836         action="append",
837         help="Directory which holds the templates for validation",
838     )
839
840     parser.addoption(
841         "--template-source",
842         dest="template_source",
843         action="append",
844         help="Source Directory which holds the templates for validation",
845     )
846
847     parser.addoption(
848         "--self-test",
849         dest="self_test",
850         action="store_true",
851         help="Test the unit tests against their fixtured data",
852     )
853
854     parser.addoption(
855         "--report-format",
856         dest="report_format",
857         action="store",
858         help="Format of output report (html, csv, excel, json)",
859     )
860
861     parser.addoption(
862         "--continue-on-failure",
863         dest="continue_on_failure",
864         action="store_true",
865         help="Continue validation even when structural errors exist in input files",
866     )
867
868     parser.addoption(
869         "--output-directory",
870         dest="output_dir",
871         action="store",
872         default=None,
873         help="Alternate ",
874     )
875
876     parser.addoption(
877         "--category",
878         dest="test_categories",
879         action="append",
880         help="optional category of test to execute",
881     )
882
883
884 def pytest_configure(config):
885     """
886     Ensure that we are receive either `--self-test` or
887     `--template-dir=<directory` as CLI arguments
888     """
889     if config.getoption("template_dir") and config.getoption("self_test"):
890         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
891     if not (
892         config.getoption("template_dir")
893         or config.getoption("self_test")
894         or config.getoption("help")
895     ):
896         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
897
898
899 def pytest_generate_tests(metafunc):
900     """
901     If a unit test requires an argument named 'filename'
902     we generate a test for the filenames selected. Either
903     the files contained in `template_dir` or if `template_dir`
904     is not specified on the CLI, the fixtures associated with this
905     test name.
906     """
907
908     # noinspection PyBroadException
909     try:
910         if "filename" in metafunc.fixturenames:
911             from .parametrizers import parametrize_filename
912
913             parametrize_filename(metafunc)
914
915         if "filenames" in metafunc.fixturenames:
916             from .parametrizers import parametrize_filenames
917
918             parametrize_filenames(metafunc)
919
920         if "template_dir" in metafunc.fixturenames:
921             from .parametrizers import parametrize_template_dir
922
923             parametrize_template_dir(metafunc)
924
925         if "environment_pair" in metafunc.fixturenames:
926             from .parametrizers import parametrize_environment_pair
927
928             parametrize_environment_pair(metafunc)
929
930         if "heat_volume_pair" in metafunc.fixturenames:
931             from .parametrizers import parametrize_heat_volume_pair
932
933             parametrize_heat_volume_pair(metafunc)
934
935         if "yaml_files" in metafunc.fixturenames:
936             from .parametrizers import parametrize_yaml_files
937
938             parametrize_yaml_files(metafunc)
939
940         if "env_files" in metafunc.fixturenames:
941             from .parametrizers import parametrize_environment_files
942
943             parametrize_environment_files(metafunc)
944
945         if "yaml_file" in metafunc.fixturenames:
946             from .parametrizers import parametrize_yaml_file
947
948             parametrize_yaml_file(metafunc)
949
950         if "env_file" in metafunc.fixturenames:
951             from .parametrizers import parametrize_environment_file
952
953             parametrize_environment_file(metafunc)
954
955         if "parsed_yaml_file" in metafunc.fixturenames:
956             from .parametrizers import parametrize_parsed_yaml_file
957
958             parametrize_parsed_yaml_file(metafunc)
959
960         if "parsed_environment_file" in metafunc.fixturenames:
961             from .parametrizers import parametrize_parsed_environment_file
962
963             parametrize_parsed_environment_file(metafunc)
964
965         if "heat_template" in metafunc.fixturenames:
966             from .parametrizers import parametrize_heat_template
967
968             parametrize_heat_template(metafunc)
969
970         if "heat_templates" in metafunc.fixturenames:
971             from .parametrizers import parametrize_heat_templates
972
973             parametrize_heat_templates(metafunc)
974
975         if "volume_template" in metafunc.fixturenames:
976             from .parametrizers import parametrize_volume_template
977
978             parametrize_volume_template(metafunc)
979
980         if "volume_templates" in metafunc.fixturenames:
981             from .parametrizers import parametrize_volume_templates
982
983             parametrize_volume_templates(metafunc)
984
985         if "template" in metafunc.fixturenames:
986             from .parametrizers import parametrize_template
987
988             parametrize_template(metafunc)
989
990         if "templates" in metafunc.fixturenames:
991             from .parametrizers import parametrize_templates
992
993             parametrize_templates(metafunc)
994     except Exception as e:
995         # If an error occurs in the collection phase, then it won't be logged as a
996         # normal test failure.  This means that failures could occur, but not
997         # be seen on the report resulting in a false positive success message.  These
998         # errors will be stored and reported separately on the report
999         COLLECTION_FAILURES.append(
1000             {
1001                 "module": metafunc.module.__name__,
1002                 "test": metafunc.function.__name__,
1003                 "fixtures": metafunc.fixturenames,
1004                 "error": traceback.format_exc(),
1005                 "requirements": getattr(metafunc.function, "requirement_ids", []),
1006             }
1007         )
1008         raise e
1009
1010
1011 def hash_directory(path):
1012     """
1013     Create md5 hash using the contents of all files under ``path``
1014     :param path: string directory containing files
1015     :return: string MD5 hash code (hex)
1016     """
1017     md5 = hashlib.md5()
1018     for dir_path, sub_dirs, filenames in os.walk(path):
1019         for filename in filenames:
1020             file_path = os.path.join(dir_path, filename)
1021             with open(file_path, "rb") as f:
1022                 md5.update(f.read())
1023     return md5.hexdigest()
1024
1025
1026 def load_current_requirements():
1027     """Loads dict of current requirements or empty dict if file doesn't exist"""
1028     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1029         data = json.load(f)
1030         version = data["current_version"]
1031         return data["versions"][version]["needs"]
1032
1033
1034 def select_heat_requirements(reqs):
1035     """Filters dict requirements to only those requirements pertaining to Heat"""
1036     return {k: v for k, v in reqs.items() if "Heat" in v["docname"]}
1037
1038
1039 def build_rst_json(reqs):
1040     """Takes requirements and returns list of only Heat requirements"""
1041     data = json.loads(reqs)
1042     for key, values in list(data.items()):
1043         if "Heat" in (values["docname"]):
1044             if "MUST" in (values["keyword"]):
1045                 if "none" in (values["validation_mode"]):
1046                     del data[key]
1047                 else:
1048                     # Creates links in RST format to requirements and test cases
1049                     if values["test_case"]:
1050                         mod = values["test_case"].split(".")[-1]
1051                         val = TEST_SCRIPT_SITE + mod + ".py"
1052                         rst_value = ("`" + mod + " <" + val + ">`_")
1053                         title = "`" + values["id"] + " <" + VNFRQTS_ID_URL + values["docname"].replace(" ", "%20") + ".html#" + values["id"] + ">`_"
1054                         data[key].update({'full_title': title, 'test_case': rst_value})
1055                     else:
1056                         del data[key]
1057             else:
1058                 del data[key]
1059         else:
1060             del data[key]
1061     return data
1062
1063
1064 def generate_rst_table(data):
1065     """Generate a formatted csv to be used in RST"""
1066     rst_path = os.path.join(__path__[0], "../output/rst.csv")
1067     with open(rst_path, "w", newline="") as f:
1068         out = csv.writer(f)
1069         out.writerow(
1070             ("Requirement ID", "Requirement", "Test Module", "Test Name"),
1071         )
1072         for req_id, metadata in data.items():
1073             out.writerow(
1074                 (
1075                     metadata["full_title"],
1076                     metadata["description"],
1077                     metadata["test_case"],
1078                     metadata["validated_by"],
1079                 )
1080             )
1081
1082
1083 # noinspection PyUnusedLocal
1084 def pytest_report_collectionfinish(config, startdir, items):
1085     """Generates a simple traceability report to output/traceability.csv"""
1086     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1087     output_dir = os.path.split(traceability_path)[0]
1088     if not os.path.exists(output_dir):
1089         os.makedirs(output_dir)
1090     reqs = load_current_requirements()
1091     requirements = select_heat_requirements(reqs)
1092     unmapped, mapped = partition(
1093         lambda i: hasattr(i.function, "requirement_ids"), items
1094     )
1095
1096     req_to_test = defaultdict(set)
1097     mapping_errors = set()
1098     for item in mapped:
1099         for req_id in item.function.requirement_ids:
1100             if req_id not in req_to_test:
1101                 req_to_test[req_id].add(item)
1102                 if req_id in requirements:
1103                     reqs[req_id].update({'test_case': item.function.__module__,
1104                                          'validated_by': item.function.__name__})
1105             if req_id not in requirements:
1106                 mapping_errors.add(
1107                     (req_id, item.function.__module__, item.function.__name__)
1108                 )
1109
1110     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1111     with open(mapping_error_path, "w", newline="") as f:
1112         writer = csv.writer(f)
1113         for err in mapping_errors:
1114             writer.writerow(err)
1115
1116     with open(traceability_path, "w", newline="") as f:
1117         out = csv.writer(f)
1118         out.writerow(
1119             ("Requirement ID", "Requirement", "Section",
1120              "Keyword", "Validation Mode", "Is Testable",
1121              "Test Module", "Test Name"),
1122         )
1123         for req_id, metadata in requirements.items():
1124             keyword = metadata["keyword"].upper()
1125             mode = metadata["validation_mode"].lower()
1126             testable = keyword in {"MUST", "MUST NOT"} and mode != "none"
1127             if req_to_test[req_id]:
1128                 for item in req_to_test[req_id]:
1129                     out.writerow(
1130                         (
1131                             req_id,
1132                             metadata["description"],
1133                             metadata["section_name"],
1134                             keyword,
1135                             mode,
1136                             "TRUE" if testable else "FALSE",
1137                             item.function.__module__,
1138                             item.function.__name__,
1139                         ),
1140                     )
1141             else:
1142                 out.writerow(
1143                     (req_id,
1144                      metadata["description"],
1145                      metadata["section_name"],
1146                      keyword,
1147                      mode,
1148                      "TRUE" if testable else "FALSE",
1149                      "",   # test module
1150                      ""),  # test function
1151                 )
1152         # now write out any test methods that weren't mapped to requirements
1153         unmapped_tests = {(item.function.__module__, item.function.__name__) for item in
1154                           unmapped}
1155         for test_module, test_name in unmapped_tests:
1156             out.writerow(
1157                 ("",        # req ID
1158                  "",        # description
1159                  "",        # section name
1160                  "",        # keyword
1161                  "static",  # validation mode
1162                  "TRUE",    # testable
1163                  test_module,
1164                  test_name)
1165             )
1166
1167     generate_rst_table(build_rst_json(json.dumps(reqs)))