1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
23 from cloudify.mocks import MockCloudifyContext
24 from cloudify.state import current_ctx
25 from cloudify import ctx
26 from cloudify.decorators import operation
27 from cloudify.exceptions import NonRecoverableError
31 from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
34 logger = logging.getLogger("test_mr_lifecycle")
37 'auth_url': 'https://example.com/identity/v2.0',
45 def test_dmaapc (monkeypatch, mockconsul, mockdmaapbc):
46 from dmaapplugin.dmaaputils import random_string
48 config = mockconsul().get_config('mockkey')['dmaap']
49 DMAAP_API_URL = config['url']
50 DMAAP_USER = config['username']
51 DMAAP_PASS = config['password']
52 DMAAP_OWNER = config['owner']
54 properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2 }
55 mock_ctx = MockCloudifyContext(
56 node_id='test_node_id',
57 node_name='test_node_name',
58 properties=properties,
59 runtime_properties = {
60 "admin": { "user": "admin_user" },
61 "user": { "user": "user_user" },
62 "viewer": { "user": "viewer_user" }
65 current_ctx.set(mock_ctx)
67 kwargs = { "topic_name": "ONAP_test",
68 "topic_description": "onap dmaap plugin unit test topic"}
70 # Make sure there's a topic_name
71 if "topic_name" in ctx.node.properties:
72 topic_name = ctx.node.properties["topic_name"]
73 if topic_name == '' or topic_name.isspace():
74 topic_name = random_string(12)
76 topic_name = random_string(12)
78 # Make sure there's a topic description
79 if "topic_description" in ctx.node.properties:
80 topic_description = ctx.node.properties["topic_description"]
82 topic_description = "No description provided"
84 # ..and the truly optional setting
85 if "txenable" in ctx.node.properties:
86 txenable = ctx.node.properties["txenable"]
90 if "replication_case" in ctx.node.properties:
91 replication_case = ctx.node.properties["replication_case"]
93 replication_case = None
95 if "global_mr_url" in ctx.node.properties:
96 global_mr_url = ctx.node.properties["global_mr_url"]
100 dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
101 ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
102 t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url)
104 # Capture important properties from the result
106 ctx.instance.runtime_properties["fqtn"] = topic["fqtn"]
108 # test DMaaPControllerHandle functions
110 url = dmc._make_url(path)
111 rc = dmc._get_resource(path)
112 rc = dmc._create_resource(path, None)
113 rc = dmc._delete_resource(path)