1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
22 from cloudify.mocks import MockCloudifyContext
23 from cloudify.state import current_ctx
24 from cloudify import ctx
25 from cloudify.decorators import operation
26 from cloudify.exceptions import NonRecoverableError
27 from cloudify.exceptions import RecoverableError
30 'auth_url': 'https://example.com/identity/v2.0',
38 def test_create_topic(monkeypatch, mockconsul, mockdmaapbc):
40 from dmaapplugin import mr_lifecycle
41 properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2, 'fqtn': 'test_fqtn' }
42 mock_ctx = MockCloudifyContext(
43 node_id='test_node_id',
44 node_name='test_node_name',
45 properties=properties,
46 runtime_properties = {
47 "admin": { "user": "admin_user" },
48 "user": { "user": "user_user" },
49 "viewer": { "user": "viewer_user" }
52 current_ctx.set(mock_ctx)
54 kwargs = { "topic_name": "ONAP_test",
55 "topic_description": "onap dmaap plugin unit test topic"}
57 mr_lifecycle.create_topic(**kwargs)
58 mr_lifecycle.get_existing_topic(**kwargs)
59 mr_lifecycle.delete_topic(**kwargs)