09baa9abd2b9353ca41716680480105fbafd605f
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2018 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import collections
39 import csv
40 import datetime
41 import hashlib
42 import io
43 import json
44 import os
45 import sys
46 import time
47
48 import docutils.core
49 import pytest
50 from more_itertools import partition
51 from six import string_types
52 import xlsxwriter
53
54 __path__ = [os.path.dirname(os.path.abspath(__file__))]
55
56 resolution_steps_file = "resolution_steps.json"
57 requirements_file = "requirements.json"
58
59 FAILURE_DATA = {}
60
61 report_columns = [
62     ("Input File", "file"),
63     ("Test", "test_file"),
64     ("Requirements", "req_description"),
65     ("Resolution Steps", "resolution_steps"),
66     ("Error Message", "message"),
67     ("Raw Test Output", "raw_output"),
68 ]
69 report = collections.OrderedDict(report_columns)
70
71
72 def extract_error_msg(rep):
73     msg = str(rep.longrepr.reprcrash)
74     if "AssertionError:" in msg:
75         return msg.split("AssertionError:")[1]
76     else:
77         return msg
78
79
80 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
81 def pytest_runtest_makereport(item, call):
82
83     outcome = yield
84     rep = outcome.get_result()
85
86     output_dir = "{}/../output".format(__path__[0])
87     if rep.outcome == "failed":
88         if not os.path.exists(output_dir):
89             os.mkdir(output_dir)
90
91         if hasattr(item.function, "requirement_ids"):
92             requirement_ids = item.function.requirement_ids
93         else:
94             requirement_ids = ""
95
96         if "environment_pair" in item.fixturenames:
97             resolved_pair = "{} environment pair".format(
98                 item.funcargs["environment_pair"]["name"]
99             )
100         elif "heat_volume_pair" in item.fixturenames:
101             resolved_pair = "{} volume pair".format(
102                 item.funcargs["heat_volume_pair"]["name"]
103             )
104         elif "heat_templates" in item.fixturenames:
105             resolved_pair = item.funcargs["heat_templates"]
106         elif "yaml_files" in item.fixturenames:
107             resolved_pair = item.funcargs["yaml_files"]
108         else:
109             resolved_pair = rep.nodeid.split("[")[1][:-1]
110
111         FAILURE_DATA[len(FAILURE_DATA)] = {
112             "file": resolved_pair,
113             "vnfrqts": requirement_ids,
114             "test": item.function.__name__,
115             "test_file": item.function.__module__.split(".")[-1],
116             "raw_output": str(rep.longrepr),
117             "message": extract_error_msg(rep),
118         }
119
120         with open("{}/failures".format(output_dir), "w") as f:
121             json.dump(FAILURE_DATA, f, indent=4)
122
123
124 def make_timestamp():
125     timezone = time.tzname[time.localtime().tm_isdst]
126     return "{} {}".format(str(datetime.datetime.now()), timezone)
127
128
129 def pytest_sessionfinish(session, exitstatus):
130     if not session.config.option.template_dir:
131         return
132     template_path = os.path.abspath(session.config.option.template_dir[0])
133     profile_name = session.config.option.validation_profile_name
134     generate_report(
135         "{}/../output".format(__path__[0]),
136         template_path,
137         profile_name,
138         session.config.option.report_format,
139     )
140
141
142 def pytest_runtest_setup(item):
143     profile = item.session.config.option.validation_profile
144     markers = set(m.name for m in item.iter_markers())
145     if not profile and markers:
146         pytest.skip("No validation profile selected. Skipping tests with marks.")
147     if profile and markers and profile not in markers:
148         pytest.skip("Doesn't match selection validation profile")
149
150
151 def make_href(path):
152     paths = [path] if isinstance(path, string_types) else path
153     links = []
154     for p in paths:
155         abs_path = os.path.abspath(p)
156         filename = os.path.split(abs_path)[1]
157         links.append(
158             "<a href='file://{abs_path}' target='_blank'>{filename}</a>".format(
159                 abs_path=abs_path, filename=filename
160             )
161         )
162     return "<br/>".join(links)
163
164
165 def generate_report(outpath, template_path, profile_name, output_format):
166     failures = "{}/failures".format(outpath)
167     faildata = None
168     rdata = None
169     hdata = None
170
171     if os.path.exists(failures):
172         with open(failures, "r") as f:
173             faildata = json.loads(f.read())
174     else:
175         faildata = {}
176
177     resolution_steps = "{}/../{}".format(__path__[0], resolution_steps_file)
178     if os.path.exists(resolution_steps):
179         with open(resolution_steps, "r") as f:
180             rdata = json.loads(f.read())
181
182     heat_requirements = "{}/../{}".format(__path__[0], requirements_file)
183     if os.path.exists(heat_requirements):
184         with open(heat_requirements, "r") as f:
185             hdata = json.loads(f.read())
186
187     # point requirements at the most recent version
188     current_version = hdata["current_version"]
189     hdata = hdata["versions"][current_version]["needs"]
190     # mapping requirement IDs from failures to requirement descriptions
191     for k, v in faildata.items():
192         req_text = ""
193         if v["vnfrqts"] != "":
194             for req in v["vnfrqts"]:
195                 if req in hdata:
196                     req_text += "\n\n{}: \n{}".format(req, hdata[req]["description"])
197         faildata[k]["req_description"] = req_text
198
199     # mapping resolution steps to module and test name
200     for k, v in faildata.items():
201         faildata[k]["resolution_steps"] = ""
202         for rs in rdata:
203             if v["test_file"] == rs["module"] and v["test"] == rs["function"]:
204                 faildata[k]["resolution_steps"] = "\n{}: \n{}".format(
205                     rs["header"], rs["resolution_steps"]
206                 )
207     output_format = output_format.lower().strip() if output_format else "html"
208     if output_format == "html":
209         generate_html_report(outpath, profile_name, template_path, faildata)
210     elif output_format == "excel":
211         generate_excel_report(outpath, profile_name, template_path, faildata)
212     elif output_format == "csv":
213         generate_csv_report(outpath, profile_name, template_path, faildata)
214     else:
215         raise ValueError("Unsupported output format: " + output_format)
216
217
218 def generate_csv_report(output_dir, profile_name, template_path, faildata):
219     rows = []
220     rows.append(["Validation Failures"])
221     headers = [
222         ("Profile Selected:", profile_name),
223         ("Report Generated At:", make_timestamp()),
224         ("Directory Validated:", template_path),
225         ("Checksum:", hash_directory(template_path)),
226         ("Total Errors:", len(faildata)),
227     ]
228
229     rows.append([])
230     for header in headers:
231         rows.append(header)
232     rows.append([])
233
234     # table header
235     rows.append([col for col, _ in report_columns])
236
237     # table content
238     for data in faildata.values():
239         rows.append(
240             [
241                 data.get("file", ""),
242                 data.get("test_file", ""),
243                 data.get("req_description", ""),
244                 data.get("resolution_steps", ""),
245                 data.get("message", ""),
246                 data.get("raw_output", ""),
247             ]
248         )
249
250     output_path = os.path.join(output_dir, "report.csv")
251     with open(output_path, "w", newline="") as f:
252         writer = csv.writer(f)
253         for row in rows:
254             writer.writerow(row)
255
256
257 def generate_excel_report(output_dir, profile_name, template_path, faildata):
258     output_path = os.path.join(output_dir, "report.xlsx")
259     workbook = xlsxwriter.Workbook(output_path)
260     bold = workbook.add_format({"bold": True})
261     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
262     normal = workbook.add_format({"text_wrap": True})
263     heading = workbook.add_format({"bold": True, "font_size": 18})
264     worksheet = workbook.add_worksheet("failures")
265     worksheet.write(0, 0, "Validation Failures", heading)
266
267     headers = [
268         ("Profile Selected:", profile_name),
269         ("Report Generated At:", make_timestamp()),
270         ("Directory Validated:", template_path),
271         ("Checksum:", hash_directory(template_path)),
272         ("Total Errors:", len(faildata)),
273     ]
274     for row, (header, value) in enumerate(headers, start=2):
275         worksheet.write(row, 0, header, bold)
276         worksheet.write(row, 1, value)
277
278     worksheet.set_column(0, len(headers) - 1, 40)
279     worksheet.set_column(len(headers), len(headers), 80)
280
281     # table header
282     start_error_table_row = 2 + len(headers) + 2
283     for col_num, (col_name, _) in enumerate(report_columns):
284         worksheet.write(start_error_table_row, col_num, col_name, bold)
285
286     # table content
287     for row, data in enumerate(faildata.values(), start=start_error_table_row + 1):
288         for col, key in enumerate(report.values()):
289             if key == "file":
290                 paths = (
291                     [data[key]] if isinstance(data[key], string_types) else data[key]
292                 )
293                 contents = "\n".join(paths)
294                 worksheet.write(row, col, contents, normal)
295             elif key == "raw_output":
296                 worksheet.write_string(row, col, data[key], code)
297             else:
298                 worksheet.write(row, col, data[key], normal)
299
300     workbook.close()
301
302
303 def generate_html_report(outpath, profile_name, template_path, faildata):
304     with open("{}/report.html".format(outpath), "w") as of:
305         body_begin = """
306         <style type="text/css">
307         h1, li {{
308             font-family:Arial, sans-serif;
309         }}
310         .tg  {{border-collapse:collapse;border-spacing:0;}}
311         .tg td{{font-family:Arial, sans-serif;font-size:8px;padding:10px 5px;
312         border-style:solid;border-width:1px;overflow:hidden;word-break:normal;
313         border-color:black;}}
314         .tg th{{font-family:Arial, sans-serif;font-size:14px;font-weight:normal;
315         padding:10px 5px;border-style:solid;border-width:1px;overflow:hidden;
316         word-break:normal;border-color:black;}}
317         .tg .tg-rwj1{{font-size:10px;font-family:Arial, Helvetica,
318         sans-serif !important;;border-color:inherit;vertical-align:top}}</style>
319         <h1>Validation Failures</h1>
320         <ul>
321             <li><b>Profile Selected: </b> <tt>{profile}</tt></li>
322             <li><b>Report Generated At:</b> <tt>{timestamp}</tt></li>
323             <li><b>Directory Validated:</b> <tt>{template_dir}</tt></li>
324             <li><b>Checksum:</b> <tt>{checksum}</tt></li>
325             <li><b>Total Errors:</b> {num_failures}</li>
326         </ul>
327         """.format(
328             profile=profile_name,
329             timestamp=make_timestamp(),
330             checksum=hash_directory(template_path),
331             template_dir=template_path,
332             num_failures=len(faildata),
333         )
334         of.write(body_begin)
335
336         if len(faildata) == 0:
337             of.write("<p>Success! No validation failures detected.</p>")
338             return
339
340         table_begin = '<table class="tg">'
341         of.write(table_begin)
342
343         # table headers
344         of.write("<tr>")
345         for k, v in report.items():
346             of.write('<th class="tg-rwj1">{}</th>'.format(k))
347         of.write("</tr>")
348
349         # table content
350         for k, v in faildata.items():
351             of.write("<tr>")
352             for rk, rv in report.items():
353                 if rv == "file":
354                     value = make_href(v[rv])
355                 elif rv == "raw_output":
356                     value = "<pre>{}</pre>".format(v[rv])
357                 elif rv == "req_description":
358                     parts = docutils.core.publish_parts(
359                         writer_name="html", source=v[rv]
360                     )
361                     value = parts["body"]
362                 else:
363                     value = v[rv].replace("\n", "<br />")
364                 of.write("  <td>{}</td>".format(value))
365             of.write("</tr>")
366
367         of.write("</table>")
368
369
370 def pytest_addoption(parser):
371     """
372     Add needed CLI arguments
373     """
374     parser.addoption(
375         "--template-directory",
376         dest="template_dir",
377         action="append",
378         help="Directory which holds the templates for validation",
379     )
380
381     parser.addoption(
382         "--self-test",
383         dest="self_test",
384         action="store_true",
385         help="Test the unit tests against their fixtured data",
386     )
387
388     parser.addoption(
389         "--validation-profile",
390         dest="validation_profile",
391         action="store",
392         help="Runs all unmarked tests plus test with a matching marker",
393     )
394
395     parser.addoption(
396         "--validation-profile-name",
397         dest="validation_profile_name",
398         action="store",
399         help="Friendly name of the validation profile used in reports",
400     )
401
402     parser.addoption(
403         "--report-format",
404         dest="report_format",
405         action="store",
406         help="Format of output report (html, csv, excel)",
407     )
408
409
410 def pytest_configure(config):
411     """
412     Ensure that we are receive either `--self-test` or
413     `--template-dir=<directory` as CLI arguments
414     """
415     if config.getoption("template_dir") and config.getoption("self_test"):
416         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
417     if not (
418         config.getoption("template_dir") or
419         config.getoption("self_test") or
420         config.getoption("help")
421     ):
422         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
423
424
425 def pytest_generate_tests(metafunc):
426     """
427     If a unit test requires an argument named 'filename'
428     we generate a test for the filenames selected. Either
429     the files contained in `template_dir` or if `template_dir`
430     is not specified on the CLI, the fixtures associated with this
431     test name.
432     """
433     if "filename" in metafunc.fixturenames:
434         from .parametrizers import parametrize_filename
435
436         parametrize_filename(metafunc)
437
438     if "filenames" in metafunc.fixturenames:
439         from .parametrizers import parametrize_filenames
440
441         parametrize_filenames(metafunc)
442
443     if "template_dir" in metafunc.fixturenames:
444         from .parametrizers import parametrize_template_dir
445
446         parametrize_template_dir(metafunc)
447
448     if "environment_pair" in metafunc.fixturenames:
449         from .parametrizers import parametrize_environment_pair
450
451         parametrize_environment_pair(metafunc)
452
453     if "heat_volume_pair" in metafunc.fixturenames:
454         from .parametrizers import parametrize_heat_volume_pair
455
456         parametrize_heat_volume_pair(metafunc)
457
458     if "yaml_files" in metafunc.fixturenames:
459         from .parametrizers import parametrize_yaml_files
460
461         parametrize_yaml_files(metafunc)
462
463     if "env_files" in metafunc.fixturenames:
464         from .parametrizers import parametrize_environment_files
465
466         parametrize_environment_files(metafunc)
467
468     if "yaml_file" in metafunc.fixturenames:
469         from .parametrizers import parametrize_yaml_file
470
471         parametrize_yaml_file(metafunc)
472
473     if "env_file" in metafunc.fixturenames:
474         from .parametrizers import parametrize_environment_file
475
476         parametrize_environment_file(metafunc)
477
478     if "parsed_yaml_file" in metafunc.fixturenames:
479         from .parametrizers import parametrize_parsed_yaml_file
480
481         parametrize_parsed_yaml_file(metafunc)
482
483     if "parsed_environment_file" in metafunc.fixturenames:
484         from .parametrizers import parametrize_parsed_environment_file
485
486         parametrize_parsed_environment_file(metafunc)
487
488     if "heat_template" in metafunc.fixturenames:
489         from .parametrizers import parametrize_heat_template
490
491         parametrize_heat_template(metafunc)
492
493     if "heat_templates" in metafunc.fixturenames:
494         from .parametrizers import parametrize_heat_templates
495
496         parametrize_heat_templates(metafunc)
497
498     if "volume_template" in metafunc.fixturenames:
499         from .parametrizers import parametrize_volume_template
500
501         parametrize_volume_template(metafunc)
502
503     if "volume_templates" in metafunc.fixturenames:
504         from .parametrizers import parametrize_volume_templates
505
506         parametrize_volume_templates(metafunc)
507
508     if "template" in metafunc.fixturenames:
509         from .parametrizers import parametrize_template
510
511         parametrize_template(metafunc)
512
513     if "templates" in metafunc.fixturenames:
514         from .parametrizers import parametrize_templates
515
516         parametrize_templates(metafunc)
517
518
519 def hash_directory(path):
520     md5 = hashlib.md5()
521     for dir_path, sub_dirs, filenames in os.walk(path):
522         for filename in filenames:
523             file_path = os.path.join(dir_path, filename)
524             with open(file_path, "rb") as f:
525                 md5.update(f.read())
526     return md5.hexdigest()
527
528
529 def load_current_requirements():
530     """Loads dict of current requirements or empty dict if file doesn't exist"""
531     path = "requirements.json"
532     if not os.path.exists(path):
533         return {}
534     with io.open(path, encoding="utf8", mode="r") as f:
535         data = json.load(f)
536         version = data["current_version"]
537         return data["versions"][version]["needs"]
538
539
540 def compat_open(path):
541     """Invokes open correctly depending on the Python version"""
542     if sys.version_info.major < 3:
543         return open(path, "wb")
544     else:
545         return open(path, "w", newline="")
546
547
548 def unicode_writerow(writer, row):
549     if sys.version_info.major < 3:
550         row = [s.encode("utf8") for s in row]
551     writer.writerow(row)
552
553
554 def pytest_report_collectionfinish(config, startdir, items):
555     """Generates a simple traceability report to output/traceability.csv"""
556     traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
557     output_dir = os.path.split(traceability_path)[0]
558     if not os.path.exists(output_dir):
559         os.makedirs(output_dir)
560     requirements = load_current_requirements()
561     unmapped, mapped = partition(
562         lambda item: hasattr(item.function, "requirement_ids"), items
563     )
564
565     req_to_test = collections.defaultdict(set)
566     mapping_errors = set()
567     for item in mapped:
568         for req_id in item.function.requirement_ids:
569             req_to_test[req_id].add(item)
570             if req_id not in requirements:
571                 mapping_errors.add(
572                     (req_id, item.function.__module__, item.function.__name__)
573                 )
574
575     mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
576     with compat_open(mapping_error_path) as f:
577         writer = csv.writer(f)
578         for error in mapping_errors:
579             unicode_writerow(writer, error)
580
581     with compat_open(traceability_path) as f:
582         out = csv.writer(f)
583         unicode_writerow(
584             out,
585             ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
586         )
587         for req_id, metadata in requirements.items():
588             if req_to_test[req_id]:
589                 for item in req_to_test[req_id]:
590                     unicode_writerow(
591                         out,
592                         (
593                             req_id,
594                             metadata["description"],
595                             metadata["section_name"],
596                             item.function.__module__,
597                             item.function.__name__,
598                         ),
599                     )
600             else:
601                 unicode_writerow(
602                     out,
603                     (req_id, metadata["description"], metadata["section_name"], "", ""),
604                 )
605         # now write out any test methods that weren't mapped to requirements
606         for item in unmapped:
607             unicode_writerow(
608                 out, ("", "", "", item.function.__module__, item.function.__name__)
609             )