[VVP] Removed yamllint library
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import csv
39 import datetime
40 import hashlib
41 import io
42 import json
43 import os
44 import re
45 import time
46 from collections import defaultdict
47
48 import traceback
49
50 import docutils.core
51 import jinja2
52 import pytest
53 from more_itertools import partition
54 import xlsxwriter
55 from six import string_types
56
57 # noinspection PyUnresolvedReferences
58 import version
59 import logging
60
61 logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.ERROR)
62
63 __path__ = [os.path.dirname(os.path.abspath(__file__))]
64
65 DEFAULT_OUTPUT_DIR = "{}/../output".format(__path__[0])
66
67 RESOLUTION_STEPS_FILE = "resolution_steps.json"
68 HEAT_REQUIREMENTS_FILE = os.path.join(__path__[0], "..", "heat_requirements.json")
69 TEST_SCRIPT_SITE = (
70     "https://github.com/onap/vvp-validation-scripts/blob/master/ice_validator/tests/"
71 )
72 VNFRQTS_ID_URL = (
73     "https://docs.onap.org/en/latest/submodules/vnfrqts/requirements.git/docs/"
74 )
75
76 REPORT_COLUMNS = [
77     ("Input File", "file"),
78     ("Test", "test_file"),
79     ("Requirements", "req_description"),
80     ("Resolution Steps", "resolution_steps"),
81     ("Error Message", "message"),
82     ("Raw Test Output", "raw_output"),
83 ]
84
85 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
86 while preparing to validate the the input files. Some validations may not have been
87 executed. Please refer these issue to the VNF Validation Tool team.
88 """
89
90 COLLECTION_FAILURES = []
91
92 # Captures the results of every test run
93 ALL_RESULTS = []
94
95
96 def get_output_dir(config):
97     """
98     Retrieve the output directory for the reports and create it if necessary
99     :param config: pytest configuration
100     :return: output directory as string
101     """
102     output_dir = config.option.output_dir or DEFAULT_OUTPUT_DIR
103     if not os.path.exists(output_dir):
104         os.makedirs(output_dir, exist_ok=True)
105     return output_dir
106
107
108 def extract_error_msg(rep):
109     """
110     If a custom error message was provided, then extract it otherwise
111     just show the pytest assert message
112     """
113     if rep.outcome != "failed":
114         return ""
115     try:
116         full_msg = str(rep.longrepr.reprcrash.message)
117         match = re.match(
118             "AssertionError:(.*)^assert.*", full_msg, re.MULTILINE | re.DOTALL
119         )
120         if match:  # custom message was provided
121             # Extract everything between AssertionError and the start
122             # of the assert statement expansion in the pytest report
123             msg = match.group(1)
124         else:
125             msg = str(rep.longrepr.reprcrash)
126             if "AssertionError:" in msg:
127                 msg = msg.split("AssertionError:")[1]
128     except AttributeError:
129         msg = str(rep)
130
131     return msg
132
133
134 class TestResult:
135     """
136     Wraps the test case and result to extract necessary metadata for
137     reporting purposes.
138     """
139
140     RESULT_MAPPING = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}
141
142     def __init__(self, item, outcome):
143         self.item = item
144         self.result = outcome.get_result()
145         self.files = [os.path.normpath(p) for p in self._get_files()]
146         self.error_message = self._get_error_message()
147
148     @property
149     def requirement_ids(self):
150         """
151         Returns list of requirement IDs mapped to the test case.
152
153         :return: Returns a list of string requirement IDs the test was
154                  annotated with ``validates`` otherwise returns and empty list
155         """
156         is_mapped = hasattr(self.item.function, "requirement_ids")
157         return self.item.function.requirement_ids if is_mapped else []
158
159     @property
160     def markers(self):
161         """
162         :return: Returns a set of pytest marker names for the test or an empty set
163         """
164         return set(m.name for m in self.item.iter_markers())
165
166     @property
167     def is_base_test(self):
168         """
169         :return: Returns True if the test is annotated with a pytest marker called base
170         """
171         return "base" in self.markers
172
173     @property
174     def is_failed(self):
175         """
176         :return: True if the test failed
177         """
178         return self.outcome == "FAIL"
179
180     @property
181     def outcome(self):
182         """
183         :return: Returns 'PASS', 'FAIL', or 'SKIP'
184         """
185         return self.RESULT_MAPPING[self.result.outcome]
186
187     @property
188     def test_case(self):
189         """
190         :return: Name of the test case method
191         """
192         return self.item.function.__name__
193
194     @property
195     def test_module(self):
196         """
197         :return: Name of the file containing the test case
198         """
199         return self.item.function.__module__.split(".")[-1]
200
201     @property
202     def test_id(self):
203         """
204         :return: ID of the test (test_module + test_case)
205         """
206         return "{}::{}".format(self.test_module, self.test_case)
207
208     @property
209     def raw_output(self):
210         """
211         :return: Full output from pytest for the given test case
212         """
213         return str(self.result.longrepr)
214
215     def requirement_text(self, curr_reqs):
216         """
217         Creates a text summary for the requirement IDs mapped to the test case.
218         If no requirements are mapped, then it returns the empty string.
219
220         :param curr_reqs: mapping of requirement IDs to requirement metadata
221                           loaded from the VNFRQTS projects needs.json output
222         :return: ID and text of the requirements mapped to the test case
223         """
224         text = (
225             "\n\n{}: \n{}".format(r_id, curr_reqs[r_id]["description"])
226             for r_id in self.requirement_ids
227             if r_id in curr_reqs
228         )
229         return "".join(text)
230
231     def requirements_metadata(self, curr_reqs):
232         """
233         Returns a list of dicts containing the following metadata for each
234         requirement mapped:
235
236         - id: Requirement ID
237         - text: Full text of the requirement
238         - keyword: MUST, MUST NOT, MAY, etc.
239
240         :param curr_reqs: mapping of requirement IDs to requirement metadata
241                           loaded from the VNFRQTS projects needs.json output
242         :return: List of requirement metadata
243         """
244         data = []
245         for r_id in self.requirement_ids:
246             if r_id not in curr_reqs:
247                 continue
248             data.append(
249                 {
250                     "id": r_id,
251                     "text": curr_reqs[r_id]["description"],
252                     "keyword": curr_reqs[r_id]["keyword"],
253                 }
254             )
255         return data
256
257     def resolution_steps(self, resolutions):
258         """
259         :param resolutions: Loaded from contents for resolution_steps.json
260         :return: Header and text for the resolution step associated with this
261                  test case.  Returns empty string if no resolutions are
262                  provided.
263         """
264         text = (
265             "\n{}: \n{}".format(entry["header"], entry["resolution_steps"])
266             for entry in resolutions
267             if self._match(entry)
268         )
269         return "".join(text)
270
271     def _match(self, resolution_entry):
272         """
273         Returns True if the test result maps to the given entry in
274         the resolutions file
275         """
276         return (
277             self.test_case == resolution_entry["function"]
278             and self.test_module == resolution_entry["module"]
279         )
280
281     def _get_files(self):
282         """
283         Extracts the list of files passed into the test case.
284         :return: List of absolute paths to files
285         """
286         if "environment_pair" in self.item.fixturenames:
287             return [
288                 "{} environment pair".format(
289                     self.item.funcargs["environment_pair"]["name"]
290                 )
291             ]
292         elif "heat_volume_pair" in self.item.fixturenames:
293             return [
294                 "{} volume pair".format(self.item.funcargs["heat_volume_pair"]["name"])
295             ]
296         elif "heat_templates" in self.item.fixturenames:
297             return self.item.funcargs["heat_templates"]
298         elif "yaml_files" in self.item.fixturenames:
299             return self.item.funcargs["yaml_files"]
300         else:
301             parts = self.result.nodeid.split("[")
302             return [""] if len(parts) == 1 else [parts[1][:-1]]
303
304     def _get_error_message(self):
305         """
306         :return: Error message or empty string if the test did not fail or error
307         """
308         if self.is_failed:
309             return extract_error_msg(self.result)
310         else:
311             return ""
312
313
314 # noinspection PyUnusedLocal
315 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
316 def pytest_runtest_makereport(item, call):
317     """
318     Captures the test results for later reporting.  This will also halt testing
319     if a base failure is encountered (can be overridden with continue-on-failure)
320     """
321     outcome = yield
322     if outcome.get_result().when != "call":
323         return  # only capture results of test cases themselves
324     result = TestResult(item, outcome)
325     if (
326         not item.config.option.continue_on_failure
327         and result.is_base_test
328         and result.is_failed
329     ):
330         msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
331             result.error_message
332         )
333         result.error_message = msg
334         ALL_RESULTS.append(result)
335         pytest.exit("{}\n{}\n{}".format(msg, result.files, result.test_case))
336
337     ALL_RESULTS.append(result)
338
339
340 def make_timestamp():
341     """
342     :return: String make_iso_timestamp in format:
343              2019-01-19 10:18:49.865000 Central Standard Time
344     """
345     timezone = time.tzname[time.localtime().tm_isdst]
346     return "{} {}".format(str(datetime.datetime.now()), timezone)
347
348
349 # noinspection PyUnusedLocal
350 def pytest_sessionstart(session):
351     ALL_RESULTS.clear()
352     COLLECTION_FAILURES.clear()
353
354
355 # noinspection PyUnusedLocal
356 def pytest_sessionfinish(session, exitstatus):
357     """
358     If not a self-test run, generate the output reports
359     """
360     if not session.config.option.template_dir:
361         return
362
363     if session.config.option.template_source:
364         template_source = session.config.option.template_source[0]
365     else:
366         template_source = os.path.abspath(session.config.option.template_dir[0])
367
368     categories_selected = session.config.option.test_categories or ""
369     generate_report(
370         get_output_dir(session.config),
371         template_source,
372         categories_selected,
373         session.config.option.report_format,
374     )
375
376
377 # noinspection PyUnusedLocal
378 def pytest_collection_modifyitems(session, config, items):
379     """
380     Selects tests based on the categories requested.  Tests without
381     categories will always be executed.
382     """
383     config.traceability_items = list(items)  # save all items for traceability
384     if not config.option.self_test:
385         for item in items:
386             # checking if test belongs to a category
387             if hasattr(item.function, "categories"):
388                 if config.option.test_categories:
389                     test_categories = getattr(item.function, "categories")
390                     passed_categories = config.option.test_categories
391                     if not all(
392                         category in passed_categories for category in test_categories
393                     ):
394                         item.add_marker(
395                             pytest.mark.skip(
396                                 reason=(
397                                     "Test categories do not match "
398                                     "all the passed categories"
399                                 )
400                             )
401                         )
402                 else:
403                     item.add_marker(
404                         pytest.mark.skip(
405                             reason=(
406                                 "Test belongs to a category but "
407                                 "no categories were passed"
408                             )
409                         )
410                     )
411
412     items.sort(
413         key=lambda x: (0, x.name)
414         if "base" in set(m.name for m in x.iter_markers())
415         else (1, x.name)
416     )
417
418
419 def make_href(paths):
420     """
421     Create an anchor tag to link to the file paths provided.
422     :param paths: string or list of file paths
423     :return: String of hrefs - one for each path, each seperated by a line
424              break (<br/).
425     """
426     paths = [paths] if isinstance(paths, string_types) else paths
427     links = []
428     for p in paths:
429         abs_path = os.path.abspath(p)
430         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
431         links.append(
432             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
433                 abs_path=abs_path, name=name
434             )
435         )
436     return "<br/>".join(links)
437
438
439 def load_resolutions_file():
440     """
441     :return: dict of data loaded from resolutions_steps.json
442     """
443     resolution_steps = "{}/../{}".format(__path__[0], RESOLUTION_STEPS_FILE)
444     if os.path.exists(resolution_steps):
445         with open(resolution_steps, "r") as f:
446             return json.loads(f.read())
447
448
449 def generate_report(outpath, template_path, categories, output_format="html"):
450     """
451     Generates the various output reports.
452
453     :param outpath: destination directory for all reports
454     :param template_path: directory containing the Heat templates validated
455     :param categories: Optional categories selected
456     :param output_format: One of "html", "excel", or "csv". Default is "html"
457     :raises: ValueError if requested output format is unknown
458     """
459     failures = [r for r in ALL_RESULTS if r.is_failed]
460     generate_failure_file(outpath)
461     output_format = output_format.lower().strip() if output_format else "html"
462     generate_json(outpath, template_path, categories)
463     if output_format == "html":
464         generate_html_report(outpath, categories, template_path, failures)
465     elif output_format == "excel":
466         generate_excel_report(outpath, categories, template_path, failures)
467     elif output_format == "json":
468         return
469     elif output_format == "csv":
470         generate_csv_report(outpath, categories, template_path, failures)
471     else:
472         raise ValueError("Unsupported output format: " + output_format)
473
474
475 def write_json(data, path):
476     """
477     Pretty print data as JSON to the output path requested
478
479     :param data: Data structure to be converted to JSON
480     :param path: Where to write output
481     """
482     with open(path, "w") as f:
483         json.dump(data, f, indent=2)
484
485
486 def generate_failure_file(outpath):
487     """
488     Writes a summary of test failures to a file named failures.
489     This is for backwards compatibility only.  The report.json offers a
490     more comprehensive output.
491     """
492     failure_path = os.path.join(outpath, "failures")
493     failures = [r for r in ALL_RESULTS if r.is_failed]
494     data = {}
495     for i, fail in enumerate(failures):
496         data[str(i)] = {
497             "file": fail.files[0] if len(fail.files) == 1 else fail.files,
498             "vnfrqts": fail.requirement_ids,
499             "test": fail.test_case,
500             "test_file": fail.test_module,
501             "raw_output": fail.raw_output,
502             "message": fail.error_message,
503         }
504     write_json(data, failure_path)
505
506
507 def generate_csv_report(output_dir, categories, template_path, failures):
508     rows = [["Validation Failures"]]
509     headers = [
510         ("Categories Selected:", categories),
511         ("Tool Version:", version.VERSION),
512         ("Report Generated At:", make_timestamp()),
513         ("Directory Validated:", template_path),
514         ("Checksum:", hash_directory(template_path)),
515         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
516     ]
517     rows.append([])
518     for header in headers:
519         rows.append(header)
520     rows.append([])
521
522     if COLLECTION_FAILURES:
523         rows.append([COLLECTION_FAILURE_WARNING])
524         rows.append(["Validation File", "Test", "Fixtures", "Error"])
525         for failure in COLLECTION_FAILURES:
526             rows.append(
527                 [
528                     failure["module"],
529                     failure["test"],
530                     ";".join(failure["fixtures"]),
531                     failure["error"],
532                 ]
533             )
534         rows.append([])
535
536     # table header
537     rows.append([col for col, _ in REPORT_COLUMNS])
538
539     reqs = load_current_requirements()
540     resolutions = load_resolutions_file()
541
542     # table content
543     for failure in failures:
544         rows.append(
545             [
546                 "\n".join(failure.files),
547                 failure.test_id,
548                 failure.requirement_text(reqs),
549                 failure.resolution_steps(resolutions),
550                 failure.error_message,
551                 failure.raw_output,
552             ]
553         )
554
555     output_path = os.path.join(output_dir, "report.csv")
556     with open(output_path, "w", newline="") as f:
557         writer = csv.writer(f)
558         for row in rows:
559             writer.writerow(row)
560
561
562 def generate_excel_report(output_dir, categories, template_path, failures):
563     output_path = os.path.join(output_dir, "report.xlsx")
564     workbook = xlsxwriter.Workbook(output_path)
565     bold = workbook.add_format({"bold": True})
566     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
567     normal = workbook.add_format({"text_wrap": True})
568     heading = workbook.add_format({"bold": True, "font_size": 18})
569     worksheet = workbook.add_worksheet("failures")
570     worksheet.write(0, 0, "Validation Failures", heading)
571
572     headers = [
573         ("Categories Selected:", ",".join(categories)),
574         ("Tool Version:", version.VERSION),
575         ("Report Generated At:", make_timestamp()),
576         ("Directory Validated:", template_path),
577         ("Checksum:", hash_directory(template_path)),
578         ("Total Errors:", len(failures) + len(COLLECTION_FAILURES)),
579     ]
580     for row, (header, value) in enumerate(headers, start=2):
581         worksheet.write(row, 0, header, bold)
582         worksheet.write(row, 1, value)
583
584     worksheet.set_column(0, len(headers) - 1, 40)
585     worksheet.set_column(len(headers), len(headers), 80)
586
587     if COLLECTION_FAILURES:
588         collection_failures_start = 2 + len(headers) + 2
589         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
590         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
591         for col_num, col_name in enumerate(collection_failure_headers):
592             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
593         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
594             worksheet.write(row, 0, data["module"])
595             worksheet.write(row, 1, data["test"])
596             worksheet.write(row, 2, ",".join(data["fixtures"]))
597             worksheet.write(row, 3, data["error"], code)
598
599     # table header
600     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
601     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
602     for col_num, (col_name, _) in enumerate(REPORT_COLUMNS):
603         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
604
605     reqs = load_current_requirements()
606     resolutions = load_resolutions_file()
607
608     # table content
609     for row, failure in enumerate(failures, start=start_error_table_row + 2):
610         worksheet.write(row, 0, "\n".join(failure.files), normal)
611         worksheet.write(row, 1, failure.test_id, normal)
612         worksheet.write(row, 2, failure.requirement_text(reqs), normal)
613         worksheet.write(row, 3, failure.resolution_steps(resolutions), normal)
614         worksheet.write(row, 4, failure.error_message, normal)
615         worksheet.write(row, 5, failure.raw_output, code)
616
617     workbook.close()
618
619
620 def make_iso_timestamp():
621     """
622     Creates a timestamp in ISO 8601 format in UTC format.  Used for JSON output.
623     """
624     now = datetime.datetime.utcnow()
625     now.replace(tzinfo=datetime.timezone.utc)
626     return now.isoformat()
627
628
629 def aggregate_results(outcomes, r_id=None):
630     """
631     Determines the aggregate result for the conditions provided.  Assumes the
632     results have been filtered and collected for analysis.
633
634     :param outcomes: set of outcomes from the TestResults
635     :param r_id: Optional requirement ID if known
636     :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
637              (see aggregate_requirement_adherence for more detail)
638     """
639     if not outcomes:
640         return "PASS"
641     elif "ERROR" in outcomes:
642         return "ERROR"
643     elif "FAIL" in outcomes:
644         return "FAIL"
645     elif "PASS" in outcomes:
646         return "PASS"
647     elif {"SKIP"} == outcomes:
648         return "SKIP"
649     else:
650         pytest.warns(
651             "Unexpected error aggregating outcomes ({}) for requirement {}".format(
652                 outcomes, r_id
653             )
654         )
655         return "ERROR"
656
657
658 def aggregate_run_results(collection_failures, test_results):
659     """
660     Determines overall status of run based on all failures and results.
661
662     * 'ERROR' - At least one collection failure occurred during the run.
663     * 'FAIL' - Template failed at least one test
664     * 'PASS' - All tests executed properly and no failures were detected
665
666     :param collection_failures: failures occuring during test setup
667     :param test_results: list of all test executuion results
668     :return: one of 'ERROR', 'FAIL', or 'PASS'
669     """
670     if collection_failures:
671         return "ERROR"
672     elif any(r.is_failed for r in test_results):
673         return "FAIL"
674     else:
675         return "PASS"
676
677
678 def relative_paths(base_dir, paths):
679     return [os.path.relpath(p, base_dir) for p in paths]
680
681
682 # noinspection PyTypeChecker
683 def generate_json(outpath, template_path, categories):
684     """
685     Creates a JSON summary of the entire test run.
686     """
687     reqs = load_current_requirements()
688     data = {
689         "version": "dublin",
690         "template_directory": os.path.splitdrive(template_path)[1].replace(
691             os.path.sep, "/"
692         ),
693         "timestamp": make_iso_timestamp(),
694         "checksum": hash_directory(template_path),
695         "categories": categories,
696         "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
697         "tests": [],
698         "requirements": [],
699     }
700
701     results = data["tests"]
702     for result in COLLECTION_FAILURES:
703         results.append(
704             {
705                 "files": [],
706                 "test_module": result["module"],
707                 "test_case": result["test"],
708                 "result": "ERROR",
709                 "error": result["error"],
710                 "requirements": result["requirements"],
711             }
712         )
713     for result in ALL_RESULTS:
714         results.append(
715             {
716                 "files": relative_paths(template_path, result.files),
717                 "test_module": result.test_module,
718                 "test_case": result.test_case,
719                 "result": result.outcome,
720                 "error": result.error_message if result.is_failed else "",
721                 "requirements": result.requirements_metadata(reqs),
722             }
723         )
724
725     # Build a mapping of requirement ID to the results
726     r_id_results = defaultdict(lambda: {"errors": set(), "outcomes": set()})
727     for test_result in results:
728         test_reqs = test_result["requirements"]
729         r_ids = (
730             [r["id"] if isinstance(r, dict) else r for r in test_reqs]
731             if test_reqs
732             else ("",)
733         )
734         for r_id in r_ids:
735             item = r_id_results[r_id]
736             item["outcomes"].add(test_result["result"])
737             if test_result["error"]:
738                 item["errors"].add(test_result["error"])
739
740     requirements = data["requirements"]
741     for r_id, r_data in reqs.items():
742         requirements.append(
743             {
744                 "id": r_id,
745                 "text": r_data["description"],
746                 "keyword": r_data["keyword"],
747                 "result": aggregate_results(r_id_results[r_id]["outcomes"]),
748                 "errors": list(r_id_results[r_id]["errors"]),
749             }
750         )
751
752     if r_id_results[""]["errors"] or r_id_results[""]["outcomes"]:
753         requirements.append(
754             {
755                 "id": "Unmapped",
756                 "text": "Tests not mapped to requirements (see tests)",
757                 "result": aggregate_results(r_id_results[""]["outcomes"]),
758                 "errors": list(r_id_results[""]["errors"]),
759             }
760         )
761
762     report_path = os.path.join(outpath, "report.json")
763     write_json(data, report_path)
764
765
766 def generate_html_report(outpath, categories, template_path, failures):
767     reqs = load_current_requirements()
768     resolutions = load_resolutions_file()
769     fail_data = []
770     for failure in failures:
771         fail_data.append(
772             {
773                 "file_links": make_href(failure.files),
774                 "test_id": failure.test_id,
775                 "error_message": failure.error_message,
776                 "raw_output": failure.raw_output,
777                 "requirements": docutils.core.publish_parts(
778                     writer_name="html", source=failure.requirement_text(reqs)
779                 )["body"],
780                 "resolution_steps": failure.resolution_steps(resolutions),
781             }
782         )
783     pkg_dir = os.path.split(__file__)[0]
784     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
785     with open(j2_template_path, "r") as f:
786         report_template = jinja2.Template(f.read())
787         contents = report_template.render(
788             version=version.VERSION,
789             num_failures=len(failures) + len(COLLECTION_FAILURES),
790             categories=categories,
791             template_dir=make_href(template_path),
792             checksum=hash_directory(template_path),
793             timestamp=make_timestamp(),
794             failures=fail_data,
795             collection_failures=COLLECTION_FAILURES,
796         )
797     with open(os.path.join(outpath, "report.html"), "w") as f:
798         f.write(contents)
799
800
801 def pytest_addoption(parser):
802     """
803     Add needed CLI arguments
804     """
805     parser.addoption(
806         "--template-directory",
807         dest="template_dir",
808         action="append",
809         help="Directory which holds the templates for validation",
810     )
811
812     parser.addoption(
813         "--template-source",
814         dest="template_source",
815         action="append",
816         help="Source Directory which holds the templates for validation",
817     )
818
819     parser.addoption(
820         "--self-test",
821         dest="self_test",
822         action="store_true",
823         help="Test the unit tests against their fixtured data",
824     )
825
826     parser.addoption(
827         "--report-format",
828         dest="report_format",
829         action="store",
830         help="Format of output report (html, csv, excel, json)",
831     )
832
833     parser.addoption(
834         "--continue-on-failure",
835         dest="continue_on_failure",
836         action="store_true",
837         help="Continue validation even when structural errors exist in input files",
838     )
839
840     parser.addoption(
841         "--output-directory",
842         dest="output_dir",
843         action="store",
844         default=None,
845         help="Alternate ",
846     )
847
848     parser.addoption(
849         "--category",
850         dest="test_categories",
851         action="append",
852         help="optional category of test to execute",
853     )
854
855
856 def pytest_configure(config):
857     """
858     Ensure that we are receive either `--self-test` or
859     `--template-dir=<directory` as CLI arguments
860     """
861     if config.getoption("template_dir") and config.getoption("self_test"):
862         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
863     if not (
864         config.getoption("template_dir")
865         or config.getoption("self_test")
866         or config.getoption("help")
867     ):
868         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
869
870
871 def pytest_generate_tests(metafunc):
872     """
873     If a unit test requires an argument named 'filename'
874     we generate a test for the filenames selected. Either
875     the files contained in `template_dir` or if `template_dir`
876     is not specified on the CLI, the fixtures associated with this
877     test name.
878     """
879
880     # noinspection PyBroadException
881     try:
882         if "filename" in metafunc.fixturenames:
883             from .parametrizers import parametrize_filename
884
885             parametrize_filename(metafunc)
886
887         if "filenames" in metafunc.fixturenames:
888             from .parametrizers import parametrize_filenames
889
890             parametrize_filenames(metafunc)
891
892         if "template_dir" in metafunc.fixturenames:
893             from .parametrizers import parametrize_template_dir
894
895             parametrize_template_dir(metafunc)
896
897         if "environment_pair" in metafunc.fixturenames:
898             from .parametrizers import parametrize_environment_pair
899
900             parametrize_environment_pair(metafunc)
901
902         if "heat_volume_pair" in metafunc.fixturenames:
903             from .parametrizers import parametrize_heat_volume_pair
904
905             parametrize_heat_volume_pair(metafunc)
906
907         if "yaml_files" in metafunc.fixturenames:
908             from .parametrizers import parametrize_yaml_files
909
910             parametrize_yaml_files(metafunc)
911
912         if "env_files" in metafunc.fixturenames:
913             from .parametrizers import parametrize_environment_files
914
915             parametrize_environment_files(metafunc)
916
917         if "yaml_file" in metafunc.fixturenames:
918             from .parametrizers import parametrize_yaml_file
919
920             parametrize_yaml_file(metafunc)
921
922         if "env_file" in metafunc.fixturenames:
923             from .parametrizers import parametrize_environment_file
924
925             parametrize_environment_file(metafunc)
926
927         if "parsed_yaml_file" in metafunc.fixturenames:
928             from .parametrizers import parametrize_parsed_yaml_file
929
930             parametrize_parsed_yaml_file(metafunc)
931
932         if "parsed_environment_file" in metafunc.fixturenames:
933             from .parametrizers import parametrize_parsed_environment_file
934
935             parametrize_parsed_environment_file(metafunc)
936
937         if "heat_template" in metafunc.fixturenames:
938             from .parametrizers import parametrize_heat_template
939
940             parametrize_heat_template(metafunc)
941
942         if "heat_templates" in metafunc.fixturenames:
943             from .parametrizers import parametrize_heat_templates
944
945             parametrize_heat_templates(metafunc)
946
947         if "volume_template" in metafunc.fixturenames:
948             from .parametrizers import parametrize_volume_template
949
950             parametrize_volume_template(metafunc)
951
952         if "volume_templates" in metafunc.fixturenames:
953             from .parametrizers import parametrize_volume_templates
954
955             parametrize_volume_templates(metafunc)
956
957         if "template" in metafunc.fixturenames:
958             from .parametrizers import parametrize_template
959
960             parametrize_template(metafunc)
961
962         if "templates" in metafunc.fixturenames:
963             from .parametrizers import parametrize_templates
964
965             parametrize_templates(metafunc)
966     except Exception as e:
967         # If an error occurs in the collection phase, then it won't be logged as a
968         # normal test failure.  This means that failures could occur, but not
969         # be seen on the report resulting in a false positive success message.  These
970         # errors will be stored and reported separately on the report
971         COLLECTION_FAILURES.append(
972             {
973                 "module": metafunc.module.__name__,
974                 "test": metafunc.function.__name__,
975                 "fixtures": metafunc.fixturenames,
976                 "error": traceback.format_exc(),
977                 "requirements": getattr(metafunc.function, "requirement_ids", []),
978             }
979         )
980         raise e
981
982
983 def hash_directory(path):
984     """
985     Create md5 hash using the contents of all files under ``path``
986     :param path: string directory containing files
987     :return: string MD5 hash code (hex)
988     """
989     md5 = hashlib.md5()
990     for dir_path, sub_dirs, filenames in os.walk(path):
991         for filename in filenames:
992             file_path = os.path.join(dir_path, filename)
993             with open(file_path, "rb") as f:
994                 md5.update(f.read())
995     return md5.hexdigest()
996
997
998 def load_current_requirements():
999     """Loads dict of current requirements or empty dict if file doesn't exist"""
1000     with io.open(HEAT_REQUIREMENTS_FILE, encoding="utf8", mode="r") as f:
1001         data = json.load(f)
1002         version = data["current_version"]
1003         return data["versions"][version]["needs"]
1004
1005
1006 def select_heat_requirements(reqs):
1007     """Filters dict requirements to only those requirements pertaining to Heat"""
1008     return {k: v for k, v in reqs.items() if "heat" in v["docname"].lower()}
1009
1010
1011 def is_testable(reqs):
1012     """Filters dict requirements to only those which are testable"""
1013     for key, values in reqs.items():
1014         if ("MUST" in values.get("keyword", "").upper()) and (
1015             "none" not in values.get("validation_mode", "").lower()
1016         ):
1017             reqs[key]["testable"] = True
1018         else:
1019             reqs[key]["testable"] = False
1020     return reqs
1021
1022
1023 def build_rst_json(reqs):
1024     """Takes requirements and returns list of only Heat requirements"""
1025     for key, values in list(reqs.items()):
1026         if values["testable"]:
1027             # Creates links in RST format to requirements and test cases
1028             if values["test_case"]:
1029                 mod = values["test_case"].split(".")[-1]
1030                 val = TEST_SCRIPT_SITE + mod + ".py"
1031                 rst_value = "`" + mod + " <" + val + ">`_"
1032                 title = (
1033                     "`"
1034                     + values["id"]
1035                     + " <"
1036                     + VNFRQTS_ID_URL
1037                     + values["docname"].replace(" ", "%20")
1038                     + ".html#"
1039                     + values["id"]
1040                     + ">`_"
1041                 )
1042                 reqs[key].update({"full_title": title, "test_case": rst_value})
1043             else:
1044                 title = (
1045                     "`"
1046                     + values["id"]
1047                     + " <"
1048                     + VNFRQTS_ID_URL
1049                     + values["docname"].replace(" ", "%20")
1050                     + ".html#"
1051                     + values["id"]
1052                     + ">`_"
1053                 )
1054                 reqs[key].update(
1055                     {
1056                         "full_title": title,
1057                         "test_case": "No test for requirement",
1058                         "validated_by": "static",
1059                     }
1060                 )
1061         else:
1062             del reqs[key]
1063     return reqs
1064
1065
1066 def generate_rst_table(output_dir, data):
1067     """Generate a formatted csv to be used in RST"""
1068     rst_path = os.path.join(output_dir, "rst.csv")
1069     with open(rst_path, "w", newline="") as f:
1070         out = csv.writer(f)
1071         out.writerow(("Requirement ID", "Requirement", "Test Module", "Test Name"))
1072         for req_id, metadata in data.items():
1073             out.writerow(
1074                 (
1075                     metadata["full_title"],
1076                     metadata["description"],
1077                     metadata["test_case"],
1078                     metadata["validated_by"],
1079                 )
1080             )
1081
1082
1083 # noinspection PyUnusedLocal
1084 def pytest_report_collectionfinish(config, startdir, items):
1085     """Generates a simple traceability report to output/traceability.csv"""
1086     traceability_path = os.path.join(get_output_dir(config), "traceability.csv")
1087     output_dir = os.path.split(traceability_path)[0]
1088     if not os.path.exists(output_dir):
1089         os.makedirs(output_dir)
1090     reqs = load_current_requirements()
1091     requirements = select_heat_requirements(reqs)
1092     testable_requirements = is_testable(requirements)
1093     unmapped, mapped = partition(
1094         lambda i: hasattr(i.function, "requirement_ids"), items
1095     )
1096
1097     req_to_test = defaultdict(set)
1098     mapping_errors = set()
1099     for item in mapped:
1100         for req_id in item.function.requirement_ids:
1101             if req_id not in req_to_test:
1102                 req_to_test[req_id].add(item)
1103                 if req_id in requirements:
1104                     reqs[req_id].update(
1105                         {
1106                             "test_case": item.function.__module__,
1107                             "validated_by": item.function.__name__,
1108                         }
1109                     )
1110             if req_id not in requirements:
1111                 mapping_errors.add(
1112                     (req_id, item.function.__module__, item.function.__name__)
1113                 )
1114
1115     mapping_error_path = os.path.join(get_output_dir(config), "mapping_errors.csv")
1116     with open(mapping_error_path, "w", newline="") as f:
1117         writer = csv.writer(f)
1118         for err in mapping_errors:
1119             writer.writerow(err)
1120
1121     with open(traceability_path, "w", newline="") as f:
1122         out = csv.writer(f)
1123         out.writerow(
1124             (
1125                 "Requirement ID",
1126                 "Requirement",
1127                 "Section",
1128                 "Keyword",
1129                 "Validation Mode",
1130                 "Is Testable",
1131                 "Test Module",
1132                 "Test Name",
1133             )
1134         )
1135         for req_id, metadata in testable_requirements.items():
1136             if req_to_test[req_id]:
1137                 for item in req_to_test[req_id]:
1138                     out.writerow(
1139                         (
1140                             req_id,
1141                             metadata["description"],
1142                             metadata["section_name"],
1143                             metadata["keyword"],
1144                             metadata["validation_mode"],
1145                             metadata["testable"],
1146                             item.function.__module__,
1147                             item.function.__name__,
1148                         )
1149                     )
1150             else:
1151                 out.writerow(
1152                     (
1153                         req_id,
1154                         metadata["description"],
1155                         metadata["section_name"],
1156                         metadata["keyword"],
1157                         metadata["validation_mode"],
1158                         metadata["testable"],
1159                         "",  # test module
1160                         "",
1161                     )  # test function
1162                 )
1163         # now write out any test methods that weren't mapped to requirements
1164         unmapped_tests = {
1165             (item.function.__module__, item.function.__name__) for item in unmapped
1166         }
1167         for test_module, test_name in unmapped_tests:
1168             out.writerow(
1169                 (
1170                     "",  # req ID
1171                     "",  # description
1172                     "",  # section name
1173                     "",  # keyword
1174                     "static",  # validation mode
1175                     "TRUE",  # testable
1176                     test_module,
1177                     test_name,
1178                 )
1179             )
1180
1181     generate_rst_table(get_output_dir(config), build_rst_json(testable_requirements))