[VVP] validation scripts warning function
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2018 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import collections
39 import csv
40 import datetime
41 import hashlib
42 import io
43 import json
44 import os
45 import sys
46 import time
47 import requests
48 import traceback
49 import warnings
50
51 import docutils.core
52 import jinja2
53 import pytest
54 from more_itertools import partition
55 from six import string_types
56 import xlsxwriter
57
58 __path__ = [os.path.dirname(os.path.abspath(__file__))]
59
60 resolution_steps_file = "resolution_steps.json"
61 heat_requirements_file = "heat_requirements.json"
62
63 report_columns = [
64     ("Input File", "file"),
65     ("Test", "test_file"),
66     ("Requirements", "req_description"),
67     ("Resolution Steps", "resolution_steps"),
68     ("Error Message", "message"),
69     ("Raw Test Output", "raw_output"),
70 ]
71 report = collections.OrderedDict(report_columns)
72
73 COLLECTION_FAILURES = []
74 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
75 while preparing to validate the the input files. Some validations may not have been
76 executed. Please refer these issue to the VNF Validation Tool team.
77 """
78
79
80 def extract_error_msg(rep):
81     try:
82         msg = str(rep.longrepr.reprcrash)
83     except AttributeError:
84         msg = str(rep)
85
86     if "AssertionError:" in msg:
87         return msg.split("AssertionError:")[1]
88     else:
89         return msg
90
91
92 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
93 def pytest_runtest_makereport(item, call):
94
95     outcome = yield
96     rep = outcome.get_result()
97
98     output_dir = "{}/../output".format(__path__[0])
99     if rep.outcome == "failed":
100         if not os.path.exists(output_dir):
101             os.mkdir(output_dir)
102         if os.path.exists("{}/failures".format(output_dir)):
103             with open("{}/failures".format(output_dir), "r") as o:
104                 jdata = json.loads(o.read())
105         else:
106             jdata = {}
107
108         if hasattr(item.function, "requirement_ids"):
109             requirement_ids = item.function.requirement_ids
110         else:
111             requirement_ids = ""
112
113         if "environment_pair" in item.fixturenames:
114             resolved_pair = "{} environment pair".format(
115                 item.funcargs["environment_pair"]["name"]
116             )
117         elif "heat_volume_pair" in item.fixturenames:
118             resolved_pair = "{} volume pair".format(
119                 item.funcargs["heat_volume_pair"]["name"]
120             )
121         elif "heat_templates" in item.fixturenames:
122             resolved_pair = item.funcargs["heat_templates"]
123         elif "yaml_files" in item.fixturenames:
124             resolved_pair = item.funcargs["yaml_files"]
125         else:
126             resolved_pair = rep.nodeid.split("[")[1][:-1]
127
128         markers = set(m.name for m in item.iter_markers())
129         base_test = "base" in markers
130
131         msg = extract_error_msg(rep)
132         if base_test:
133             msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
134                 msg
135             )
136
137         jdata[len(jdata)] = {
138             "file": resolved_pair,
139             "vnfrqts": requirement_ids,
140             "test": item.function.__name__,
141             "test_file": item.function.__module__.split(".")[-1],
142             "raw_output": str(rep.longrepr),
143             "message": msg,
144         }
145
146         with open("{}/failures".format(output_dir), "w") as f:
147             json.dump(jdata, f, indent=4)
148
149         if not item.config.option.continue_on_failure and base_test:
150             pytest.exit(
151                 "{}\n{}\n{}".format(msg, resolved_pair, item.function.__name__)
152             )
153
154
155 def make_timestamp():
156     timezone = time.tzname[time.localtime().tm_isdst]
157     return "{} {}".format(str(datetime.datetime.now()), timezone)
158
159
160 def pytest_sessionfinish(session, exitstatus):
161     if not session.config.option.template_dir:
162         return
163     template_path = os.path.abspath(session.config.option.template_dir[0])
164     profile_name = session.config.option.validation_profile_name
165     generate_report(
166         "{}/../output".format(__path__[0]),
167         template_path,
168         profile_name,
169         session.config.option.report_format,
170     )
171
172
173 def pytest_collection_modifyitems(session, config, items):
174     allowed_marks = ["xfail", "base"]
175     profile = config.option.validation_profile
176
177     for item in items:
178         markers = set(m.name for m in item.iter_markers())
179         if not profile and markers and set(markers).isdisjoint(allowed_marks):
180             item.add_marker(
181                 pytest.mark.skip(
182                     reason="No validation profile selected. Skipping tests with marks."
183                 )
184             )
185         if (
186             profile
187             and markers
188             and profile not in markers
189             and set(markers).isdisjoint(allowed_marks)
190         ):
191             item.add_marker(
192                 pytest.mark.skip(reason="Doesn't match selection validation profile")
193             )
194
195     items.sort(
196         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
197     )
198
199
200 def make_href(path):
201     paths = [path] if isinstance(path, string_types) else path
202     links = []
203     for p in paths:
204         abs_path = os.path.abspath(p)
205         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
206         links.append(
207             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
208                 abs_path=abs_path, name=name
209             )
210         )
211     return "<br/>".join(links)
212
213
214 def generate_report(outpath, template_path, profile_name, output_format):
215     failures = "{}/failures".format(outpath)
216     faildata = None
217     rdata = None
218     hdata = None
219
220     if os.path.exists(failures):
221         with open(failures, "r") as f:
222             faildata = json.loads(f.read())
223     else:
224         faildata = {}
225
226     resolution_steps = "{}/../{}".format(__path__[0], resolution_steps_file)
227     if os.path.exists(resolution_steps):
228         with open(resolution_steps, "r") as f:
229             rdata = json.loads(f.read())
230
231     heat_requirements = "{}/../{}".format(__path__[0], heat_requirements_file)
232     if os.path.exists(heat_requirements):
233         with open(heat_requirements, "r") as f:
234             hdata = json.loads(f.read())
235
236     # point requirements at the most recent version
237     current_version = hdata["current_version"]
238     hdata = hdata["versions"][current_version]["needs"]
239     # mapping requirement IDs from failures to requirement descriptions
240     for k, v in faildata.items():
241         req_text = ""
242         if v["vnfrqts"] != "":
243             for req in v["vnfrqts"]:
244                 if req in hdata:
245                     req_text += "\n\n{}: \n{}".format(req, hdata[req]["description"])
246         faildata[k]["req_description"] = req_text
247
248     # mapping resolution steps to module and test name
249     for k, v in faildata.items():
250         # resolution_step = ""
251         faildata[k]["resolution_steps"] = ""
252         for rs in rdata:
253             if v["test_file"] == rs["module"] and v["test"] == rs["function"]:
254                 faildata[k]["resolution_steps"] = "\n{}: \n{}".format(
255                     rs["header"], rs["resolution_steps"]
256                 )
257     output_format = output_format.lower().strip() if output_format else "html"
258     if output_format == "html":
259         generate_html_report(outpath, profile_name, template_path, faildata)
260     elif output_format == "excel":
261         generate_excel_report(outpath, profile_name, template_path, faildata)
262     elif output_format == "csv":
263         generate_csv_report(outpath, profile_name, template_path, faildata)
264     else:
265         raise ValueError("Unsupported output format: " + output_format)
266
267
268 def generate_csv_report(output_dir, profile_name, template_path, faildata):
269     rows = [["Validation Failures"]]
270     headers = [
271         ("Profile Selected:", profile_name),
272         ("Report Generated At:", make_timestamp()),
273         ("Directory Validated:", template_path),
274         ("Checksum:", hash_directory(template_path)),
275         ("Total Errors:", len(faildata) + len(COLLECTION_FAILURES)),
276     ]
277     rows.append([])
278     for header in headers:
279         rows.append(header)
280     rows.append([])
281
282     if COLLECTION_FAILURES:
283         rows.append([COLLECTION_FAILURE_WARNING])
284         rows.append(["Validation File", "Test", "Fixtures", "Error"])
285         for failure in COLLECTION_FAILURES:
286             rows.append(
287                 [
288                     failure["module"],
289                     failure["test"],
290                     ";".join(failure["fixtures"]),
291                     failure["error"],
292                 ]
293             )
294         rows.append([])
295
296     # table header
297     rows.append([col for col, _ in report_columns])
298
299     # table content
300     for data in faildata.values():
301         rows.append(
302             [
303                 data.get("file", ""),
304                 data.get("test_file", ""),
305                 data.get("req_description", ""),
306                 data.get("resolution_steps", ""),
307                 data.get("message", ""),
308                 data.get("raw_output", ""),
309             ]
310         )
311
312     output_path = os.path.join(output_dir, "report.csv")
313     with open(output_path, "w", newline="") as f:
314         writer = csv.writer(f)
315         for row in rows:
316             writer.writerow(row)
317
318
319 def generate_excel_report(output_dir, profile_name, template_path, faildata):
320     output_path = os.path.join(output_dir, "report.xlsx")
321     workbook = xlsxwriter.Workbook(output_path)
322     bold = workbook.add_format({"bold": True})
323     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
324     normal = workbook.add_format({"text_wrap": True})
325     heading = workbook.add_format({"bold": True, "font_size": 18})
326     worksheet = workbook.add_worksheet("failures")
327     worksheet.write(0, 0, "Validation Failures", heading)
328
329     headers = [
330         ("Profile Selected:", profile_name),
331         ("Report Generated At:", make_timestamp()),
332         ("Directory Validated:", template_path),
333         ("Checksum:", hash_directory(template_path)),
334         ("Total Errors:", len(faildata) + len(COLLECTION_FAILURES)),
335     ]
336     for row, (header, value) in enumerate(headers, start=2):
337         worksheet.write(row, 0, header, bold)
338         worksheet.write(row, 1, value)
339
340     worksheet.set_column(0, len(headers) - 1, 40)
341     worksheet.set_column(len(headers), len(headers), 80)
342
343     if COLLECTION_FAILURES:
344         collection_failures_start = 2 + len(headers) + 2
345         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
346         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
347         for col_num, col_name in enumerate(collection_failure_headers):
348             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
349         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
350             worksheet.write(row, 0, data["module"])
351             worksheet.write(row, 1, data["test"])
352             worksheet.write(row, 2, ",".join(data["fixtures"]))
353             worksheet.write(row, 3, data["error"], code)
354
355     # table header
356     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
357     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
358     for col_num, (col_name, _) in enumerate(report_columns):
359         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
360
361     # table content
362     for row, data in enumerate(faildata.values(), start=start_error_table_row + 2):
363         for col, key in enumerate(report.values()):
364             if key == "file":
365                 paths = (
366                     [data[key]] if isinstance(data[key], string_types) else data[key]
367                 )
368                 contents = "\n".join(paths)
369                 worksheet.write(row, col, contents, normal)
370             elif key == "raw_output":
371                 worksheet.write_string(row, col, data[key], code)
372             else:
373                 worksheet.write(row, col, data[key], normal)
374
375     workbook.close()
376
377
378 def generate_html_report(outpath, profile_name, template_path, faildata):
379     failures = []
380     for data in faildata.values():
381         failures.append(
382             {
383                 "file_links": make_href(data["file"]),
384                 "test_id": data["test_file"],
385                 "error_message": data["message"],
386                 "raw_output": data["raw_output"],
387                 "requirements": docutils.core.publish_parts(
388                     writer_name="html", source=data["req_description"]
389                 )["body"],
390                 "resolution_steps": data["resolution_steps"],
391             }
392         )
393     pkg_dir = os.path.split(__file__)[0]
394     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
395     with open(j2_template_path, "r") as f:
396         report_template = jinja2.Template(f.read())
397         contents = report_template.render(
398             num_failures=len(failures) + len(COLLECTION_FAILURES),
399             profile_name=profile_name,
400             template_dir=make_href(template_path),
401             checksum=hash_directory(template_path),
402             timestamp=make_timestamp(),
403             failures=failures,
404             collection_failures=COLLECTION_FAILURES,
405         )
406     with open(os.path.join(outpath, "report.html"), "w") as f:
407         f.write(contents)
408
409
410 def pytest_addoption(parser):
411     """
412     Add needed CLI arguments
413     """
414     parser.addoption(
415         "--template-directory",
416         dest="template_dir",
417         action="append",
418         help="Directory which holds the templates for validation",
419     )
420
421     parser.addoption(
422         "--self-test",
423         dest="self_test",
424         action="store_true",
425         help="Test the unit tests against their fixtured data",
426     )
427
428     parser.addoption(
429         "--validation-profile",
430         dest="validation_profile",
431         action="store",
432         help="Runs all unmarked tests plus test with a matching marker",
433     )
434
435     parser.addoption(
436         "--validation-profile-name",
437         dest="validation_profile_name",
438         action="store",
439         help="Friendly name of the validation profile used in reports",
440     )
441
442     parser.addoption(
443         "--report-format",
444         dest="report_format",
445         action="store",
446         help="Format of output report (html, csv, excel)",
447     )
448
449     parser.addoption(
450         "--continue-on-failure",
451         dest="continue_on_failure",
452         action="store_true",
453         help="Continue validation even when structural errors exist in input files",
454     )
455
456
457 def pytest_configure(config):
458     """
459     Ensure that we are receive either `--self-test` or
460     `--template-dir=<directory` as CLI arguments
461     """
462     if config.getoption("template_dir") and config.getoption("self_test"):
463         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
464     if not (
465         config.getoption("template_dir")
466         or config.getoption("self_test")
467         or config.getoption("help")
468     ):
469         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
470
471
472 def pytest_generate_tests(metafunc):
473     """
474     If a unit test requires an argument named 'filename'
475     we generate a test for the filenames selected. Either
476     the files contained in `template_dir` or if `template_dir`
477     is not specified on the CLI, the fixtures associated with this
478     test name.
479     """
480
481     # noinspection PyBroadException
482     try:
483         if "filename" in metafunc.fixturenames:
484             from .parametrizers import parametrize_filename
485
486             parametrize_filename(metafunc)
487
488         if "filenames" in metafunc.fixturenames:
489             from .parametrizers import parametrize_filenames
490
491             parametrize_filenames(metafunc)
492
493         if "template_dir" in metafunc.fixturenames:
494             from .parametrizers import parametrize_template_dir
495
496             parametrize_template_dir(metafunc)
497
498         if "environment_pair" in metafunc.fixturenames:
499             from .parametrizers import parametrize_environment_pair
500
501             parametrize_environment_pair(metafunc)
502
503         if "heat_volume_pair" in metafunc.fixturenames:
504             from .parametrizers import parametrize_heat_volume_pair
505
506             parametrize_heat_volume_pair(metafunc)
507
508         if "yaml_files" in metafunc.fixturenames:
509             from .parametrizers import parametrize_yaml_files
510
511             parametrize_yaml_files(metafunc)
512
513         if "env_files" in metafunc.fixturenames:
514             from .parametrizers import parametrize_environment_files
515
516             parametrize_environment_files(metafunc)
517
518         if "yaml_file" in metafunc.fixturenames:
519             from .parametrizers import parametrize_yaml_file
520
521             parametrize_yaml_file(metafunc)
522
523         if "env_file" in metafunc.fixturenames:
524             from .parametrizers import parametrize_environment_file
525
526             parametrize_environment_file(metafunc)
527
528         if "parsed_yaml_file" in metafunc.fixturenames:
529             from .parametrizers import parametrize_parsed_yaml_file
530
531             parametrize_parsed_yaml_file(metafunc)
532
533         if "parsed_environment_file" in metafunc.fixturenames:
534             from .parametrizers import parametrize_parsed_environment_file
535
536             parametrize_parsed_environment_file(metafunc)
537
538         if "heat_template" in metafunc.fixturenames:
539             from .parametrizers import parametrize_heat_template
540
541             parametrize_heat_template(metafunc)
542
543         if "heat_templates" in metafunc.fixturenames:
544             from .parametrizers import parametrize_heat_templates
545
546             parametrize_heat_templates(metafunc)
547
548         if "volume_template" in metafunc.fixturenames:
549             from .parametrizers import parametrize_volume_template
550
551             parametrize_volume_template(metafunc)
552
553         if "volume_templates" in metafunc.fixturenames:
554             from .parametrizers import parametrize_volume_templates
555
556             parametrize_volume_templates(metafunc)
557
558         if "template" in metafunc.fixturenames:
559             from .parametrizers import parametrize_template
560
561             parametrize_template(metafunc)
562
563         if "templates" in metafunc.fixturenames:
564             from .parametrizers import parametrize_templates
565
566             parametrize_templates(metafunc)
567     except Exception as e:
568         # If an error occurs in the collection phase, then it won't be logged as a
569         # normal test failure.  This means that failures could occur, but not
570         # be seen on the report resulting in a false positive success message.  These
571         # errors will be stored and reported separately on the report
572         COLLECTION_FAILURES.append(
573             {
574                 "module": metafunc.module.__name__,
575                 "test": metafunc.function.__name__,
576                 "fixtures": metafunc.fixturenames,
577                 "error": traceback.format_exc(),
578             }
579         )
580         raise e
581
582
583 def hash_directory(path):
584     md5 = hashlib.md5()
585     for dir_path, sub_dirs, filenames in os.walk(path):
586         for filename in filenames:
587             file_path = os.path.join(dir_path, filename)
588             with open(file_path, "rb") as f:
589                 md5.update(f.read())
590     return md5.hexdigest()
591
592
593 def load_current_requirements():
594     """Loads dict of current requirements or empty dict if file doesn't exist"""
595
596     url = 'https://onap.readthedocs.io/en/latest/_downloads/needs.json'
597
598     try:
599         r = requests.get(url)
600         if r.headers.get('content-type') == 'application/json':
601             with open('requirements.json', 'wb') as needs:
602                 needs.write(r.content)
603         else:
604             warnings.warn("Unexpected content-type ({}) encountered downloading requirements.json, using last saved copy".format(r.headers.get('content-type')))
605     except requests.exceptions.RequestException as e:
606         warnings.warn("Error downloading latest JSON, using last saved copy.")
607         warnings.warn(UserWarning(e))
608     path = "requirements.json"
609     if not os.path.exists(path):
610         return {}
611     with io.open(path, encoding="utf8", mode="r") as f:
612         data = json.load(f)
613         version = data["current_version"]
614         return data["versions"][version]["needs"]
615
616
617 def compat_open(path):
618     """Invokes open correctly depending on the Python version"""
619     if sys.version_info.major < 3:
620         return open(path, "wb")
621     else:
622         return open(path, "w", newline="")
623
624
625 def unicode_writerow(writer, row):
626     if sys.version_info.major < 3:
627         row = [s.encode("utf8") for s in row]
628     writer.writerow(row)
629
630
631 def pytest_report_collectionfinish(config, startdir, items):
632
633     """Generates a simple traceability report to output/traceability.csv"""
634     traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
635     output_dir = os.path.split(traceability_path)[0]
636     if not os.path.exists(output_dir):
637         os.makedirs(output_dir)
638     requirements = load_current_requirements()
639     unmapped, mapped = partition(
640         lambda item: hasattr(item.function, "requirement_ids"), items
641     )
642
643     req_to_test = collections.defaultdict(set)
644     mapping_errors = set()
645     for item in mapped:
646         for req_id in item.function.requirement_ids:
647             if req_id not in req_to_test:
648                 req_to_test[req_id].add(item)
649             if req_id not in requirements:
650                 mapping_errors.add(
651                     (req_id, item.function.__module__, item.function.__name__)
652                 )
653
654     mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
655     with compat_open(mapping_error_path) as f:
656         writer = csv.writer(f)
657         for error in mapping_errors:
658             unicode_writerow(writer, error)
659
660     with compat_open(traceability_path) as f:
661         out = csv.writer(f)
662         unicode_writerow(
663             out,
664             ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
665         )
666         for req_id, metadata in requirements.items():
667             if req_to_test[req_id]:
668                 for item in req_to_test[req_id]:
669                     unicode_writerow(
670                         out,
671                         (
672                             req_id,
673                             metadata["description"],
674                             metadata["section_name"],
675                             item.function.__module__,
676                             item.function.__name__,
677                         ),
678                     )
679             else:
680                 unicode_writerow(
681                     out,
682                     (req_id, metadata["description"], metadata["section_name"], "", ""),
683                 )
684         # now write out any test methods that weren't mapped to requirements
685         for item in unmapped:
686             unicode_writerow(
687                 out, ("", "", "", item.function.__module__, item.function.__name__)
688             )