Provide cacert in JKS format for clients
[dcaegen2/platform/plugins.git] / k8s / tests / common.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2019 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19
20 # Common functions for unit testing
21 def _set_k8s_configuration():
22     ''' Set up the basic k8s configuration '''
23     return  {
24         "image_pull_secrets" : ["secret0", "secret1"],
25         "filebeat" : {
26             "log_path": "/var/log/onap",
27             "data_path": "/usr/share/filebeat/data",
28             "config_path": "/usr/share/filebeat/filebeat.yml",
29             "config_subpath": "filebeat.yml",
30             "image" : "filebeat-repo/filebeat:latest",
31             "config_map" : "dcae-filebeat-configmap"
32         },
33         "tls" : {
34             "cert_path": "/opt/certs",
35             "image": "tlsrepo/tls-init-container:1.2.3",
36             "component_cert_dir": "/opt/dcae/cacert"
37         },
38         "cbs": {
39             "base_url": "https://config-binding-service:10443/service_component_all/test-component"
40         }
41     }
42
43 def _set_resources():
44     ''' Set resources '''
45     return {
46         "limits": {
47             "cpu" : 0.5,
48             "memory" : "2Gi"
49         },
50         "requests": {
51             "cpu" : 0.5,
52             "memory" : "2Gi"
53         }
54     }
55
56 def _set_common_kwargs():
57     ''' Set kwargs common to all test cases '''
58     return {
59         "volumes": [
60             {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw"}}
61         ],
62         "ports": ["80:0", "443:0"],
63         "env": {"NAME0": "value0", "NAME1": "value1"},
64         "log_info": {"log_directory": "/path/to/container/log/directory"},
65         "readiness": {"type": "http", "endpoint" : "/ready"}
66     }
67
68 def _get_item_by_name(list, name):
69     ''' Search a list of k8s API objects with the specified name '''
70     for item in list:
71         if item.name == name:
72             return item
73     return None
74
75 def check_env_var(env_list, name, value):
76     e = _get_item_by_name(env_list, name)
77     assert e and e.value == value
78
79 def verify_common(dep, deployment_description):
80     ''' Check results common to all test cases '''
81     assert deployment_description["deployment"] == "dep-testcomponent"
82     assert deployment_description["namespace"] == "k8stest"
83     assert deployment_description["services"][0] == "testcomponent"
84
85     # For unit test purposes, we want to make sure that the deployment object
86     # we're passing to the k8s API is correct
87     app_container = dep.spec.template.spec.containers[0]
88     assert app_container.image == "example.com/testcomponent:1.4.3"
89     assert app_container.image_pull_policy == "IfNotPresent"
90     assert len(app_container.ports) == 2
91     assert app_container.ports[0].container_port == 80
92     assert app_container.ports[1].container_port == 443
93     assert app_container.readiness_probe.http_get.path == "/ready"
94     assert app_container.readiness_probe.http_get.scheme == "HTTP"
95     assert len(app_container.volume_mounts) == 3
96     assert app_container.volume_mounts[0].mount_path == "/path/on/container"
97     assert app_container.volume_mounts[1].mount_path == "/path/to/container/log/directory"
98
99     # Check environment variables
100     env = app_container.env
101     check_env_var(env, "NAME0", "value0")
102     check_env_var(env, "NAME1", "value1")
103
104     # Should have a log container with volume mounts
105     log_container = dep.spec.template.spec.containers[1]
106     assert log_container.image == "filebeat-repo/filebeat:latest"
107     assert log_container.volume_mounts[0].mount_path == "/var/log/onap/testcomponent"
108     assert log_container.volume_mounts[0].name == "component-log"
109     assert log_container.volume_mounts[1].mount_path == "/usr/share/filebeat/data"
110     assert log_container.volume_mounts[1].name == "filebeat-data"
111     assert log_container.volume_mounts[2].mount_path == "/usr/share/filebeat/filebeat.yml"
112     assert log_container.volume_mounts[2].name == "filebeat-conf"
113
114     # Needs to be correctly labeled so that the Service can find it
115     assert dep.spec.template.metadata.labels["app"] == "testcomponent"
116
117
118 def do_deploy(tls_info=None):
119     ''' Common deployment operations '''
120     import k8sclient.k8sclient
121
122     k8s_test_config = _set_k8s_configuration()
123
124     kwargs = _set_common_kwargs()
125     kwargs['resources'] = _set_resources()
126
127     if tls_info:
128         kwargs["tls_info"] = tls_info
129
130     dep, deployment_description = k8sclient.k8sclient.deploy("k8stest", "testcomponent", "example.com/testcomponent:1.4.3", 1, False, k8s_test_config, **kwargs)
131
132     # Make sure all of the basic k8s parameters are correct
133     verify_common(dep, deployment_description)
134
135     return dep, deployment_description