1 # ============LICENSE_START===================================================
2 # Copyright (C) 2019-2020 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
20 from test.support import EnvironmentVarGuard
21 from unittest import TestCase
22 from unittest.mock import patch
24 from requests import Session
25 from tenacity import stop_after_attempt
27 import mod.aai_client as aai_client
28 from mod import db, create_app
29 from mod.api.db_models import NetworkFunctionModel
30 from mod.network_function import NetworkFunction, NetworkFunctionFilter, OrchestrationStatus
31 from mod.pmsh_utils import AppConfig
32 from mod.subscription import Subscription
35 class SubscriptionTest(TestCase):
36 @patch('mod.update_config')
37 @patch('mod.pmsh_utils._MrPub')
38 @patch('mod.pmsh_utils._MrSub')
39 @patch('mod.get_db_connection_url')
40 @patch.object(Session, 'put')
41 @patch('pmsh_service_main.AppConfig')
42 def setUp(self, mock_app_config, mock_session, mock_get_db_url,
43 mock_mr_sub, mock_mr_pub, mock_update_config):
44 mock_get_db_url.return_value = 'sqlite://'
45 with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
46 self.aai_response_data = data.read()
47 mock_session.return_value.status_code = 200
48 mock_session.return_value.text = self.aai_response_data
49 self.env = EnvironmentVarGuard()
50 self.env.set('AAI_SERVICE_HOST', '1.2.3.4')
51 self.env.set('AAI_SERVICE_PORT', '8443')
52 self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
53 with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
54 self.cbs_data_1 = json.load(data)
55 with open(os.path.join(os.path.dirname(__file__),
56 'data/cbs_data_2.json'), 'r') as data:
57 self.cbs_data_2 = json.load(data)
58 self.sub_1, self.xnfs = aai_client.get_pmsh_subscription_data(self.cbs_data_1)
59 self.sub_2, self.xnfs = aai_client.get_pmsh_subscription_data(self.cbs_data_2)
60 self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
61 self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
62 self.xnf_filter = NetworkFunctionFilter(**self.sub_1.nfFilter)
63 self.mock_mr_sub = mock_mr_sub
64 self.mock_mr_pub = mock_mr_pub
65 self.app = create_app()
66 self.app_context = self.app.app_context()
67 self.app_context.push()
68 self.mock_app_config = mock_app_config
74 self.app_context.pop()
76 def test_xnf_filter_true(self):
77 self.assertTrue(self.xnf_filter.is_nf_in_filter('pnf1', OrchestrationStatus.ACTIVE.value))
79 def test_xnf_filter_false(self):
80 self.assertFalse(self.xnf_filter.is_nf_in_filter('PNF-33',
81 OrchestrationStatus.ACTIVE.value))
83 def test_sub_measurement_group(self):
84 self.assertEqual(len(self.sub_1.measurementGroups), 2)
86 def test_sub_file_location(self):
87 self.assertEqual(self.sub_1.fileLocation, '/pm/pm.xml')
89 def test_get_subscription(self):
90 sub_name = 'ExtraPM-All-gNB-R2B'
92 new_sub = Subscription.get(sub_name)
93 self.assertEqual(sub_name, new_sub.subscription_name)
95 def test_get_subscription_no_match(self):
96 sub_name = 'sub2_does_not_exist'
97 sub = Subscription.get(sub_name)
98 self.assertEqual(sub, None)
100 def test_get_subscriptions(self):
103 subs = self.sub_1.get_all()
105 self.assertEqual(2, len(subs))
107 def test_get_nf_names_per_sub(self):
109 self.sub_1.add_network_function_to_subscription(self.nf_1)
110 self.sub_1.add_network_function_to_subscription(self.nf_2)
111 nfs = Subscription.get_nf_names_per_sub(self.sub_1.subscriptionName)
112 self.assertEqual(2, len(nfs))
114 def test_create_existing_subscription(self):
115 sub1 = self.sub_1.create()
116 same_sub1 = self.sub_1.create()
117 self.assertEqual(sub1, same_sub1)
118 self.assertEqual(1, len(self.sub_1.get_all()))
120 def test_add_network_functions_per_subscription(self):
121 for nf in [self.nf_1, self.nf_2]:
122 self.sub_1.add_network_function_to_subscription(nf)
123 nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
124 self.assertEqual(2, len(nfs_for_sub_1))
125 new_nf = NetworkFunction(nf_name='vnf_3', orchestration_status='Inventoried')
126 self.sub_1.add_network_function_to_subscription(new_nf)
127 nf_subs = Subscription.get_all_nfs_subscription_relations()
128 self.assertEqual(3, len(nf_subs))
130 def test_add_duplicate_network_functions_per_subscription(self):
131 self.sub_1.add_network_function_to_subscription(self.nf_1)
132 nf_subs = Subscription.get_all_nfs_subscription_relations()
133 self.assertEqual(1, len(nf_subs))
134 self.sub_1.add_network_function_to_subscription(self.nf_1)
135 nf_subs = Subscription.get_all_nfs_subscription_relations()
136 self.assertEqual(1, len(nf_subs))
138 def test_update_subscription_status(self):
139 sub_name = 'ExtraPM-All-gNB-R2B'
141 self.sub_1.administrativeState = 'new_status'
142 self.sub_1.update_subscription_status()
143 sub = Subscription.get(sub_name)
145 self.assertEqual('new_status', sub.status)
147 def test_delete_subscription(self):
148 for nf in [self.nf_1, self.nf_2]:
149 self.sub_1.add_network_function_to_subscription(nf)
150 for nf in [self.nf_2]:
151 self.sub_2.add_network_function_to_subscription(nf)
153 self.sub_1.delete_subscription()
155 self.assertEqual(1, len(Subscription.get_all()))
156 self.assertEqual(None, Subscription.get(self.sub_1.subscriptionName))
157 self.assertEqual(1, len(Subscription.get_all_nfs_subscription_relations()))
158 self.assertEqual(1, len(NetworkFunction.get_all()))
159 self.assertEqual(None, NetworkFunction.get(nf_name=self.nf_1.nf_name))
161 def test_update_sub_nf_status(self):
162 sub_name = 'ExtraPM-All-gNB-R2B'
163 for nf in [self.nf_1, self.nf_2]:
164 self.sub_1.add_network_function_to_subscription(nf)
165 sub_nfs = Subscription.get_all_nfs_subscription_relations()
166 self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
168 Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_1')
169 sub_nfs = Subscription.get_all_nfs_subscription_relations()
170 self.assertEqual('Active', sub_nfs[0].nf_sub_status)
171 self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status)
173 @patch('mod.subscription.Subscription.add_network_function_to_subscription')
174 @patch('mod.subscription.Subscription.update_sub_nf_status')
175 @patch('mod.subscription.Subscription.update_subscription_status')
176 def test_process_activate_subscription(self, mock_update_sub_status,
177 mock_update_sub_nf, mock_add_nfs):
178 self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
179 self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config)
181 mock_update_sub_status.assert_called()
182 mock_add_nfs.assert_called()
183 self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
184 mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
185 'PENDING_CREATE', self.nf_1.nf_name)
187 @patch('mod.subscription.Subscription.update_sub_nf_status')
188 @patch('mod.subscription.Subscription.update_subscription_status')
189 def test_process_deactivate_subscription(self, mock_update_sub_status,
191 self.sub_1.administrativeState = 'LOCKED'
192 self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
193 self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config)
195 self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
196 mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
197 'PENDING_DELETE', self.nf_1.nf_name)
198 mock_update_sub_status.assert_called()
200 def test_process_subscription_exception(self):
201 self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
202 self.assertRaises(Exception, self.sub_1.process_subscription,
203 [self.nf_1], 'not_mr_pub', 'app_config')
205 def test_prepare_subscription_event(self):
206 with open(os.path.join(os.path.dirname(__file__),
207 'data/pm_subscription_event.json'), 'r') as data:
208 expected_sub_event = json.load(data)
209 app_conf = AppConfig(**self.cbs_data_1['config'])
210 actual_sub_event = self.sub_1.prepare_subscription_event(self.nf_1.nf_name, app_conf)
211 self.assertEqual(expected_sub_event, actual_sub_event)
213 def test_get_nf_models(self):
214 for nf in [self.nf_1, self.nf_2]:
215 self.sub_1.add_network_function_to_subscription(nf)
216 nf_models = self.sub_1._get_nf_models()
218 self.assertEqual(2, len(nf_models))
219 self.assertIsInstance(nf_models[0], NetworkFunctionModel)
221 def test_get_network_functions(self):
222 for nf in [self.nf_1, self.nf_2]:
223 self.sub_1.add_network_function_to_subscription(nf)
224 nfs = self.sub_1.get_network_functions()
226 self.assertEqual(2, len(nfs))
227 self.assertIsInstance(nfs[0], NetworkFunction)