move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / dmaap / tests / test_mr_lifecycle.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19
20 import pytest
21 import requests
22 from cloudify.mocks import MockCloudifyContext
23 from cloudify.state import current_ctx
24 from cloudify import ctx
25 from cloudify.decorators import operation
26 from cloudify.exceptions import NonRecoverableError
27 from cloudify.exceptions import RecoverableError
28
29 _goodosv2 = {
30   'auth_url': 'https://example.com/identity/v2.0',
31   'password': 'pw',
32   'region': 'r',
33   'tenant_name': 'tn',
34   'username': 'un'
35 }
36
37
38 def test_create_topic(monkeypatch, mockconsul, mockdmaapbc):
39     import dmaapplugin
40     from dmaapplugin import mr_lifecycle
41     properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2, 'fqtn': 'test_fqtn' }
42     mock_ctx = MockCloudifyContext(
43         node_id='test_node_id',
44         node_name='test_node_name',
45         properties=properties,
46         runtime_properties = {
47            "admin": { "user": "admin_user" },
48            "user": { "user": "user_user" },
49            "viewer": { "user": "viewer_user" }
50         })
51
52     current_ctx.set(mock_ctx)
53
54     kwargs = { "topic_name": "ONAP_test",
55             "topic_description": "onap dmaap plugin unit test topic"}
56
57     mr_lifecycle.create_topic(**kwargs)
58     mr_lifecycle.get_existing_topic(**kwargs)
59     mr_lifecycle.delete_topic(**kwargs)