[PMSH] Replace own logging implementation with pylog
[dcaegen2/services.git] / components / pm-subscription-handler / tests / test_subscription.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2019-2020 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18 import json
19 import os
20 from test.support import EnvironmentVarGuard
21 from unittest import TestCase
22 from unittest.mock import patch
23
24 from requests import Session
25 from tenacity import stop_after_attempt
26
27 import mod.aai_client as aai_client
28 from mod import db, create_app
29 from mod.api.db_models import NetworkFunctionModel
30 from mod.network_function import NetworkFunction, NetworkFunctionFilter, OrchestrationStatus
31 from mod.pmsh_utils import AppConfig
32 from mod.subscription import Subscription
33
34
35 class SubscriptionTest(TestCase):
36     @patch('mod.update_config')
37     @patch('mod.pmsh_utils._MrPub')
38     @patch('mod.pmsh_utils._MrSub')
39     @patch('mod.get_db_connection_url')
40     @patch.object(Session, 'put')
41     @patch('pmsh_service_main.AppConfig')
42     def setUp(self, mock_app_config, mock_session, mock_get_db_url,
43               mock_mr_sub, mock_mr_pub, mock_update_config):
44         mock_get_db_url.return_value = 'sqlite://'
45         with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
46             self.aai_response_data = data.read()
47         mock_session.return_value.status_code = 200
48         mock_session.return_value.text = self.aai_response_data
49         self.env = EnvironmentVarGuard()
50         self.env.set('AAI_SERVICE_HOST', '1.2.3.4')
51         self.env.set('AAI_SERVICE_PORT', '8443')
52         self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
53         with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
54             self.cbs_data_1 = json.load(data)
55         with open(os.path.join(os.path.dirname(__file__),
56                                'data/cbs_data_2.json'), 'r') as data:
57             self.cbs_data_2 = json.load(data)
58         self.sub_1, self.xnfs = aai_client.get_pmsh_subscription_data(self.cbs_data_1)
59         self.sub_2, self.xnfs = aai_client.get_pmsh_subscription_data(self.cbs_data_2)
60         self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
61         self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
62         self.xnf_filter = NetworkFunctionFilter(**self.sub_1.nfFilter)
63         self.mock_mr_sub = mock_mr_sub
64         self.mock_mr_pub = mock_mr_pub
65         self.app = create_app()
66         self.app_context = self.app.app_context()
67         self.app_context.push()
68         self.mock_app_config = mock_app_config
69         db.create_all()
70
71     def tearDown(self):
72         db.session.remove()
73         db.drop_all()
74         self.app_context.pop()
75
76     def test_xnf_filter_true(self):
77         self.assertTrue(self.xnf_filter.is_nf_in_filter('pnf1', OrchestrationStatus.ACTIVE.value))
78
79     def test_xnf_filter_false(self):
80         self.assertFalse(self.xnf_filter.is_nf_in_filter('PNF-33',
81                                                          OrchestrationStatus.ACTIVE.value))
82
83     def test_sub_measurement_group(self):
84         self.assertEqual(len(self.sub_1.measurementGroups), 2)
85
86     def test_sub_file_location(self):
87         self.assertEqual(self.sub_1.fileLocation, '/pm/pm.xml')
88
89     def test_get_subscription(self):
90         sub_name = 'ExtraPM-All-gNB-R2B'
91         self.sub_1.create()
92         new_sub = Subscription.get(sub_name)
93         self.assertEqual(sub_name, new_sub.subscription_name)
94
95     def test_get_subscription_no_match(self):
96         sub_name = 'sub2_does_not_exist'
97         sub = Subscription.get(sub_name)
98         self.assertEqual(sub, None)
99
100     def test_get_subscriptions(self):
101         self.sub_1.create()
102         self.sub_2.create()
103         subs = self.sub_1.get_all()
104
105         self.assertEqual(2, len(subs))
106
107     def test_get_nf_names_per_sub(self):
108         self.sub_1.create()
109         self.sub_1.add_network_function_to_subscription(self.nf_1)
110         self.sub_1.add_network_function_to_subscription(self.nf_2)
111         nfs = Subscription.get_nf_names_per_sub(self.sub_1.subscriptionName)
112         self.assertEqual(2, len(nfs))
113
114     def test_create_existing_subscription(self):
115         sub1 = self.sub_1.create()
116         same_sub1 = self.sub_1.create()
117         self.assertEqual(sub1, same_sub1)
118         self.assertEqual(1, len(self.sub_1.get_all()))
119
120     def test_add_network_functions_per_subscription(self):
121         for nf in [self.nf_1, self.nf_2]:
122             self.sub_1.add_network_function_to_subscription(nf)
123         nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
124         self.assertEqual(2, len(nfs_for_sub_1))
125         new_nf = NetworkFunction(nf_name='vnf_3', orchestration_status='Inventoried')
126         self.sub_1.add_network_function_to_subscription(new_nf)
127         nf_subs = Subscription.get_all_nfs_subscription_relations()
128         self.assertEqual(3, len(nf_subs))
129
130     def test_add_duplicate_network_functions_per_subscription(self):
131         self.sub_1.add_network_function_to_subscription(self.nf_1)
132         nf_subs = Subscription.get_all_nfs_subscription_relations()
133         self.assertEqual(1, len(nf_subs))
134         self.sub_1.add_network_function_to_subscription(self.nf_1)
135         nf_subs = Subscription.get_all_nfs_subscription_relations()
136         self.assertEqual(1, len(nf_subs))
137
138     def test_update_subscription_status(self):
139         sub_name = 'ExtraPM-All-gNB-R2B'
140         self.sub_1.create()
141         self.sub_1.administrativeState = 'new_status'
142         self.sub_1.update_subscription_status()
143         sub = Subscription.get(sub_name)
144
145         self.assertEqual('new_status', sub.status)
146
147     def test_delete_subscription(self):
148         for nf in [self.nf_1, self.nf_2]:
149             self.sub_1.add_network_function_to_subscription(nf)
150         for nf in [self.nf_2]:
151             self.sub_2.add_network_function_to_subscription(nf)
152
153         self.sub_1.delete_subscription()
154
155         self.assertEqual(1, len(Subscription.get_all()))
156         self.assertEqual(None, Subscription.get(self.sub_1.subscriptionName))
157         self.assertEqual(1, len(Subscription.get_all_nfs_subscription_relations()))
158         self.assertEqual(1, len(NetworkFunction.get_all()))
159         self.assertEqual(None, NetworkFunction.get(nf_name=self.nf_1.nf_name))
160
161     def test_update_sub_nf_status(self):
162         sub_name = 'ExtraPM-All-gNB-R2B'
163         for nf in [self.nf_1, self.nf_2]:
164             self.sub_1.add_network_function_to_subscription(nf)
165         sub_nfs = Subscription.get_all_nfs_subscription_relations()
166         self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
167
168         Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_1')
169         sub_nfs = Subscription.get_all_nfs_subscription_relations()
170         self.assertEqual('Active', sub_nfs[0].nf_sub_status)
171         self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status)
172
173     @patch('mod.subscription.Subscription.add_network_function_to_subscription')
174     @patch('mod.subscription.Subscription.update_sub_nf_status')
175     @patch('mod.subscription.Subscription.update_subscription_status')
176     def test_process_activate_subscription(self, mock_update_sub_status,
177                                            mock_update_sub_nf, mock_add_nfs):
178         self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
179         self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config)
180
181         mock_update_sub_status.assert_called()
182         mock_add_nfs.assert_called()
183         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
184         mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
185                                               'PENDING_CREATE', self.nf_1.nf_name)
186
187     @patch('mod.subscription.Subscription.update_sub_nf_status')
188     @patch('mod.subscription.Subscription.update_subscription_status')
189     def test_process_deactivate_subscription(self, mock_update_sub_status,
190                                              mock_update_sub_nf):
191         self.sub_1.administrativeState = 'LOCKED'
192         self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
193         self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub, self.mock_app_config)
194
195         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
196         mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
197                                               'PENDING_DELETE', self.nf_1.nf_name)
198         mock_update_sub_status.assert_called()
199
200     def test_process_subscription_exception(self):
201         self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
202         self.assertRaises(Exception, self.sub_1.process_subscription,
203                           [self.nf_1], 'not_mr_pub', 'app_config')
204
205     def test_prepare_subscription_event(self):
206         with open(os.path.join(os.path.dirname(__file__),
207                                'data/pm_subscription_event.json'), 'r') as data:
208             expected_sub_event = json.load(data)
209         app_conf = AppConfig(**self.cbs_data_1['config'])
210         actual_sub_event = self.sub_1.prepare_subscription_event(self.nf_1.nf_name, app_conf)
211         self.assertEqual(expected_sub_event, actual_sub_event)
212
213     def test_get_nf_models(self):
214         for nf in [self.nf_1, self.nf_2]:
215             self.sub_1.add_network_function_to_subscription(nf)
216         nf_models = self.sub_1._get_nf_models()
217
218         self.assertEqual(2, len(nf_models))
219         self.assertIsInstance(nf_models[0], NetworkFunctionModel)
220
221     def test_get_network_functions(self):
222         for nf in [self.nf_1, self.nf_2]:
223             self.sub_1.add_network_function_to_subscription(nf)
224         nfs = self.sub_1.get_network_functions()
225
226         self.assertEqual(2, len(nfs))
227         self.assertIsInstance(nfs[0], NetworkFunction)