VVP - Fixing script for traceability csv
[vvp/validation-scripts.git] / ice_validator / tests / conftest.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START=======================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2018 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37
38 import collections
39 import csv
40 import datetime
41 import hashlib
42 import io
43 import json
44 import os
45 import sys
46 import time
47 import requests
48
49 import docutils.core
50 import pytest
51 from more_itertools import partition
52 from six import string_types
53 import xlsxwriter
54
55 __path__ = [os.path.dirname(os.path.abspath(__file__))]
56
57 resolution_steps_file = "resolution_steps.json"
58 requirements_file = "requirements.json"
59
60 FAILURE_DATA = {}
61
62 report_columns = [
63     ("Input File", "file"),
64     ("Test", "test_file"),
65     ("Requirements", "req_description"),
66     ("Resolution Steps", "resolution_steps"),
67     ("Error Message", "message"),
68     ("Raw Test Output", "raw_output"),
69 ]
70 report = collections.OrderedDict(report_columns)
71
72
73 def extract_error_msg(rep):
74     try:
75         msg = str(rep.longrepr.reprcrash)
76     except AttributeError:
77         msg = str(rep)
78
79     if "AssertionError:" in msg:
80         return msg.split("AssertionError:")[1]
81     else:
82         return msg
83
84
85 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
86 def pytest_runtest_makereport(item, call):
87
88     outcome = yield
89     rep = outcome.get_result()
90
91     output_dir = "{}/../output".format(__path__[0])
92     if rep.outcome == "failed":
93         if not os.path.exists(output_dir):
94             os.mkdir(output_dir)
95
96         if hasattr(item.function, "requirement_ids"):
97             requirement_ids = item.function.requirement_ids
98         else:
99             requirement_ids = ""
100
101         if "environment_pair" in item.fixturenames:
102             resolved_pair = "{} environment pair".format(
103                 item.funcargs["environment_pair"]["name"]
104             )
105         elif "heat_volume_pair" in item.fixturenames:
106             resolved_pair = "{} volume pair".format(
107                 item.funcargs["heat_volume_pair"]["name"]
108             )
109         elif "heat_templates" in item.fixturenames:
110             resolved_pair = item.funcargs["heat_templates"]
111         elif "yaml_files" in item.fixturenames:
112             resolved_pair = item.funcargs["yaml_files"]
113         else:
114             resolved_pair = rep.nodeid.split("[")[1][:-1]
115
116         FAILURE_DATA[len(FAILURE_DATA)] = {
117             "file": resolved_pair,
118             "vnfrqts": requirement_ids,
119             "test": item.function.__name__,
120             "test_file": item.function.__module__.split(".")[-1],
121             "raw_output": str(rep.longrepr),
122             "message": extract_error_msg(rep),
123         }
124
125         with open("{}/failures".format(output_dir), "w") as f:
126             json.dump(FAILURE_DATA, f, indent=4)
127
128
129 def make_timestamp():
130     timezone = time.tzname[time.localtime().tm_isdst]
131     return "{} {}".format(str(datetime.datetime.now()), timezone)
132
133
134 def pytest_sessionfinish(session, exitstatus):
135     if not session.config.option.template_dir:
136         return
137     template_path = os.path.abspath(session.config.option.template_dir[0])
138     profile_name = session.config.option.validation_profile_name
139     generate_report(
140         "{}/../output".format(__path__[0]),
141         template_path,
142         profile_name,
143         session.config.option.report_format,
144     )
145
146
147 def pytest_runtest_setup(item):
148     profile = item.session.config.option.validation_profile
149     markers = set(m.name for m in item.iter_markers())
150     if not profile and markers and "xfail" not in markers:
151         pytest.skip("No validation profile selected. Skipping tests with marks.")
152     if profile and markers and profile not in markers and "xfail" not in markers:
153         pytest.skip("Doesn't match selection validation profile")
154
155
156 def make_href(path):
157     paths = [path] if isinstance(path, string_types) else path
158     links = []
159     for p in paths:
160         abs_path = os.path.abspath(p)
161         filename = os.path.split(abs_path)[1]
162         links.append(
163             "<a href='file://{abs_path}' target='_blank'>{filename}</a>".format(
164                 abs_path=abs_path, filename=filename
165             )
166         )
167     return "<br/>".join(links)
168
169
170 def generate_report(outpath, template_path, profile_name, output_format):
171     failures = "{}/failures".format(outpath)
172     faildata = None
173     rdata = None
174     hdata = None
175
176     if os.path.exists(failures):
177         with open(failures, "r") as f:
178             faildata = json.loads(f.read())
179     else:
180         faildata = {}
181
182     resolution_steps = "{}/../{}".format(__path__[0], resolution_steps_file)
183     if os.path.exists(resolution_steps):
184         with open(resolution_steps, "r") as f:
185             rdata = json.loads(f.read())
186
187     heat_requirements = "{}/../{}".format(__path__[0], requirements_file)
188     if os.path.exists(heat_requirements):
189         with open(heat_requirements, "r") as f:
190             hdata = json.loads(f.read())
191
192     # point requirements at the most recent version
193     current_version = hdata["current_version"]
194     hdata = hdata["versions"][current_version]["needs"]
195     # mapping requirement IDs from failures to requirement descriptions
196     for k, v in faildata.items():
197         req_text = ""
198         if v["vnfrqts"] != "":
199             for req in v["vnfrqts"]:
200                 if req in hdata:
201                     req_text += "\n\n{}: \n{}".format(req, hdata[req]["description"])
202         faildata[k]["req_description"] = req_text
203
204     # mapping resolution steps to module and test name
205     for k, v in faildata.items():
206         faildata[k]["resolution_steps"] = ""
207         for rs in rdata:
208             if v["test_file"] == rs["module"] and v["test"] == rs["function"]:
209                 faildata[k]["resolution_steps"] = "\n{}: \n{}".format(
210                     rs["header"], rs["resolution_steps"]
211                 )
212     output_format = output_format.lower().strip() if output_format else "html"
213     if output_format == "html":
214         generate_html_report(outpath, profile_name, template_path, faildata)
215     elif output_format == "excel":
216         generate_excel_report(outpath, profile_name, template_path, faildata)
217     elif output_format == "csv":
218         generate_csv_report(outpath, profile_name, template_path, faildata)
219     else:
220         raise ValueError("Unsupported output format: " + output_format)
221
222
223 def generate_csv_report(output_dir, profile_name, template_path, faildata):
224     rows = []
225     rows.append(["Validation Failures"])
226     headers = [
227         ("Profile Selected:", profile_name),
228         ("Report Generated At:", make_timestamp()),
229         ("Directory Validated:", template_path),
230         ("Checksum:", hash_directory(template_path)),
231         ("Total Errors:", len(faildata)),
232     ]
233
234     rows.append([])
235     for header in headers:
236         rows.append(header)
237     rows.append([])
238
239     # table header
240     rows.append([col for col, _ in report_columns])
241
242     # table content
243     for data in faildata.values():
244         rows.append(
245             [
246                 data.get("file", ""),
247                 data.get("test_file", ""),
248                 data.get("req_description", ""),
249                 data.get("resolution_steps", ""),
250                 data.get("message", ""),
251                 data.get("raw_output", ""),
252             ]
253         )
254
255     output_path = os.path.join(output_dir, "report.csv")
256     with open(output_path, "w", newline="") as f:
257         writer = csv.writer(f)
258         for row in rows:
259             writer.writerow(row)
260
261
262 def generate_excel_report(output_dir, profile_name, template_path, faildata):
263     output_path = os.path.join(output_dir, "report.xlsx")
264     workbook = xlsxwriter.Workbook(output_path)
265     bold = workbook.add_format({"bold": True})
266     code = workbook.add_format(({"font_name": "Courier", "text_wrap": True}))
267     normal = workbook.add_format({"text_wrap": True})
268     heading = workbook.add_format({"bold": True, "font_size": 18})
269     worksheet = workbook.add_worksheet("failures")
270     worksheet.write(0, 0, "Validation Failures", heading)
271
272     headers = [
273         ("Profile Selected:", profile_name),
274         ("Report Generated At:", make_timestamp()),
275         ("Directory Validated:", template_path),
276         ("Checksum:", hash_directory(template_path)),
277         ("Total Errors:", len(faildata)),
278     ]
279     for row, (header, value) in enumerate(headers, start=2):
280         worksheet.write(row, 0, header, bold)
281         worksheet.write(row, 1, value)
282
283     worksheet.set_column(0, len(headers) - 1, 40)
284     worksheet.set_column(len(headers), len(headers), 80)
285
286     # table header
287     start_error_table_row = 2 + len(headers) + 2
288     for col_num, (col_name, _) in enumerate(report_columns):
289         worksheet.write(start_error_table_row, col_num, col_name, bold)
290
291     # table content
292     for row, data in enumerate(faildata.values(), start=start_error_table_row + 1):
293         for col, key in enumerate(report.values()):
294             if key == "file":
295                 paths = (
296                     [data[key]] if isinstance(data[key], string_types) else data[key]
297                 )
298                 contents = "\n".join(paths)
299                 worksheet.write(row, col, contents, normal)
300             elif key == "raw_output":
301                 worksheet.write_string(row, col, data[key], code)
302             else:
303                 worksheet.write(row, col, data[key], normal)
304
305     workbook.close()
306
307
308 def generate_html_report(outpath, profile_name, template_path, faildata):
309     with open("{}/report.html".format(outpath), "w") as of:
310         body_begin = """
311         <style type="text/css">
312         h1, li {{
313             font-family:Arial, sans-serif;
314         }}
315         .tg  {{border-collapse:collapse;border-spacing:0;}}
316         .tg td{{font-family:Arial, sans-serif;font-size:8px;padding:10px 5px;
317         border-style:solid;border-width:1px;overflow:hidden;word-break:normal;
318         border-color:black;}}
319         .tg th{{font-family:Arial, sans-serif;font-size:14px;font-weight:normal;
320         padding:10px 5px;border-style:solid;border-width:1px;overflow:hidden;
321         word-break:normal;border-color:black;}}
322         .tg .tg-rwj1{{font-size:10px;font-family:Arial, Helvetica,
323         sans-serif !important;;border-color:inherit;vertical-align:top}}</style>
324         <h1>Validation Failures</h1>
325         <ul>
326             <li><b>Profile Selected: </b> <tt>{profile}</tt></li>
327             <li><b>Report Generated At:</b> <tt>{timestamp}</tt></li>
328             <li><b>Directory Validated:</b> <tt>{template_dir}</tt></li>
329             <li><b>Checksum:</b> <tt>{checksum}</tt></li>
330             <li><b>Total Errors:</b> {num_failures}</li>
331         </ul>
332         """.format(
333             profile=profile_name,
334             timestamp=make_timestamp(),
335             checksum=hash_directory(template_path),
336             template_dir=template_path,
337             num_failures=len(faildata),
338         )
339         of.write(body_begin)
340
341         if len(faildata) == 0:
342             of.write("<p>Success! No validation failures detected.</p>")
343             return
344
345         table_begin = '<table class="tg">'
346         of.write(table_begin)
347
348         # table headers
349         of.write("<tr>")
350         for k, v in report.items():
351             of.write('<th class="tg-rwj1">{}</th>'.format(k))
352         of.write("</tr>")
353
354         # table content
355         for k, v in faildata.items():
356             of.write("<tr>")
357             for rk, rv in report.items():
358                 if rv == "file":
359                     value = make_href(v[rv])
360                 elif rv == "raw_output":
361                     value = "<pre>{}</pre>".format(v[rv])
362                 elif rv == "req_description":
363                     parts = docutils.core.publish_parts(
364                         writer_name="html", source=v[rv]
365                     )
366                     value = parts["body"]
367                 else:
368                     value = v[rv].replace("\n", "<br />")
369                 of.write("  <td>{}</td>".format(value))
370             of.write("</tr>")
371
372         of.write("</table>")
373
374
375 def pytest_addoption(parser):
376     """
377     Add needed CLI arguments
378     """
379     parser.addoption(
380         "--template-directory",
381         dest="template_dir",
382         action="append",
383         help="Directory which holds the templates for validation",
384     )
385
386     parser.addoption(
387         "--self-test",
388         dest="self_test",
389         action="store_true",
390         help="Test the unit tests against their fixtured data",
391     )
392
393     parser.addoption(
394         "--validation-profile",
395         dest="validation_profile",
396         action="store",
397         help="Runs all unmarked tests plus test with a matching marker",
398     )
399
400     parser.addoption(
401         "--validation-profile-name",
402         dest="validation_profile_name",
403         action="store",
404         help="Friendly name of the validation profile used in reports",
405     )
406
407     parser.addoption(
408         "--report-format",
409         dest="report_format",
410         action="store",
411         help="Format of output report (html, csv, excel)",
412     )
413
414
415 def pytest_configure(config):
416     """
417     Ensure that we are receive either `--self-test` or
418     `--template-dir=<directory` as CLI arguments
419     """
420     if config.getoption("template_dir") and config.getoption("self_test"):
421         raise Exception('"--template-dir", and "--self-test"' " are mutually exclusive")
422     if not (
423         config.getoption("template_dir") or
424         config.getoption("self_test") or
425         config.getoption("help")
426     ):
427         raise Exception('One of "--template-dir" or' ' "--self-test" must be specified')
428
429
430 def pytest_generate_tests(metafunc):
431     """
432     If a unit test requires an argument named 'filename'
433     we generate a test for the filenames selected. Either
434     the files contained in `template_dir` or if `template_dir`
435     is not specified on the CLI, the fixtures associated with this
436     test name.
437     """
438     if "filename" in metafunc.fixturenames:
439         from .parametrizers import parametrize_filename
440
441         parametrize_filename(metafunc)
442
443     if "filenames" in metafunc.fixturenames:
444         from .parametrizers import parametrize_filenames
445
446         parametrize_filenames(metafunc)
447
448     if "template_dir" in metafunc.fixturenames:
449         from .parametrizers import parametrize_template_dir
450
451         parametrize_template_dir(metafunc)
452
453     if "environment_pair" in metafunc.fixturenames:
454         from .parametrizers import parametrize_environment_pair
455
456         parametrize_environment_pair(metafunc)
457
458     if "heat_volume_pair" in metafunc.fixturenames:
459         from .parametrizers import parametrize_heat_volume_pair
460
461         parametrize_heat_volume_pair(metafunc)
462
463     if "yaml_files" in metafunc.fixturenames:
464         from .parametrizers import parametrize_yaml_files
465
466         parametrize_yaml_files(metafunc)
467
468     if "env_files" in metafunc.fixturenames:
469         from .parametrizers import parametrize_environment_files
470
471         parametrize_environment_files(metafunc)
472
473     if "yaml_file" in metafunc.fixturenames:
474         from .parametrizers import parametrize_yaml_file
475
476         parametrize_yaml_file(metafunc)
477
478     if "env_file" in metafunc.fixturenames:
479         from .parametrizers import parametrize_environment_file
480
481         parametrize_environment_file(metafunc)
482
483     if "parsed_yaml_file" in metafunc.fixturenames:
484         from .parametrizers import parametrize_parsed_yaml_file
485
486         parametrize_parsed_yaml_file(metafunc)
487
488     if "parsed_environment_file" in metafunc.fixturenames:
489         from .parametrizers import parametrize_parsed_environment_file
490
491         parametrize_parsed_environment_file(metafunc)
492
493     if "heat_template" in metafunc.fixturenames:
494         from .parametrizers import parametrize_heat_template
495
496         parametrize_heat_template(metafunc)
497
498     if "heat_templates" in metafunc.fixturenames:
499         from .parametrizers import parametrize_heat_templates
500
501         parametrize_heat_templates(metafunc)
502
503     if "volume_template" in metafunc.fixturenames:
504         from .parametrizers import parametrize_volume_template
505
506         parametrize_volume_template(metafunc)
507
508     if "volume_templates" in metafunc.fixturenames:
509         from .parametrizers import parametrize_volume_templates
510
511         parametrize_volume_templates(metafunc)
512
513     if "template" in metafunc.fixturenames:
514         from .parametrizers import parametrize_template
515
516         parametrize_template(metafunc)
517
518     if "templates" in metafunc.fixturenames:
519         from .parametrizers import parametrize_templates
520
521         parametrize_templates(metafunc)
522
523
524 def hash_directory(path):
525     md5 = hashlib.md5()
526     for dir_path, sub_dirs, filenames in os.walk(path):
527         for filename in filenames:
528             file_path = os.path.join(dir_path, filename)
529             with open(file_path, "rb") as f:
530                 md5.update(f.read())
531     return md5.hexdigest()
532
533
534 def load_current_requirements():
535     """Loads dict of current requirements or empty dict if file doesn't exist"""
536
537     url = 'https://onap.readthedocs.io/en/latest/_downloads/needs.json'
538     r = requests.get(url)
539     with open('requirements.json', 'wb') as needs:
540         needs.write(r.content)
541     path = "requirements.json"
542     if not os.path.exists(path):
543         return {}
544     with io.open(path, encoding="utf8", mode="r") as f:
545         data = json.load(f)
546         version = data["current_version"]
547         return data["versions"][version]["needs"]
548
549
550 def compat_open(path):
551     """Invokes open correctly depending on the Python version"""
552     if sys.version_info.major < 3:
553         return open(path, "wb")
554     else:
555         return open(path, "w", newline="")
556
557
558 def unicode_writerow(writer, row):
559     if sys.version_info.major < 3:
560         row = [s.encode("utf8") for s in row]
561     writer.writerow(row)
562
563
564 def pytest_report_collectionfinish(config, startdir, items):
565     """Generates a simple traceability report to output/traceability.csv"""
566     traceability_path = os.path.join(__path__[0], "../output/traceability.csv")
567     output_dir = os.path.split(traceability_path)[0]
568     if not os.path.exists(output_dir):
569         os.makedirs(output_dir)
570     requirements = load_current_requirements()
571     unmapped, mapped = partition(
572         lambda item: hasattr(item.function, "requirement_ids"), items
573     )
574
575     req_to_test = collections.defaultdict(set)
576     mapping_errors = set()
577     for item in mapped:
578         for req_id in item.function.requirement_ids:
579             if req_id not in req_to_test:
580                 req_to_test[req_id].add(item)
581             if req_id not in requirements:
582                 mapping_errors.add(
583                     (req_id, item.function.__module__, item.function.__name__)
584                 )
585
586     mapping_error_path = os.path.join(__path__[0], "../output/mapping_errors.csv")
587     with compat_open(mapping_error_path) as f:
588         writer = csv.writer(f)
589         for error in mapping_errors:
590             unicode_writerow(writer, error)
591
592     with compat_open(traceability_path) as f:
593         out = csv.writer(f)
594         unicode_writerow(
595             out,
596             ("Requirement ID", "Requirement", "Section", "Test Module", "Test Name"),
597         )
598         for req_id, metadata in requirements.items():
599             if req_to_test[req_id]:
600                 for item in req_to_test[req_id]:
601                     unicode_writerow(
602                         out,
603                         (
604                             req_id,
605                             metadata["description"],
606                             metadata["section_name"],
607                             item.function.__module__,
608                             item.function.__name__,
609                         ),
610                     )
611             else:
612                 unicode_writerow(
613                     out,
614                     (req_id, metadata["description"], metadata["section_name"], "", ""),
615                 )
616         # now write out any test methods that weren't mapped to requirements
617         for item in unmapped:
618             unicode_writerow(
619                 out, ("", "", "", item.function.__module__, item.function.__name__)
620             )