1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
20 # Common functions for unit testing
21 def _set_k8s_configuration():
22 ''' Set up the basic k8s configuration '''
24 "image_pull_secrets" : ["secret0", "secret1"],
26 "log_path": "/var/log/onap",
27 "data_path": "/usr/share/filebeat/data",
28 "config_path": "/usr/share/filebeat/filebeat.yml",
29 "config_subpath": "filebeat.yml",
30 "image" : "filebeat-repo/filebeat:latest",
31 "config_map" : "dcae-filebeat-configmap"
34 "cert_path": "/opt/certs",
35 "image": "tlsrepo/tls-init-container:1.2.3",
36 "component_cert_dir": "/opt/dcae/cacert"
39 "base_url": "https://config-binding-service:10443/service_component_all/test-component"
56 def _set_common_kwargs():
57 ''' Set kwargs common to all test cases '''
60 {"host":{"path": "/path/on/host"}, "container":{"bind":"/path/on/container","mode":"rw"}}
62 "ports": ["80:0", "443:0"],
63 "env": {"NAME0": "value0", "NAME1": "value1"},
64 "log_info": {"log_directory": "/path/to/container/log/directory"},
65 "readiness": {"type": "http", "endpoint" : "/ready"}
68 def _get_item_by_name(list, name):
69 ''' Search a list of k8s API objects with the specified name '''
75 def check_env_var(env_list, name, value):
76 e = _get_item_by_name(env_list, name)
77 assert e and e.value == value
79 def verify_common(dep, deployment_description):
80 ''' Check results common to all test cases '''
81 assert deployment_description["deployment"] == "dep-testcomponent"
82 assert deployment_description["namespace"] == "k8stest"
83 assert deployment_description["services"][0] == "testcomponent"
85 # For unit test purposes, we want to make sure that the deployment object
86 # we're passing to the k8s API is correct
87 app_container = dep.spec.template.spec.containers[0]
88 assert app_container.image == "example.com/testcomponent:1.4.3"
89 assert app_container.image_pull_policy == "IfNotPresent"
90 assert len(app_container.ports) == 2
91 assert app_container.ports[0].container_port == 80
92 assert app_container.ports[1].container_port == 443
93 assert app_container.readiness_probe.http_get.path == "/ready"
94 assert app_container.readiness_probe.http_get.scheme == "HTTP"
95 assert len(app_container.volume_mounts) == 3
96 assert app_container.volume_mounts[0].mount_path == "/path/on/container"
97 assert app_container.volume_mounts[1].mount_path == "/path/to/container/log/directory"
99 # Check environment variables
100 env = app_container.env
101 check_env_var(env, "NAME0", "value0")
102 check_env_var(env, "NAME1", "value1")
104 # Should have a log container with volume mounts
105 log_container = dep.spec.template.spec.containers[1]
106 assert log_container.image == "filebeat-repo/filebeat:latest"
107 assert log_container.volume_mounts[0].mount_path == "/var/log/onap/testcomponent"
108 assert log_container.volume_mounts[0].name == "component-log"
109 assert log_container.volume_mounts[1].mount_path == "/usr/share/filebeat/data"
110 assert log_container.volume_mounts[1].name == "filebeat-data"
111 assert log_container.volume_mounts[2].mount_path == "/usr/share/filebeat/filebeat.yml"
112 assert log_container.volume_mounts[2].name == "filebeat-conf"
114 # Needs to be correctly labeled so that the Service can find it
115 assert dep.spec.template.metadata.labels["app"] == "testcomponent"
118 def do_deploy(tls_info=None):
119 ''' Common deployment operations '''
120 import k8sclient.k8sclient
122 k8s_test_config = _set_k8s_configuration()
124 kwargs = _set_common_kwargs()
125 kwargs['resources'] = _set_resources()
128 kwargs["tls_info"] = tls_info
130 dep, deployment_description = k8sclient.k8sclient.deploy("k8stest", "testcomponent", "example.com/testcomponent:1.4.3", 1, False, k8s_test_config, **kwargs)
132 # Make sure all of the basic k8s parameters are correct
133 verify_common(dep, deployment_description)
135 return dep, deployment_description