1 # ============LICENSE_START===================================================
2 # Copyright (C) 2019-2021 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
20 from unittest.mock import patch, Mock
22 from requests import Session
24 import mod.aai_client as aai_client
25 from mod.network_function import NetworkFunction
26 from mod.subscription import Subscription
27 from tests.base_setup import BaseClassSetup
30 class SubscriptionTest(BaseClassSetup):
36 @patch.object(Session, 'get')
37 @patch.object(Session, 'put')
38 def setUp(self, mock_session_put, mock_session_get):
40 with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
41 self.aai_response_data = data.read()
42 mock_session_put.return_value.status_code = 200
43 mock_session_put.return_value.text = self.aai_response_data
44 with open(os.path.join(os.path.dirname(__file__), 'data/aai_model_info.json'), 'r') as data:
45 self.aai_model_data = data.read()
46 mock_session_get.return_value.status_code = 200
47 mock_session_get.return_value.text = self.aai_model_data
48 self.mock_mr_sub = Mock()
49 self.mock_mr_pub = Mock()
50 self.app_conf.subscription.create()
51 self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf, self.app_conf.nf_filter)
52 self.sub_model = self.app_conf.subscription.get()
58 def tearDownClass(cls):
59 super().tearDownClass()
61 def test_sub_measurement_group(self):
62 self.assertEqual(len(self.app_conf.subscription.measurementGroups), 2)
64 def test_sub_file_location(self):
65 self.assertEqual(self.app_conf.subscription.fileLocation, '/pm/pm.xml')
67 def test_get_subscription(self):
68 sub_name = 'ExtraPM-All-gNB-R2B'
69 new_sub = self.app_conf.subscription.get()
70 self.assertEqual(sub_name, new_sub.subscription_name)
72 def test_get_nf_names_per_sub(self):
73 self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
75 self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1],
78 def test_create_existing_subscription(self):
79 sub1 = self.app_conf.subscription.create()
80 same_sub1 = self.app_conf.subscription.create()
81 self.assertEqual(sub1, same_sub1)
82 self.assertEqual(1, len(self.app_conf.subscription.get_all()))
84 def test_add_duplicate_network_functions_per_subscription(self):
85 self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
87 nf_subs = Subscription.get_all_nfs_subscription_relations()
88 self.assertEqual(1, len(nf_subs))
89 self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
91 nf_subs = Subscription.get_all_nfs_subscription_relations()
92 self.assertEqual(1, len(nf_subs))
94 def test_update_subscription_status(self):
95 self.app_conf.subscription.administrativeState = 'new_status'
96 self.app_conf.subscription.update_subscription_status()
97 sub = self.app_conf.subscription.get()
99 self.assertEqual('new_status', sub.status)
101 def test_update_sub_nf_status(self):
102 sub_name = 'ExtraPM-All-gNB-R2B'
104 self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
105 sub_nfs = Subscription.get_all_nfs_subscription_relations()
106 self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
108 Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_23')
109 sub_nfs = Subscription.get_all_nfs_subscription_relations()
110 self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
111 self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status)
113 @patch('mod.subscription.Subscription.add_network_function_to_subscription')
114 @patch('mod.subscription.Subscription.update_sub_nf_status')
115 def test_process_activate_subscription(self, mock_update_sub_nf, mock_add_nfs):
116 self.app_conf.subscription.create_subscription_on_nfs([list(self.xnfs)[0]],
117 self.mock_mr_pub, self.app_conf)
119 mock_add_nfs.assert_called()
120 self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
121 mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
122 'PENDING_CREATE', list(self.xnfs)[0].nf_name)
124 @patch('mod.subscription.Subscription.get_network_functions')
125 @patch('mod.subscription.Subscription.update_sub_nf_status')
126 def test_process_deactivate_subscription(self, mock_update_sub_nf, mock_get_nfs):
127 self.app_conf.subscription.administrativeState = 'LOCKED'
128 mock_get_nfs.return_value = [list(self.xnfs)[0]]
129 self.app_conf.subscription.delete_subscription_from_nfs(self.xnfs, self.mock_mr_pub,
131 self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
132 self.assertEqual(mock_update_sub_nf.call_count, 3)
134 def test_activate_subscription_exception(self):
135 self.assertRaises(Exception, self.app_conf.subscription.create_subscription_on_nfs,
136 [list(self.xnfs)[0]], 'not_mr_pub', 'app_config')
138 def test_prepare_subscription_event(self):
139 with open(os.path.join(os.path.dirname(__file__),
140 'data/pm_subscription_event.json'), 'r') as data:
141 expected_sub_event = json.load(data)
142 nf = NetworkFunction(nf_name='pnf_1',
143 ipv4_address='204.120.0.15',
145 model_invariant_id='some-id',
146 model_version_id='some-id')
147 nf.sdnc_model_name = 'some-name'
148 nf.sdnc_model_version = 'some-version'
149 actual_sub_event = self.app_conf.subscription.prepare_subscription_event(nf, self.app_conf)
150 print(actual_sub_event)
151 self.assertEqual(expected_sub_event, actual_sub_event)
153 def test_prepare_subscription_event_with_ipv6(self):
154 with open(os.path.join(os.path.dirname(__file__),
155 'data/pm_subscription_event.json'), 'r') as data:
156 expected_sub_event = json.load(data)
157 expected_sub_event['ipAddress'] = '2001:db8:3333:4444:5555:6666:7777:8888'
158 nf = NetworkFunction(nf_name='pnf_1',
159 ipv4_address='204.120.0.15',
160 ipv6_address='2001:db8:3333:4444:5555:6666:7777:8888',
161 model_invariant_id='some-id',
162 model_version_id='some-id')
163 nf.sdnc_model_name = 'some-name'
164 nf.sdnc_model_version = 'some-version'
165 actual_sub_event = self.app_conf.subscription.prepare_subscription_event(nf, self.app_conf)
166 print(actual_sub_event)
167 self.assertEqual(expected_sub_event, actual_sub_event)
169 def test_get_network_functions(self):
171 self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
172 nfs = self.app_conf.subscription.get_network_functions()
174 self.assertEqual(3, len(nfs))
175 self.assertIsInstance(nfs[0], NetworkFunction)