46680459f1667c7f69c786cc82151be20a07cb05
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47 from itertools import chain
48
49 import traceback
50
51 import docutils.core
52 import jinja2
53 import pytest
54 from more_itertools import partition
55 import xlsxwriter
56 from six import string_types
57
58 import version
59
60 __path__ = [os.path.dirname(os.path.abspath(__file__))]
61
62 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
63
64 RESOLUTION_STEPS_FILE = "resolution_steps.json"
65 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
66 TEST_SCRIPT_SITE = "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
67 VNFRQTS_ID_URL = "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
68
69 REPORT_COLUMNS = [
70     ("Input File", "file"),
71     ("Test", "test_file"),
72     ("Requirements", "req_description"),
73     ("Resolution Steps", "resolution_steps"),
74     ("Error Message", "message"),
75     ("Raw Test Output", "raw_output"),
76 ]
77
78 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
79 while preparing to validate the the input files. Some validations may not have been
80 executed. Please refer these issue to the VNF Validation Tool team.
81 """
82
83 COLLECTION_FAILURES = []
84
85 # Captures the results of every test run
86 ALL_RESULTS = []
87
88
89 def get_output_dir(config):
90     """
91     Retrieve the output directory for the reports and create it if necessary
92     :param config: pytest configuration
93     :return: output directory as string
94     """
95     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
96     if not os.path.exists(output_dir):
97         os.makedirs(output_dir, exist_ok=True)
98     return output_dir
99
100
101 def extract_error_msg(rep):
102     """
103     If a custom error message was provided, then extract it otherwise
104     just show the pytest assert message
105     """
106     if rep.outcome != "failed":
107         return ""
108     try:
109         full_msg = str(rep.longrepr.reprcrash.message)
110         match = re.match(
111             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
112         )
113         if match:  # custom message was provided
114             # Extract everything between AssertionError and the start
115             # of the assert statement expansion in the pytest report
116             msg = match.group(1)
117         else:
118             msg = str(rep.longrepr.reprcrash)
119             if "AssertionError:" in msg:
120                 msg = msg.split("AssertionError:")[1]
121     except AttributeError:
122         msg = str(rep)
123
124     return msg
125
126
127 class TestResult:
128     """
129     Wraps the test case and result to extract necessary metadata for
130     reporting purposes.
131     """
132
133     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
134
135     def __init__(self, item, outcome):
136         self.item = item
137         self.result = outcome.get_result()
138         self.files = [os.path.normpath(p) for p in self._get_files()]
139         self.error_message = self._get_error_message()
140
141     @property
142     def requirement_ids(self):
143         """
144         Returns list of requirement IDs mapped to the test case.
145
146         :return: Returns a list of string requirement IDs the test was
147                  annotated with ``validates`` otherwise returns and empty list
148         """
149         is_mapped = hasattr(self.item.function, "requirement_ids")
150         return self.item.function.requirement_ids if is_mapped else []
151
152     @property
153     def markers(self):
154         """
155         :return: Returns a set of pytest marker names for the test or an empty set
156         """
157         return set(m.name for m in self.item.iter_markers())
158
159     @property
160     def is_base_test(self):
161         """
162         :return: Returns True if the test is annotated with a pytest marker called base
163         """
164         return "base" in self.markers
165
166     @property
167     def is_failed(self):
168         """
169         :return: True if the test failed
170         """
171         return self.outcome == "FAIL"
172
173     @property
174     def outcome(self):
175         """
176         :return: Returns 'PASS', 'FAIL', or 'SKIP'
177         """
178         return self.RESULT_MAPPING[self.result.outcome]
179
180     @property
181     def test_case(self):
182         """
183         :return: Name of the test case method
184         """
185         return self.item.function.__name__
186
187     @property
188     def test_module(self):
189         """
190         :return: Name of the file containing the test case
191         """
192         return self.item.function.__module__.split(".")[-1]
193
194     @property
195     def raw_output(self):
196         """
197         :return: Full output from pytest for the given test case
198         """
199         return str(self.result.longrepr)
200
201     def requirement_text(self, curr_reqs):
202         """
203         Creates a text summary for the requirement IDs mapped to the test case.
204         If no requirements are mapped, then it returns the empty string.
205
206         :param curr_reqs: mapping of requirement IDs to requirement metadata
207                           loaded from the VNFRQTS projects needs.json output
208         :return: ID and text of the requirements mapped to the test case
209         """
210         text = (
211             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
212             for r_id in self.requirement_ids
213         )
214         return "".join(text)
215
216     def requirements_metadata(self, curr_reqs):
217         """
218         Returns a list of dicts containing the following metadata for each
219         requirement mapped:
220
221         - id: Requirement ID
222         - text: Full text of the requirement
223         - keyword: MUST, MUST NOT, MAY, etc.
224
225         :param curr_reqs: mapping of requirement IDs to requirement metadata
226                           loaded from the VNFRQTS projects needs.json output
227         :return: List of requirement metadata
228         """
229         data = []
230         for r_id in self.requirement_ids:
231             if r_id not in curr_reqs:
232                 continue
233             data.append(
234                 {
235                     "id": r_id,
236                     "text": curr_reqs[r_id]["description"],
237                     "keyword": curr_reqs[r_id]["keyword"],
238                 }
239             )
240         return data
241
242     def resolution_steps(self, resolutions):
243         """
244         :param resolutions: Loaded from contents for resolution_steps.json
245         :return: Header and text for the resolution step associated with this
246                  test case.  Returns empty string if no resolutions are
247                  provided.
248         """
249         text = (
250             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
251             for entry in resolutions
252             if self._match(entry)
253         )
254         return "".join(text)
255
256     def _match(self, resolution_entry):
257         """
258         Returns True if the test result maps to the given entry in
259         the resolutions file
260         """
261         return (
262             self.test_case == resolution_entry["function"]
263             and self.test_module == resolution_entry["module"]
264         )
265
266     def _get_files(self):
267         """
268         Extracts the list of files passed into the test case.
269         :return: List of absolute paths to files
270         """
271         if "environment_pair" in self.item.fixturenames:
272             return [
273                 "{} environment pair".format(
274                     self.item.funcargs["environment_pair"]["name"]
275                 )
276             ]
277         elif "heat_volume_pair" in self.item.fixturenames:
278             return [
279                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
280             ]
281         elif "heat_templates" in self.item.fixturenames:
282             return self.item.funcargs["heat_templates"]
283         elif "yaml_files" in self.item.fixturenames:
284             return self.item.funcargs["yaml_files"]
285         else:
286             parts = self.result.nodeid.split("[")
287             return "" if len(parts) == 1 else parts[1][:-1]
288
289     def _get_error_message(self):
290         """
291         :return: Error message or empty string if the test did not fail or error
292         """
293         if self.is_failed:
294             return extract_error_msg(self.result)
295         else:
296             return ""
297
298
299 # noinspection PyUnusedLocal
300 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
301 def pytest_runtest_makereport(item, call):
302     """
303     Captures the test results for later reporting.  This will also halt testing
304     if a base failure is encountered (can be overridden with continue-on-failure)
305     """
306     outcome = yield
307     if outcome.get_result().when != "call":
308         return  # only capture results of test cases themselves
309     result = TestResult(item, outcome)
310     ALL_RESULTS.append(result)
311     if (
312         not item.config.option.continue_on_failure
313         and result.is_base_test
314         and result.is_failed
315     ):
316         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
317             result.error_message
318         )
319         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
320
321
322 def make_timestamp():
323     """
324     :return: String make_iso_timestamp in format:
325              2019-01-19 10:18:49.865000 Central Standard Time
326     """
327     timezone = time.tzname[time.localtime().tm_isdst]
328     return "{} {}".format(str(datetime.datetime.now()), timezone)
329
330
331 # noinspection PyUnusedLocal
332 def pytest_sessionstart(session):
333     ALL_RESULTS.clear()
334     COLLECTION_FAILURES.clear()
335
336
337 # noinspection PyUnusedLocal
338 def pytest_sessionfinish(session, exitstatus):
339     """
340     If not a self-test run, generate the output reports
341     """
342     if not session.config.option.template_dir:
343         return
344
345     if session.config.option.template_source:
346         template_source = session.config.option.template_source[0]
347     else:
348         template_source = os.path.abspath(session.config.option.template_dir[0])
349
350     categories_selected = session.config.option.test_categories or ""
351     generate_report(
352         get_output_dir(session.config),
353         template_source,
354         categories_selected,
355         session.config.option.report_format,
356     )
357
358
359 # noinspection PyUnusedLocal
360 def pytest_collection_modifyitems(session, config, items):
361     """
362     Selects tests based on the categories requested.  Tests without
363     categories will always be executed.
364     """
365     config.traceability_items = list(items)  # save all items for traceability
366     if not config.option.self_test:
367         for item in items:
368             # checking if test belongs to a category
369             if hasattr(item.function, "categories"):
370                 if config.option.test_categories:
371                     test_categories = getattr(item.function, "categories")
372                     passed_categories = config.option.test_categories
373                     if not all(
374                         category in passed_categories for category in test_categories
375                     ):
376                         item.add_marker(
377                             pytest.mark.skip(
378                                 reason="Test categories do not match all the passed categories"
379                             )
380                         )
381                 else:
382                     item.add_marker(
383                         pytest.mark.skip(
384                             reason="Test belongs to a category but no categories were passed"
385                         )
386                     )
387     items.sort(
388         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
389     )
390
391
392 def make_href(paths):
393     """
394     Create an anchor tag to link to the file paths provided.
395     :param paths: string or list of file paths
396     :return: String of hrefs - one for each path, each seperated by a line
397              break (<br/).
398     """
399     paths = [paths] if isinstance(paths, string_types) else paths
400     links = []
401     for p in paths:
402         abs_path = os.path.abspath(p)
403         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
404         links.append(
405             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
406                 abs_path=abs_path, name=name
407             )
408         )
409     return "<br/>".join(links)
410
411
412 def load_resolutions_file():
413     """
414     :return: dict of data loaded from resolutions_steps.json
415     """
416     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
417     if os.path.exists(resolution_steps):
418         with open(resolution_steps, "r") as f:
419             return json.loads(f.read())
420
421
422 def generate_report(outpath, template_path, categories, output_format="html"):
423     """
424     Generates the various output reports.
425
426     :param outpath: destination directory for all reports
427     :param template_path: directory containing the Heat templates validated
428     :param categories: Optional categories selected
429     :param output_format: One of "html", "excel", or "csv". Default is "html"
430     :raises: ValueError if requested output format is unknown
431     """
432     failures = [r for r in ALL_RESULTS if r.is_failed]
433     generate_failure_file(outpath)
434     output_format = output_format.lower().strip() if output_format else "html"
435     if output_format == "html":
436         generate_html_report(outpath, categories, template_path, failures)
437     elif output_format == "excel":
438         generate_excel_report(outpath, categories, template_path, failures)
439     elif output_format == "json":
440         generate_json(outpath, template_path, categories)
441     elif output_format == "csv":
442         generate_csv_report(outpath, categories, template_path, failures)
443     else:
444         raise ValueError("Unsupported output format: " + output_format)
445
446
447 def write_json(data, path):
448     """
449     Pretty print data as JSON to the output path requested
450
451     :param data: Data structure to be converted to JSON
452     :param path: Where to write output
453     """
454     with open(path, "w") as f:
455         json.dump(data, f, indent=2)
456
457
458 def generate_failure_file(outpath):
459     """
460     Writes a summary of test failures to a file named failures.
461     This is for backwards compatibility only.  The report.json offers a
462     more comprehensive output.
463     """
464     failure_path = os.path.join(outpath, "failures")
465     failures = [r for r in ALL_RESULTS if r.is_failed]
466     data = {}
467     for i, fail in enumerate(failures):
468         data[str(i)] = {
469             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
470             "vnfrqts": fail.requirement_ids,
471             "test": fail.test_case,
472             "test_file": fail.test_module,
473             "raw_output": fail.raw_output,
474             "message": fail.error_message,
475         }
476     write_json(data, failure_path)
477
478
479 def generate_csv_report(output_dir, categories, template_path, failures):
480     rows = [["Validation Failures"]]
481     headers = [
482         ("Categories Selected:", categories),
483         ("Tool Version:", version.VERSION),
484         ("Report Generated At:", make_timestamp()),
485         ("Directory Validated:", template_path),
486         ("Checksum:", hash_directory(template_path)),
487         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
488     ]
489     rows.append([])
490     for header in headers:
491         rows.append(header)
492     rows.append([])
493
494     if COLLECTION_FAILURES:
495         rows.append([COLLECTION_FAILURE_WARNING])
496         rows.append(["Validation File", "Test", "Fixtures", "Error"])
497         for failure in COLLECTION_FAILURES:
498             rows.append(
499                 [
500                     failure["module"],
501                     failure["test"],
502                     ";".join(failure["fixtures"]),
503                     failure["error"],
504                 ]
505             )
506         rows.append([])
507
508     # table header
509     rows.append([col for col, _ in REPORT_COLUMNS])
510
511     reqs = load_current_requirements()
512     resolutions = load_resolutions_file()
513
514     # table content
515     for failure in failures:
516         rows.append(
517             [
518                 "\n".join(failure.files),
519                 failure.test_module,
520                 failure.requirement_text(reqs),
521                 failure.resolution_steps(resolutions),
522                 failure.error_message,
523                 failure.raw_output,
524             ]
525         )
526
527     output_path = os.path.join(output_dir, "report.csv")
528     with open(output_path, "w", newline="") as f:
529         writer = csv.writer(f)
530         for row in rows:
531             writer.writerow(row)
532
533
534 def generate_excel_report(output_dir, categories, template_path, failures):
535     output_path = os.path.join(output_dir, "report.xlsx")
536     workbook = xlsxwriter.Workbook(output_path)
537     bold = workbook.add_format({"bold": True})
538     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
539     normal = workbook.add_format({"text_wrap": True})
540     heading = workbook.add_format({"bold": True, "font_size": 18})
541     worksheet = workbook.add_worksheet("failures")
542     worksheet.write(0, 0, "Validation Failures", heading)
543
544     headers = [
545         ("Categories Selected:", ",".join(categories)),
546         ("Tool Version:", version.VERSION),
547         ("Report Generated At:", make_timestamp()),
548         ("Directory Validated:", template_path),
549         ("Checksum:", hash_directory(template_path)),
550         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
551     ]
552     for row, (header, value) in enumerate(headers, start=2):
553         worksheet.write(row, 0, header, bold)
554         worksheet.write(row, 1, value)
555
556     worksheet.set_column(0, len(headers) - 1, 40)
557     worksheet.set_column(len(headers), len(headers), 80)
558
559     if COLLECTION_FAILURES:
560         collection_failures_start = 2 + len(headers) + 2
561         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
562         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
563         for col_num, col_name in enumerate(collection_failure_headers):
564             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
565         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
566             worksheet.write(row, 0, data["module"])
567             worksheet.write(row, 1, data["test"])
568             worksheet.write(row, 2, ",".join(data["fixtures"]))
569             worksheet.write(row, 3, data["error"], code)
570
571     # table header
572     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
573     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
574     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
575         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
576
577     reqs = load_current_requirements()
578     resolutions = load_resolutions_file()
579
580     # table content
581     for row, failure in enumerate(failures, start=start_error_table_row + 2):
582         worksheet.write(row, 0, "\n".join(failure.files), normal)
583         worksheet.write(row, 1, failure.test_module, normal)
584         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
585         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
586         worksheet.write(row, 4, failure.error_message, normal)
587         worksheet.write(row, 5, failure.raw_output, code)
588
589     workbook.close()
590
591
592 def make_iso_timestamp():
593     """
594     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
595     """
596     now = datetime.datetime.utcnow()
597     now.replace(tzinfo=datetime.timezone.utc)
598     return now.isoformat()
599
600
601 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
602     """
603     Examines all tests associated with a given requirement and determines
604     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
605
606     * ERROR - At least one ERROR occurred
607     * PASS -  At least one PASS and no FAIL or ERRORs.
608     * FAIL -  At least one FAIL occurred (no ERRORs)
609     * SKIP - All tests were SKIP
610
611
612     :param r_id: Requirement ID to examing
613     :param collection_failures: Errors that occurred during test setup.
614     :param test_results: List of TestResult
615     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
616     """
617     errors = any(r_id in f["requirements"] for f in collection_failures)
618     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
619     return aggregate_results(errors, outcomes, r_id)
620
621
622 def aggregate_results(has_errors, outcomes, r_id=None):
623     """
624     Determines the aggregate result for the conditions provided.  Assumes the
625     results have been filtered and collected for analysis.
626
627     :param has_errors: True if collection failures occurred for the tests being
628                        analyzed.
629     :param outcomes: set of outcomes from the TestResults
630     :param r_id: Optional requirement ID if known
631     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
632              (see aggregate_requirement_adherence for more detail)
633     """
634     if has_errors:
635         return "ERROR"
636
637     if not outcomes:
638         return "PASS"
639     elif "FAIL" in outcomes:
640         return "FAIL"
641     elif "PASS" in outcomes:
642         return "PASS"
643     elif {"SKIP"} == outcomes:
644         return "SKIP"
645     else:
646         pytest.warns(
647             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
648                 outcomes, r_id
649             )
650         )
651         return "ERROR"
652
653
654 def aggregate_run_results(collection_failures, test_results):
655     """
656     Determines overall status of run based on all failures and results.
657
658     * 'ERROR' - At least one collection failure occurred during the run.
659     * 'FAIL' - Template failed at least one test
660     * 'PASS' - All tests executed properly and no failures were detected
661
662     :param collection_failures: failures occuring during test setup
663     :param test_results: list of all test executuion results
664     :return: one of 'ERROR', 'FAIL', or 'PASS'
665     """
666     if collection_failures:
667         return "ERROR"
668     elif any(r.is_failed for r in test_results):
669         return "FAIL"
670     else:
671         return "PASS"
672
673
674 def error(failure_or_result):
675     """
676     Extracts the error message from a collection failure or test result
677     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
678     :return: Error message as string
679     """
680     if isinstance(failure_or_result, TestResult):
681         return failure_or_result.error_message
682     else:
683         return failure_or_result["error"]
684
685
686 def req_ids(failure_or_result):
687     """
688     Extracts the requirement IDs from a collection failure or test result
689     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
690     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
691     """
692     if isinstance(failure_or_result, TestResult):
693         return set(failure_or_result.requirement_ids)
694     else:
695         return set(failure_or_result["requirements"])
696
697
698 def collect_errors(r_id, collection_failures, test_result):
699     """
700     Creates a list of error messages from the collection failures and
701     test results.  If r_id is provided, then it collects the error messages
702     where the failure or test is associated with that requirement ID.  If
703     r_id is None, then it collects all errors that occur on failures and
704     results that are not mapped to requirements
705     """
706
707     def selector(item):
708         if r_id:
709             return r_id in req_ids(item)
710         else:
711             return not req_ids(item)
712
713     errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
714     return [e for e in errors if e]
715
716
717 def generate_json(outpath, template_path, categories):
718     """
719     Creates a JSON summary of the entire test run.
720     """
721     reqs = load_current_requirements()
722     data = {
723         "version": "dublin",
724         "template_directory": template_path,
725         "timestamp": make_iso_timestamp(),
726         "checksum": hash_directory(template_path),
727         "categories": categories,
728         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
729         "tests": [],
730         "requirements": [],
731     }
732
733     results = data["tests"]
734     for result in COLLECTION_FAILURES:
735         results.append(
736             {
737                 "files": [],
738                 "test_module": result["module"],
739                 "test_case": result["test"],
740                 "result": "ERROR",
741                 "error": result["error"],
742                 "requirements": result["requirements"],
743             }
744         )
745     for result in ALL_RESULTS:
746         results.append(
747             {
748                 "files": result.files,
749                 "test_module": result.test_module,
750                 "test_case": result.test_case,
751                 "result": result.outcome,
752                 "error": result.error_message if result.is_failed else "",
753                 "requirements": result.requirements_metadata(reqs),
754             }
755         )
756
757     requirements = data["requirements"]
758     for r_id, r_data in reqs.items():
759         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
760         if result:
761             requirements.append(
762                 {
763                     "id": r_id,
764                     "text": r_data["description"],
765                     "keyword": r_data["keyword"],
766                     "result": result,
767                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
768                 }
769             )
770     # If there are tests that aren't mapped to a requirement, then we'll
771     # map them to a special entry so the results are coherent.
772     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
773     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
774     if unmapped_outcomes or has_errors:
775         requirements.append(
776             {
777                 "id": "Unmapped",
778                 "text": "Tests not mapped to requirements (see tests)",
779                 "result": aggregate_results(has_errors, unmapped_outcomes),
780                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
781             }
782         )
783
784     report_path = os.path.join(outpath, "report.json")
785     write_json(data, report_path)
786
787
788 def generate_html_report(outpath, categories, template_path, failures):
789     reqs = load_current_requirements()
790     resolutions = load_resolutions_file()
791     fail_data = []
792     for failure in failures:
793         fail_data.append(
794             {
795                 "file_links": make_href(failure.files),
796                 "test_id": failure.test_module,
797                 "error_message": failure.error_message,
798                 "raw_output": failure.raw_output,
799                 "requirements": docutils.core.publish_parts(
800                     writer_name="html", source=failure.requirement_text(reqs)
801                 )["body"],
802                 "resolution_steps": failure.resolution_steps(resolutions),
803             }
804         )
805     pkg_dir = os.path.split(__file__)[0]
806     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
807     with open(j2_template_path, "r") as f:
808         report_template = jinja2.Template(f.read())
809         contents = report_template.render(
810             version=version.VERSION,
811             num_failures=len(failures) + len(COLLECTION_FAILURES),
812             categories=categories,
813             template_dir=make_href(template_path),
814             checksum=hash_directory(template_path),
815             timestamp=make_timestamp(),
816             failures=fail_data,
817             collection_failures=COLLECTION_FAILURES,
818         )
819     with open(os.path.join(outpath, "report.html"), "w") as f:
820         f.write(contents)
821
822
823 def pytest_addoption(parser):
824     """
825     Add needed CLI arguments
826     """
827     parser.addoption(
828         "--template-directory",
829         dest="template_dir",
830         action="append",
831         help="Directory which holds the templates for validation",
832     )
833
834     parser.addoption(
835         "--template-source",
836         dest="template_source",
837         action="append",
838         help="Source Directory which holds the templates for validation",
839     )
840
841     parser.addoption(
842         "--self-test",
843         dest="self_test",
844         action="store_true",
845         help="Test the unit tests against their fixtured data",
846     )
847
848     parser.addoption(
849         "--report-format",
850         dest="report_format",
851         action="store",
852         help="Format of output report (html, csv, excel, json)",
853     )
854
855     parser.addoption(
856         "--continue-on-failure",
857         dest="continue_on_failure",
858         action="store_true",
859         help="Continue validation even when structural errors exist in input files",
860     )
861
862     parser.addoption(
863         "--output-directory",
864         dest="output_dir",
865         action="store",
866         default=None,
867         help="Alternate ",
868     )
869
870     parser.addoption(
871         "--category",
872         dest="test_categories",
873         action="append",
874         help="optional category of test to execute",
875     )
876
877
878 def pytest_configure(config):
879     """
880     Ensure that we are receive either `--self-test` or
881     `--template-dir=<directory` as CLI arguments
882     """
883     if config.getoption("template_dir") and config.getoption("self_test"):
884         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
885     if not (
886         config.getoption("template_dir")
887         or config.getoption("self_test")
888         or config.getoption("help")
889     ):
890         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
891
892
893 def pytest_generate_tests(metafunc):
894     """
895     If a unit test requires an argument named 'filename'
896     we generate a test for the filenames selected. Either
897     the files contained in `template_dir` or if `template_dir`
898     is not specified on the CLI, the fixtures associated with this
899     test name.
900     """
901
902     # noinspection PyBroadException
903     try:
904         if "filename" in metafunc.fixturenames:
905             from .parametrizers import parametrize_filename
906
907             parametrize_filename(metafunc)
908
909         if "filenames" in metafunc.fixturenames:
910             from .parametrizers import parametrize_filenames
911
912             parametrize_filenames(metafunc)
913
914         if "template_dir" in metafunc.fixturenames:
915             from .parametrizers import parametrize_template_dir
916
917             parametrize_template_dir(metafunc)
918
919         if "environment_pair" in metafunc.fixturenames:
920             from .parametrizers import parametrize_environment_pair
921
922             parametrize_environment_pair(metafunc)
923
924         if "heat_volume_pair" in metafunc.fixturenames:
925             from .parametrizers import parametrize_heat_volume_pair
926
927             parametrize_heat_volume_pair(metafunc)
928
929         if "yaml_files" in metafunc.fixturenames:
930             from .parametrizers import parametrize_yaml_files
931
932             parametrize_yaml_files(metafunc)
933
934         if "env_files" in metafunc.fixturenames:
935             from .parametrizers import parametrize_environment_files
936
937             parametrize_environment_files(metafunc)
938
939         if "yaml_file" in metafunc.fixturenames:
940             from .parametrizers import parametrize_yaml_file
941
942             parametrize_yaml_file(metafunc)
943
944         if "env_file" in metafunc.fixturenames:
945             from .parametrizers import parametrize_environment_file
946
947             parametrize_environment_file(metafunc)
948
949         if "parsed_yaml_file" in metafunc.fixturenames:
950             from .parametrizers import parametrize_parsed_yaml_file
951
952             parametrize_parsed_yaml_file(metafunc)
953
954         if "parsed_environment_file" in metafunc.fixturenames:
955             from .parametrizers import parametrize_parsed_environment_file
956
957             parametrize_parsed_environment_file(metafunc)
958
959         if "heat_template" in metafunc.fixturenames:
960             from .parametrizers import parametrize_heat_template
961
962             parametrize_heat_template(metafunc)
963
964         if "heat_templates" in metafunc.fixturenames:
965             from .parametrizers import parametrize_heat_templates
966
967             parametrize_heat_templates(metafunc)
968
969         if "volume_template" in metafunc.fixturenames:
970             from .parametrizers import parametrize_volume_template
971
972             parametrize_volume_template(metafunc)
973
974         if "volume_templates" in metafunc.fixturenames:
975             from .parametrizers import parametrize_volume_templates
976
977             parametrize_volume_templates(metafunc)
978
979         if "template" in metafunc.fixturenames:
980             from .parametrizers import parametrize_template
981
982             parametrize_template(metafunc)
983
984         if "templates" in metafunc.fixturenames:
985             from .parametrizers import parametrize_templates
986
987             parametrize_templates(metafunc)
988     except Exception as e:
989         # If an error occurs in the collection phase, then it won't be logged as a
990         # normal test failure.  This means that failures could occur, but not
991         # be seen on the report resulting in a false positive success message.  These
992         # errors will be stored and reported separately on the report
993         COLLECTION_FAILURES.append(
994             {
995                 "module": metafunc.module.__name__,
996                 "test": metafunc.function.__name__,
997                 "fixtures": metafunc.fixturenames,
998                 "error": traceback.format_exc(),
999                 "requirements": getattr(metafunc.function, "requirement_ids", []),
1000             }
1001         )
1002         raise e
1003
1004
1005 def hash_directory(path):
1006     """
1007     Create md5 hash using the contents of all files under ``path``
1008     :param path: string directory containing files
1009     :return: string MD5 hash code (hex)
1010     """
1011     md5 = hashlib.md5()
1012     for dir_path, sub_dirs, filenames in os.walk(path):
1013         for filename in filenames:
1014             file_path = os.path.join(dir_path, filename)
1015             with open(file_path, "rb") as f:
1016                 md5.update(f.read())
1017     return md5.hexdigest()
1018
1019
1020 def load_current_requirements():
1021     """Loads dict of current requirements or empty dict if file doesn't exist"""
1022     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1023         data = json.load(f)
1024         version = data["current_version"]
1025         return data["versions"][version]["needs"]
1026
1027
1028 def select_heat_requirements(reqs):
1029     """Filters dict requirements to only those requirements pertaining to Heat"""
1030     return {k: v for k, v in reqs.items() if "Heat" in v["docname"]}
1031
1032
1033 def build_rst_json(reqs):
1034     """Takes requirements and returns list of only Heat requirements"""
1035     data = json.loads(reqs)
1036     for key, values in list(data.items()):
1037         if "Heat" in (values["docname"]):
1038             if "MUST" in (values["keyword"]):
1039                 if "none" in (values["validation_mode"]):
1040                     del data[key]
1041                 else:
1042                     # Creates links in RST format to requirements and test cases
1043                     if values["test_case"]:
1044                         val_list = re.findall(r'(?<=\.).*', values["test_case"])
1045                         val = TEST_SCRIPT_SITE + val_list[0] + ".py"
1046                         rst_value = ("`" + val_list[0] + " <" + val + ">`_")
1047                         title = "`" + values["id"] + " <" + VNFRQTS_ID_URL + values["docname"].replace(" ", "%20") + ".html#" + values["id"] + ">`_"
1048                         data[key].update({'full_title': title, 'test_case': rst_value})
1049                     else:
1050                         del data[key]
1051             else:
1052                 del data[key]
1053         else:
1054             del data[key]
1055     return data
1056
1057
1058 def generate_rst_table(data):
1059     """Generate a formatted csv to be used in RST"""
1060     rst_path = os.path.join(__path__[0], "../output/rst.csv")
1061     with open(rst_path, "w", newline="") as f:
1062         out = csv.writer(f)
1063         out.writerow(
1064             ("Requirement ID", "Requirement", "Test Module", "Test Name"),
1065         )
1066         for req_id, metadata in data.items():
1067             out.writerow(
1068                 (
1069                     metadata["full_title"],
1070                     metadata["description"],
1071                     metadata["test_case"],
1072                     metadata["validated_by"],
1073                 )
1074             )
1075
1076
1077 # noinspection PyUnusedLocal
1078 def pytest_report_collectionfinish(config, startdir, items):
1079     """Generates a simple traceability report to output/traceability.csv"""
1080     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1081     output_dir = os.path.split(traceability_path)[0]
1082     if not os.path.exists(output_dir):
1083         os.makedirs(output_dir)
1084     reqs = load_current_requirements()
1085     requirements = select_heat_requirements(reqs)
1086     unmapped, mapped = partition(
1087         lambda i: hasattr(i.function, "requirement_ids"), items
1088     )
1089
1090     req_to_test = defaultdict(set)
1091     mapping_errors = set()
1092     for item in mapped:
1093         for req_id in item.function.requirement_ids:
1094             if req_id not in req_to_test:
1095                 req_to_test[req_id].add(item)
1096                 if req_id in requirements:
1097                     reqs[req_id].update({'test_case': item.function.__module__,
1098                                          'validated_by': item.function.__name__})
1099             if req_id not in requirements:
1100                 mapping_errors.add(
1101                     (req_id, item.function.__module__, item.function.__name__)
1102                 )
1103
1104     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1105     with open(mapping_error_path, "w", newline="") as f:
1106         writer = csv.writer(f)
1107         for err in mapping_errors:
1108             writer.writerow(err)
1109
1110     with open(traceability_path, "w", newline="") as f:
1111         out = csv.writer(f)
1112         out.writerow(
1113             ("Requirement ID", "Requirement", "Section",
1114              "Keyword", "Validation Mode", "Is Testable",
1115              "Test Module", "Test Name"),
1116         )
1117         for req_id, metadata in requirements.items():
1118             keyword = metadata["keyword"].upper()
1119             mode = metadata["validation_mode"].lower()
1120             testable = keyword in {"MUST", "MUST NOT"} and mode != "none"
1121             if req_to_test[req_id]:
1122                 for item in req_to_test[req_id]:
1123                     out.writerow(
1124                         (
1125                             req_id,
1126                             metadata["description"],
1127                             metadata["section_name"],
1128                             keyword,
1129                             mode,
1130                             "TRUE" if testable else "FALSE",
1131                             item.function.__module__,
1132                             item.function.__name__,
1133                         ),
1134                     )
1135             else:
1136                 out.writerow(
1137                     (req_id,
1138                      metadata["description"],
1139                      metadata["section_name"],
1140                      keyword,
1141                      mode,
1142                      "TRUE" if testable else "FALSE",
1143                      "",   # test module
1144                      ""),  # test function
1145                 )
1146         # now write out any test methods that weren't mapped to requirements
1147         unmapped_tests = {(item.function.__module__, item.function.__name__) for item in
1148                           unmapped}
1149         for test_module, test_name in unmapped_tests:
1150             out.writerow(
1151                 ("",        # req ID
1152                  "",        # description
1153                  "",        # section name
1154                  "",        # keyword
1155                  "static",  # validation mode
1156                  "TRUE",    # testable
1157                  test_module,
1158                  test_name)
1159             )
1160
1161     generate_rst_table(build_rst_json(json.dumps(reqs)))