move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / dmaap / tests / test_consulif.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
18 #
19
20
21 import pytest
22 from cloudify.exceptions import NonRecoverableError
23 import os
24 from consulif.consulif import ConsulHandle
25
26
27 # No connections are actually made to this host
28 CONSUL_HOST = "consul"                      # Should always be a local consul agent on Cloudify Manager
29 #CONSUL_PORT = '8510'
30 CONSUL_PORT = '8500'
31 DBCL_KEY_NAME = "dmaap_dbcl_info"           # Consul key containing DMaaP data bus credentials
32 DBC_SERVICE_NAME= "dmaap_bus_controller"    # Name under which the DMaaP bus controller is registered
33
34
35 def test_get_config_service(mockconsul):
36     err_msg = "Error getting ConsulHandle when configuring dmaap plugin: {0}"
37     _ch = ConsulHandle("http://{0}:{1}".format(CONSUL_HOST, CONSUL_PORT), None, None, None)
38
39     config = _ch.get_config(DBCL_KEY_NAME)
40
41     DMAAP_USER = config['dmaap']['username']
42     DMAAP_PASS = config['dmaap']['password']
43     DMAAP_OWNER = config['dmaap']['owner']
44
45     if 'protocol' in config['dmaap']:
46         DMAAP_PROTOCOL = config['dmaap']['protocol']
47     else:
48         DMAAP_PROTOCOL = 'https'    # Default to https (service discovery should give us this but doesn't
49
50     if 'path' in config['dmaap']:
51         DMAAP_PATH = config['dmaap']['path']
52     else:
53         DMAAP_PATH = 'webapi'       # Should come from service discovery but Consul doesn't support it
54
55     service_address, service_port = _ch.get_service(DBC_SERVICE_NAME)
56
57     DMAAP_API_URL = '{0}://{1}:{2}/{3}'.format(DMAAP_PROTOCOL, service_address, service_port, DMAAP_PATH)
58
59
60 def test_add_entry(mockconsul):
61     _ch = ConsulHandle("http://{0}:{1}".format(CONSUL_HOST, CONSUL_PORT), None, None, None)
62
63     key = 'DMAAP_TEST'
64     name = 'dmaap_test_name'
65     value = 'dmaap_test_value'
66     _ch.add_to_entry(key, name, value)
67
68     name = "dmaap_test_name_2"
69     value = 'dmaap_test_value_2'
70     _ch.add_to_entry(key, name, value)
71
72     _ch.delete_entry(key)