move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / dmaap / tests / test_dr_lifecycle.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19 #
20
21 import pytest
22 import requests
23 from cloudify.mocks import MockCloudifyContext
24 from cloudify.state import current_ctx
25 from cloudify import ctx
26 from cloudify.decorators import operation
27 from cloudify.exceptions import NonRecoverableError
28 from cloudify.exceptions import RecoverableError
29
30 _goodosv2 = {
31   'auth_url': 'https://example.com/identity/v2.0',
32   'password': 'pw',
33   'region': 'r',
34   'tenant_name': 'tn',
35   'username': 'un'
36 }
37
38
39 def test_create_feed(monkeypatch, mockconsul, mockdmaapbc):
40     import dmaapplugin
41     from dmaapplugin import dr_lifecycle
42
43     properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2, 'feed_id': 'test_feed_id' }
44     mock_ctx = MockCloudifyContext(
45         node_id='test_node_id',
46         node_name='test_node_name',
47         properties=properties,
48         runtime_properties = {
49             "admin": { "user": "admin_user" },
50             "user": { "user": "user_user" },
51             "viewer": { "user": "viewer_user" }
52         })
53
54     current_ctx.set(mock_ctx)
55
56     kwargs = { "feed_name": "ONAP_test",
57             "feed_description": "onap dmaap plugin unit test feed"}
58
59     def fake_feed(self):
60         return {"feedId":"test_feedId", "publishURL":"test_publishURL", "logURL":"test_logURL" }
61     monkeypatch.setattr(requests.Response, "json", fake_feed)
62
63     dr_lifecycle.create_feed(**kwargs)
64     dr_lifecycle.get_existing_feed(**kwargs)
65     dr_lifecycle.delete_feed(**kwargs)