[VVP] udpating scripts for casablanca
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2018 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import collections
39 import csv
40 import datetime
41 import hashlib
42 import io
43 import json
44 import os
45 import sys
46 import time
47 import requests
48 import traceback
49
50 import docutils.core
51 import jinja2
52 import pytest
53 from more_itertools import partition
54 from six import string_types
55 import xlsxwriter
56
57 __path__ = [os.path.dirname(os.path.abspath(__file__))]
58
59 resolution_steps_file = "resolution_steps.json"
60 heat_requirements_file = "heat_requirements.json"
61
62 report_columns = [
63     ("Input File", "file"),
64     ("Test", "test_file"),
65     ("Requirements", "req_description"),
66     ("Resolution Steps", "resolution_steps"),
67     ("Error Message", "message"),
68     ("Raw Test Output", "raw_output"),
69 ]
70 report = collections.OrderedDict(report_columns)
71
72 COLLECTION_FAILURES = []
73 COLLECTION_FAILURE_WARNING = """WARNING: The following unexpected errors occurred
74 while preparing to validate the the input files. Some validations may not have been
75 executed. Please refer these issue to the VNF Validation Tool team.
76 """
77
78
79 def extract_error_msg(rep):
80     try:
81         msg = str(rep.longrepr.reprcrash)
82     except AttributeError:
83         msg = str(rep)
84
85     if "AssertionError:" in msg:
86         return msg.split("AssertionError:")[1]
87     else:
88         return msg
89
90
91 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
92 def pytest_runtest_makereport(item, call):
93
94     outcome = yield
95     rep = outcome.get_result()
96
97     output_dir = "{}/../output".format(__path__[0])
98     if rep.outcome == "failed":
99         if not os.path.exists(output_dir):
100             os.mkdir(output_dir)
101         if os.path.exists("{}/failures".format(output_dir)):
102             with open("{}/failures".format(output_dir), "r") as o:
103                 jdata = json.loads(o.read())
104         else:
105             jdata = {}
106
107         if hasattr(item.function, "requirement_ids"):
108             requirement_ids = item.function.requirement_ids
109         else:
110             requirement_ids = ""
111
112         if "environment_pair" in item.fixturenames:
113             resolved_pair = "{} environment pair".format(
114                 item.funcargs["environment_pair"]["name"]
115             )
116         elif "heat_volume_pair" in item.fixturenames:
117             resolved_pair = "{} volume pair".format(
118                 item.funcargs["heat_volume_pair"]["name"]
119             )
120         elif "heat_templates" in item.fixturenames:
121             resolved_pair = item.funcargs["heat_templates"]
122         elif "yaml_files" in item.fixturenames:
123             resolved_pair = item.funcargs["yaml_files"]
124         else:
125             resolved_pair = rep.nodeid.split("[")[1][:-1]
126
127         markers = set(m.name for m in item.iter_markers())
128         base_test = "base" in markers
129
130         msg = extract_error_msg(rep)
131         if base_test:
132             msg = "!!Base Test Failure!! Halting test suite execution...\n{}".format(
133                 msg
134             )
135
136         jdata[len(jdata)] = {
137             "file": resolved_pair,
138             "vnfrqts": requirement_ids,
139             "test": item.function.__name__,
140             "test_file": item.function.__module__.split(".")[-1],
141             "raw_output": str(rep.longrepr),
142             "message": msg,
143         }
144
145         with open("{}/failures".format(output_dir), "w") as f:
146             json.dump(jdata, f, indent=4)
147
148         if not item.config.option.continue_on_failure and base_test:
149             pytest.exit(
150                 "{}\n{}\n{}".format(msg, resolved_pair, item.function.__name__)
151             )
152
153
154 def make_timestamp():
155     timezone = time.tzname[time.localtime().tm_isdst]
156     return "{} {}".format(str(datetime.datetime.now()), timezone)
157
158
159 def pytest_sessionfinish(session, exitstatus):
160     if not session.config.option.template_dir:
161         return
162     template_path = os.path.abspath(session.config.option.template_dir[0])
163     profile_name = session.config.option.validation_profile_name
164     generate_report(
165         "{}/../output".format(__path__[0]),
166         template_path,
167         profile_name,
168         session.config.option.report_format,
169     )
170
171
172 def pytest_collection_modifyitems(session, config, items):
173     allowed_marks = ["xfail", "base"]
174     profile = config.option.validation_profile
175
176     for item in items:
177         markers = set(m.name for m in item.iter_markers())
178         if not profile and markers and set(markers).isdisjoint(allowed_marks):
179             item.add_marker(
180                 pytest.mark.skip(
181                     reason="No validation profile selected. Skipping tests with marks."
182                 )
183             )
184         if (
185             profile
186             and markers
187             and profile not in markers
188             and set(markers).isdisjoint(allowed_marks)
189         ):
190             item.add_marker(
191                 pytest.mark.skip(reason="Doesn't match selection validation profile")
192             )
193
194     items.sort(
195         key=lambda item: 0 if "base" in set(m.name for m in item.iter_markers()) else 1
196     )
197
198
199 def make_href(path):
200     paths = [path] if isinstance(path, string_types) else path
201     links = []
202     for p in paths:
203         abs_path = os.path.abspath(p)
204         name = abs_path if os.path.isdir(abs_path) else os.path.split(abs_path)[1]
205         links.append(
206             "<a href='file://{abs_path}' target='_blank'>{name}</a>".format(
207                 abs_path=abs_path, name=name
208             )
209         )
210     return "<br/>".join(links)
211
212
213 def generate_report(outpath, template_path, profile_name, output_format):
214     failures = "{}/failures".format(outpath)
215     faildata = None
216     rdata = None
217     hdata = None
218
219     if os.path.exists(failures):
220         with open(failures, "r") as f:
221             faildata = json.loads(f.read())
222     else:
223         faildata = {}
224
225     resolution_steps = "{}/../{}".format(__path__[0], resolution_steps_file)
226     if os.path.exists(resolution_steps):
227         with open(resolution_steps, "r") as f:
228             rdata = json.loads(f.read())
229
230     heat_requirements = "{}/../{}".format(__path__[0], heat_requirements_file)
231     if os.path.exists(heat_requirements):
232         with open(heat_requirements, "r") as f:
233             hdata = json.loads(f.read())
234
235     # point requirements at the most recent version
236     current_version = hdata["current_version"]
237     hdata = hdata["versions"][current_version]["needs"]
238     # mapping requirement IDs from failures to requirement descriptions
239     for k, v in faildata.items():
240         req_text = ""
241         if v["vnfrqts"] != "":
242             for req in v["vnfrqts"]:
243                 if req in hdata:
244                     req_text += "\n\n{}: \n{}".format(req, hdata[req]["description"])
245         faildata[k]["req_description"] = req_text
246
247     # mapping resolution steps to module and test name
248     for k, v in faildata.items():
249         # resolution_step = ""
250         faildata[k]["resolution_steps"] = ""
251         for rs in rdata:
252             if v["test_file"] == rs["module"] and v["test"] == rs["function"]:
253                 faildata[k]["resolution_steps"] = "\n{}: \n{}".format(
254                     rs["header"], rs["resolution_steps"]
255                 )
256     output_format = output_format.lower().strip() if output_format else "html"
257     if output_format == "html":
258         generate_html_report(outpath, profile_name, template_path, faildata)
259     elif output_format == "excel":
260         generate_excel_report(outpath, profile_name, template_path, faildata)
261     elif output_format == "csv":
262         generate_csv_report(outpath, profile_name, template_path, faildata)
263     else:
264         raise ValueError("Unsupported output format: " + output_format)
265
266
267 def generate_csv_report(output_dir, profile_name, template_path, faildata):
268     rows = [["Validation Failures"]]
269     headers = [
270         ("Profile Selected:", profile_name),
271         ("Report Generated At:", make_timestamp()),
272         ("Directory Validated:", template_path),
273         ("Checksum:", hash_directory(template_path)),
274         ("Total Errors:", len(faildata) + len(COLLECTION_FAILURES)),
275     ]
276     rows.append([])
277     for header in headers:
278         rows.append(header)
279     rows.append([])
280
281     if COLLECTION_FAILURES:
282         rows.append([COLLECTION_FAILURE_WARNING])
283         rows.append(["Validation File", "Test", "Fixtures", "Error"])
284         for failure in COLLECTION_FAILURES:
285             rows.append(
286                 [
287                     failure["module"],
288                     failure["test"],
289                     ";".join(failure["fixtures"]),
290                     failure["error"],
291                 ]
292             )
293         rows.append([])
294
295     # table header
296     rows.append([col for col, _ in report_columns])
297
298     # table content
299     for data in faildata.values():
300         rows.append(
301             [
302                 data.get("file", ""),
303                 data.get("test_file", ""),
304                 data.get("req_description", ""),
305                 data.get("resolution_steps", ""),
306                 data.get("message", ""),
307                 data.get("raw_output", ""),
308             ]
309         )
310
311     output_path = os.path.join(output_dir, "report.csv")
312     with open(output_path, "w", newline="") as f:
313         writer = csv.writer(f)
314         for row in rows:
315             writer.writerow(row)
316
317
318 def generate_excel_report(output_dir, profile_name, template_path, faildata):
319     output_path = os.path.join(output_dir, "report.xlsx")
320     workbook = xlsxwriter.Workbook(output_path)
321     bold = workbook.add_format({"bold": True})
322     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
323     normal = workbook.add_format({"text_wrap": True})
324     heading = workbook.add_format({"bold": True, "font_size": 18})
325     worksheet = workbook.add_worksheet("failures")
326     worksheet.write(0, 0, "Validation Failures", heading)
327
328     headers = [
329         ("Profile Selected:", profile_name),
330         ("Report Generated At:", make_timestamp()),
331         ("Directory Validated:", template_path),
332         ("Checksum:", hash_directory(template_path)),
333         ("Total Errors:", len(faildata) + len(COLLECTION_FAILURES)),
334     ]
335     for row, (header, value) in enumerate(headers, start=2):
336         worksheet.write(row, 0, header, bold)
337         worksheet.write(row, 1, value)
338
339     worksheet.set_column(0, len(headers) - 1, 40)
340     worksheet.set_column(len(headers), len(headers), 80)
341
342     if COLLECTION_FAILURES:
343         collection_failures_start = 2 + len(headers) + 2
344         worksheet.write(collection_failures_start, 0, COLLECTION_FAILURE_WARNING, bold)
345         collection_failure_headers = ["Validation File", "Test", "Fixtures", "Error"]
346         for col_num, col_name in enumerate(collection_failure_headers):
347             worksheet.write(collection_failures_start + 1, col_num, col_name, bold)
348         for row, data in enumerate(COLLECTION_FAILURES, collection_failures_start + 2):
349             worksheet.write(row, 0, data["module"])
350             worksheet.write(row, 1, data["test"])
351             worksheet.write(row, 2, ",".join(data["fixtures"]))
352             worksheet.write(row, 3, data["error"], code)
353
354     # table header
355     start_error_table_row = 2 + len(headers) + len(COLLECTION_FAILURES) + 4
356     worksheet.write(start_error_table_row, 0, "Validation Failures", bold)
357     for col_num, (col_name, _) in enumerate(report_columns):
358         worksheet.write(start_error_table_row + 1, col_num, col_name, bold)
359
360     # table content
361     for row, data in enumerate(faildata.values(), start=start_error_table_row + 2):
362         for col, key in enumerate(report.values()):
363             if key == "file":
364                 paths = (
365                     [data[key]] if isinstance(data[key], string_types) else data[key]
366                 )
367                 contents = "\n".join(paths)
368                 worksheet.write(row, col, contents, normal)
369             elif key == "raw_output":
370                 worksheet.write_string(row, col, data[key], code)
371             else:
372                 worksheet.write(row, col, data[key], normal)
373
374     workbook.close()
375
376
377 def generate_html_report(outpath, profile_name, template_path, faildata):
378     failures = []
379     for data in faildata.values():
380         failures.append(
381             {
382                 "file_links": make_href(data["file"]),
383                 "test_id": data["test_file"],
384                 "error_message": data["message"],
385                 "raw_output": data["raw_output"],
386                 "requirements": docutils.core.publish_parts(
387                     writer_name="html", source=data["req_description"]
388                 )["body"],
389                 "resolution_steps": data["resolution_steps"],
390             }
391         )
392     pkg_dir = os.path.split(__file__)[0]
393     j2_template_path = os.path.join(pkg_dir, "report.html.jinja2")
394     with open(j2_template_path, "r") as f:
395         report_template = jinja2.Template(f.read())
396         contents = report_template.render(
397             num_failures=len(failures) + len(COLLECTION_FAILURES),
398             profile_name=profile_name,
399             template_dir=make_href(template_path),
400             checksum=hash_directory(template_path),
401             timestamp=make_timestamp(),
402             failures=failures,
403             collection_failures=COLLECTION_FAILURES,
404         )
405     with open(os.path.join(outpath, "report.html"), "w") as f:
406         f.write(contents)
407
408
409 def pytest_addoption(parser):
410     """
411     Add needed CLI arguments
412     """
413     parser.addoption(
414         "--template-directory",
415         dest="template_dir",
416         action="append",
417         help="Directory which holds the templates for validation",
418     )
419
420     parser.addoption(
421         "--self-test",
422         dest="self_test",
423         action="store_true",
424         help="Test the unit tests against their fixtured data",
425     )
426
427     parser.addoption(
428         "--validation-profile",
429         dest="validation_profile",
430         action="store",
431         help="Runs all unmarked tests plus test with a matching marker",
432     )
433
434     parser.addoption(
435         "--validation-profile-name",
436         dest="validation_profile_name",
437         action="store",
438         help="Friendly name of the validation profile used in reports",
439     )
440
441     parser.addoption(
442         "--report-format",
443         dest="report_format",
444         action="store",
445         help="Format of output report (html, csv, excel)",
446     )
447
448     parser.addoption(
449         "--continue-on-failure",
450         dest="continue_on_failure",
451         action="store_true",
452         help="Continue validation even when structural errors exist in input files",
453     )
454
455
456 def pytest_configure(config):
457     """
458     Ensure that we are receive either `--self-test` or
459     `--template-dir=<directory` as CLI arguments
460     """
461     if config.getoption("template_dir") and config.getoption("self_test"):
462         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
463     if not (
464         config.getoption("template_dir")
465         or config.getoption("self_test")
466         or config.getoption("help")
467     ):
468         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
469
470
471 def pytest_generate_tests(metafunc):
472     """
473     If a unit test requires an argument named 'filename'
474     we generate a test for the filenames selected. Either
475     the files contained in `template_dir` or if `template_dir`
476     is not specified on the CLI, the fixtures associated with this
477     test name.
478     """
479
480     # noinspection PyBroadException
481     try:
482         if "filename" in metafunc.fixturenames:
483             from .parametrizers import parametrize_filename
484
485             parametrize_filename(metafunc)
486
487         if "filenames" in metafunc.fixturenames:
488             from .parametrizers import parametrize_filenames
489
490             parametrize_filenames(metafunc)
491
492         if "template_dir" in metafunc.fixturenames:
493             from .parametrizers import parametrize_template_dir
494
495             parametrize_template_dir(metafunc)
496
497         if "environment_pair" in metafunc.fixturenames:
498             from .parametrizers import parametrize_environment_pair
499
500             parametrize_environment_pair(metafunc)
501
502         if "heat_volume_pair" in metafunc.fixturenames:
503             from .parametrizers import parametrize_heat_volume_pair
504
505             parametrize_heat_volume_pair(metafunc)
506
507         if "yaml_files" in metafunc.fixturenames:
508             from .parametrizers import parametrize_yaml_files
509
510             parametrize_yaml_files(metafunc)
511
512         if "env_files" in metafunc.fixturenames:
513             from .parametrizers import parametrize_environment_files
514
515             parametrize_environment_files(metafunc)
516
517         if "yaml_file" in metafunc.fixturenames:
518             from .parametrizers import parametrize_yaml_file
519
520             parametrize_yaml_file(metafunc)
521
522         if "env_file" in metafunc.fixturenames:
523             from .parametrizers import parametrize_environment_file
524
525             parametrize_environment_file(metafunc)
526
527         if "parsed_yaml_file" in metafunc.fixturenames:
528             from .parametrizers import parametrize_parsed_yaml_file
529
530             parametrize_parsed_yaml_file(metafunc)
531
532         if "parsed_environment_file" in metafunc.fixturenames:
533             from .parametrizers import parametrize_parsed_environment_file
534
535             parametrize_parsed_environment_file(metafunc)
536
537         if "heat_template" in metafunc.fixturenames:
538             from .parametrizers import parametrize_heat_template
539
540             parametrize_heat_template(metafunc)
541
542         if "heat_templates" in metafunc.fixturenames:
543             from .parametrizers import parametrize_heat_templates
544
545             parametrize_heat_templates(metafunc)
546
547         if "volume_template" in metafunc.fixturenames:
548             from .parametrizers import parametrize_volume_template
549
550             parametrize_volume_template(metafunc)
551
552         if "volume_templates" in metafunc.fixturenames:
553             from .parametrizers import parametrize_volume_templates
554
555             parametrize_volume_templates(metafunc)
556
557         if "template" in metafunc.fixturenames:
558             from .parametrizers import parametrize_template
559
560             parametrize_template(metafunc)
561
562         if "templates" in metafunc.fixturenames:
563             from .parametrizers import parametrize_templates
564
565             parametrize_templates(metafunc)
566     except Exception as e:
567         # If an error occurs in the collection phase, then it won't be logged as a
568         # normal test failure.  This means that failures could occur, but not
569         # be seen on the report resulting in a false positive success message.  These
570         # errors will be stored and reported separately on the report
571         COLLECTION_FAILURES.append(
572             {
573                 "module": metafunc.module.__name__,
574                 "test": metafunc.function.__name__,
575                 "fixtures": metafunc.fixturenames,
576                 "error": traceback.format_exc(),
577             }
578         )
579         raise e
580
581
582 def hash_directory(path):
583     md5 = hashlib.md5()
584     for dir_path, sub_dirs, filenames in os.walk(path):
585         for filename in filenames:
586             file_path = os.path.join(dir_path, filename)
587             with open(file_path, "rb") as f:
588                 md5.update(f.read())
589     return md5.hexdigest()
590
591
592 def load_current_requirements():
593     """Loads dict of current requirements or empty dict if file doesn't exist"""
594
595     url = 'https://onap.readthedocs.io/en/latest/_downloads/needs.json'
596     r = requests.get(url)
597     with open('requirements.json', 'wb') as needs:
598         needs.write(r.content)
599     path = "heat_requirements.json"
600     if not os.path.exists(path):
601         return {}
602     with io.open(path, encoding="utf8", mode="r") as f:
603         data = json.load(f)
604         version = data["current_version"]
605         return data["versions"][version]["needs"]
606
607
608 def compat_open(path):
609     """Invokes open correctly depending on the Python version"""
610     if sys.version_info.major < 3:
611         return open(path, "wb")
612     else:
613         return open(path, "w", newline="")
614
615
616 def unicode_writerow(writer, row):
617     if sys.version_info.major < 3:
618         row = [s.encode("utf8") for s in row]
619     writer.writerow(row)
620
621
622 def pytest_report_collectionfinish(config, startdir, items):
623
624     """Generates a simple traceability report to output/traceability.csv"""
625     traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
626     output_dir = os.path.split(traceability_path)[0]
627     if not os.path.exists(output_dir):
628         os.makedirs(output_dir)
629     requirements = load_current_requirements()
630     unmapped, mapped = partition(
631         lambda item: hasattr(item.function, "requirement_ids"), items
632     )
633
634     req_to_test = collections.defaultdict(set)
635     mapping_errors = set()
636     for item in mapped:
637         for req_id in item.function.requirement_ids:
638             if req_id not in req_to_test:
639                 req_to_test[req_id].add(item)
640             if req_id not in requirements:
641                 mapping_errors.add(
642                     (req_id, item.function.__module__, item.function.__name__)
643                 )
644
645     mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
646     with compat_open(mapping_error_path) as f:
647         writer = csv.writer(f)
648         for error in mapping_errors:
649             unicode_writerow(writer, error)
650
651     with compat_open(traceability_path) as f:
652         out = csv.writer(f)
653         unicode_writerow(
654             out,
655             ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
656         )
657         for req_id, metadata in requirements.items():
658             if req_to_test[req_id]:
659                 for item in req_to_test[req_id]:
660                     unicode_writerow(
661                         out,
662                         (
663                             req_id,
664                             metadata["description"],
665                             metadata["section_name"],
666                             item.function.__module__,
667                             item.function.__name__,
668                         ),
669                     )
670             else:
671                 unicode_writerow(
672                     out,
673                     (req_id, metadata["description"], metadata["section_name"], "", ""),
674                 )
675         # now write out any test methods that weren't mapped to requirements
676         for item in unmapped:
677             unicode_writerow(
678                 out, ("", "", "", item.function.__module__, item.function.__name__)
679             )