1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
23 from cloudify.mocks import MockCloudifyContext
24 from cloudify.state import current_ctx
25 from cloudify import ctx
26 from cloudify.decorators import operation
27 from cloudify.exceptions import NonRecoverableError
28 from cloudify.exceptions import RecoverableError
31 'auth_url': 'https://example.com/identity/v2.0',
39 def test_create_feed(monkeypatch, mockconsul, mockdmaapbc):
41 from dmaapplugin import dr_lifecycle
43 properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2, 'feed_id': 'test_feed_id' }
44 mock_ctx = MockCloudifyContext(
45 node_id='test_node_id',
46 node_name='test_node_name',
47 properties=properties,
48 runtime_properties = {
49 "admin": { "user": "admin_user" },
50 "user": { "user": "user_user" },
51 "viewer": { "user": "viewer_user" }
54 current_ctx.set(mock_ctx)
56 kwargs = { "feed_name": "ONAP_test",
57 "feed_description": "onap dmaap plugin unit test feed"}
60 return {"feedId":"test_feedId", "publishURL":"test_publishURL", "logURL":"test_logURL" }
61 monkeypatch.setattr(requests.Response, "json", fake_feed)
63 dr_lifecycle.create_feed(**kwargs)
64 dr_lifecycle.get_existing_feed(**kwargs)
65 dr_lifecycle.delete_feed(**kwargs)