Address k8s plugin code smells reported by sonar
[dcaegen2/platform/plugins.git] / k8s / tests / common.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19
20 # Common functions for unit testing
21 def _set_k8s_configuration():
22     ''' Set up the basic k8s configuration '''
23     return  {
24         "image_pull_secrets" : ["secret0", "secret1"],
25         "filebeat" : {
26             "log_path": "/var/log/onap",
27             "data_path": "/usr/share/filebeat/data",
28             "config_path": "/usr/share/filebeat/filebeat.yml",
29             "config_subpath": "filebeat.yml",
30             "image" : "filebeat-repo/filebeat:latest",
31             "config_map" : "dcae-filebeat-configmap"
32         },
33         "tls" : {
34             "cert_path": "/opt/certs",
35             "image": "tlsrepo/tls-init-container:1.2.3",
36             "component_ca_cert_path": "/opt/dcae/cacert/cacert.pem",
37             "ca_cert_configmap": "dcae-cacert-configmap"
38         },
39         "cbs": {
40             "base_url": "https://config-binding-service:10443/service_component_all/test-component"
41         }
42     }
43
44 def _set_resources():
45     ''' Set resources '''
46     return {
47         "limits": {
48             "cpu" : 0.5,
49             "memory" : "2Gi"
50         },
51         "requests": {
52             "cpu" : 0.5,
53             "memory" : "2Gi"
54         }
55     }
56
57 def _set_common_kwargs():
58     ''' Set kwargs common to all test cases '''
59     return {
60         "volumes": [
61             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw"}}
62         ],
63         "ports": ["80:0", "443:0"],
64         "env": {"NAME0": "value0", "NAME1": "value1"},
65         "log_info": {"log_directory": "/path/to/container/log/directory"},
66         "readiness": {"type": "http", "endpoint" : "/ready"}
67     }
68
69 def _get_item_by_name(list, name):
70     ''' Search a list of k8s API objects with the specified name '''
71     for item in list:
72         if item.name == name:
73             return item
74     return None
75
76 def check_env_var(env_list, name, value):
77     e = _get_item_by_name(env_list, name)
78     assert e and e.value == value
79
80 def verify_common(dep, deployment_description):
81     ''' Check results common to all test cases '''
82     assert deployment_description["deployment"] == "dep-testcomponent"
83     assert deployment_description["namespace"] == "k8stest"
84     assert deployment_description["services"][0] == "testcomponent"
85
86     # For unit test purposes, we want to make sure that the deployment object
87     # we're passing to the k8s API is correct
88     app_container = dep.spec.template.spec.containers[0]
89     assert app_container.image == "example.com/testcomponent:1.4.3"
90     assert app_container.image_pull_policy == "IfNotPresent"
91     assert len(app_container.ports) == 2
92     assert app_container.ports[0].container_port == 80
93     assert app_container.ports[1].container_port == 443
94     assert app_container.readiness_probe.http_get.path == "/ready"
95     assert app_container.readiness_probe.http_get.scheme == "HTTP"
96     assert len(app_container.volume_mounts) == 3
97     assert app_container.volume_mounts[0].mount_path == "/path/on/container"
98     assert app_container.volume_mounts[1].mount_path == "/path/to/container/log/directory"
99
100     # Check environment variables
101     env = app_container.env
102     check_env_var(env, "NAME0", "value0")
103     check_env_var(env, "NAME1", "value1")
104
105     # Should have a log container with volume mounts
106     log_container = dep.spec.template.spec.containers[1]
107     assert log_container.image == "filebeat-repo/filebeat:latest"
108     assert log_container.volume_mounts[0].mount_path == "/var/log/onap/testcomponent"
109     assert log_container.volume_mounts[0].name == "component-log"
110     assert log_container.volume_mounts[1].mount_path == "/usr/share/filebeat/data"
111     assert log_container.volume_mounts[1].name == "filebeat-data"
112     assert log_container.volume_mounts[2].mount_path == "/usr/share/filebeat/filebeat.yml"
113     assert log_container.volume_mounts[2].name == "filebeat-conf"
114
115     # Needs to be correctly labeled so that the Service can find it
116     assert dep.spec.template.metadata.labels["app"] == "testcomponent"
117
118
119 def do_deploy(tls_info=None):
120     ''' Common deployment operations '''
121     import k8sclient.k8sclient
122
123     k8s_test_config = _set_k8s_configuration()
124
125     kwargs = _set_common_kwargs()
126     kwargs['resources'] = _set_resources()
127
128     if tls_info:
129         kwargs["tls_info"] = tls_info
130
131     dep, deployment_description = k8sclient.k8sclient.deploy("k8stest", "testcomponent", "example.com/testcomponent:1.4.3", 1, False, k8s_test_config, **kwargs)
132
133     # Make sure all of the basic k8s parameters are correct
134     verify_common(dep, deployment_description)
135
136     return dep, deployment_description