[VVP] pytest --self-test xfail
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2018 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import collections
39 import csv
40 import datetime
41 import hashlib
42 import io
43 import json
44 import os
45 import sys
46 import time
47
48 import docutils.core
49 import pytest
50 from more_itertools import partition
51 from six import string_types
52 import xlsxwriter
53
54 __path__ = [os.path.dirname(os.path.abspath(__file__))]
55
56 resolution_steps_file = "resolution_steps.json"
57 requirements_file = "requirements.json"
58
59 FAILURE_DATA = {}
60
61 report_columns = [
62     ("Input File", "file"),
63     ("Test", "test_file"),
64     ("Requirements", "req_description"),
65     ("Resolution Steps", "resolution_steps"),
66     ("Error Message", "message"),
67     ("Raw Test Output", "raw_output"),
68 ]
69 report = collections.OrderedDict(report_columns)
70
71
72 def extract_error_msg(rep):
73     try:
74         msg = str(rep.longrepr.reprcrash)
75     except AttributeError:
76         msg = str(rep)
77
78     if "AssertionError:" in msg:
79         return msg.split("AssertionError:")[1]
80     else:
81         return msg
82
83
84 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
85 def pytest_runtest_makereport(item, call):
86
87     outcome = yield
88     rep = outcome.get_result()
89
90     output_dir = "{}/../output".format(__path__[0])
91     if rep.outcome == "failed":
92         if not os.path.exists(output_dir):
93             os.mkdir(output_dir)
94
95         if hasattr(item.function, "requirement_ids"):
96             requirement_ids = item.function.requirement_ids
97         else:
98             requirement_ids = ""
99
100         if "environment_pair" in item.fixturenames:
101             resolved_pair = "{} environment pair".format(
102                 item.funcargs["environment_pair"]["name"]
103             )
104         elif "heat_volume_pair" in item.fixturenames:
105             resolved_pair = "{} volume pair".format(
106                 item.funcargs["heat_volume_pair"]["name"]
107             )
108         elif "heat_templates" in item.fixturenames:
109             resolved_pair = item.funcargs["heat_templates"]
110         elif "yaml_files" in item.fixturenames:
111             resolved_pair = item.funcargs["yaml_files"]
112         else:
113             resolved_pair = rep.nodeid.split("[")[1][:-1]
114
115         FAILURE_DATA[len(FAILURE_DATA)] = {
116             "file": resolved_pair,
117             "vnfrqts": requirement_ids,
118             "test": item.function.__name__,
119             "test_file": item.function.__module__.split(".")[-1],
120             "raw_output": str(rep.longrepr),
121             "message": extract_error_msg(rep),
122         }
123
124         with open("{}/failures".format(output_dir), "w") as f:
125             json.dump(FAILURE_DATA, f, indent=4)
126
127
128 def make_timestamp():
129     timezone = time.tzname[time.localtime().tm_isdst]
130     return "{} {}".format(str(datetime.datetime.now()), timezone)
131
132
133 def pytest_sessionfinish(session, exitstatus):
134     if not session.config.option.template_dir:
135         return
136     template_path = os.path.abspath(session.config.option.template_dir[0])
137     profile_name = session.config.option.validation_profile_name
138     generate_report(
139         "{}/../output".format(__path__[0]),
140         template_path,
141         profile_name,
142         session.config.option.report_format,
143     )
144
145
146 def pytest_runtest_setup(item):
147     profile = item.session.config.option.validation_profile
148     markers = set(m.name for m in item.iter_markers())
149     if not profile and markers and "xfail" not in markers:
150         pytest.skip("No validation profile selected. Skipping tests with marks.")
151     if profile and markers and profile not in markers and "xfail" not in markers:
152         pytest.skip("Doesn't match selection validation profile")
153
154
155 def make_href(path):
156     paths = [path] if isinstance(path, string_types) else path
157     links = []
158     for p in paths:
159         abs_path = os.path.abspath(p)
160         filename = os.path.split(abs_path)[1]
161         links.append(
162             "<a href='file://{abs_path}' target='_blank'>{filename}</a>".format(
163                 abs_path=abs_path, filename=filename
164             )
165         )
166     return "<br/>".join(links)
167
168
169 def generate_report(outpath, template_path, profile_name, output_format):
170     failures = "{}/failures".format(outpath)
171     faildata = None
172     rdata = None
173     hdata = None
174
175     if os.path.exists(failures):
176         with open(failures, "r") as f:
177             faildata = json.loads(f.read())
178     else:
179         faildata = {}
180
181     resolution_steps = "{}/../{}".format(__path__[0], resolution_steps_file)
182     if os.path.exists(resolution_steps):
183         with open(resolution_steps, "r") as f:
184             rdata = json.loads(f.read())
185
186     heat_requirements = "{}/../{}".format(__path__[0], requirements_file)
187     if os.path.exists(heat_requirements):
188         with open(heat_requirements, "r") as f:
189             hdata = json.loads(f.read())
190
191     # point requirements at the most recent version
192     current_version = hdata["current_version"]
193     hdata = hdata["versions"][current_version]["needs"]
194     # mapping requirement IDs from failures to requirement descriptions
195     for k, v in faildata.items():
196         req_text = ""
197         if v["vnfrqts"] != "":
198             for req in v["vnfrqts"]:
199                 if req in hdata:
200                     req_text += "\n\n{}: \n{}".format(req, hdata[req]["description"])
201         faildata[k]["req_description"] = req_text
202
203     # mapping resolution steps to module and test name
204     for k, v in faildata.items():
205         faildata[k]["resolution_steps"] = ""
206         for rs in rdata:
207             if v["test_file"] == rs["module"] and v["test"] == rs["function"]:
208                 faildata[k]["resolution_steps"] = "\n{}: \n{}".format(
209                     rs["header"], rs["resolution_steps"]
210                 )
211     output_format = output_format.lower().strip() if output_format else "html"
212     if output_format == "html":
213         generate_html_report(outpath, profile_name, template_path, faildata)
214     elif output_format == "excel":
215         generate_excel_report(outpath, profile_name, template_path, faildata)
216     elif output_format == "csv":
217         generate_csv_report(outpath, profile_name, template_path, faildata)
218     else:
219         raise ValueError("Unsupported output format: " + output_format)
220
221
222 def generate_csv_report(output_dir, profile_name, template_path, faildata):
223     rows = []
224     rows.append(["Validation Failures"])
225     headers = [
226         ("Profile Selected:", profile_name),
227         ("Report Generated At:", make_timestamp()),
228         ("Directory Validated:", template_path),
229         ("Checksum:", hash_directory(template_path)),
230         ("Total Errors:", len(faildata)),
231     ]
232
233     rows.append([])
234     for header in headers:
235         rows.append(header)
236     rows.append([])
237
238     # table header
239     rows.append([col for col, _ in report_columns])
240
241     # table content
242     for data in faildata.values():
243         rows.append(
244             [
245                 data.get("file", ""),
246                 data.get("test_file", ""),
247                 data.get("req_description", ""),
248                 data.get("resolution_steps", ""),
249                 data.get("message", ""),
250                 data.get("raw_output", ""),
251             ]
252         )
253
254     output_path = os.path.join(output_dir, "report.csv")
255     with open(output_path, "w", newline="") as f:
256         writer = csv.writer(f)
257         for row in rows:
258             writer.writerow(row)
259
260
261 def generate_excel_report(output_dir, profile_name, template_path, faildata):
262     output_path = os.path.join(output_dir, "report.xlsx")
263     workbook = xlsxwriter.Workbook(output_path)
264     bold = workbook.add_format({"bold": True})
265     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
266     normal = workbook.add_format({"text_wrap": True})
267     heading = workbook.add_format({"bold": True, "font_size": 18})
268     worksheet = workbook.add_worksheet("failures")
269     worksheet.write(0, 0, "Validation Failures", heading)
270
271     headers = [
272         ("Profile Selected:", profile_name),
273         ("Report Generated At:", make_timestamp()),
274         ("Directory Validated:", template_path),
275         ("Checksum:", hash_directory(template_path)),
276         ("Total Errors:", len(faildata)),
277     ]
278     for row, (header, value) in enumerate(headers, start=2):
279         worksheet.write(row, 0, header, bold)
280         worksheet.write(row, 1, value)
281
282     worksheet.set_column(0, len(headers) - 1, 40)
283     worksheet.set_column(len(headers), len(headers), 80)
284
285     # table header
286     start_error_table_row = 2 + len(headers) + 2
287     for col_num, (col_name, _) in enumerate(report_columns):
288         worksheet.write(start_error_table_row, col_num, col_name, bold)
289
290     # table content
291     for row, data in enumerate(faildata.values(), start=start_error_table_row + 1):
292         for col, key in enumerate(report.values()):
293             if key == "file":
294                 paths = (
295                     [data[key]] if isinstance(data[key], string_types) else data[key]
296                 )
297                 contents = "\n".join(paths)
298                 worksheet.write(row, col, contents, normal)
299             elif key == "raw_output":
300                 worksheet.write_string(row, col, data[key], code)
301             else:
302                 worksheet.write(row, col, data[key], normal)
303
304     workbook.close()
305
306
307 def generate_html_report(outpath, profile_name, template_path, faildata):
308     with open("{}/report.html".format(outpath), "w") as of:
309         body_begin = """
310         <style type="text/css">
311         h1, li {{
312             font-family:Arial, sans-serif;
313         }}
314         .tg  {{border-collapse:collapse;border-spacing:0;}}
315         .tg td{{font-family:Arial, sans-serif;font-size:8px;padding:10px 5px;
316         border-style:solid;border-width:1px;overflow:hidden;word-break:normal;
317         border-color:black;}}
318         .tg th{{font-family:Arial, sans-serif;font-size:14px;font-weight:normal;
319         padding:10px 5px;border-style:solid;border-width:1px;overflow:hidden;
320         word-break:normal;border-color:black;}}
321         .tg .tg-rwj1{{font-size:10px;font-family:Arial, Helvetica,
322         sans-serif !important;;border-color:inherit;vertical-align:top}}</style>
323         <h1>Validation Failures</h1>
324         <ul>
325             <li><b>Profile Selected: </b> <tt>{profile}</tt></li>
326             <li><b>Report Generated At:</b> <tt>{timestamp}</tt></li>
327             <li><b>Directory Validated:</b> <tt>{template_dir}</tt></li>
328             <li><b>Checksum:</b> <tt>{checksum}</tt></li>
329             <li><b>Total Errors:</b> {num_failures}</li>
330         </ul>
331         """.format(
332             profile=profile_name,
333             timestamp=make_timestamp(),
334             checksum=hash_directory(template_path),
335             template_dir=template_path,
336             num_failures=len(faildata),
337         )
338         of.write(body_begin)
339
340         if len(faildata) == 0:
341             of.write("<p>Success! No validation failures detected.</p>")
342             return
343
344         table_begin = '<table class="tg">'
345         of.write(table_begin)
346
347         # table headers
348         of.write("<tr>")
349         for k, v in report.items():
350             of.write('<th class="tg-rwj1">{}</th>'.format(k))
351         of.write("</tr>")
352
353         # table content
354         for k, v in faildata.items():
355             of.write("<tr>")
356             for rk, rv in report.items():
357                 if rv == "file":
358                     value = make_href(v[rv])
359                 elif rv == "raw_output":
360                     value = "<pre>{}</pre>".format(v[rv])
361                 elif rv == "req_description":
362                     parts = docutils.core.publish_parts(
363                         writer_name="html", source=v[rv]
364                     )
365                     value = parts["body"]
366                 else:
367                     value = v[rv].replace("\n", "<br />")
368                 of.write("  <td>{}</td>".format(value))
369             of.write("</tr>")
370
371         of.write("</table>")
372
373
374 def pytest_addoption(parser):
375     """
376     Add needed CLI arguments
377     """
378     parser.addoption(
379         "--template-directory",
380         dest="template_dir",
381         action="append",
382         help="Directory which holds the templates for validation",
383     )
384
385     parser.addoption(
386         "--self-test",
387         dest="self_test",
388         action="store_true",
389         help="Test the unit tests against their fixtured data",
390     )
391
392     parser.addoption(
393         "--validation-profile",
394         dest="validation_profile",
395         action="store",
396         help="Runs all unmarked tests plus test with a matching marker",
397     )
398
399     parser.addoption(
400         "--validation-profile-name",
401         dest="validation_profile_name",
402         action="store",
403         help="Friendly name of the validation profile used in reports",
404     )
405
406     parser.addoption(
407         "--report-format",
408         dest="report_format",
409         action="store",
410         help="Format of output report (html, csv, excel)",
411     )
412
413
414 def pytest_configure(config):
415     """
416     Ensure that we are receive either `--self-test` or
417     `--template-dir=<directory` as CLI arguments
418     """
419     if config.getoption("template_dir") and config.getoption("self_test"):
420         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
421     if not (
422         config.getoption("template_dir") or
423         config.getoption("self_test") or
424         config.getoption("help")
425     ):
426         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
427
428
429 def pytest_generate_tests(metafunc):
430     """
431     If a unit test requires an argument named 'filename'
432     we generate a test for the filenames selected. Either
433     the files contained in `template_dir` or if `template_dir`
434     is not specified on the CLI, the fixtures associated with this
435     test name.
436     """
437     if "filename" in metafunc.fixturenames:
438         from .parametrizers import parametrize_filename
439
440         parametrize_filename(metafunc)
441
442     if "filenames" in metafunc.fixturenames:
443         from .parametrizers import parametrize_filenames
444
445         parametrize_filenames(metafunc)
446
447     if "template_dir" in metafunc.fixturenames:
448         from .parametrizers import parametrize_template_dir
449
450         parametrize_template_dir(metafunc)
451
452     if "environment_pair" in metafunc.fixturenames:
453         from .parametrizers import parametrize_environment_pair
454
455         parametrize_environment_pair(metafunc)
456
457     if "heat_volume_pair" in metafunc.fixturenames:
458         from .parametrizers import parametrize_heat_volume_pair
459
460         parametrize_heat_volume_pair(metafunc)
461
462     if "yaml_files" in metafunc.fixturenames:
463         from .parametrizers import parametrize_yaml_files
464
465         parametrize_yaml_files(metafunc)
466
467     if "env_files" in metafunc.fixturenames:
468         from .parametrizers import parametrize_environment_files
469
470         parametrize_environment_files(metafunc)
471
472     if "yaml_file" in metafunc.fixturenames:
473         from .parametrizers import parametrize_yaml_file
474
475         parametrize_yaml_file(metafunc)
476
477     if "env_file" in metafunc.fixturenames:
478         from .parametrizers import parametrize_environment_file
479
480         parametrize_environment_file(metafunc)
481
482     if "parsed_yaml_file" in metafunc.fixturenames:
483         from .parametrizers import parametrize_parsed_yaml_file
484
485         parametrize_parsed_yaml_file(metafunc)
486
487     if "parsed_environment_file" in metafunc.fixturenames:
488         from .parametrizers import parametrize_parsed_environment_file
489
490         parametrize_parsed_environment_file(metafunc)
491
492     if "heat_template" in metafunc.fixturenames:
493         from .parametrizers import parametrize_heat_template
494
495         parametrize_heat_template(metafunc)
496
497     if "heat_templates" in metafunc.fixturenames:
498         from .parametrizers import parametrize_heat_templates
499
500         parametrize_heat_templates(metafunc)
501
502     if "volume_template" in metafunc.fixturenames:
503         from .parametrizers import parametrize_volume_template
504
505         parametrize_volume_template(metafunc)
506
507     if "volume_templates" in metafunc.fixturenames:
508         from .parametrizers import parametrize_volume_templates
509
510         parametrize_volume_templates(metafunc)
511
512     if "template" in metafunc.fixturenames:
513         from .parametrizers import parametrize_template
514
515         parametrize_template(metafunc)
516
517     if "templates" in metafunc.fixturenames:
518         from .parametrizers import parametrize_templates
519
520         parametrize_templates(metafunc)
521
522
523 def hash_directory(path):
524     md5 = hashlib.md5()
525     for dir_path, sub_dirs, filenames in os.walk(path):
526         for filename in filenames:
527             file_path = os.path.join(dir_path, filename)
528             with open(file_path, "rb") as f:
529                 md5.update(f.read())
530     return md5.hexdigest()
531
532
533 def load_current_requirements():
534     """Loads dict of current requirements or empty dict if file doesn't exist"""
535     path = "requirements.json"
536     if not os.path.exists(path):
537         return {}
538     with io.open(path, encoding="utf8", mode="r") as f:
539         data = json.load(f)
540         version = data["current_version"]
541         return data["versions"][version]["needs"]
542
543
544 def compat_open(path):
545     """Invokes open correctly depending on the Python version"""
546     if sys.version_info.major < 3:
547         return open(path, "wb")
548     else:
549         return open(path, "w", newline="")
550
551
552 def unicode_writerow(writer, row):
553     if sys.version_info.major < 3:
554         row = [s.encode("utf8") for s in row]
555     writer.writerow(row)
556
557
558 def pytest_report_collectionfinish(config, startdir, items):
559     """Generates a simple traceability report to output/traceability.csv"""
560     traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
561     output_dir = os.path.split(traceability_path)[0]
562     if not os.path.exists(output_dir):
563         os.makedirs(output_dir)
564     requirements = load_current_requirements()
565     unmapped, mapped = partition(
566         lambda item: hasattr(item.function, "requirement_ids"), items
567     )
568
569     req_to_test = collections.defaultdict(set)
570     mapping_errors = set()
571     for item in mapped:
572         for req_id in item.function.requirement_ids:
573             req_to_test[req_id].add(item)
574             if req_id not in requirements:
575                 mapping_errors.add(
576                     (req_id, item.function.__module__, item.function.__name__)
577                 )
578
579     mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
580     with compat_open(mapping_error_path) as f:
581         writer = csv.writer(f)
582         for error in mapping_errors:
583             unicode_writerow(writer, error)
584
585     with compat_open(traceability_path) as f:
586         out = csv.writer(f)
587         unicode_writerow(
588             out,
589             ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
590         )
591         for req_id, metadata in requirements.items():
592             if req_to_test[req_id]:
593                 for item in req_to_test[req_id]:
594                     unicode_writerow(
595                         out,
596                         (
597                             req_id,
598                             metadata["description"],
599                             metadata["section_name"],
600                             item.function.__module__,
601                             item.function.__name__,
602                         ),
603                     )
604             else:
605                 unicode_writerow(
606                     out,
607                     (req_id, metadata["description"], metadata["section_name"], "", ""),
608                 )
609         # now write out any test methods that weren't mapped to requirements
610         for item in unmapped:
611             unicode_writerow(
612                 out, ("", "", "", item.function.__module__, item.function.__name__)
613             )