87516cb60e14665aada0ab5a4ab3b23fb4541d64
[integration.git] / test / security / check_versions / versions / k8s_bin_versions_inspector_test_case.py
1 #!/usr/bin/env python3
2
3 #   COPYRIGHT NOTICE STARTS HERE
4 #
5 #   Copyright 2020 Samsung Electronics Co., Ltd.
6 #
7 #   Licensed under the Apache License, Version 2.0 (the "License");
8 #   you may not use this file except in compliance with the License.
9 #   You may obtain a copy of the License at
10 #
11 #       http://www.apache.org/licenses/LICENSE-2.0
12 #
13 #   Unless required by applicable law or agreed to in writing, software
14 #   distributed under the License is distributed on an "AS IS" BASIS,
15 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 #   See the License for the specific language governing permissions and
17 #   limitations under the License.
18 #
19 #   COPYRIGHT NOTICE ENDS HERE
20
21 import logging
22 import pathlib
23 import time
24 import os
25 import wget
26 from kubernetes import client, config
27 from xtesting.core import testcase  # pylint: disable=import-error
28
29 import versions.reporting as Reporting
30 from versions.k8s_bin_versions_inspector import (
31     gather_containers_informations,
32     generate_and_handle_output,
33     verify_versions_acceptability,
34 )
35
36 RECOMMENDED_VERSIONS_FILE = "/tmp/recommended_versions.yaml"
37 WAIVER_LIST_FILE = "/tmp/versions_xfail.txt"
38
39 # Logger
40 logging.basicConfig()
41 LOGGER = logging.getLogger("onap-versions-status-inspector")
42 LOGGER.setLevel("INFO")
43
44
45 class Inspector(testcase.TestCase):
46     """Inspector CLass."""
47
48     def __init__(self, **kwargs):
49         """Init the testcase."""
50         if "case_name" not in kwargs:
51             kwargs["case_name"] = "check_versions"
52         super().__init__(**kwargs)
53
54         version = os.getenv("ONAP_VERSION", "master")
55         base_url = "https://git.onap.org/integration/seccom/plain"
56
57         self.namespace = "onap"
58         # if no Recommended file found, download it
59         if pathlib.Path(RECOMMENDED_VERSIONS_FILE).is_file():
60             self.acceptable = pathlib.Path(RECOMMENDED_VERSIONS_FILE)
61         else:
62             self.acceptable = wget.download(
63                 base_url + "/recommended_versions.yaml?h=" + version,
64                 out=RECOMMENDED_VERSIONS_FILE,
65             )
66         self.output_file = "/tmp/versions.json"
67         # if no waiver file found, download it
68         if pathlib.Path(WAIVER_LIST_FILE).is_file():
69             self.waiver = pathlib.Path(WAIVER_LIST_FILE)
70         else:
71             self.waiver = wget.download(
72                 base_url + "/waivers/versions/versions_xfail.txt?h=" + version,
73                 out=WAIVER_LIST_FILE,
74             )
75         self.result = 0
76         self.start_time = None
77         self.stop_time = None
78
79     def run(self):
80         """Execute the version Inspector."""
81         self.start_time = time.time()
82         config.load_kube_config()
83         api = client.CoreV1Api()
84
85         field_selector = "metadata.namespace==onap"
86
87         containers = gather_containers_informations(api, field_selector, True)
88         LOGGER.info("gather_containers_informations")
89         LOGGER.info(containers)
90         LOGGER.info("---------------------------------")
91
92         generate_and_handle_output(
93             containers, "json", pathlib.Path(self.output_file), True
94         )
95         LOGGER.info("generate_and_handle_output in %s", self.output_file)
96         LOGGER.info("---------------------------------")
97
98         code = verify_versions_acceptability(containers, self.acceptable, True)
99         LOGGER.info("verify_versions_acceptability")
100         LOGGER.info(code)
101         LOGGER.info("---------------------------------")
102
103         # Generate reporting
104         test = Reporting.OnapVersionsReporting(result_file=self.output_file)
105         LOGGER.info("Prepare reporting")
106         self.result = test.generate_reporting(self.output_file)
107         LOGGER.info("Reporting generated")
108
109         self.stop_time = time.time()
110         if self.result >= 90:
111             return testcase.TestCase.EX_OK
112         return testcase.TestCase.EX_TESTCASE_FAILED
113
114     def set_namespace(self, namespace):
115         """Set namespace."""
116         self.namespace = namespace