a2f432126ff369cdce9bfad5774577ffec62c0db
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47 from itertools import chain
48
49 import traceback
50
51 import docutils.core
52 import jinja2
53 import pytest
54 from more_itertools import partition
55 import xlsxwriter
56 from six import string_types
57
58 import version
59
60 __path__ = [os.path.dirname(os.path.abspath(__file__))]
61
62 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
63
64 RESOLUTION_STEPS_FILE = "resolution_steps.json"
65 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
66 TEST_SCRIPT_SITE = "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
67 VNFRQTS_ID_URL = "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
68
69 REPORT_COLUMNS = [
70     ("Input File", "file"),
71     ("Test", "test_file"),
72     ("Requirements", "req_description"),
73     ("Resolution Steps", "resolution_steps"),
74     ("Error Message", "message"),
75     ("Raw Test Output", "raw_output"),
76 ]
77
78 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
79 while preparing to validate the the input files. Some validations may not have been
80 executed. Please refer these issue to the VNF Validation Tool team.
81 """
82
83 COLLECTION_FAILURES = []
84
85 # Captures the results of every test run
86 ALL_RESULTS = []
87
88
89 def get_output_dir(config):
90     """
91     Retrieve the output directory for the reports and create it if necessary
92     :param config: pytest configuration
93     :return: output directory as string
94     """
95     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
96     if not os.path.exists(output_dir):
97         os.makedirs(output_dir, exist_ok=True)
98     return output_dir
99
100
101 def extract_error_msg(rep):
102     """
103     If a custom error message was provided, then extract it otherwise
104     just show the pytest assert message
105     """
106     if rep.outcome != "failed":
107         return ""
108     try:
109         full_msg = str(rep.longrepr.reprcrash.message)
110         match = re.match(
111             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
112         )
113         if match:  # custom message was provided
114             # Extract everything between AssertionError and the start
115             # of the assert statement expansion in the pytest report
116             msg = match.group(1)
117         else:
118             msg = str(rep.longrepr.reprcrash)
119             if "AssertionError:" in msg:
120                 msg = msg.split("AssertionError:")[1]
121     except AttributeError:
122         msg = str(rep)
123
124     return msg
125
126
127 class TestResult:
128     """
129     Wraps the test case and result to extract necessary metadata for
130     reporting purposes.
131     """
132
133     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
134
135     def __init__(self, item, outcome):
136         self.item = item
137         self.result = outcome.get_result()
138         self.files = [os.path.normpath(p) for p in self._get_files()]
139         self.error_message = self._get_error_message()
140
141     @property
142     def requirement_ids(self):
143         """
144         Returns list of requirement IDs mapped to the test case.
145
146         :return: Returns a list of string requirement IDs the test was
147                  annotated with ``validates`` otherwise returns and empty list
148         """
149         is_mapped = hasattr(self.item.function, "requirement_ids")
150         return self.item.function.requirement_ids if is_mapped else []
151
152     @property
153     def markers(self):
154         """
155         :return: Returns a set of pytest marker names for the test or an empty set
156         """
157         return set(m.name for m in self.item.iter_markers())
158
159     @property
160     def is_base_test(self):
161         """
162         :return: Returns True if the test is annotated with a pytest marker called base
163         """
164         return "base" in self.markers
165
166     @property
167     def is_failed(self):
168         """
169         :return: True if the test failed
170         """
171         return self.outcome == "FAIL"
172
173     @property
174     def outcome(self):
175         """
176         :return: Returns 'PASS', 'FAIL', or 'SKIP'
177         """
178         return self.RESULT_MAPPING[self.result.outcome]
179
180     @property
181     def test_case(self):
182         """
183         :return: Name of the test case method
184         """
185         return self.item.function.__name__
186
187     @property
188     def test_module(self):
189         """
190         :return: Name of the file containing the test case
191         """
192         return self.item.function.__module__.split(".")[-1]
193
194     @property
195     def raw_output(self):
196         """
197         :return: Full output from pytest for the given test case
198         """
199         return str(self.result.longrepr)
200
201     def requirement_text(self, curr_reqs):
202         """
203         Creates a text summary for the requirement IDs mapped to the test case.
204         If no requirements are mapped, then it returns the empty string.
205
206         :param curr_reqs: mapping of requirement IDs to requirement metadata
207                           loaded from the VNFRQTS projects needs.json output
208         :return: ID and text of the requirements mapped to the test case
209         """
210         text = (
211             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
212             for r_id in self.requirement_ids
213         )
214         return "".join(text)
215
216     def requirements_metadata(self, curr_reqs):
217         """
218         Returns a list of dicts containing the following metadata for each
219         requirement mapped:
220
221         - id: Requirement ID
222         - text: Full text of the requirement
223         - keyword: MUST, MUST NOT, MAY, etc.
224
225         :param curr_reqs: mapping of requirement IDs to requirement metadata
226                           loaded from the VNFRQTS projects needs.json output
227         :return: List of requirement metadata
228         """
229         data = []
230         for r_id in self.requirement_ids:
231             if r_id not in curr_reqs:
232                 continue
233             data.append(
234                 {
235                     "id": r_id,
236                     "text": curr_reqs[r_id]["description"],
237                     "keyword": curr_reqs[r_id]["keyword"],
238                 }
239             )
240         return data
241
242     def resolution_steps(self, resolutions):
243         """
244         :param resolutions: Loaded from contents for resolution_steps.json
245         :return: Header and text for the resolution step associated with this
246                  test case.  Returns empty string if no resolutions are
247                  provided.
248         """
249         text = (
250             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
251             for entry in resolutions
252             if self._match(entry)
253         )
254         return "".join(text)
255
256     def _match(self, resolution_entry):
257         """
258         Returns True if the test result maps to the given entry in
259         the resolutions file
260         """
261         return (
262             self.test_case == resolution_entry["function"]
263             and self.test_module == resolution_entry["module"]
264         )
265
266     def _get_files(self):
267         """
268         Extracts the list of files passed into the test case.
269         :return: List of absolute paths to files
270         """
271         if "environment_pair" in self.item.fixturenames:
272             return [
273                 "{} environment pair".format(
274                     self.item.funcargs["environment_pair"]["name"]
275                 )
276             ]
277         elif "heat_volume_pair" in self.item.fixturenames:
278             return [
279                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
280             ]
281         elif "heat_templates" in self.item.fixturenames:
282             return self.item.funcargs["heat_templates"]
283         elif "yaml_files" in self.item.fixturenames:
284             return self.item.funcargs["yaml_files"]
285         else:
286             return [self.result.nodeid.split("[")[1][:-1]]
287
288     def _get_error_message(self):
289         """
290         :return: Error message or empty string if the test did not fail or error
291         """
292         if self.is_failed:
293             return extract_error_msg(self.result)
294         else:
295             return ""
296
297
298 # noinspection PyUnusedLocal
299 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
300 def pytest_runtest_makereport(item, call):
301     """
302     Captures the test results for later reporting.  This will also halt testing
303     if a base failure is encountered (can be overridden with continue-on-failure)
304     """
305     outcome = yield
306     if outcome.get_result().when != "call":
307         return  # only capture results of test cases themselves
308     result = TestResult(item, outcome)
309     ALL_RESULTS.append(result)
310     if (
311         not item.config.option.continue_on_failure
312         and result.is_base_test
313         and result.is_failed
314     ):
315         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
316             result.error_message
317         )
318         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
319
320
321 def make_timestamp():
322     """
323     :return: String make_iso_timestamp in format:
324              2019-01-19 10:18:49.865000 Central Standard Time
325     """
326     timezone = time.tzname[time.localtime().tm_isdst]
327     return "{} {}".format(str(datetime.datetime.now()), timezone)
328
329
330 # noinspection PyUnusedLocal
331 def pytest_sessionstart(session):
332     ALL_RESULTS.clear()
333     COLLECTION_FAILURES.clear()
334
335
336 # noinspection PyUnusedLocal
337 def pytest_sessionfinish(session, exitstatus):
338     """
339     If not a self-test run, generate the output reports
340     """
341     if not session.config.option.template_dir:
342         return
343
344     if session.config.option.template_source:
345         template_source = session.config.option.template_source[0]
346     else:
347         template_source = os.path.abspath(session.config.option.template_dir[0])
348
349     categories_selected = session.config.option.test_categories or ""
350     generate_report(
351         get_output_dir(session.config),
352         template_source,
353         categories_selected,
354         session.config.option.report_format,
355     )
356
357
358 # noinspection PyUnusedLocal
359 def pytest_collection_modifyitems(session, config, items):
360     """
361     Selects tests based on the categories requested.  Tests without
362     categories will always be executed.
363     """
364     config.traceability_items = list(items)  # save all items for traceability
365     if not config.option.self_test:
366         for item in items:
367             # checking if test belongs to a category
368             if hasattr(item.function, "categories"):
369                 if config.option.test_categories:
370                     test_categories = getattr(item.function, "categories")
371                     passed_categories = config.option.test_categories
372                     if not all(
373                         category in passed_categories for category in test_categories
374                     ):
375                         item.add_marker(
376                             pytest.mark.skip(
377                                 reason="Test categories do not match all the passed categories"
378                             )
379                         )
380                 else:
381                     item.add_marker(
382                         pytest.mark.skip(
383                             reason="Test belongs to a category but no categories were passed"
384                         )
385                     )
386     items.sort(
387         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
388     )
389
390
391 def make_href(paths):
392     """
393     Create an anchor tag to link to the file paths provided.
394     :param paths: string or list of file paths
395     :return: String of hrefs - one for each path, each seperated by a line
396              break (<br/).
397     """
398     paths = [paths] if isinstance(paths, string_types) else paths
399     links = []
400     for p in paths:
401         abs_path = os.path.abspath(p)
402         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
403         links.append(
404             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
405                 abs_path=abs_path, name=name
406             )
407         )
408     return "<br/>".join(links)
409
410
411 def load_resolutions_file():
412     """
413     :return: dict of data loaded from resolutions_steps.json
414     """
415     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
416     if os.path.exists(resolution_steps):
417         with open(resolution_steps, "r") as f:
418             return json.loads(f.read())
419
420
421 def generate_report(outpath, template_path, categories, output_format="html"):
422     """
423     Generates the various output reports.
424
425     :param outpath: destination directory for all reports
426     :param template_path: directory containing the Heat templates validated
427     :param categories: Optional categories selected
428     :param output_format: One of "html", "excel", or "csv". Default is "html"
429     :raises: ValueError if requested output format is unknown
430     """
431     failures = [r for r in ALL_RESULTS if r.is_failed]
432     generate_failure_file(outpath)
433     output_format = output_format.lower().strip() if output_format else "html"
434     if output_format == "html":
435         generate_html_report(outpath, categories, template_path, failures)
436     elif output_format == "excel":
437         generate_excel_report(outpath, categories, template_path, failures)
438     elif output_format == "json":
439         generate_json(outpath, template_path, categories)
440     elif output_format == "csv":
441         generate_csv_report(outpath, categories, template_path, failures)
442     else:
443         raise ValueError("Unsupported output format: " + output_format)
444
445
446 def write_json(data, path):
447     """
448     Pretty print data as JSON to the output path requested
449
450     :param data: Data structure to be converted to JSON
451     :param path: Where to write output
452     """
453     with open(path, "w") as f:
454         json.dump(data, f, indent=2)
455
456
457 def generate_failure_file(outpath):
458     """
459     Writes a summary of test failures to a file named failures.
460     This is for backwards compatibility only.  The report.json offers a
461     more comprehensive output.
462     """
463     failure_path = os.path.join(outpath, "failures")
464     failures = [r for r in ALL_RESULTS if r.is_failed]
465     data = {}
466     for i, fail in enumerate(failures):
467         data[str(i)] = {
468             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
469             "vnfrqts": fail.requirement_ids,
470             "test": fail.test_case,
471             "test_file": fail.test_module,
472             "raw_output": fail.raw_output,
473             "message": fail.error_message,
474         }
475     write_json(data, failure_path)
476
477
478 def generate_csv_report(output_dir, categories, template_path, failures):
479     rows = [["Validation Failures"]]
480     headers = [
481         ("Categories Selected:", categories),
482         ("Tool Version:", version.VERSION),
483         ("Report Generated At:", make_timestamp()),
484         ("Directory Validated:", template_path),
485         ("Checksum:", hash_directory(template_path)),
486         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
487     ]
488     rows.append([])
489     for header in headers:
490         rows.append(header)
491     rows.append([])
492
493     if COLLECTION_FAILURES:
494         rows.append([COLLECTION_FAILURE_WARNING])
495         rows.append(["Validation File", "Test", "Fixtures", "Error"])
496         for failure in COLLECTION_FAILURES:
497             rows.append(
498                 [
499                     failure["module"],
500                     failure["test"],
501                     ";".join(failure["fixtures"]),
502                     failure["error"],
503                 ]
504             )
505         rows.append([])
506
507     # table header
508     rows.append([col for col, _ in REPORT_COLUMNS])
509
510     reqs = load_current_requirements()
511     resolutions = load_resolutions_file()
512
513     # table content
514     for failure in failures:
515         rows.append(
516             [
517                 "\n".join(failure.files),
518                 failure.test_module,
519                 failure.requirement_text(reqs),
520                 failure.resolution_steps(resolutions),
521                 failure.error_message,
522                 failure.raw_output,
523             ]
524         )
525
526     output_path = os.path.join(output_dir, "report.csv")
527     with open(output_path, "w", newline="") as f:
528         writer = csv.writer(f)
529         for row in rows:
530             writer.writerow(row)
531
532
533 def generate_excel_report(output_dir, categories, template_path, failures):
534     output_path = os.path.join(output_dir, "report.xlsx")
535     workbook = xlsxwriter.Workbook(output_path)
536     bold = workbook.add_format({"bold": True})
537     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
538     normal = workbook.add_format({"text_wrap": True})
539     heading = workbook.add_format({"bold": True, "font_size": 18})
540     worksheet = workbook.add_worksheet("failures")
541     worksheet.write(0, 0, "Validation Failures", heading)
542
543     headers = [
544         ("Categories Selected:", ",".join(categories)),
545         ("Tool Version:", version.VERSION),
546         ("Report Generated At:", make_timestamp()),
547         ("Directory Validated:", template_path),
548         ("Checksum:", hash_directory(template_path)),
549         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
550     ]
551     for row, (header, value) in enumerate(headers, start=2):
552         worksheet.write(row, 0, header, bold)
553         worksheet.write(row, 1, value)
554
555     worksheet.set_column(0, len(headers) - 1, 40)
556     worksheet.set_column(len(headers), len(headers), 80)
557
558     if COLLECTION_FAILURES:
559         collection_failures_start = 2 + len(headers) + 2
560         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
561         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
562         for col_num, col_name in enumerate(collection_failure_headers):
563             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
564         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
565             worksheet.write(row, 0, data["module"])
566             worksheet.write(row, 1, data["test"])
567             worksheet.write(row, 2, ",".join(data["fixtures"]))
568             worksheet.write(row, 3, data["error"], code)
569
570     # table header
571     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
572     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
573     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
574         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
575
576     reqs = load_current_requirements()
577     resolutions = load_resolutions_file()
578
579     # table content
580     for row, failure in enumerate(failures, start=start_error_table_row + 2):
581         worksheet.write(row, 0, "\n".join(failure.files), normal)
582         worksheet.write(row, 1, failure.test_module, normal)
583         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
584         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
585         worksheet.write(row, 4, failure.error_message, normal)
586         worksheet.write(row, 5, failure.raw_output, code)
587
588     workbook.close()
589
590
591 def make_iso_timestamp():
592     """
593     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
594     """
595     now = datetime.datetime.utcnow()
596     now.replace(tzinfo=datetime.timezone.utc)
597     return now.isoformat()
598
599
600 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
601     """
602     Examines all tests associated with a given requirement and determines
603     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
604
605     * ERROR - At least one ERROR occurred
606     * PASS -  At least one PASS and no FAIL or ERRORs.
607     * FAIL -  At least one FAIL occurred (no ERRORs)
608     * SKIP - All tests were SKIP
609
610
611     :param r_id: Requirement ID to examing
612     :param collection_failures: Errors that occurred during test setup.
613     :param test_results: List of TestResult
614     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
615     """
616     errors = any(r_id in f["requirements"] for f in collection_failures)
617     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
618     return aggregate_results(errors, outcomes, r_id)
619
620
621 def aggregate_results(has_errors, outcomes, r_id=None):
622     """
623     Determines the aggregate result for the conditions provided.  Assumes the
624     results have been filtered and collected for analysis.
625
626     :param has_errors: True if collection failures occurred for the tests being
627                        analyzed.
628     :param outcomes: set of outcomes from the TestResults
629     :param r_id: Optional requirement ID if known
630     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
631              (see aggregate_requirement_adherence for more detail)
632     """
633     if has_errors:
634         return "ERROR"
635
636     if not outcomes:
637         return "PASS"
638     elif "FAIL" in outcomes:
639         return "FAIL"
640     elif "PASS" in outcomes:
641         return "PASS"
642     elif {"SKIP"} == outcomes:
643         return "SKIP"
644     else:
645         pytest.warns(
646             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
647                 outcomes, r_id
648             )
649         )
650         return "ERROR"
651
652
653 def aggregate_run_results(collection_failures, test_results):
654     """
655     Determines overall status of run based on all failures and results.
656
657     * 'ERROR' - At least one collection failure occurred during the run.
658     * 'FAIL' - Template failed at least one test
659     * 'PASS' - All tests executed properly and no failures were detected
660
661     :param collection_failures: failures occuring during test setup
662     :param test_results: list of all test executuion results
663     :return: one of 'ERROR', 'FAIL', or 'PASS'
664     """
665     if collection_failures:
666         return "ERROR"
667     elif any(r.is_failed for r in test_results):
668         return "FAIL"
669     else:
670         return "PASS"
671
672
673 def error(failure_or_result):
674     """
675     Extracts the error message from a collection failure or test result
676     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
677     :return: Error message as string
678     """
679     if isinstance(failure_or_result, TestResult):
680         return failure_or_result.error_message
681     else:
682         return failure_or_result["error"]
683
684
685 def req_ids(failure_or_result):
686     """
687     Extracts the requirement IDs from a collection failure or test result
688     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
689     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
690     """
691     if isinstance(failure_or_result, TestResult):
692         return set(failure_or_result.requirement_ids)
693     else:
694         return set(failure_or_result["requirements"])
695
696
697 def collect_errors(r_id, collection_failures, test_result):
698     """
699     Creates a list of error messages from the collection failures and
700     test results.  If r_id is provided, then it collects the error messages
701     where the failure or test is associated with that requirement ID.  If
702     r_id is None, then it collects all errors that occur on failures and
703     results that are not mapped to requirements
704     """
705
706     def selector(item):
707         if r_id:
708             return r_id in req_ids(item)
709         else:
710             return not req_ids(item)
711
712     errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
713     return [e for e in errors if e]
714
715
716 def generate_json(outpath, template_path, categories):
717     """
718     Creates a JSON summary of the entire test run.
719     """
720     reqs = load_current_requirements()
721     data = {
722         "version": "dublin",
723         "template_directory": template_path,
724         "timestamp": make_iso_timestamp(),
725         "checksum": hash_directory(template_path),
726         "categories": categories,
727         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
728         "tests": [],
729         "requirements": [],
730     }
731
732     results = data["tests"]
733     for result in COLLECTION_FAILURES:
734         results.append(
735             {
736                 "files": [],
737                 "test_module": result["module"],
738                 "test_case": result["test"],
739                 "result": "ERROR",
740                 "error": result["error"],
741                 "requirements": result["requirements"],
742             }
743         )
744     for result in ALL_RESULTS:
745         results.append(
746             {
747                 "files": result.files,
748                 "test_module": result.test_module,
749                 "test_case": result.test_case,
750                 "result": result.outcome,
751                 "error": result.error_message if result.is_failed else "",
752                 "requirements": result.requirements_metadata(reqs),
753             }
754         )
755
756     requirements = data["requirements"]
757     for r_id, r_data in reqs.items():
758         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
759         if result:
760             requirements.append(
761                 {
762                     "id": r_id,
763                     "text": r_data["description"],
764                     "keyword": r_data["keyword"],
765                     "result": result,
766                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
767                 }
768             )
769     # If there are tests that aren't mapped to a requirement, then we'll
770     # map them to a special entry so the results are coherent.
771     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
772     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
773     if unmapped_outcomes or has_errors:
774         requirements.append(
775             {
776                 "id": "Unmapped",
777                 "text": "Tests not mapped to requirements (see tests)",
778                 "result": aggregate_results(has_errors, unmapped_outcomes),
779                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
780             }
781         )
782
783     report_path = os.path.join(outpath, "report.json")
784     write_json(data, report_path)
785
786
787 def generate_html_report(outpath, categories, template_path, failures):
788     reqs = load_current_requirements()
789     resolutions = load_resolutions_file()
790     fail_data = []
791     for failure in failures:
792         fail_data.append(
793             {
794                 "file_links": make_href(failure.files),
795                 "test_id": failure.test_module,
796                 "error_message": failure.error_message,
797                 "raw_output": failure.raw_output,
798                 "requirements": docutils.core.publish_parts(
799                     writer_name="html", source=failure.requirement_text(reqs)
800                 )["body"],
801                 "resolution_steps": failure.resolution_steps(resolutions),
802             }
803         )
804     pkg_dir = os.path.split(__file__)[0]
805     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
806     with open(j2_template_path, "r") as f:
807         report_template = jinja2.Template(f.read())
808         contents = report_template.render(
809             version=version.VERSION,
810             num_failures=len(failures) + len(COLLECTION_FAILURES),
811             categories=categories,
812             template_dir=make_href(template_path),
813             checksum=hash_directory(template_path),
814             timestamp=make_timestamp(),
815             failures=fail_data,
816             collection_failures=COLLECTION_FAILURES,
817         )
818     with open(os.path.join(outpath, "report.html"), "w") as f:
819         f.write(contents)
820
821
822 def pytest_addoption(parser):
823     """
824     Add needed CLI arguments
825     """
826     parser.addoption(
827         "--template-directory",
828         dest="template_dir",
829         action="append",
830         help="Directory which holds the templates for validation",
831     )
832
833     parser.addoption(
834         "--template-source",
835         dest="template_source",
836         action="append",
837         help="Source Directory which holds the templates for validation",
838     )
839
840     parser.addoption(
841         "--self-test",
842         dest="self_test",
843         action="store_true",
844         help="Test the unit tests against their fixtured data",
845     )
846
847     parser.addoption(
848         "--report-format",
849         dest="report_format",
850         action="store",
851         help="Format of output report (html, csv, excel, json)",
852     )
853
854     parser.addoption(
855         "--continue-on-failure",
856         dest="continue_on_failure",
857         action="store_true",
858         help="Continue validation even when structural errors exist in input files",
859     )
860
861     parser.addoption(
862         "--output-directory",
863         dest="output_dir",
864         action="store",
865         default=None,
866         help="Alternate ",
867     )
868
869     parser.addoption(
870         "--category",
871         dest="test_categories",
872         action="append",
873         help="optional category of test to execute",
874     )
875
876
877 def pytest_configure(config):
878     """
879     Ensure that we are receive either `--self-test` or
880     `--template-dir=<directory` as CLI arguments
881     """
882     if config.getoption("template_dir") and config.getoption("self_test"):
883         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
884     if not (
885         config.getoption("template_dir")
886         or config.getoption("self_test")
887         or config.getoption("help")
888     ):
889         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
890
891
892 def pytest_generate_tests(metafunc):
893     """
894     If a unit test requires an argument named 'filename'
895     we generate a test for the filenames selected. Either
896     the files contained in `template_dir` or if `template_dir`
897     is not specified on the CLI, the fixtures associated with this
898     test name.
899     """
900
901     # noinspection PyBroadException
902     try:
903         if "filename" in metafunc.fixturenames:
904             from .parametrizers import parametrize_filename
905
906             parametrize_filename(metafunc)
907
908         if "filenames" in metafunc.fixturenames:
909             from .parametrizers import parametrize_filenames
910
911             parametrize_filenames(metafunc)
912
913         if "template_dir" in metafunc.fixturenames:
914             from .parametrizers import parametrize_template_dir
915
916             parametrize_template_dir(metafunc)
917
918         if "environment_pair" in metafunc.fixturenames:
919             from .parametrizers import parametrize_environment_pair
920
921             parametrize_environment_pair(metafunc)
922
923         if "heat_volume_pair" in metafunc.fixturenames:
924             from .parametrizers import parametrize_heat_volume_pair
925
926             parametrize_heat_volume_pair(metafunc)
927
928         if "yaml_files" in metafunc.fixturenames:
929             from .parametrizers import parametrize_yaml_files
930
931             parametrize_yaml_files(metafunc)
932
933         if "env_files" in metafunc.fixturenames:
934             from .parametrizers import parametrize_environment_files
935
936             parametrize_environment_files(metafunc)
937
938         if "yaml_file" in metafunc.fixturenames:
939             from .parametrizers import parametrize_yaml_file
940
941             parametrize_yaml_file(metafunc)
942
943         if "env_file" in metafunc.fixturenames:
944             from .parametrizers import parametrize_environment_file
945
946             parametrize_environment_file(metafunc)
947
948         if "parsed_yaml_file" in metafunc.fixturenames:
949             from .parametrizers import parametrize_parsed_yaml_file
950
951             parametrize_parsed_yaml_file(metafunc)
952
953         if "parsed_environment_file" in metafunc.fixturenames:
954             from .parametrizers import parametrize_parsed_environment_file
955
956             parametrize_parsed_environment_file(metafunc)
957
958         if "heat_template" in metafunc.fixturenames:
959             from .parametrizers import parametrize_heat_template
960
961             parametrize_heat_template(metafunc)
962
963         if "heat_templates" in metafunc.fixturenames:
964             from .parametrizers import parametrize_heat_templates
965
966             parametrize_heat_templates(metafunc)
967
968         if "volume_template" in metafunc.fixturenames:
969             from .parametrizers import parametrize_volume_template
970
971             parametrize_volume_template(metafunc)
972
973         if "volume_templates" in metafunc.fixturenames:
974             from .parametrizers import parametrize_volume_templates
975
976             parametrize_volume_templates(metafunc)
977
978         if "template" in metafunc.fixturenames:
979             from .parametrizers import parametrize_template
980
981             parametrize_template(metafunc)
982
983         if "templates" in metafunc.fixturenames:
984             from .parametrizers import parametrize_templates
985
986             parametrize_templates(metafunc)
987     except Exception as e:
988         # If an error occurs in the collection phase, then it won't be logged as a
989         # normal test failure.  This means that failures could occur, but not
990         # be seen on the report resulting in a false positive success message.  These
991         # errors will be stored and reported separately on the report
992         COLLECTION_FAILURES.append(
993             {
994                 "module": metafunc.module.__name__,
995                 "test": metafunc.function.__name__,
996                 "fixtures": metafunc.fixturenames,
997                 "error": traceback.format_exc(),
998                 "requirements": getattr(metafunc.function, "requirement_ids", []),
999             }
1000         )
1001         raise e
1002
1003
1004 def hash_directory(path):
1005     """
1006     Create md5 hash using the contents of all files under ``path``
1007     :param path: string directory containing files
1008     :return: string MD5 hash code (hex)
1009     """
1010     md5 = hashlib.md5()
1011     for dir_path, sub_dirs, filenames in os.walk(path):
1012         for filename in filenames:
1013             file_path = os.path.join(dir_path, filename)
1014             with open(file_path, "rb") as f:
1015                 md5.update(f.read())
1016     return md5.hexdigest()
1017
1018
1019 def load_current_requirements():
1020     """Loads dict of current requirements or empty dict if file doesn't exist"""
1021     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1022         data = json.load(f)
1023         version = data["current_version"]
1024         return data["versions"][version]["needs"]
1025
1026
1027 def select_heat_requirements(reqs):
1028     """Filters dict requirements to only those requirements pertaining to Heat"""
1029     return {k: v for k, v in reqs.items() if "Heat" in v["docname"]}
1030
1031
1032 def build_rst_json(reqs):
1033     """Takes requirements and returns list of only Heat requirements"""
1034     data = json.loads(reqs)
1035     for key, values in list(data.items()):
1036         if "Heat" in (values["docname"]):
1037             if "MUST" in (values["keyword"]):
1038                 if "none" in (values["validation_mode"]):
1039                     del data[key]
1040                 else:
1041                     """Creates links in RST format to requirements and test cases"""
1042                     if values["test_case"]:
1043                         val_list = re.findall(r'(?<=\.).*', values["test_case"])
1044                         val = TEST_SCRIPT_SITE + val_list[0] + ".py"
1045                         rst_value = ("`" + val_list[0] + " <" + val + ">`_")
1046                         title = "`" + values["id"] + " <" + VNFRQTS_ID_URL + values["docname"].replace(" ", "%20") + ".html#" + values["id"] + ">`_"
1047                         data[key].update({'full_title': title, 'test_case': rst_value})
1048                     else:
1049                         del data[key]
1050             else:
1051                 del data[key]
1052         else:
1053             del data[key]
1054     return data
1055
1056
1057 def generate_rst_table(data):
1058     """Generate a formatted csv to be used in RST"""
1059     rst_path = os.path.join(__path__[0], "../output/rst.csv")
1060     with open(rst_path, "w", newline="") as f:
1061         out = csv.writer(f)
1062         out.writerow(
1063             ("Requirement ID", "Requirement", "Test Module", "Test Name"),
1064         )
1065         for req_id, metadata in data.items():
1066             out.writerow(
1067                 (
1068                     metadata["full_title"],
1069                     metadata["description"],
1070                     metadata["test_case"],
1071                     metadata["validated_by"],
1072                 )
1073             )
1074
1075
1076 # noinspection PyUnusedLocal
1077 def pytest_report_collectionfinish(config, startdir, items):
1078     """Generates a simple traceability report to output/traceability.csv"""
1079     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1080     output_dir = os.path.split(traceability_path)[0]
1081     if not os.path.exists(output_dir):
1082         os.makedirs(output_dir)
1083     reqs = load_current_requirements()
1084     requirements = select_heat_requirements(reqs)
1085     unmapped, mapped = partition(
1086         lambda i: hasattr(i.function, "requirement_ids"), items
1087     )
1088
1089     req_to_test = defaultdict(set)
1090     mapping_errors = set()
1091     for item in mapped:
1092         for req_id in item.function.requirement_ids:
1093             if req_id not in req_to_test:
1094                 req_to_test[req_id].add(item)
1095                 if req_id in requirements:
1096                     reqs[req_id].update({'test_case': item.function.__module__, 'validated_by': item.function.__name__})
1097             if req_id not in requirements:
1098                 mapping_errors.add(
1099                     (req_id, item.function.__module__, item.function.__name__)
1100                 )
1101
1102     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1103     with open(mapping_error_path, "w", newline="") as f:
1104         writer = csv.writer(f)
1105         for err in mapping_errors:
1106             writer.writerow(err)
1107
1108     with open(traceability_path, "w", newline="") as f:
1109         out = csv.writer(f)
1110         out.writerow(
1111             ("Requirement ID", "Requirement", "Section",
1112              "Keyword", "Validation Mode", "Is Testable",
1113              "Test Module", "Test Name"),
1114         )
1115         for req_id, metadata in requirements.items():
1116             keyword = metadata["keyword"].upper()
1117             mode = metadata["validation_mode"].lower()
1118             testable = keyword in {"MUST", "MUST NOT"} and mode != "none"
1119             if req_to_test[req_id]:
1120                 for item in req_to_test[req_id]:
1121                     out.writerow(
1122                         (
1123                             req_id,
1124                             metadata["description"],
1125                             metadata["section_name"],
1126                             keyword,
1127                             mode,
1128                             "TRUE" if testable else "FALSE",
1129                             item.function.__module__,
1130                             item.function.__name__,
1131                         ),
1132                     )
1133             else:
1134                 out.writerow(
1135                     (req_id,
1136                      metadata["description"],
1137                      metadata["section_name"],
1138                      keyword,
1139                      mode,
1140                      "TRUE" if testable else "FALSE",
1141                      "",   # test module
1142                      ""),  # test function
1143                 )
1144         # now write out any test methods that weren't mapped to requirements
1145         unmapped_tests = {(item.function.__module__, item.function.__name__)
1146                           for item in unmapped}
1147         for test_module, test_name in unmapped_tests:
1148             out.writerow(
1149                 ("",        # req ID
1150                  "",        # description
1151                  "",        # section name
1152                  "",        # keyword
1153                  "static",  # validation mode
1154                  "TRUE",    # testable
1155                  test_module,
1156                  test_name)
1157             )
1158
1159     generate_rst_table(build_rst_json(json.dumps(reqs)))