[VVP] adding heat template-validate test
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47 from itertools import chain
48
49 import traceback
50
51 import docutils.core
52 import jinja2
53 import pytest
54 from more_itertools import partition
55 import xlsxwriter
56 from six import string_types
57
58 import version
59 import logging
60
61 logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.ERROR)
62
63 __path__ = [os.path.dirname(os.path.abspath(__file__))]
64
65 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
66
67 RESOLUTION_STEPS_FILE = "resolution_steps.json"
68 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
69 TEST_SCRIPT_SITE = (
70     "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
71 )
72 VNFRQTS_ID_URL = (
73     "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
74 )
75
76 REPORT_COLUMNS = [
77     ("Input File", "file"),
78     ("Test", "test_file"),
79     ("Requirements", "req_description"),
80     ("Resolution Steps", "resolution_steps"),
81     ("Error Message", "message"),
82     ("Raw Test Output", "raw_output"),
83 ]
84
85 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
86 while preparing to validate the the input files. Some validations may not have been
87 executed. Please refer these issue to the VNF Validation Tool team.
88 """
89
90 COLLECTION_FAILURES = []
91
92 # Captures the results of every test run
93 ALL_RESULTS = []
94
95
96 def get_output_dir(config):
97     """
98     Retrieve the output directory for the reports and create it if necessary
99     :param config: pytest configuration
100     :return: output directory as string
101     """
102     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
103     if not os.path.exists(output_dir):
104         os.makedirs(output_dir, exist_ok=True)
105     return output_dir
106
107
108 def extract_error_msg(rep):
109     """
110     If a custom error message was provided, then extract it otherwise
111     just show the pytest assert message
112     """
113     if rep.outcome != "failed":
114         return ""
115     try:
116         full_msg = str(rep.longrepr.reprcrash.message)
117         match = re.match(
118             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
119         )
120         if match:  # custom message was provided
121             # Extract everything between AssertionError and the start
122             # of the assert statement expansion in the pytest report
123             msg = match.group(1)
124         else:
125             msg = str(rep.longrepr.reprcrash)
126             if "AssertionError:" in msg:
127                 msg = msg.split("AssertionError:")[1]
128     except AttributeError:
129         msg = str(rep)
130
131     return msg
132
133
134 class TestResult:
135     """
136     Wraps the test case and result to extract necessary metadata for
137     reporting purposes.
138     """
139
140     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
141
142     def __init__(self, item, outcome):
143         self.item = item
144         self.result = outcome.get_result()
145         self.files = [os.path.normpath(p) for p in self._get_files()]
146         self.error_message = self._get_error_message()
147
148     @property
149     def requirement_ids(self):
150         """
151         Returns list of requirement IDs mapped to the test case.
152
153         :return: Returns a list of string requirement IDs the test was
154                  annotated with ``validates`` otherwise returns and empty list
155         """
156         is_mapped = hasattr(self.item.function, "requirement_ids")
157         return self.item.function.requirement_ids if is_mapped else []
158
159     @property
160     def markers(self):
161         """
162         :return: Returns a set of pytest marker names for the test or an empty set
163         """
164         return set(m.name for m in self.item.iter_markers())
165
166     @property
167     def is_base_test(self):
168         """
169         :return: Returns True if the test is annotated with a pytest marker called base
170         """
171         return "base" in self.markers
172
173     @property
174     def is_failed(self):
175         """
176         :return: True if the test failed
177         """
178         return self.outcome == "FAIL"
179
180     @property
181     def outcome(self):
182         """
183         :return: Returns 'PASS', 'FAIL', or 'SKIP'
184         """
185         return self.RESULT_MAPPING[self.result.outcome]
186
187     @property
188     def test_case(self):
189         """
190         :return: Name of the test case method
191         """
192         return self.item.function.__name__
193
194     @property
195     def test_module(self):
196         """
197         :return: Name of the file containing the test case
198         """
199         return self.item.function.__module__.split(".")[-1]
200
201     @property
202     def test_id(self):
203         """
204         :return: ID of the test (test_module + test_case)
205         """
206         return "{}::{}".format(self.test_module, self.test_case)
207
208     @property
209     def raw_output(self):
210         """
211         :return: Full output from pytest for the given test case
212         """
213         return str(self.result.longrepr)
214
215     def requirement_text(self, curr_reqs):
216         """
217         Creates a text summary for the requirement IDs mapped to the test case.
218         If no requirements are mapped, then it returns the empty string.
219
220         :param curr_reqs: mapping of requirement IDs to requirement metadata
221                           loaded from the VNFRQTS projects needs.json output
222         :return: ID and text of the requirements mapped to the test case
223         """
224         text = (
225             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
226             for r_id in self.requirement_ids
227             if r_id in curr_reqs
228         )
229         return "".join(text)
230
231     def requirements_metadata(self, curr_reqs):
232         """
233         Returns a list of dicts containing the following metadata for each
234         requirement mapped:
235
236         - id: Requirement ID
237         - text: Full text of the requirement
238         - keyword: MUST, MUST NOT, MAY, etc.
239
240         :param curr_reqs: mapping of requirement IDs to requirement metadata
241                           loaded from the VNFRQTS projects needs.json output
242         :return: List of requirement metadata
243         """
244         data = []
245         for r_id in self.requirement_ids:
246             if r_id not in curr_reqs:
247                 continue
248             data.append(
249                 {
250                     "id": r_id,
251                     "text": curr_reqs[r_id]["description"],
252                     "keyword": curr_reqs[r_id]["keyword"],
253                 }
254             )
255         return data
256
257     def resolution_steps(self, resolutions):
258         """
259         :param resolutions: Loaded from contents for resolution_steps.json
260         :return: Header and text for the resolution step associated with this
261                  test case.  Returns empty string if no resolutions are
262                  provided.
263         """
264         text = (
265             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
266             for entry in resolutions
267             if self._match(entry)
268         )
269         return "".join(text)
270
271     def _match(self, resolution_entry):
272         """
273         Returns True if the test result maps to the given entry in
274         the resolutions file
275         """
276         return (
277             self.test_case == resolution_entry["function"]
278             and self.test_module == resolution_entry["module"]
279         )
280
281     def _get_files(self):
282         """
283         Extracts the list of files passed into the test case.
284         :return: List of absolute paths to files
285         """
286         if "environment_pair" in self.item.fixturenames:
287             return [
288                 "{} environment pair".format(
289                     self.item.funcargs["environment_pair"]["name"]
290                 )
291             ]
292         elif "heat_volume_pair" in self.item.fixturenames:
293             return [
294                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
295             ]
296         elif "heat_templates" in self.item.fixturenames:
297             return self.item.funcargs["heat_templates"]
298         elif "yaml_files" in self.item.fixturenames:
299             return self.item.funcargs["yaml_files"]
300         else:
301             parts = self.result.nodeid.split("[")
302             return [""] if len(parts) == 1 else [parts[1][:-1]]
303
304     def _get_error_message(self):
305         """
306         :return: Error message or empty string if the test did not fail or error
307         """
308         if self.is_failed:
309             return extract_error_msg(self.result)
310         else:
311             return ""
312
313
314 # noinspection PyUnusedLocal
315 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
316 def pytest_runtest_makereport(item, call):
317     """
318     Captures the test results for later reporting.  This will also halt testing
319     if a base failure is encountered (can be overridden with continue-on-failure)
320     """
321     outcome = yield
322     if outcome.get_result().when != "call":
323         return  # only capture results of test cases themselves
324     result = TestResult(item, outcome)
325     if (
326         not item.config.option.continue_on_failure
327         and result.is_base_test
328         and result.is_failed
329     ):
330         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
331             result.error_message
332         )
333         result.error_message = msg
334         ALL_RESULTS.append(result)
335         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
336
337     ALL_RESULTS.append(result)
338
339
340 def make_timestamp():
341     """
342     :return: String make_iso_timestamp in format:
343              2019-01-19 10:18:49.865000 Central Standard Time
344     """
345     timezone = time.tzname[time.localtime().tm_isdst]
346     return "{} {}".format(str(datetime.datetime.now()), timezone)
347
348
349 # noinspection PyUnusedLocal
350 def pytest_sessionstart(session):
351     ALL_RESULTS.clear()
352     COLLECTION_FAILURES.clear()
353
354
355 # noinspection PyUnusedLocal
356 def pytest_sessionfinish(session, exitstatus):
357     """
358     If not a self-test run, generate the output reports
359     """
360     if not session.config.option.template_dir:
361         return
362
363     if session.config.option.template_source:
364         template_source = session.config.option.template_source[0]
365     else:
366         template_source = os.path.abspath(session.config.option.template_dir[0])
367
368     categories_selected = session.config.option.test_categories or ""
369     generate_report(
370         get_output_dir(session.config),
371         template_source,
372         categories_selected,
373         session.config.option.report_format,
374     )
375
376
377 # noinspection PyUnusedLocal
378 def pytest_collection_modifyitems(session, config, items):
379     """
380     Selects tests based on the categories requested.  Tests without
381     categories will always be executed.
382     """
383     config.traceability_items = list(items)  # save all items for traceability
384     if not config.option.self_test:
385         for item in items:
386             # checking if test belongs to a category
387             if hasattr(item.function, "categories"):
388                 if config.option.test_categories:
389                     test_categories = getattr(item.function, "categories")
390                     passed_categories = config.option.test_categories
391                     if not all(
392                         category in passed_categories for category in test_categories
393                     ):
394                         item.add_marker(
395                             pytest.mark.skip(
396                                 reason="Test categories do not match all the passed categories"
397                             )
398                         )
399                 else:
400                     item.add_marker(
401                         pytest.mark.skip(
402                             reason="Test belongs to a category but no categories were passed"
403                         )
404                     )
405     items.sort(
406         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
407     )
408
409
410 def make_href(paths):
411     """
412     Create an anchor tag to link to the file paths provided.
413     :param paths: string or list of file paths
414     :return: String of hrefs - one for each path, each seperated by a line
415              break (<br/).
416     """
417     paths = [paths] if isinstance(paths, string_types) else paths
418     links = []
419     for p in paths:
420         abs_path = os.path.abspath(p)
421         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
422         links.append(
423             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
424                 abs_path=abs_path, name=name
425             )
426         )
427     return "<br/>".join(links)
428
429
430 def load_resolutions_file():
431     """
432     :return: dict of data loaded from resolutions_steps.json
433     """
434     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
435     if os.path.exists(resolution_steps):
436         with open(resolution_steps, "r") as f:
437             return json.loads(f.read())
438
439
440 def generate_report(outpath, template_path, categories, output_format="html"):
441     """
442     Generates the various output reports.
443
444     :param outpath: destination directory for all reports
445     :param template_path: directory containing the Heat templates validated
446     :param categories: Optional categories selected
447     :param output_format: One of "html", "excel", or "csv". Default is "html"
448     :raises: ValueError if requested output format is unknown
449     """
450     failures = [r for r in ALL_RESULTS if r.is_failed]
451     generate_failure_file(outpath)
452     output_format = output_format.lower().strip() if output_format else "html"
453     generate_json(outpath, template_path, categories)
454     if output_format == "html":
455         generate_html_report(outpath, categories, template_path, failures)
456     elif output_format == "excel":
457         generate_excel_report(outpath, categories, template_path, failures)
458     elif output_format == "json":
459         return
460     elif output_format == "csv":
461         generate_csv_report(outpath, categories, template_path, failures)
462     else:
463         raise ValueError("Unsupported output format: " + output_format)
464
465
466 def write_json(data, path):
467     """
468     Pretty print data as JSON to the output path requested
469
470     :param data: Data structure to be converted to JSON
471     :param path: Where to write output
472     """
473     with open(path, "w") as f:
474         json.dump(data, f, indent=2)
475
476
477 def generate_failure_file(outpath):
478     """
479     Writes a summary of test failures to a file named failures.
480     This is for backwards compatibility only.  The report.json offers a
481     more comprehensive output.
482     """
483     failure_path = os.path.join(outpath, "failures")
484     failures = [r for r in ALL_RESULTS if r.is_failed]
485     data = {}
486     for i, fail in enumerate(failures):
487         data[str(i)] = {
488             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
489             "vnfrqts": fail.requirement_ids,
490             "test": fail.test_case,
491             "test_file": fail.test_module,
492             "raw_output": fail.raw_output,
493             "message": fail.error_message,
494         }
495     write_json(data, failure_path)
496
497
498 def generate_csv_report(output_dir, categories, template_path, failures):
499     rows = [["Validation Failures"]]
500     headers = [
501         ("Categories Selected:", categories),
502         ("Tool Version:", version.VERSION),
503         ("Report Generated At:", make_timestamp()),
504         ("Directory Validated:", template_path),
505         ("Checksum:", hash_directory(template_path)),
506         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
507     ]
508     rows.append([])
509     for header in headers:
510         rows.append(header)
511     rows.append([])
512
513     if COLLECTION_FAILURES:
514         rows.append([COLLECTION_FAILURE_WARNING])
515         rows.append(["Validation File", "Test", "Fixtures", "Error"])
516         for failure in COLLECTION_FAILURES:
517             rows.append(
518                 [
519                     failure["module"],
520                     failure["test"],
521                     ";".join(failure["fixtures"]),
522                     failure["error"],
523                 ]
524             )
525         rows.append([])
526
527     # table header
528     rows.append([col for col, _ in REPORT_COLUMNS])
529
530     reqs = load_current_requirements()
531     resolutions = load_resolutions_file()
532
533     # table content
534     for failure in failures:
535         rows.append(
536             [
537                 "\n".join(failure.files),
538                 failure.test_id,
539                 failure.requirement_text(reqs),
540                 failure.resolution_steps(resolutions),
541                 failure.error_message,
542                 failure.raw_output,
543             ]
544         )
545
546     output_path = os.path.join(output_dir, "report.csv")
547     with open(output_path, "w", newline="") as f:
548         writer = csv.writer(f)
549         for row in rows:
550             writer.writerow(row)
551
552
553 def generate_excel_report(output_dir, categories, template_path, failures):
554     output_path = os.path.join(output_dir, "report.xlsx")
555     workbook = xlsxwriter.Workbook(output_path)
556     bold = workbook.add_format({"bold": True})
557     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
558     normal = workbook.add_format({"text_wrap": True})
559     heading = workbook.add_format({"bold": True, "font_size": 18})
560     worksheet = workbook.add_worksheet("failures")
561     worksheet.write(0, 0, "Validation Failures", heading)
562
563     headers = [
564         ("Categories Selected:", ",".join(categories)),
565         ("Tool Version:", version.VERSION),
566         ("Report Generated At:", make_timestamp()),
567         ("Directory Validated:", template_path),
568         ("Checksum:", hash_directory(template_path)),
569         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
570     ]
571     for row, (header, value) in enumerate(headers, start=2):
572         worksheet.write(row, 0, header, bold)
573         worksheet.write(row, 1, value)
574
575     worksheet.set_column(0, len(headers) - 1, 40)
576     worksheet.set_column(len(headers), len(headers), 80)
577
578     if COLLECTION_FAILURES:
579         collection_failures_start = 2 + len(headers) + 2
580         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
581         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
582         for col_num, col_name in enumerate(collection_failure_headers):
583             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
584         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
585             worksheet.write(row, 0, data["module"])
586             worksheet.write(row, 1, data["test"])
587             worksheet.write(row, 2, ",".join(data["fixtures"]))
588             worksheet.write(row, 3, data["error"], code)
589
590     # table header
591     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
592     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
593     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
594         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
595
596     reqs = load_current_requirements()
597     resolutions = load_resolutions_file()
598
599     # table content
600     for row, failure in enumerate(failures, start=start_error_table_row + 2):
601         worksheet.write(row, 0, "\n".join(failure.files), normal)
602         worksheet.write(row, 1, failure.test_id, normal)
603         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
604         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
605         worksheet.write(row, 4, failure.error_message, normal)
606         worksheet.write(row, 5, failure.raw_output, code)
607
608     workbook.close()
609
610
611 def make_iso_timestamp():
612     """
613     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
614     """
615     now = datetime.datetime.utcnow()
616     now.replace(tzinfo=datetime.timezone.utc)
617     return now.isoformat()
618
619
620 def aggregate_requirement_adherence(r_id, collection_failures, test_results):
621     """
622     Examines all tests associated with a given requirement and determines
623     the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
624
625     * ERROR - At least one ERROR occurred
626     * PASS -  At least one PASS and no FAIL or ERRORs.
627     * FAIL -  At least one FAIL occurred (no ERRORs)
628     * SKIP - All tests were SKIP
629
630
631     :param r_id: Requirement ID to examing
632     :param collection_failures: Errors that occurred during test setup.
633     :param test_results: List of TestResult
634     :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
635     """
636     errors = any(r_id in f["requirements"] for f in collection_failures)
637     outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
638     return aggregate_results(errors, outcomes, r_id)
639
640
641 def aggregate_results(has_errors, outcomes, r_id=None):
642     """
643     Determines the aggregate result for the conditions provided.  Assumes the
644     results have been filtered and collected for analysis.
645
646     :param has_errors: True if collection failures occurred for the tests being
647                        analyzed.
648     :param outcomes: set of outcomes from the TestResults
649     :param r_id: Optional requirement ID if known
650     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
651              (see aggregate_requirement_adherence for more detail)
652     """
653     if has_errors:
654         return "ERROR"
655
656     if not outcomes:
657         return "PASS"
658     elif "FAIL" in outcomes:
659         return "FAIL"
660     elif "PASS" in outcomes:
661         return "PASS"
662     elif {"SKIP"} == outcomes:
663         return "SKIP"
664     else:
665         pytest.warns(
666             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
667                 outcomes, r_id
668             )
669         )
670         return "ERROR"
671
672
673 def aggregate_run_results(collection_failures, test_results):
674     """
675     Determines overall status of run based on all failures and results.
676
677     * 'ERROR' - At least one collection failure occurred during the run.
678     * 'FAIL' - Template failed at least one test
679     * 'PASS' - All tests executed properly and no failures were detected
680
681     :param collection_failures: failures occuring during test setup
682     :param test_results: list of all test executuion results
683     :return: one of 'ERROR', 'FAIL', or 'PASS'
684     """
685     if collection_failures:
686         return "ERROR"
687     elif any(r.is_failed for r in test_results):
688         return "FAIL"
689     else:
690         return "PASS"
691
692
693 def error(failure_or_result):
694     """
695     Extracts the error message from a collection failure or test result
696     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
697     :return: Error message as string
698     """
699     if isinstance(failure_or_result, TestResult):
700         return failure_or_result.error_message
701     else:
702         return failure_or_result["error"]
703
704
705 def req_ids(failure_or_result):
706     """
707     Extracts the requirement IDs from a collection failure or test result
708     :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
709     :return: set of Requirement IDs.  If no requirements mapped, then an empty set
710     """
711     if isinstance(failure_or_result, TestResult):
712         return set(failure_or_result.requirement_ids)
713     else:
714         return set(failure_or_result["requirements"])
715
716
717 def collect_errors(r_id, collection_failures, test_result):
718     """
719     Creates a list of error messages from the collection failures and
720     test results.  If r_id is provided, then it collects the error messages
721     where the failure or test is associated with that requirement ID.  If
722     r_id is None, then it collects all errors that occur on failures and
723     results that are not mapped to requirements
724     """
725
726     def selector(item):
727         if r_id:
728             return r_id in req_ids(item)
729         else:
730             return not req_ids(item)
731
732     errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
733     return [e for e in errors if e]
734
735
736 def relative_paths(base_dir, paths):
737     return [os.path.relpath(p, base_dir) for p in paths]
738
739
740 def generate_json(outpath, template_path, categories):
741     """
742     Creates a JSON summary of the entire test run.
743     """
744     reqs = load_current_requirements()
745     data = {
746         "version": "dublin",
747         "template_directory": os.path.splitdrive(template_path)[1].replace(
748             os.path.sep, "/"
749         ),
750         "timestamp": make_iso_timestamp(),
751         "checksum": hash_directory(template_path),
752         "categories": categories,
753         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
754         "tests": [],
755         "requirements": [],
756     }
757
758     results = data["tests"]
759     for result in COLLECTION_FAILURES:
760         results.append(
761             {
762                 "files": [],
763                 "test_module": result["module"],
764                 "test_case": result["test"],
765                 "result": "ERROR",
766                 "error": result["error"],
767                 "requirements": result["requirements"],
768             }
769         )
770     for result in ALL_RESULTS:
771         results.append(
772             {
773                 "files": relative_paths(template_path, result.files),
774                 "test_module": result.test_module,
775                 "test_case": result.test_case,
776                 "result": result.outcome,
777                 "error": result.error_message if result.is_failed else "",
778                 "requirements": result.requirements_metadata(reqs),
779             }
780         )
781
782     requirements = data["requirements"]
783     for r_id, r_data in reqs.items():
784         result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
785         if result:
786             requirements.append(
787                 {
788                     "id": r_id,
789                     "text": r_data["description"],
790                     "keyword": r_data["keyword"],
791                     "result": result,
792                     "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
793                 }
794             )
795     # If there are tests that aren't mapped to a requirement, then we'll
796     # map them to a special entry so the results are coherent.
797     unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
798     has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
799     if unmapped_outcomes or has_errors:
800         requirements.append(
801             {
802                 "id": "Unmapped",
803                 "text": "Tests not mapped to requirements (see tests)",
804                 "result": aggregate_results(has_errors, unmapped_outcomes),
805                 "errors": collect_errors(None, COLLECTION_FAILURES, ALL_RESULTS),
806             }
807         )
808
809     report_path = os.path.join(outpath, "report.json")
810     write_json(data, report_path)
811
812
813 def generate_html_report(outpath, categories, template_path, failures):
814     reqs = load_current_requirements()
815     resolutions = load_resolutions_file()
816     fail_data = []
817     for failure in failures:
818         fail_data.append(
819             {
820                 "file_links": make_href(failure.files),
821                 "test_id": failure.test_id,
822                 "error_message": failure.error_message,
823                 "raw_output": failure.raw_output,
824                 "requirements": docutils.core.publish_parts(
825                     writer_name="html", source=failure.requirement_text(reqs)
826                 )["body"],
827                 "resolution_steps": failure.resolution_steps(resolutions),
828             }
829         )
830     pkg_dir = os.path.split(__file__)[0]
831     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
832     with open(j2_template_path, "r") as f:
833         report_template = jinja2.Template(f.read())
834         contents = report_template.render(
835             version=version.VERSION,
836             num_failures=len(failures) + len(COLLECTION_FAILURES),
837             categories=categories,
838             template_dir=make_href(template_path),
839             checksum=hash_directory(template_path),
840             timestamp=make_timestamp(),
841             failures=fail_data,
842             collection_failures=COLLECTION_FAILURES,
843         )
844     with open(os.path.join(outpath, "report.html"), "w") as f:
845         f.write(contents)
846
847
848 def pytest_addoption(parser):
849     """
850     Add needed CLI arguments
851     """
852     parser.addoption(
853         "--template-directory",
854         dest="template_dir",
855         action="append",
856         help="Directory which holds the templates for validation",
857     )
858
859     parser.addoption(
860         "--template-source",
861         dest="template_source",
862         action="append",
863         help="Source Directory which holds the templates for validation",
864     )
865
866     parser.addoption(
867         "--self-test",
868         dest="self_test",
869         action="store_true",
870         help="Test the unit tests against their fixtured data",
871     )
872
873     parser.addoption(
874         "--report-format",
875         dest="report_format",
876         action="store",
877         help="Format of output report (html, csv, excel, json)",
878     )
879
880     parser.addoption(
881         "--continue-on-failure",
882         dest="continue_on_failure",
883         action="store_true",
884         help="Continue validation even when structural errors exist in input files",
885     )
886
887     parser.addoption(
888         "--output-directory",
889         dest="output_dir",
890         action="store",
891         default=None,
892         help="Alternate ",
893     )
894
895     parser.addoption(
896         "--category",
897         dest="test_categories",
898         action="append",
899         help="optional category of test to execute",
900     )
901
902
903 def pytest_configure(config):
904     """
905     Ensure that we are receive either `--self-test` or
906     `--template-dir=<directory` as CLI arguments
907     """
908     if config.getoption("template_dir") and config.getoption("self_test"):
909         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
910     if not (
911         config.getoption("template_dir")
912         or config.getoption("self_test")
913         or config.getoption("help")
914     ):
915         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
916
917
918 def pytest_generate_tests(metafunc):
919     """
920     If a unit test requires an argument named 'filename'
921     we generate a test for the filenames selected. Either
922     the files contained in `template_dir` or if `template_dir`
923     is not specified on the CLI, the fixtures associated with this
924     test name.
925     """
926
927     # noinspection PyBroadException
928     try:
929         if "filename" in metafunc.fixturenames:
930             from .parametrizers import parametrize_filename
931
932             parametrize_filename(metafunc)
933
934         if "filenames" in metafunc.fixturenames:
935             from .parametrizers import parametrize_filenames
936
937             parametrize_filenames(metafunc)
938
939         if "template_dir" in metafunc.fixturenames:
940             from .parametrizers import parametrize_template_dir
941
942             parametrize_template_dir(metafunc)
943
944         if "environment_pair" in metafunc.fixturenames:
945             from .parametrizers import parametrize_environment_pair
946
947             parametrize_environment_pair(metafunc)
948
949         if "heat_volume_pair" in metafunc.fixturenames:
950             from .parametrizers import parametrize_heat_volume_pair
951
952             parametrize_heat_volume_pair(metafunc)
953
954         if "yaml_files" in metafunc.fixturenames:
955             from .parametrizers import parametrize_yaml_files
956
957             parametrize_yaml_files(metafunc)
958
959         if "env_files" in metafunc.fixturenames:
960             from .parametrizers import parametrize_environment_files
961
962             parametrize_environment_files(metafunc)
963
964         if "yaml_file" in metafunc.fixturenames:
965             from .parametrizers import parametrize_yaml_file
966
967             parametrize_yaml_file(metafunc)
968
969         if "env_file" in metafunc.fixturenames:
970             from .parametrizers import parametrize_environment_file
971
972             parametrize_environment_file(metafunc)
973
974         if "parsed_yaml_file" in metafunc.fixturenames:
975             from .parametrizers import parametrize_parsed_yaml_file
976
977             parametrize_parsed_yaml_file(metafunc)
978
979         if "parsed_environment_file" in metafunc.fixturenames:
980             from .parametrizers import parametrize_parsed_environment_file
981
982             parametrize_parsed_environment_file(metafunc)
983
984         if "heat_template" in metafunc.fixturenames:
985             from .parametrizers import parametrize_heat_template
986
987             parametrize_heat_template(metafunc)
988
989         if "heat_templates" in metafunc.fixturenames:
990             from .parametrizers import parametrize_heat_templates
991
992             parametrize_heat_templates(metafunc)
993
994         if "volume_template" in metafunc.fixturenames:
995             from .parametrizers import parametrize_volume_template
996
997             parametrize_volume_template(metafunc)
998
999         if "volume_templates" in metafunc.fixturenames:
1000             from .parametrizers import parametrize_volume_templates
1001
1002             parametrize_volume_templates(metafunc)
1003
1004         if "template" in metafunc.fixturenames:
1005             from .parametrizers import parametrize_template
1006
1007             parametrize_template(metafunc)
1008
1009         if "templates" in metafunc.fixturenames:
1010             from .parametrizers import parametrize_templates
1011
1012             parametrize_templates(metafunc)
1013     except Exception as e:
1014         # If an error occurs in the collection phase, then it won't be logged as a
1015         # normal test failure.  This means that failures could occur, but not
1016         # be seen on the report resulting in a false positive success message.  These
1017         # errors will be stored and reported separately on the report
1018         COLLECTION_FAILURES.append(
1019             {
1020                 "module": metafunc.module.__name__,
1021                 "test": metafunc.function.__name__,
1022                 "fixtures": metafunc.fixturenames,
1023                 "error": traceback.format_exc(),
1024                 "requirements": getattr(metafunc.function, "requirement_ids", []),
1025             }
1026         )
1027         raise e
1028
1029
1030 def hash_directory(path):
1031     """
1032     Create md5 hash using the contents of all files under ``path``
1033     :param path: string directory containing files
1034     :return: string MD5 hash code (hex)
1035     """
1036     md5 = hashlib.md5()
1037     for dir_path, sub_dirs, filenames in os.walk(path):
1038         for filename in filenames:
1039             file_path = os.path.join(dir_path, filename)
1040             with open(file_path, "rb") as f:
1041                 md5.update(f.read())
1042     return md5.hexdigest()
1043
1044
1045 def load_current_requirements():
1046     """Loads dict of current requirements or empty dict if file doesn't exist"""
1047     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1048         data = json.load(f)
1049         version = data["current_version"]
1050         return data["versions"][version]["needs"]
1051
1052
1053 def select_heat_requirements(reqs):
1054     """Filters dict requirements to only those requirements pertaining to Heat"""
1055     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1056
1057
1058 def is_testable(reqs):
1059     """Filters dict requirements to only those which are testable"""
1060     for key, values in reqs.items():
1061         if (("MUST" in values.get("keyword", "").upper()) and (
1062             "none" not in values.get("validation_mode", "").lower()
1063         )):
1064             reqs[key]["testable"] = True
1065         else:
1066             reqs[key]["testable"] = False
1067     return reqs
1068
1069
1070 def build_rst_json(reqs):
1071     """Takes requirements and returns list of only Heat requirements"""
1072     for key, values in list(reqs.items()):
1073         if values["testable"]:
1074             # Creates links in RST format to requirements and test cases
1075             if values["test_case"]:
1076                 mod = values["test_case"].split(".")[-1]
1077                 val = TEST_SCRIPT_SITE + mod + ".py"
1078                 rst_value = "`" + mod + " <" + val + ">`_"
1079                 title = (
1080                     "`"
1081                     + values["id"]
1082                     + " <"
1083                     + VNFRQTS_ID_URL
1084                     + values["docname"].replace(" ", "%20")
1085                     + ".html#"
1086                     + values["id"]
1087                     + ">`_"
1088                 )
1089                 reqs[key].update({"full_title": title, "test_case": rst_value})
1090             else:
1091                 title = (
1092                     "`"
1093                     + values["id"]
1094                     + " <"
1095                     + VNFRQTS_ID_URL
1096                     + values["docname"].replace(" ", "%20")
1097                     + ".html#"
1098                     + values["id"]
1099                     + ">`_"
1100                 )
1101                 reqs[key].update(
1102                     {
1103                         "full_title": title,
1104                         "test_case": "No test for requirement",
1105                         "validated_by": "static",
1106                     }
1107                 )
1108         else:
1109             del reqs[key]
1110     return reqs
1111
1112
1113 def generate_rst_table(output_dir, data):
1114     """Generate a formatted csv to be used in RST"""
1115     rst_path = os.path.join(output_dir, "rst.csv")
1116     with open(rst_path, "w", newline="") as f:
1117         out = csv.writer(f)
1118         out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1119         for req_id, metadata in data.items():
1120             out.writerow(
1121                 (
1122                     metadata["full_title"],
1123                     metadata["description"],
1124                     metadata["test_case"],
1125                     metadata["validated_by"],
1126                 )
1127             )
1128
1129
1130 # noinspection PyUnusedLocal
1131 def pytest_report_collectionfinish(config, startdir, items):
1132     """Generates a simple traceability report to output/traceability.csv"""
1133     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1134     output_dir = os.path.split(traceability_path)[0]
1135     if not os.path.exists(output_dir):
1136         os.makedirs(output_dir)
1137     reqs = load_current_requirements()
1138     requirements = select_heat_requirements(reqs)
1139     testable_requirements = is_testable(requirements)
1140     unmapped, mapped = partition(
1141         lambda i: hasattr(i.function, "requirement_ids"), items
1142     )
1143
1144     req_to_test = defaultdict(set)
1145     mapping_errors = set()
1146     for item in mapped:
1147         for req_id in item.function.requirement_ids:
1148             if req_id not in req_to_test:
1149                 req_to_test[req_id].add(item)
1150                 if req_id in requirements:
1151                     reqs[req_id].update(
1152                         {
1153                             "test_case": item.function.__module__,
1154                             "validated_by": item.function.__name__,
1155                         }
1156                     )
1157             if req_id not in requirements:
1158                 mapping_errors.add(
1159                     (req_id, item.function.__module__, item.function.__name__)
1160                 )
1161
1162     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1163     with open(mapping_error_path, "w", newline="") as f:
1164         writer = csv.writer(f)
1165         for err in mapping_errors:
1166             writer.writerow(err)
1167
1168     with open(traceability_path, "w", newline="") as f:
1169         out = csv.writer(f)
1170         out.writerow(
1171             (
1172                 "Requirement ID",
1173                 "Requirement",
1174                 "Section",
1175                 "Keyword",
1176                 "Validation Mode",
1177                 "Is Testable",
1178                 "Test Module",
1179                 "Test Name",
1180             )
1181         )
1182         for req_id, metadata in testable_requirements.items():
1183             if req_to_test[req_id]:
1184                 for item in req_to_test[req_id]:
1185                     out.writerow(
1186                         (
1187                             req_id,
1188                             metadata["description"],
1189                             metadata["section_name"],
1190                             metadata["keyword"],
1191                             metadata["validation_mode"],
1192                             metadata["testable"],
1193                             item.function.__module__,
1194                             item.function.__name__,
1195                         )
1196                     )
1197             else:
1198                 out.writerow(
1199                     (
1200                         req_id,
1201                         metadata["description"],
1202                         metadata["section_name"],
1203                         metadata["keyword"],
1204                         metadata["validation_mode"],
1205                         metadata["testable"],
1206                         "",  # test module
1207                         "",
1208                     )  # test function
1209                 )
1210         # now write out any test methods that weren't mapped to requirements
1211         unmapped_tests = {
1212             (item.function.__module__, item.function.__name__) for item in unmapped
1213         }
1214         for test_module, test_name in unmapped_tests:
1215             out.writerow(
1216                 (
1217                     "",  # req ID
1218                     "",  # description
1219                     "",  # section name
1220                     "",  # keyword
1221                     "static",  # validation mode
1222                     "TRUE",  # testable
1223                     test_module,
1224                     test_name,
1225                 )
1226             )
1227
1228     generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))