1 # ============LICENSE_START===================================================
2 # Copyright (C) 2019-2020 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
20 from test.support import EnvironmentVarGuard
21 from unittest import TestCase
22 from unittest.mock import patch
25 from requests import Session
26 from tenacity import stop_after_attempt
28 from mod import db, get_db_connection_url, create_app
29 from mod.db_models import SubscriptionModel
30 from mod.pmsh_utils import AppConfig, policy_response_handle_functions
31 from mod.subscription import Subscription
32 from mod.network_function import NetworkFunction
35 class PmshUtilsTestCase(TestCase):
37 @patch('mod.create_app')
38 @patch('mod.get_db_connection_url')
39 def setUp(self, mock_get_db_url, mock_app):
40 mock_get_db_url.return_value = 'sqlite://'
41 with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
42 self.cbs_data = json.load(data)
43 self.app_conf = AppConfig(**self.cbs_data['config'])
44 self.sub = Subscription(**self.cbs_data['policy']['subscription'])
45 self.env = EnvironmentVarGuard()
46 self.env.set('LOGS_PATH', './unit_test_logs')
47 self.policy_mr_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
48 self.mock_app = mock_app
49 self.app = create_app()
50 self.app_context = self.app.app_context()
51 self.app_context.push()
54 def test_utils_get_mr_sub(self):
55 mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
56 self.assertTrue(mr_policy_sub.aaf_id, 'dcae@dcae.onap.org')
58 def test_utils_get_mr_sub_fails_with_invalid_name(self):
59 with self.assertRaises(KeyError):
60 self.app_conf.get_mr_sub('invalid_sub')
62 def test_utils_get_mr_pub(self):
63 mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
64 self.assertTrue(mr_policy_pub.aaf_pass, 'demo123456!')
66 def test_utils_get_mr_pub_fails_with_invalid_name(self):
67 with self.assertRaises(KeyError):
68 self.app_conf.get_mr_pub('invalid_pub')
70 def test_utils_get_cert_data(self):
71 self.assertTrue(self.app_conf.cert_params, ('/opt/app/pm-mapper/etc/certs/cert.pem',
72 '/opt/app/pm-mapper/etc/certs/key.pem'))
74 @patch.object(Session, 'post')
75 def test_mr_pub_publish_to_topic_success(self, mock_session):
76 mock_session.return_value.status_code = 200
77 mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
78 with patch('requests.Session.post') as session_post_call:
79 mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
80 session_post_call.assert_called_once()
83 def test_mr_pub_publish_to_topic_fail(self):
84 responses.add(responses.POST,
85 'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS',
86 json={'error': 'Client Error'}, status=400)
87 mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
88 with self.assertRaises(Exception):
89 mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
91 def test_mr_pub_publish_sub_event_data_success(self):
92 mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
93 with patch('mod.pmsh_utils._MrPub.publish_to_topic') as pub_to_topic_call:
94 mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201')
95 pub_to_topic_call.assert_called_once()
98 def test_mr_sub_get_from_topic_success(self):
99 responses.add(responses.GET,
100 'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
101 'dcae_pmsh_cg/1?timeout=1000',
102 json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=200)
103 mr_topic_data = self.policy_mr_sub.get_from_topic(1)
104 self.assertIsNotNone(mr_topic_data)
107 def test_mr_sub_get_from_topic_fail(self):
108 responses.add(responses.GET,
109 'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
110 'dcae_pmsh_cg/1?timeout=1000',
111 json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=400)
112 mr_topic_data = self.policy_mr_sub.get_from_topic(1)
113 self.assertIsNone(mr_topic_data)
115 def test_get_db_connection_url_success(self):
116 self.env = EnvironmentVarGuard()
117 self.env.set('PMSH_PG_URL', '1.2.3.4')
118 self.env.set('PMSH_PG_USERNAME', 'pmsh')
119 self.env.set('PMSH_PG_PASSWORD', 'pass')
120 db_url = get_db_connection_url()
121 self.assertEqual(db_url, 'postgres+psycopg2://pmsh:pass@1.2.3.4:5432/pmsh')
123 def test_get_db_connection_url_fail(self):
124 self.env = EnvironmentVarGuard()
125 self.env.set('PMSH_PG_USERNAME', 'pmsh')
126 self.env.set('PMSH_PG_PASSWORD', 'pass')
127 with self.assertRaises(Exception):
128 get_db_connection_url()
130 @patch('mod.pmsh_utils.NetworkFunction.delete')
131 def test_handle_response_locked_success(self, mock_delete):
132 with patch.dict(policy_response_handle_functions, {'LOCKED': {'success': mock_delete}}):
133 administrative_state = 'LOCKED'
134 nf = NetworkFunction(nf_name='nf1')
135 self.policy_mr_sub._handle_response(self.sub.subscriptionName, administrative_state,
136 nf.nf_name, 'success')
138 mock_delete.assert_called()
140 @patch('mod.subscription.Subscription.update_sub_nf_status')
141 def test_handle_response_locked_failed(self, mock_update_sub_nf):
142 with patch.dict(policy_response_handle_functions,
143 {'LOCKED': {'failed': mock_update_sub_nf}}):
144 administrative_state = 'LOCKED'
145 nf = NetworkFunction(nf_name='nf1')
146 self.policy_mr_sub._handle_response(self.sub.subscriptionName, administrative_state,
147 nf.nf_name, 'failed')
148 mock_update_sub_nf.assert_called()
150 @patch('mod.subscription.Subscription.update_sub_nf_status')
151 def test_handle_response_unlocked_success(self, mock_update_sub_nf):
152 with patch.dict(policy_response_handle_functions,
153 {'UNLOCKED': {'success': mock_update_sub_nf}}):
154 nf = NetworkFunction(nf_name='nf1')
155 self.policy_mr_sub._handle_response(self.sub.subscriptionName,
156 self.sub.administrativeState,
157 nf.nf_name, 'success')
158 mock_update_sub_nf.assert_called()
160 @patch('mod.subscription.Subscription.update_sub_nf_status')
161 def test_handle_response_unlocked_failed(self, mock_update_sub_nf):
162 with patch.dict(policy_response_handle_functions,
163 {'UNLOCKED': {'failed': mock_update_sub_nf}}):
164 nf = NetworkFunction(nf_name='nf1')
165 self.policy_mr_sub._handle_response(self.sub.subscriptionName,
166 self.sub.administrativeState,
167 nf.nf_name, 'failed')
168 mock_update_sub_nf.assert_called()
170 def test_handle_response_exception(self):
171 self.assertRaises(Exception, self.policy_mr_sub._handle_response, 'sub1', 'wrong_state',
172 'nf1', 'wrong_message')
174 @patch('mod.pmsh_utils._MrSub.get_from_topic')
175 @patch('mod.pmsh_utils._MrSub._handle_response')
176 @patch('mod.subscription.Subscription.get')
177 @patch('threading.Timer')
178 def test_poll_policy_topic_calls_methods_correct_sub(self, mock_thread, mock_get_sub,
179 mock_handle_response, mock_get_from_topic):
180 result_data = ['{"name": "ResponseEvent","status": { "subscriptionName": '
181 '"ExtraPM-All-gNB-R2B", "nfName": "pnf300", "message": "success" } }']
182 mock_get_from_topic.return_value = result_data
183 mock_thread.start.return_value = 1
184 mock_get_sub.return_value = SubscriptionModel(subscription_name='ExtraPM-All-gNB-R2B',
186 self.policy_mr_sub.poll_policy_topic(self.sub.subscriptionName, self.mock_app)
188 mock_get_from_topic.assert_called()
189 mock_handle_response.assert_called_with(self.sub.subscriptionName,
190 'UNLOCKED', 'pnf300', 'success')
192 @patch('mod.pmsh_utils._MrSub.get_from_topic')
193 @patch('mod.pmsh_utils._MrSub._handle_response')
194 @patch('mod.subscription.Subscription.get')
195 @patch('threading.Timer')
196 def test_poll_policy_topic_no_method_calls_incorrect_sub(self, mock_thread, mock_get_sub,
197 mock_handle_response,
198 mock_get_from_topic):
199 result_data = ['{"name": "ResponseEvent","status": { "subscriptionName": '
200 '"demo-subscription", "nfName": "pnf300", "message": "success" } }']
201 mock_get_from_topic.return_value = result_data
202 mock_thread.start.return_value = 1
203 mock_get_sub.return_value = SubscriptionModel(subscription_name='ExtraPM-All-gNB-R2B',
205 self.policy_mr_sub.poll_policy_topic(self.sub, self.mock_app)
207 mock_get_from_topic.assert_called()
208 mock_handle_response.assert_not_called()
210 @patch('mod.subscription.Subscription.get')
211 @patch('mod.pmsh_utils._MrSub.get_from_topic')
212 def test_poll_policy_topic_exception(self, mock_get_from_topic, mock_get_sub):
213 mock_get_from_topic.return_value = 'wrong_return'
214 mock_get_sub.return_value = SubscriptionModel(subscription_name='ExtraPM-All-gNB-R2B',
216 self.policy_mr_sub.poll_policy_topic.retry.stop = stop_after_attempt(1)
218 self.assertRaises(Exception, self.policy_mr_sub.poll_policy_topic, 'sub1', self.mock_app)