Add option to use services with ipv6
[dcaegen2/platform/plugins.git] / k8s / tests / conftest.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Nokia. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19 #
20
21 import pytest
22
23 @pytest.fixture()
24 def mockconfig(monkeypatch):
25     from configure import configure
26     """ Override the regular configure() routine that reads a file and calls Consul"""
27     def altconfig():
28       config = configure._set_defaults()
29       config["consul_host"] = config["consul_dns_name"]
30       return config
31     monkeypatch.setattr(configure, 'configure', altconfig)
32
33 @pytest.fixture()
34 def mockk8sapi(monkeypatch):
35     import k8sclient.k8sclient
36     from kubernetes import client
37
38     # We need to patch the kubernetes 'client' module
39     # Awkward because of the way it requires a function call
40     # to get an API object
41     core = client.CoreV1Api()
42     ext = client.ExtensionsV1beta1Api()
43     appsv1 = client.AppsV1Api()
44
45     def pseudo_deploy(namespace, dep):
46         return dep
47
48     def pseudo_service(namespace, svc):
49         return svc
50
51     # patched_core returns a CoreV1Api object with the
52     # create_namespaced_service method stubbed out so that there
53     # is no attempt to call the k8s API server
54     def patched_core():
55         monkeypatch.setattr(core, "create_namespaced_service", pseudo_service)
56         return core
57
58     # patched_ext returns an ExtensionsV1beta1Api object with the
59     # create_namespaced_deployment method stubbed out so that there
60     # is no attempt to call the k8s API server
61     def patched_ext():
62         monkeypatch.setattr(ext,"create_namespaced_deployment", pseudo_deploy)
63         return ext
64
65     # patched_appsv1 returns an AppsV1Api object with the
66     # create_namespaced_deployment method stubbed out so that there
67     # is no attempt to call the k8s API server
68     def patched_appsv1():
69         monkeypatch.setattr(ext,"create_namespaced_deployment", pseudo_deploy)
70         return ext
71
72     def pseudo_configure(loc):
73         pass
74
75     monkeypatch.setattr(k8sclient.k8sclient,"_configure_api", pseudo_configure)
76     monkeypatch.setattr(client, "CoreV1Api", patched_core)
77     monkeypatch.setattr(client,"ExtensionsV1beta1Api", patched_ext)
78     monkeypatch.setattr(client,"AppsV1Api", patched_appsv1)