move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / dmaap / tests / test_dmaapcontrollerif.py
1 # ============LICENSE_START=======================================================
2 # org.onap.dcae
3 # ================================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19 #
20
21 import pytest
22 import requests
23 from cloudify.mocks import MockCloudifyContext
24 from cloudify.state import current_ctx
25 from cloudify import ctx
26 from cloudify.decorators import operation
27 from cloudify.exceptions import NonRecoverableError
28
29
30 import test_consulif
31 from dmaapcontrollerif.dmaap_requests import DMaaPControllerHandle
32
33 import logging
34 logger = logging.getLogger("test_mr_lifecycle")
35
36 _goodosv2 = {
37   'auth_url': 'https://example.com/identity/v2.0',
38   'password': 'pw',
39   'region': 'r',
40   'tenant_name': 'tn',
41   'username': 'un'
42 }
43
44
45 def test_dmaapc (monkeypatch, mockconsul, mockdmaapbc):
46     from dmaapplugin.dmaaputils import random_string
47
48     config = mockconsul().get_config('mockkey')['dmaap']
49     DMAAP_API_URL = config['url']
50     DMAAP_USER = config['username']
51     DMAAP_PASS = config['password']
52     DMAAP_OWNER = config['owner']
53
54     properties = {'fqdn': 'a.x.example.com', 'openstack': _goodosv2 }
55     mock_ctx = MockCloudifyContext(
56         node_id='test_node_id',
57         node_name='test_node_name',
58         properties=properties,
59         runtime_properties = {
60             "admin": { "user": "admin_user" },
61             "user": { "user": "user_user" },
62             "viewer": { "user": "viewer_user" }
63         })
64
65     current_ctx.set(mock_ctx)
66
67     kwargs = { "topic_name": "ONAP_test",
68             "topic_description": "onap dmaap plugin unit test topic"}
69
70     # Make sure there's a topic_name
71     if "topic_name" in ctx.node.properties:
72         topic_name = ctx.node.properties["topic_name"]
73         if topic_name == '' or topic_name.isspace():
74             topic_name = random_string(12)
75     else:
76         topic_name = random_string(12)
77
78     # Make sure there's a topic description
79     if "topic_description" in ctx.node.properties:
80         topic_description = ctx.node.properties["topic_description"]
81     else:
82         topic_description = "No description provided"
83
84     # ..and the truly optional setting
85     if "txenable" in ctx.node.properties:
86         txenable = ctx.node.properties["txenable"]
87     else:
88         txenable= False
89
90     if "replication_case" in ctx.node.properties:
91         replication_case = ctx.node.properties["replication_case"]
92     else:
93         replication_case = None
94
95     if "global_mr_url" in ctx.node.properties:
96         global_mr_url = ctx.node.properties["global_mr_url"]
97     else:
98         global_mr_url = None
99
100     dmc = DMaaPControllerHandle(DMAAP_API_URL, DMAAP_USER, DMAAP_PASS, ctx.logger)
101     ctx.logger.info("Attempting to create topic name {0}".format(topic_name))
102     t = dmc.create_topic(topic_name, topic_description, txenable, DMAAP_OWNER, replication_case, global_mr_url)
103
104     # Capture important properties from the result
105     topic = t.json()
106     ctx.instance.runtime_properties["fqtn"] = topic["fqtn"]
107
108     # test DMaaPControllerHandle functions
109     path = "myPath"
110     url = dmc._make_url(path)
111     rc = dmc._get_resource(path)
112     rc = dmc._create_resource(path, None)
113     rc = dmc._delete_resource(path)