[VVP] Preload Generation Enhancements and Fixes
[vvp/validation-scripts.git] / checks.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 import contextlib
39 import csv
40 import io
41 import json
42 import os
43 import subprocess  #nosec
44 import sys
45
46 import pytest
47 from flake8.main.application import Application
48
49 from update_reqs import get_requirements
50
51 THIS_DIR = os.path.dirname(os.path.abspath(__file__))
52 CURRENT_NEEDS_PATH = os.path.join(THIS_DIR, "ice_validator/heat_requirements.json")
53
54
55 def run_pytest(*args, msg="pytest failed"):
56     original_dir = os.getcwd()
57     try:
58         os.chdir(os.path.join(THIS_DIR, "ice_validator"))
59         if pytest.main(list(args)) != 0:
60             return [msg]
61     finally:
62         os.chdir(original_dir)
63
64
65 class Traceability:
66
67     PATH = os.path.join(THIS_DIR, "ice_validator/output/traceability.csv")
68     TEST_FILE = 6
69     TEST_NAME = 7
70     IS_TESTABLE = 5
71     REQ_ID = 0
72
73     def __init__(self):
74         with open(self.PATH, "r") as f:
75             rows = csv.reader(f)
76             next(rows)  # skip header
77             self.mappings = list(rows)
78
79     def unmapped_requirement_errors(self):
80         """
81         Returns list of errors where a requirement is testable, but no test was found.
82         """
83         testable_mappings = [m for m in self.mappings if m[self.IS_TESTABLE] == "True"]
84         return [
85             f"Missing test for {m[self.REQ_ID]}"
86             for m in testable_mappings
87             if not m[self.TEST_NAME]
88         ]
89
90     def mapped_non_testable_requirement_errors(self):
91         """
92         Returns list of errors where the requirement isn't testable, but a test was
93         found.
94         """
95         non_testables = [m for m in self.mappings if m[self.IS_TESTABLE] == "False"]
96         return [
97             (
98                 f"No test for {m[0]} is needed, but found: "
99                 f"{m[self.TEST_FILE]}::{m[self.TEST_NAME]} "
100             )
101             for m in non_testables
102             if m[self.TEST_NAME]
103         ]
104
105
106 def current_version(needs):
107     """Extracts and returns the needs under the current version"""
108     return needs["versions"][needs["current_version"]]["needs"]
109
110
111 def in_scope(_, req_metadata):
112     """
113     Checks if requirement is relevant to VVP.
114
115     :param: _: not used
116     :param req_metadata: needs metadata about the requirement
117     :return: True if the requirement is a testable, Heat requirement
118     """
119     return (
120         "Heat" in req_metadata.get("docname", "")
121         and "MUST" in req_metadata.get("keyword", "").upper()
122         and req_metadata.get("validation_mode", "").lower() != "none"
123     )
124
125
126 def select_items(predicate, source_dict):
127     """
128     Creates a new dict from the source dict where the items match the given predicate
129     :param predicate: predicate function that must accept a two arguments (key & value)
130     :param source_dict: input dictionary to select from
131     :return: filtered dict
132     """
133     return {k: v for k, v in source_dict.items() if predicate(k, v)}
134
135
136 def check_requirements_up_to_date():
137     """
138     Checks if the requirements file packaged with VVP has meaningful differences
139     to the requirements file published from VNFRQTS.
140     :return: list of errors found
141     """
142     msg = ["heat_requirements.json is out-of-date. Run update_reqs.py to update."]
143     latest_needs = json.load(get_requirements())
144     with open(CURRENT_NEEDS_PATH, "r") as f:
145         current_needs = json.load(f)
146     latest_reqs = select_items(in_scope, current_version(latest_needs))
147     current_reqs = select_items(in_scope, current_version(current_needs))
148     if set(latest_reqs.keys()) != set(current_reqs.keys()):
149         return msg
150     if not all(
151         latest["description"] == current_reqs[r_id]["description"]
152         for r_id, latest in latest_reqs.items()
153     ):
154         return msg
155     return None
156
157
158 def check_app_tests_pass():
159     return run_pytest("tests", "--self-test",
160                       msg="app_tests failed. Run pytest app_tests and fix errors.")
161
162
163 def check_self_test_pass():
164     return run_pytest("tests", "--self-test",
165                       msg="self-test failed. Run pytest --self-test and fix errors.")
166
167
168 def check_testable_requirements_are_mapped():
169     tracing = Traceability()
170     return tracing.unmapped_requirement_errors()
171
172
173 def check_non_testable_requirements_are_not_mapped():
174     tracing = Traceability()
175     return tracing.mapped_non_testable_requirement_errors()
176
177
178 def check_flake8_passes():
179     output = io.StringIO()
180     with contextlib.redirect_stdout(output), contextlib.redirect_stderr(output):
181         app = Application()
182         app.run(["ice_validator"])
183     output.seek(0)
184     lines = [f"   {l}" for l in output.readlines()]
185     return ["flake8 errors detected:"] + lines if lines else []
186
187
188 def check_bandit_passes():
189     result = subprocess.run(                                            #nosec
190         ["bandit", "-c", "bandit.yaml", "-r", ".", "-x", "./.tox/**"],  #nosec
191         encoding="utf-8",                                               #nosec
192         stdout=subprocess.PIPE,                                         #nosec
193         stderr=subprocess.PIPE,                                         #nosec
194     )                                                                   #nosec
195     msgs = result.stdout.split("\n") if result.returncode != 0 else []
196     return ["bandit errors detected:"] + [f"  {e}" for e in msgs] if msgs else []
197
198
199 if __name__ == "__main__":
200     checks = [
201         check_self_test_pass,
202         check_requirements_up_to_date,
203         check_testable_requirements_are_mapped,
204         check_non_testable_requirements_are_not_mapped,
205         check_flake8_passes,
206         check_bandit_passes,
207     ]
208     results = [check() for check in checks]
209     errors = "\n".join("\n".join(msg) for msg in results if msg)
210     print(errors or "Everything looks good!")
211     sys.exit(1 if errors else 0)