Merge "[VVP] Adding bandit security scans and fixes"
[vvp/validation-scripts.git] / checks.py
1 # -*- coding: utf8 -*-
2 # ============LICENSE_START====================================================
3 # org.onap.vvp/validation-scripts
4 # ===================================================================
5 # Copyright © 2019 AT&T Intellectual Property. All rights reserved.
6 # ===================================================================
7 #
8 # Unless otherwise specified, all software contained herein is licensed
9 # under the Apache License, Version 2.0 (the "License");
10 # you may not use this software except in compliance with the License.
11 # You may obtain a copy of the License at
12 #
13 #             http://www.apache.org/licenses/LICENSE-2.0
14 #
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 #
21 #
22 #
23 # Unless otherwise specified, all documentation contained herein is licensed
24 # under the Creative Commons License, Attribution 4.0 Intl. (the "License");
25 # you may not use this documentation except in compliance with the License.
26 # You may obtain a copy of the License at
27 #
28 #             https://creativecommons.org/licenses/by/4.0/
29 #
30 # Unless required by applicable law or agreed to in writing, documentation
31 # distributed under the License is distributed on an "AS IS" BASIS,
32 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
33 # See the License for the specific language governing permissions and
34 # limitations under the License.
35 #
36 # ============LICENSE_END============================================
37 #
38 import contextlib
39 import csv
40 import io
41 import json
42 import os
43 import subprocess  #nosec
44 import sys
45
46 import pytest
47 from flake8.main.application import Application
48
49 from update_reqs import get_requirements
50
51 THIS_DIR = os.path.dirname(os.path.abspath(__file__))
52 CURRENT_NEEDS_PATH = os.path.join(THIS_DIR, "ice_validator/heat_requirements.json")
53
54
55 class Traceability:
56
57     PATH = os.path.join(THIS_DIR, "ice_validator/output/traceability.csv")
58     TEST_FILE = 6
59     TEST_NAME = 7
60     IS_TESTABLE = 5
61     REQ_ID = 0
62
63     def __init__(self):
64         with open(self.PATH, "r") as f:
65             rows = csv.reader(f)
66             next(rows)  # skip header
67             self.mappings = list(rows)
68
69     def unmapped_requirement_errors(self):
70         """
71         Returns list of errors where a requirement is testable, but no test was found.
72         """
73         testable_mappings = [m for m in self.mappings if m[self.IS_TESTABLE] == "True"]
74         return [
75             f"Missing test for {m[self.REQ_ID]}"
76             for m in testable_mappings
77             if not m[self.TEST_NAME]
78         ]
79
80     def mapped_non_testable_requirement_errors(self):
81         """
82         Returns list of errors where the requirement isn't testable, but a test was
83         found.
84         """
85         non_testables = [m for m in self.mappings if m[self.IS_TESTABLE] == "False"]
86         return [
87             (
88                 f"No test for {m[0]} is needed, but found: "
89                 f"{m[self.TEST_FILE]}::{m[self.TEST_NAME]} "
90             )
91             for m in non_testables
92             if m[self.TEST_NAME]
93         ]
94
95
96 def current_version(needs):
97     """Extracts and returns the needs under the current version"""
98     return needs["versions"][needs["current_version"]]["needs"]
99
100
101 def in_scope(_, req_metadata):
102     """
103     Checks if requirement is relevant to VVP.
104
105     :param: _: not used
106     :param req_metadata: needs metadata about the requirement
107     :return: True if the requirement is a testable, Heat requirement
108     """
109     return (
110         "Heat" in req_metadata.get("docname", "")
111         and "MUST" in req_metadata.get("keyword", "").upper()
112         and req_metadata.get("validation_mode", "").lower() != "none"
113     )
114
115
116 def select_items(predicate, source_dict):
117     """
118     Creates a new dict from the source dict where the items match the given predicate
119     :param predicate: predicate function that must accept a two arguments (key & value)
120     :param source_dict: input dictionary to select from
121     :return: filtered dict
122     """
123     return {k: v for k, v in source_dict.items() if predicate(k, v)}
124
125
126 def check_requirements_up_to_date():
127     """
128     Checks if the requirements file packaged with VVP has meaningful differences
129     to the requirements file published from VNFRQTS.
130     :return: list of errors found
131     """
132     msg = ["heat_requirements.json is out-of-date. Run update_reqs.py to update."]
133     latest_needs = json.load(get_requirements())
134     with open(CURRENT_NEEDS_PATH, "r") as f:
135         current_needs = json.load(f)
136     latest_reqs = select_items(in_scope, current_version(latest_needs))
137     current_reqs = select_items(in_scope, current_version(current_needs))
138     if set(latest_reqs.keys()) != set(current_reqs.keys()):
139         return msg
140     if not all(
141         latest["description"] == current_reqs[r_id]["description"]
142         for r_id, latest in latest_reqs.items()
143     ):
144         return msg
145     return None
146
147
148 def check_self_test_pass():
149     """
150     Run pytest self-test and ensure it passes
151     :return:
152     """
153     original_dir = os.getcwd()
154     try:
155         os.chdir(os.path.join(THIS_DIR, "ice_validator"))
156         if pytest.main(["tests", "--self-test"]) != 0:
157             return ["VVP self-test failed. Run pytest --self-test and fix errors."]
158     finally:
159         os.chdir(original_dir)
160
161
162 def check_testable_requirements_are_mapped():
163     tracing = Traceability()
164     return tracing.unmapped_requirement_errors()
165
166
167 def check_non_testable_requirements_are_not_mapped():
168     tracing = Traceability()
169     return tracing.mapped_non_testable_requirement_errors()
170
171
172 def check_flake8_passes():
173     output = io.StringIO()
174     with contextlib.redirect_stdout(output), contextlib.redirect_stderr(output):
175         app = Application()
176         app.run(["ice_validator"])
177     output.seek(0)
178     lines = [f"   {l}" for l in output.readlines()]
179     return ["flake8 errors detected:"] + lines if lines else []
180
181
182 def check_bandit_passes():
183     result = subprocess.run(                                            #nosec
184         ["bandit", "-c", "bandit.yaml", "-r", ".", "-x", "./.tox/**"],  #nosec
185         encoding="utf-8",                                               #nosec
186         stdout=subprocess.PIPE,                                         #nosec
187         stderr=subprocess.PIPE,                                         #nosec
188     )                                                                   #nosec
189     msgs = result.stdout.split("\n") if result.returncode != 0 else []
190     return ["bandit errors detected:"] + [f"  {e}" for e in msgs] if msgs else []
191
192
193 if __name__ == "__main__":
194     checks = [
195         check_self_test_pass,
196         check_requirements_up_to_date,
197         check_testable_requirements_are_mapped,
198         check_non_testable_requirements_are_not_mapped,
199         check_flake8_passes,
200         check_bandit_passes,
201     ]
202     results = [check() for check in checks]
203     errors = "\n".join("\n".join(msg) for msg in results if msg)
204     print(errors or "Everything looks good!")
205     sys.exit(1 if errors else 0)