Merge "[PMSH] Read subscription API by Name"
[dcaegen2/services.git] / components / pm-subscription-handler / tests / test_pmsh_config.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2021 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18 from unittest.mock import Mock, patch
19
20 import responses
21 from requests import Session
22
23 from mod.pmsh_config import MRTopic, AppConfig
24 from tests.base_setup import BaseClassSetup
25
26
27 class PmshConfigTestCase(BaseClassSetup):
28
29     @classmethod
30     def setUpClass(cls):
31         super().setUpClass()
32
33     def setUp(self):
34         super().setUpAppConf()
35         self.mock_app = Mock()
36
37     def tearDown(self):
38         super().tearDown()
39
40     @classmethod
41     def tearDownClass(cls):
42         super().tearDownClass()
43
44     def test_config_get_aaf_creds(self):
45         self.assertEqual(self.pmsh_app_conf.enable_tls, 'true')
46         self.assertEqual(self.pmsh_app_conf.aaf_id, 'dcae@dcae.onap.org')
47         self.assertEqual(self.pmsh_app_conf.aaf_pass, 'demo123456!')
48
49     def test_config_get_cert_data(self):
50         self.assertEqual(self.pmsh_app_conf.key_path, '/opt/app/pmsh/etc/certs/key.pem')
51         self.assertEqual(self.pmsh_app_conf.cert_path, '/opt/app/pmsh/etc/certs/cert.pem')
52         self.assertEqual(self.pmsh_app_conf.ca_cert_path, '/opt/app/pmsh/etc/certs/cacert.pem')
53
54     def test_singleton_instance_is_accessible_using_class_method(self):
55         my_singleton_instance = AppConfig.get_instance()
56         self.assertIsNotNone(my_singleton_instance)
57         self.assertIsInstance(my_singleton_instance, AppConfig)
58
59     @patch.object(Session, 'post')
60     def test_mr_pub_publish_to_topic_success(self, mock_session):
61         mock_session.return_value.status_code = 200
62         with patch('requests.Session.post') as session_post_call:
63             self.pmsh_app_conf.publish_to_topic(MRTopic.POLICY_PM_PUBLISHER.value,
64                                                 {"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
65             session_post_call.assert_called_once()
66
67     @responses.activate
68     def test_mr_pub_publish_to_topic_fail(self):
69         responses.add(responses.POST,
70                       'https://message-router:3905/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS',
71                       json={"error": "Client Error"}, status=400)
72         with self.assertRaises(Exception):
73             self.pmsh_app_conf.publish_to_topic(MRTopic.POLICY_PM_PUBLISHER.value,
74                                                 {"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
75
76     @responses.activate
77     def test_mr_sub_get_from_topic_success(self):
78         responses.add(responses.GET,
79                       'https://message-router:3905/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
80                       'dcae_pmsh_cg/1?timeout=5000',
81                       json={"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=200)
82         mr_topic_data = self.pmsh_app_conf.get_from_topic(MRTopic.POLICY_PM_SUBSCRIBER.value, 1)
83         self.assertIsNotNone(mr_topic_data)
84
85     @responses.activate
86     def test_mr_sub_get_from_topic_fail(self):
87         responses.add(responses.GET,
88                       'https://message-router:3905/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
89                       'dcae_pmsh_cg/1?timeout=5000',
90                       json={"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=400)
91         with self.assertRaises(Exception):
92             self.pmsh_app_conf.get_from_topic(MRTopic.POLICY_PM_SUBSCRIBER.value, 1)