[VVP] Reports show test file and case now
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47 from itertools import chain
48
49 import traceback
50
51 import docutils.core
52 import jinja2
53 import pytest
54 from more_itertools import partition
55 import xlsxwriter
56 from six import string_types
57
58 import version
59
60 __path__ = [os.path.dirname(os.path.abspath(__file__))]
61
62 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
63
64 RESOLUTION_STEPS_FILE = "resolution_steps.json"
65 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
66 TEST_SCRIPT_SITE = (
67     "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
68 )
69 VNFRQTS_ID_URL = (
70     "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
71 )
72
73 REPORT_COLUMNS = [
74     ("Input File", "file"),
75     ("Test", "test_file"),
76     ("Requirements", "req_description"),
77     ("Resolution Steps", "resolution_steps"),
78     ("Error Message", "message"),
79     ("Raw Test Output", "raw_output"),
80 ]
81
82 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
83 while preparing to validate the the input files. Some validations may not have been
84 executed. Please refer these issue to the VNF Validation Tool team.
85 """
86
87 COLLECTION_FAILURES = []
88
89 # Captures the results of every test run
90 ALL_RESULTS = []
91
92
93 def get_output_dir(config):
94     """
95     Retrieve the output directory for the reports and create it if necessary
96     :param config: pytest configuration
97     :return: output directory as string
98     """
99     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
100     if not os.path.exists(output_dir):
101         os.makedirs(output_dir, exist_ok=True)
102     return output_dir
103
104
105 def extract_error_msg(rep):
106     """
107     If a custom error message was provided, then extract it otherwise
108     just show the pytest assert message
109     """
110     if rep.outcome != "failed":
111         return ""
112     try:
113         full_msg = str(rep.longrepr.reprcrash.message)
114         match = re.match(
115             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
116         )
117         if match:  # custom message was provided
118             # Extract everything between AssertionError and the start
119             # of the assert statement expansion in the pytest report
120             msg = match.group(1)
121         else:
122             msg = str(rep.longrepr.reprcrash)
123             if "AssertionError:" in msg:
124                 msg = msg.split("AssertionError:")[1]
125     except AttributeError:
126         msg = str(rep)
127
128     return msg
129
130
131 class TestResult:
132     """
133     Wraps the test case and result to extract necessary metadata for
134     reporting purposes.
135     """
136
137     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
138
139     def __init__(self, item, outcome):
140         self.item = item
141         self.result = outcome.get_result()
142         self.files = [os.path.normpath(p) for p in self._get_files()]
143         self.error_message = self._get_error_message()
144
145     @property
146     def requirement_ids(self):
147         """
148         Returns list of requirement IDs mapped to the test case.
149
150         :return: Returns a list of string requirement IDs the test was
151                  annotated with ``validates`` otherwise returns and empty list
152         """
153         is_mapped = hasattr(self.item.function, "requirement_ids")
154         return self.item.function.requirement_ids if is_mapped else []
155
156     @property
157     def markers(self):
158         """
159         :return: Returns a set of pytest marker names for the test or an empty set
160         """
161         return set(m.name for m in self.item.iter_markers())
162
163     @property
164     def is_base_test(self):
165         """
166         :return: Returns True if the test is annotated with a pytest marker called base
167         """
168         return "base" in self.markers
169
170     @property
171     def is_failed(self):
172         """
173         :return: True if the test failed
174         """
175         return self.outcome == "FAIL"
176
177     @property
178     def outcome(self):
179         """
180         :return: Returns 'PASS', 'FAIL', or 'SKIP'
181         """
182         return self.RESULT_MAPPING[self.result.outcome]
183
184     @property
185     def test_case(self):
186         """
187         :return: Name of the test case method
188         """
189         return self.item.function.__name__
190
191     @property
192     def test_module(self):
193         """
194         :return: Name of the file containing the test case
195         """
196         return self.item.function.__module__.split(".")[-1]
197
198     @property
199     def test_id(self):
200         """
201         :return: ID of the test (test_module + test_case)
202         """
203         return "{}::{}".format(self.test_module, self.test_case)
204
205     @property
206     def raw_output(self):
207         """
208         :return: Full output from pytest for the given test case
209         """
210         return str(self.result.longrepr)
211
212     def requirement_text(self, curr_reqs):
213         """
214         Creates a text summary for the requirement IDs mapped to the test case.
215         If no requirements are mapped, then it returns the empty string.
216
217         :param curr_reqs: mapping of requirement IDs to requirement metadata
218                           loaded from the VNFRQTS projects needs.json output
219         :return: ID and text of the requirements mapped to the test case
220         """
221         text = (
222             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
223             for r_id in self.requirement_ids
224             if r_id in curr_reqs
225         )
226         return "".join(text)
227
228     def requirements_metadata(self, curr_reqs):
229         """
230         Returns a list of dicts containing the following metadata for each
231         requirement mapped:
232
233         - id: Requirement ID
234         - text: Full text of the requirement
235         - keyword: MUST, MUST NOT, MAY, etc.
236
237         :param curr_reqs: mapping of requirement IDs to requirement metadata
238                           loaded from the VNFRQTS projects needs.json output
239         :return: List of requirement metadata
240         """
241         data = []
242         for r_id in self.requirement_ids:
243             if r_id not in curr_reqs:
244                 continue
245             data.append(
246                 {
247                     "id": r_id,
248                     "text": curr_reqs[r_id]["description"],
249                     "keyword": curr_reqs[r_id]["keyword"],
250                 }
251             )
252         return data
253
254     def resolution_steps(self, resolutions):
255         """
256         :param resolutions: Loaded from contents for resolution_steps.json
257         :return: Header and text for the resolution step associated with this
258                  test case.  Returns empty string if no resolutions are
259                  provided.
260         """
261         text = (
262             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
263             for entry in resolutions
264             if self._match(entry)
265         )
266         return "".join(text)
267
268     def _match(self, resolution_entry):
269         """
270         Returns True if the test result maps to the given entry in
271         the resolutions file
272         """
273         return (
274             self.test_case == resolution_entry["function"]
275             and self.test_module == resolution_entry["module"]
276         )
277
278     def _get_files(self):
279         """
280         Extracts the list of files passed into the test case.
281         :return: List of absolute paths to files
282         """
283         if "environment_pair" in self.item.fixturenames:
284             return [
285                 "{} environment pair".format(
286                     self.item.funcargs["environment_pair"]["name"]
287                 )
288             ]
289         elif "heat_volume_pair" in self.item.fixturenames:
290             return [
291                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
292             ]
293         elif "heat_templates" in self.item.fixturenames:
294             return self.item.funcargs["heat_templates"]
295         elif "yaml_files" in self.item.fixturenames:
296             return self.item.funcargs["yaml_files"]
297         else:
298             parts = self.result.nodeid.split("[")
299             return [""] if len(parts) == 1 else [parts[1][:-1]]
300
301     def _get_error_message(self):
302         """
303         :return: Error message or empty string if the test did not fail or error
304         """
305         if self.is_failed:
306             return extract_error_msg(self.result)
307         else:
308             return ""
309
310
311 # noinspection PyUnusedLocal
312 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
313 def pytest_runtest_makereport(item, call):
314     """
315     Captures the test results for later reporting.  This will also halt testing
316     if a base failure is encountered (can be overridden with continue-on-failure)
317     """
318     outcome = yield
319     if outcome.get_result().when != "call":
320         return  # only capture results of test cases themselves
321     result = TestResult(item, outcome)
322     if (
323         not item.config.option.continue_on_failure
324         and result.is_base_test
325         and result.is_failed
326     ):
327         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
328             result.error_message
329         )
330         result.error_message = msg
331         ALL_RESULTS.append(result)
332         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
333
334     ALL_RESULTS.append(result)
335
336
337 def make_timestamp():
338     """
339     :return: String make_iso_timestamp in format:
340              2019-01-19 10:18:49.865000 Central Standard Time
341     """
342     timezone = time.tzname[time.localtime().tm_isdst]
343     return "{} {}".format(str(datetime.datetime.now()), timezone)
344
345
346 # noinspection PyUnusedLocal
347 def pytest_sessionstart(session):
348     ALL_RESULTS.clear()
349     COLLECTION_FAILURES.clear()
350
351
352 # noinspection PyUnusedLocal
353 def pytest_sessionfinish(session, exitstatus):
354     """
355     If not a self-test run, generate the output reports
356     """
357     if not session.config.option.template_dir:
358         return
359
360     if session.config.option.template_source:
361         template_source = session.config.option.template_source[0]
362     else:
363         template_source = os.path.abspath(session.config.option.template_dir[0])
364
365     categories_selected = session.config.option.test_categories or ""
366     generate_report(
367         get_output_dir(session.config),
368         template_source,
369         categories_selected,
370         session.config.option.report_format,
371     )
372
373
374 # noinspection PyUnusedLocal
375 def pytest_collection_modifyitems(session, config, items):
376     """
377     Selects tests based on the categories requested.  Tests without
378     categories will always be executed.
379     """
380     config.traceability_items = list(items)  # save all items for traceability
381     if not config.option.self_test:
382         for item in items:
383             # checking if test belongs to a category
384             if hasattr(item.function, "categories"):
385                 if config.option.test_categories:
386                     test_categories = getattr(item.function, "categories")
387                     passed_categories = config.option.test_categories
388                     if not all(
389                         category in passed_categories for category in test_categories
390                     ):
391                         item.add_marker(
392                             pytest.mark.skip(
393                                 reason="Test categories do not match all the passed categories"
394                             )
395                         )
396                 else:
397                     item.add_marker(
398                         pytest.mark.skip(
399                             reason="Test belongs to a category but no categories were passed"
400                         )
401                     )
402     items.sort(
403         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
404     )
405
406
407 def make_href(paths):
408     """
409     Create an anchor tag to link to the file paths provided.
410     :param paths: string or list of file paths
411     :return: String of hrefs - one for each path, each seperated by a line
412              break (<br/).
413     """
414     paths = [paths] if isinstance(paths, string_types) else paths
415     links = []
416     for p in paths:
417         abs_path = os.path.abspath(p)
418         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
419         links.append(
420             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
421                 abs_path=abs_path, name=name
422             )
423         )
424     return "<br/>".join(links)
425
426
427 def load_resolutions_file():
428     """
429     :return: dict of data loaded from resolutions_steps.json
430     """
431     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
432     if os.path.exists(resolution_steps):
433         with open(resolution_steps, "r") as f:
434             return json.loads(f.read())
435
436
437 def generate_report(outpath, template_path, categories, output_format="html"):
438     """
439     Generates the various output reports.
440
441     :param outpath: destination directory for all reports
442     :param template_path: directory containing the Heat templates validated
443     :param categories: Optional categories selected
444     :param output_format: One of "html", "excel", or "csv". Default is "html"
445     :raises: ValueError if requested output format is unknown
446     """
447     failures = [r for r in ALL_RESULTS if r.is_failed]
448     generate_failure_file(outpath)
449     output_format = output_format.lower().strip() if output_format else "html"
450     generate_json(outpath, template_path, categories)
451     if output_format == "html":
452         generate_html_report(outpath, categories, template_path, failures)
453     elif output_format == "excel":
454         generate_excel_report(outpath, categories, template_path, failures)
455     elif output_format == "json":
456         return
457     elif output_format == "csv":
458         generate_csv_report(outpath, categories, template_path, failures)
459     else:
460         raise ValueError("Unsupported output format: " + output_format)
461
462
463 def write_json(data, path):
464     """
465     Pretty print data as JSON to the output path requested
466
467     :param data: Data structure to be converted to JSON
468     :param path: Where to write output
469     """
470     with open(path, "w") as f:
471         json.dump(data, f, indent=2)
472
473
474 def generate_failure_file(outpath):
475     """
476     Writes a summary of test failures to a file named failures.
477     This is for backwards compatibility only.  The report.json offers a
478     more comprehensive output.
479     """
480     failure_path = os.path.join(outpath, "failures")
481     failures = [r for r in ALL_RESULTS if r.is_failed]
482     data = {}
483     for i, fail in enumerate(failures):
484         data[str(i)] = {
485             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
486             "vnfrqts": fail.requirement_ids,
487             "test": fail.test_case,
488             "test_file": fail.test_module,
489             "raw_output": fail.raw_output,
490             "message": fail.error_message,
491         }
492     write_json(data, failure_path)
493
494
495 def generate_csv_report(output_dir, categories, template_path, failures):
496     rows = [["Validation Failures"]]
497     headers = [
498         ("Categories Selected:", categories),
499         ("Tool Version:", version.VERSION),
500         ("Report Generated At:", make_timestamp()),
501         ("Directory Validated:", template_path),
502         ("Checksum:", hash_directory(template_path)),
503         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
504     ]
505     rows.append([])
506     for header in headers:
507         rows.append(header)
508     rows.append([])
509
510     if COLLECTION_FAILURES:
511         rows.append([COLLECTION_FAILURE_WARNING])
512         rows.append(["Validation File", "Test", "Fixtures", "Error"])
513         for failure in COLLECTION_FAILURES:
514             rows.append(
515                 [
516                     failure["module"],
517                     failure["test"],
518                     ";".join(failure["fixtures"]),
519                     failure["error"],
520                 ]
521             )
522         rows.append([])
523
524     # table header
525     rows.append([col for col, _ in REPORT_COLUMNS])
526
527     reqs = load_current_requirements()
528     resolutions = load_resolutions_file()
529
530     # table content
531     for failure in failures:
532         rows.append(
533             [
534                 "\n".join(failure.files),
535                 failure.test_id,
536                 failure.requirement_text(reqs),
537                 failure.resolution_steps(resolutions),
538                 failure.error_message,
539                 failure.raw_output,
540             ]
541         )
542
543     output_path = os.path.join(output_dir, "report.csv")
544     with open(output_path, "w", newline="") as f:
545         writer = csv.writer(f)
546         for row in rows:
547             writer.writerow(row)
548
549
550 def generate_excel_report(output_dir, categories, template_path, failures):
551     output_path = os.path.join(output_dir, "report.xlsx")
552     workbook = xlsxwriter.Workbook(output_path)
553     bold = workbook.add_format({"bold": True})
554     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
555     normal = workbook.add_format({"text_wrap": True})
556     heading = workbook.add_format({"bold": True, "font_size": 18})
557     worksheet = workbook.add_worksheet("failures")
558     worksheet.write(0, 0, "Validation Failures", heading)
559
560     headers = [
561         ("Categories Selected:", ",".join(categories)),
562         ("Tool Version:", version.VERSION),
563         ("Report Generated At:", make_timestamp()),
564         ("Directory Validated:", template_path),
565         ("Checksum:", hash_directory(template_path)),
566         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
567     ]
568     for row, (header, value) in enumerate(headers, start=2):
569         worksheet.write(row, 0, header, bold)
570         worksheet.write(row, 1, value)
571
572     worksheet.set_column(0, len(headers) - 1, 40)
573     worksheet.set_column(len(headers), len(headers), 80)
574
575     if COLLECTION_FAILURES:
576         collection_failures_start = 2 + len(headers) + 2
577         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
578         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
579         for col_num, col_name in enumerate(collection_failure_headers):
580             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
581         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
582             worksheet.write(row, 0, data["module"])
583             worksheet.write(row, 1, data["test"])
584             worksheet.write(row, 2, ",".join(data["fixtures"]))
585             worksheet.write(row, 3, data["error"], code)
586
587     # table header
588     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
589     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
590     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
591         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
592
593     reqs = load_current_requirements()
594     resolutions = load_resolutions_file()
595
596     # table content
597     for row, failure in enumerate(failures, start=start_error_table_row + 2):
598         worksheet.write(row, 0, "\n".join(failure.files), normal)
599         worksheet.write(row, 1, failure.test_id, normal)
600         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
601         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
602         worksheet.write(row, 4, failure.error_message, normal)
603         worksheet.write(row, 5, failure.raw_output, code)
604
605     workbook.close()
606
607
608 def make_iso_timestamp():
609     """
610     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
611     """
612     now = datetime.datetime.utcnow()
613     now.replace(tzinfo=datetime.timezone.utc)
614     return now.isoformat()
615
616
617 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
618     """
619     Examines all tests associated with a given requirement and determines
620     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
621
622     * ERROR - At least one ERROR occurred
623     * PASS -  At least one PASS and no FAIL or ERRORs.
624     * FAIL -  At least one FAIL occurred (no ERRORs)
625     * SKIP - All tests were SKIP
626
627
628     :param r_id: Requirement ID to examing
629     :param collection_failures: Errors that occurred during test setup.
630     :param test_results: List of TestResult
631     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
632     """
633     errors = any(r_id in f["requirements"] for f in collection_failures)
634     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
635     return aggregate_results(errors, outcomes, r_id)
636
637
638 def aggregate_results(has_errors, outcomes, r_id=None):
639     """
640     Determines the aggregate result for the conditions provided.  Assumes the
641     results have been filtered and collected for analysis.
642
643     :param has_errors: True if collection failures occurred for the tests being
644                        analyzed.
645     :param outcomes: set of outcomes from the TestResults
646     :param r_id: Optional requirement ID if known
647     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
648              (see aggregate_requirement_adherence for more detail)
649     """
650     if has_errors:
651         return "ERROR"
652
653     if not outcomes:
654         return "PASS"
655     elif "FAIL" in outcomes:
656         return "FAIL"
657     elif "PASS" in outcomes:
658         return "PASS"
659     elif {"SKIP"} == outcomes:
660         return "SKIP"
661     else:
662         pytest.warns(
663             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
664                 outcomes, r_id
665             )
666         )
667         return "ERROR"
668
669
670 def aggregate_run_results(collection_failures, test_results):
671     """
672     Determines overall status of run based on all failures and results.
673
674     * 'ERROR' - At least one collection failure occurred during the run.
675     * 'FAIL' - Template failed at least one test
676     * 'PASS' - All tests executed properly and no failures were detected
677
678     :param collection_failures: failures occuring during test setup
679     :param test_results: list of all test executuion results
680     :return: one of 'ERROR', 'FAIL', or 'PASS'
681     """
682     if collection_failures:
683         return "ERROR"
684     elif any(r.is_failed for r in test_results):
685         return "FAIL"
686     else:
687         return "PASS"
688
689
690 def error(failure_or_result):
691     """
692     Extracts the error message from a collection failure or test result
693     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
694     :return: Error message as string
695     """
696     if isinstance(failure_or_result, TestResult):
697         return failure_or_result.error_message
698     else:
699         return failure_or_result["error"]
700
701
702 def req_ids(failure_or_result):
703     """
704     Extracts the requirement IDs from a collection failure or test result
705     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
706     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
707     """
708     if isinstance(failure_or_result, TestResult):
709         return set(failure_or_result.requirement_ids)
710     else:
711         return set(failure_or_result["requirements"])
712
713
714 def collect_errors(r_id, collection_failures, test_result):
715     """
716     Creates a list of error messages from the collection failures and
717     test results.  If r_id is provided, then it collects the error messages
718     where the failure or test is associated with that requirement ID.  If
719     r_id is None, then it collects all errors that occur on failures and
720     results that are not mapped to requirements
721     """
722
723     def selector(item):
724         if r_id:
725             return r_id in req_ids(item)
726         else:
727             return not req_ids(item)
728
729     errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
730     return [e for e in errors if e]
731
732
733 def relative_paths(base_dir, paths):
734     return [os.path.relpath(p, base_dir) for p in paths]
735
736
737 def generate_json(outpath, template_path, categories):
738     """
739     Creates a JSON summary of the entire test run.
740     """
741     reqs = load_current_requirements()
742     data = {
743         "version": "dublin",
744         "template_directory": os.path.splitdrive(template_path)[1].replace(
745             os.path.sep, "/"
746         ),
747         "timestamp": make_iso_timestamp(),
748         "checksum": hash_directory(template_path),
749         "categories": categories,
750         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
751         "tests": [],
752         "requirements": [],
753     }
754
755     results = data["tests"]
756     for result in COLLECTION_FAILURES:
757         results.append(
758             {
759                 "files": [],
760                 "test_module": result["module"],
761                 "test_case": result["test"],
762                 "result": "ERROR",
763                 "error": result["error"],
764                 "requirements": result["requirements"],
765             }
766         )
767     for result in ALL_RESULTS:
768         results.append(
769             {
770                 "files": relative_paths(template_path, result.files),
771                 "test_module": result.test_module,
772                 "test_case": result.test_case,
773                 "result": result.outcome,
774                 "error": result.error_message if result.is_failed else "",
775                 "requirements": result.requirements_metadata(reqs),
776             }
777         )
778
779     requirements = data["requirements"]
780     for r_id, r_data in reqs.items():
781         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
782         if result:
783             requirements.append(
784                 {
785                     "id": r_id,
786                     "text": r_data["description"],
787                     "keyword": r_data["keyword"],
788                     "result": result,
789                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
790                 }
791             )
792     # If there are tests that aren't mapped to a requirement, then we'll
793     # map them to a special entry so the results are coherent.
794     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
795     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
796     if unmapped_outcomes or has_errors:
797         requirements.append(
798             {
799                 "id": "Unmapped",
800                 "text": "Tests not mapped to requirements (see tests)",
801                 "result": aggregate_results(has_errors, unmapped_outcomes),
802                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
803             }
804         )
805
806     report_path = os.path.join(outpath, "report.json")
807     write_json(data, report_path)
808
809
810 def generate_html_report(outpath, categories, template_path, failures):
811     reqs = load_current_requirements()
812     resolutions = load_resolutions_file()
813     fail_data = []
814     for failure in failures:
815         fail_data.append(
816             {
817                 "file_links": make_href(failure.files),
818                 "test_id": failure.test_id,
819                 "error_message": failure.error_message,
820                 "raw_output": failure.raw_output,
821                 "requirements": docutils.core.publish_parts(
822                     writer_name="html", source=failure.requirement_text(reqs)
823                 )["body"],
824                 "resolution_steps": failure.resolution_steps(resolutions),
825             }
826         )
827     pkg_dir = os.path.split(__file__)[0]
828     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
829     with open(j2_template_path, "r") as f:
830         report_template = jinja2.Template(f.read())
831         contents = report_template.render(
832             version=version.VERSION,
833             num_failures=len(failures) + len(COLLECTION_FAILURES),
834             categories=categories,
835             template_dir=make_href(template_path),
836             checksum=hash_directory(template_path),
837             timestamp=make_timestamp(),
838             failures=fail_data,
839             collection_failures=COLLECTION_FAILURES,
840         )
841     with open(os.path.join(outpath, "report.html"), "w") as f:
842         f.write(contents)
843
844
845 def pytest_addoption(parser):
846     """
847     Add needed CLI arguments
848     """
849     parser.addoption(
850         "--template-directory",
851         dest="template_dir",
852         action="append",
853         help="Directory which holds the templates for validation",
854     )
855
856     parser.addoption(
857         "--template-source",
858         dest="template_source",
859         action="append",
860         help="Source Directory which holds the templates for validation",
861     )
862
863     parser.addoption(
864         "--self-test",
865         dest="self_test",
866         action="store_true",
867         help="Test the unit tests against their fixtured data",
868     )
869
870     parser.addoption(
871         "--report-format",
872         dest="report_format",
873         action="store",
874         help="Format of output report (html, csv, excel, json)",
875     )
876
877     parser.addoption(
878         "--continue-on-failure",
879         dest="continue_on_failure",
880         action="store_true",
881         help="Continue validation even when structural errors exist in input files",
882     )
883
884     parser.addoption(
885         "--output-directory",
886         dest="output_dir",
887         action="store",
888         default=None,
889         help="Alternate ",
890     )
891
892     parser.addoption(
893         "--category",
894         dest="test_categories",
895         action="append",
896         help="optional category of test to execute",
897     )
898
899
900 def pytest_configure(config):
901     """
902     Ensure that we are receive either `--self-test` or
903     `--template-dir=<directory` as CLI arguments
904     """
905     if config.getoption("template_dir") and config.getoption("self_test"):
906         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
907     if not (
908         config.getoption("template_dir")
909         or config.getoption("self_test")
910         or config.getoption("help")
911     ):
912         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
913
914
915 def pytest_generate_tests(metafunc):
916     """
917     If a unit test requires an argument named 'filename'
918     we generate a test for the filenames selected. Either
919     the files contained in `template_dir` or if `template_dir`
920     is not specified on the CLI, the fixtures associated with this
921     test name.
922     """
923
924     # noinspection PyBroadException
925     try:
926         if "filename" in metafunc.fixturenames:
927             from .parametrizers import parametrize_filename
928
929             parametrize_filename(metafunc)
930
931         if "filenames" in metafunc.fixturenames:
932             from .parametrizers import parametrize_filenames
933
934             parametrize_filenames(metafunc)
935
936         if "template_dir" in metafunc.fixturenames:
937             from .parametrizers import parametrize_template_dir
938
939             parametrize_template_dir(metafunc)
940
941         if "environment_pair" in metafunc.fixturenames:
942             from .parametrizers import parametrize_environment_pair
943
944             parametrize_environment_pair(metafunc)
945
946         if "heat_volume_pair" in metafunc.fixturenames:
947             from .parametrizers import parametrize_heat_volume_pair
948
949             parametrize_heat_volume_pair(metafunc)
950
951         if "yaml_files" in metafunc.fixturenames:
952             from .parametrizers import parametrize_yaml_files
953
954             parametrize_yaml_files(metafunc)
955
956         if "env_files" in metafunc.fixturenames:
957             from .parametrizers import parametrize_environment_files
958
959             parametrize_environment_files(metafunc)
960
961         if "yaml_file" in metafunc.fixturenames:
962             from .parametrizers import parametrize_yaml_file
963
964             parametrize_yaml_file(metafunc)
965
966         if "env_file" in metafunc.fixturenames:
967             from .parametrizers import parametrize_environment_file
968
969             parametrize_environment_file(metafunc)
970
971         if "parsed_yaml_file" in metafunc.fixturenames:
972             from .parametrizers import parametrize_parsed_yaml_file
973
974             parametrize_parsed_yaml_file(metafunc)
975
976         if "parsed_environment_file" in metafunc.fixturenames:
977             from .parametrizers import parametrize_parsed_environment_file
978
979             parametrize_parsed_environment_file(metafunc)
980
981         if "heat_template" in metafunc.fixturenames:
982             from .parametrizers import parametrize_heat_template
983
984             parametrize_heat_template(metafunc)
985
986         if "heat_templates" in metafunc.fixturenames:
987             from .parametrizers import parametrize_heat_templates
988
989             parametrize_heat_templates(metafunc)
990
991         if "volume_template" in metafunc.fixturenames:
992             from .parametrizers import parametrize_volume_template
993
994             parametrize_volume_template(metafunc)
995
996         if "volume_templates" in metafunc.fixturenames:
997             from .parametrizers import parametrize_volume_templates
998
999             parametrize_volume_templates(metafunc)
1000
1001         if "template" in metafunc.fixturenames:
1002             from .parametrizers import parametrize_template
1003
1004             parametrize_template(metafunc)
1005
1006         if "templates" in metafunc.fixturenames:
1007             from .parametrizers import parametrize_templates
1008
1009             parametrize_templates(metafunc)
1010     except Exception as e:
1011         # If an error occurs in the collection phase, then it won't be logged as a
1012         # normal test failure.  This means that failures could occur, but not
1013         # be seen on the report resulting in a false positive success message.  These
1014         # errors will be stored and reported separately on the report
1015         COLLECTION_FAILURES.append(
1016             {
1017                 "module": metafunc.module.__name__,
1018                 "test": metafunc.function.__name__,
1019                 "fixtures": metafunc.fixturenames,
1020                 "error": traceback.format_exc(),
1021                 "requirements": getattr(metafunc.function, "requirement_ids", []),
1022             }
1023         )
1024         raise e
1025
1026
1027 def hash_directory(path):
1028     """
1029     Create md5 hash using the contents of all files under ``path``
1030     :param path: string directory containing files
1031     :return: string MD5 hash code (hex)
1032     """
1033     md5 = hashlib.md5()
1034     for dir_path, sub_dirs, filenames in os.walk(path):
1035         for filename in filenames:
1036             file_path = os.path.join(dir_path, filename)
1037             with open(file_path, "rb") as f:
1038                 md5.update(f.read())
1039     return md5.hexdigest()
1040
1041
1042 def load_current_requirements():
1043     """Loads dict of current requirements or empty dict if file doesn't exist"""
1044     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1045         data = json.load(f)
1046         version = data["current_version"]
1047         return data["versions"][version]["needs"]
1048
1049
1050 def select_heat_requirements(reqs):
1051     """Filters dict requirements to only those requirements pertaining to Heat"""
1052     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1053
1054
1055 def is_testable(reqs):
1056     """Filters dict requirements to only those which are testable"""
1057     for key, values in reqs.items():
1058         if (("MUST" in values.get("keyword", "").upper()) and (
1059             "none" not in values.get("validation_mode", "").lower()
1060         )):
1061             reqs[key]["testable"] = True
1062         else:
1063             reqs[key]["testable"] = False
1064     return reqs
1065
1066
1067 def build_rst_json(reqs):
1068     """Takes requirements and returns list of only Heat requirements"""
1069     for key, values in list(reqs.items()):
1070         if values["testable"]:
1071             # Creates links in RST format to requirements and test cases
1072             if values["test_case"]:
1073                 mod = values["test_case"].split(".")[-1]
1074                 val = TEST_SCRIPT_SITE + mod + ".py"
1075                 rst_value = "`" + mod + " <" + val + ">`_"
1076                 title = (
1077                     "`"
1078                     + values["id"]
1079                     + " <"
1080                     + VNFRQTS_ID_URL
1081                     + values["docname"].replace(" ", "%20")
1082                     + ".html#"
1083                     + values["id"]
1084                     + ">`_"
1085                 )
1086                 reqs[key].update({"full_title": title, "test_case": rst_value})
1087             else:
1088                 title = (
1089                     "`"
1090                     + values["id"]
1091                     + " <"
1092                     + VNFRQTS_ID_URL
1093                     + values["docname"].replace(" ", "%20")
1094                     + ".html#"
1095                     + values["id"]
1096                     + ">`_"
1097                 )
1098                 reqs[key].update(
1099                     {
1100                         "full_title": title,
1101                         "test_case": "No test for requirement",
1102                         "validated_by": "static",
1103                     }
1104                 )
1105         else:
1106             del reqs[key]
1107     return reqs
1108
1109
1110 def generate_rst_table(output_dir, data):
1111     """Generate a formatted csv to be used in RST"""
1112     rst_path = os.path.join(output_dir, "rst.csv")
1113     with open(rst_path, "w", newline="") as f:
1114         out = csv.writer(f)
1115         out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1116         for req_id, metadata in data.items():
1117             out.writerow(
1118                 (
1119                     metadata["full_title"],
1120                     metadata["description"],
1121                     metadata["test_case"],
1122                     metadata["validated_by"],
1123                 )
1124             )
1125
1126
1127 # noinspection PyUnusedLocal
1128 def pytest_report_collectionfinish(config, startdir, items):
1129     """Generates a simple traceability report to output/traceability.csv"""
1130     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1131     output_dir = os.path.split(traceability_path)[0]
1132     if not os.path.exists(output_dir):
1133         os.makedirs(output_dir)
1134     reqs = load_current_requirements()
1135     requirements = select_heat_requirements(reqs)
1136     testable_requirements = is_testable(requirements)
1137     unmapped, mapped = partition(
1138         lambda i: hasattr(i.function, "requirement_ids"), items
1139     )
1140
1141     req_to_test = defaultdict(set)
1142     mapping_errors = set()
1143     for item in mapped:
1144         for req_id in item.function.requirement_ids:
1145             if req_id not in req_to_test:
1146                 req_to_test[req_id].add(item)
1147                 if req_id in requirements:
1148                     reqs[req_id].update(
1149                         {
1150                             "test_case": item.function.__module__,
1151                             "validated_by": item.function.__name__,
1152                         }
1153                     )
1154             if req_id not in requirements:
1155                 mapping_errors.add(
1156                     (req_id, item.function.__module__, item.function.__name__)
1157                 )
1158
1159     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1160     with open(mapping_error_path, "w", newline="") as f:
1161         writer = csv.writer(f)
1162         for err in mapping_errors:
1163             writer.writerow(err)
1164
1165     with open(traceability_path, "w", newline="") as f:
1166         out = csv.writer(f)
1167         out.writerow(
1168             (
1169                 "Requirement ID",
1170                 "Requirement",
1171                 "Section",
1172                 "Keyword",
1173                 "Validation Mode",
1174                 "Is Testable",
1175                 "Test Module",
1176                 "Test Name",
1177             )
1178         )
1179         for req_id, metadata in testable_requirements.items():
1180             if req_to_test[req_id]:
1181                 for item in req_to_test[req_id]:
1182                     out.writerow(
1183                         (
1184                             req_id,
1185                             metadata["description"],
1186                             metadata["section_name"],
1187                             metadata["keyword"],
1188                             metadata["validation_mode"],
1189                             metadata["testable"],
1190                             item.function.__module__,
1191                             item.function.__name__,
1192                         )
1193                     )
1194             else:
1195                 out.writerow(
1196                     (
1197                         req_id,
1198                         metadata["description"],
1199                         metadata["section_name"],
1200                         metadata["keyword"],
1201                         metadata["validation_mode"],
1202                         metadata["testable"],
1203                         "",  # test module
1204                         "",
1205                     )  # test function
1206                 )
1207         # now write out any test methods that weren't mapped to requirements
1208         unmapped_tests = {
1209             (item.function.__module__, item.function.__name__) for item in unmapped
1210         }
1211         for test_module, test_name in unmapped_tests:
1212             out.writerow(
1213                 (
1214                     "",  # req ID
1215                     "",  # description
1216                     "",  # section name
1217                     "",  # keyword
1218                     "static",  # validation mode
1219                     "TRUE",  # testable
1220                     test_module,
1221                     test_name,
1222                 )
1223             )
1224
1225     generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))