Add Support for Activation and Deactivation
[dcaegen2/services.git] / components / pm-subscription-handler / tests / test_pmsh_utils.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2019-2020 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18 import json
19 import os
20 from test.support import EnvironmentVarGuard
21 from unittest import TestCase
22 from unittest.mock import patch
23
24 import responses
25 from requests import Session
26 from tenacity import stop_after_attempt
27
28 from mod import db, get_db_connection_url, create_app
29 from mod.db_models import SubscriptionModel
30 from mod.pmsh_utils import AppConfig, policy_response_handle_functions
31 from mod.subscription import Subscription
32 from mod.network_function import NetworkFunction
33
34
35 class PmshUtilsTestCase(TestCase):
36
37     @patch('mod.create_app')
38     @patch('mod.get_db_connection_url')
39     def setUp(self, mock_get_db_url, mock_app):
40         mock_get_db_url.return_value = 'sqlite://'
41         with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
42             self.cbs_data = json.load(data)
43         self.app_conf = AppConfig(**self.cbs_data['config'])
44         self.sub = Subscription(**self.cbs_data['policy']['subscription'])
45         self.env = EnvironmentVarGuard()
46         self.env.set('LOGS_PATH', './unit_test_logs')
47         self.policy_mr_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
48         self.mock_app = mock_app
49         self.app = create_app()
50         self.app_context = self.app.app_context()
51         self.app_context.push()
52         db.create_all()
53
54     def test_utils_get_mr_sub(self):
55         mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
56         self.assertTrue(mr_policy_sub.aaf_id, 'dcae@dcae.onap.org')
57
58     def test_utils_get_mr_sub_fails_with_invalid_name(self):
59         with self.assertRaises(KeyError):
60             self.app_conf.get_mr_sub('invalid_sub')
61
62     def test_utils_get_mr_pub(self):
63         mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
64         self.assertTrue(mr_policy_pub.aaf_pass, 'demo123456!')
65
66     def test_utils_get_mr_pub_fails_with_invalid_name(self):
67         with self.assertRaises(KeyError):
68             self.app_conf.get_mr_pub('invalid_pub')
69
70     def test_utils_get_cert_data(self):
71         self.assertTrue(self.app_conf.cert_params, ('/opt/app/pm-mapper/etc/certs/cert.pem',
72                                                     '/opt/app/pm-mapper/etc/certs/key.pem'))
73
74     @patch.object(Session, 'post')
75     def test_mr_pub_publish_to_topic_success(self, mock_session):
76         mock_session.return_value.status_code = 200
77         mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
78         with patch('requests.Session.post') as session_post_call:
79             mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
80             session_post_call.assert_called_once()
81
82     @responses.activate
83     def test_mr_pub_publish_to_topic_fail(self):
84         responses.add(responses.POST,
85                       'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS',
86                       json={'error': 'Client Error'}, status=400)
87         mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
88         with self.assertRaises(Exception):
89             mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
90
91     def test_mr_pub_publish_sub_event_data_success(self):
92         mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
93         with patch('mod.pmsh_utils._MrPub.publish_to_topic') as pub_to_topic_call:
94             mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201')
95             pub_to_topic_call.assert_called_once()
96
97     @responses.activate
98     def test_mr_sub_get_from_topic_success(self):
99         responses.add(responses.GET,
100                       'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
101                       'dcae_pmsh_cg/1?timeout=1000',
102                       json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=200)
103         mr_topic_data = self.policy_mr_sub.get_from_topic(1)
104         self.assertIsNotNone(mr_topic_data)
105
106     @responses.activate
107     def test_mr_sub_get_from_topic_fail(self):
108         responses.add(responses.GET,
109                       'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
110                       'dcae_pmsh_cg/1?timeout=1000',
111                       json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=400)
112         mr_topic_data = self.policy_mr_sub.get_from_topic(1)
113         self.assertIsNone(mr_topic_data)
114
115     def test_get_db_connection_url_success(self):
116         self.env = EnvironmentVarGuard()
117         self.env.set('PMSH_PG_URL', '1.2.3.4')
118         self.env.set('PMSH_PG_USERNAME', 'pmsh')
119         self.env.set('PMSH_PG_PASSWORD', 'pass')
120         db_url = get_db_connection_url()
121         self.assertEqual(db_url, 'postgres+psycopg2://pmsh:pass@1.2.3.4:5432/pmsh')
122
123     def test_get_db_connection_url_fail(self):
124         self.env = EnvironmentVarGuard()
125         self.env.set('PMSH_PG_USERNAME', 'pmsh')
126         self.env.set('PMSH_PG_PASSWORD', 'pass')
127         with self.assertRaises(Exception):
128             get_db_connection_url()
129
130     @patch('mod.pmsh_utils.NetworkFunction.delete')
131     def test_handle_response_locked_success(self, mock_delete):
132         with patch.dict(policy_response_handle_functions, {'LOCKED': {'success': mock_delete}}):
133             administrative_state = 'LOCKED'
134             nf = NetworkFunction(nf_name='nf1')
135             self.policy_mr_sub._handle_response(self.sub.subscriptionName, administrative_state,
136                                                 nf.nf_name, 'success')
137
138             mock_delete.assert_called()
139
140     @patch('mod.subscription.Subscription.update_sub_nf_status')
141     def test_handle_response_locked_failed(self, mock_update_sub_nf):
142         with patch.dict(policy_response_handle_functions,
143                         {'LOCKED': {'failed': mock_update_sub_nf}}):
144             administrative_state = 'LOCKED'
145             nf = NetworkFunction(nf_name='nf1')
146             self.policy_mr_sub._handle_response(self.sub.subscriptionName, administrative_state,
147                                                 nf.nf_name, 'failed')
148             mock_update_sub_nf.assert_called()
149
150     @patch('mod.subscription.Subscription.update_sub_nf_status')
151     def test_handle_response_unlocked_success(self, mock_update_sub_nf):
152         with patch.dict(policy_response_handle_functions,
153                         {'UNLOCKED': {'success': mock_update_sub_nf}}):
154             nf = NetworkFunction(nf_name='nf1')
155             self.policy_mr_sub._handle_response(self.sub.subscriptionName,
156                                                 self.sub.administrativeState,
157                                                 nf.nf_name, 'success')
158             mock_update_sub_nf.assert_called()
159
160     @patch('mod.subscription.Subscription.update_sub_nf_status')
161     def test_handle_response_unlocked_failed(self, mock_update_sub_nf):
162         with patch.dict(policy_response_handle_functions,
163                         {'UNLOCKED': {'failed': mock_update_sub_nf}}):
164             nf = NetworkFunction(nf_name='nf1')
165             self.policy_mr_sub._handle_response(self.sub.subscriptionName,
166                                                 self.sub.administrativeState,
167                                                 nf.nf_name, 'failed')
168             mock_update_sub_nf.assert_called()
169
170     def test_handle_response_exception(self):
171         self.assertRaises(Exception, self.policy_mr_sub._handle_response, 'sub1', 'wrong_state',
172                           'nf1', 'wrong_message')
173
174     @patch('mod.pmsh_utils._MrSub.get_from_topic')
175     @patch('mod.pmsh_utils._MrSub._handle_response')
176     @patch('mod.subscription.Subscription.get')
177     @patch('threading.Timer')
178     def test_poll_policy_topic_calls_methods_correct_sub(self, mock_thread, mock_get_sub,
179                                                          mock_handle_response, mock_get_from_topic):
180         result_data = ['{"name": "ResponseEvent","status": { "subscriptionName": '
181                        '"ExtraPM-All-gNB-R2B", "nfName": "pnf300", "message": "success" } }']
182         mock_get_from_topic.return_value = result_data
183         mock_thread.start.return_value = 1
184         mock_get_sub.return_value = SubscriptionModel(subscription_name='ExtraPM-All-gNB-R2B',
185                                                       status='UNLOCKED')
186         self.policy_mr_sub.poll_policy_topic(self.sub.subscriptionName, self.mock_app)
187
188         mock_get_from_topic.assert_called()
189         mock_handle_response.assert_called_with(self.sub.subscriptionName,
190                                                 'UNLOCKED', 'pnf300', 'success')
191
192     @patch('mod.pmsh_utils._MrSub.get_from_topic')
193     @patch('mod.pmsh_utils._MrSub._handle_response')
194     @patch('mod.subscription.Subscription.get')
195     @patch('threading.Timer')
196     def test_poll_policy_topic_no_method_calls_incorrect_sub(self, mock_thread, mock_get_sub,
197                                                              mock_handle_response,
198                                                              mock_get_from_topic):
199         result_data = ['{"name": "ResponseEvent","status": { "subscriptionName": '
200                        '"demo-subscription", "nfName": "pnf300", "message": "success" } }']
201         mock_get_from_topic.return_value = result_data
202         mock_thread.start.return_value = 1
203         mock_get_sub.return_value = SubscriptionModel(subscription_name='ExtraPM-All-gNB-R2B',
204                                                       status='UNLOCKED')
205         self.policy_mr_sub.poll_policy_topic(self.sub, self.mock_app)
206
207         mock_get_from_topic.assert_called()
208         mock_handle_response.assert_not_called()
209
210     @patch('mod.subscription.Subscription.get')
211     @patch('mod.pmsh_utils._MrSub.get_from_topic')
212     def test_poll_policy_topic_exception(self, mock_get_from_topic, mock_get_sub):
213         mock_get_from_topic.return_value = 'wrong_return'
214         mock_get_sub.return_value = SubscriptionModel(subscription_name='ExtraPM-All-gNB-R2B',
215                                                       status='UNLOCKED')
216         self.policy_mr_sub.poll_policy_topic.retry.stop = stop_after_attempt(1)
217
218         self.assertRaises(Exception, self.policy_mr_sub.poll_policy_topic, 'sub1', self.mock_app)