Merge "Refactor and bug fixes"
[dcaegen2/services.git] / components / pm-subscription-handler / tests / test_subscription.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2019-2020 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18 import json
19 import os
20 from test.support import EnvironmentVarGuard
21 from unittest import TestCase
22 from unittest.mock import patch
23
24 from requests import Session
25
26 import mod.aai_client as aai_client
27 from mod import db, create_app
28 from mod.api.db_models import NetworkFunctionModel
29 from mod.network_function import NetworkFunction
30 from mod.pmsh_utils import AppConfig
31 from mod.subscription import Subscription
32
33
34 class SubscriptionTest(TestCase):
35     @patch('mod.update_logging_config')
36     @patch('mod.pmsh_utils._MrPub')
37     @patch('mod.pmsh_utils._MrSub')
38     @patch('mod.get_db_connection_url')
39     @patch.object(Session, 'put')
40     @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
41     def setUp(self, mock_get_pmsh_config, mock_session, mock_get_db_url,
42               mock_mr_sub, mock_mr_pub, mock_update_config):
43         mock_get_db_url.return_value = 'sqlite://'
44         with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
45             self.aai_response_data = data.read()
46         mock_session.return_value.status_code = 200
47         mock_session.return_value.text = self.aai_response_data
48         self.env = EnvironmentVarGuard()
49         self.env.set('AAI_SERVICE_PORT', '8443')
50         self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
51         with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
52             self.cbs_data = json.load(data)
53         mock_get_pmsh_config.return_value = self.cbs_data
54         self.mock_mr_sub = mock_mr_sub
55         self.mock_mr_pub = mock_mr_pub
56         self.app = create_app()
57         self.app_context = self.app.app_context()
58         self.app_context.push()
59         db.create_all()
60         self.app_conf = AppConfig()
61         self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
62         self.sub_model = self.app_conf.subscription.get()
63
64     def tearDown(self):
65         db.drop_all()
66         db.session.remove()
67         self.app_context.pop()
68
69     def test_xnf_filter_true(self):
70         self.assertTrue(self.app_conf.nf_filter.is_nf_in_filter('pnf1',
71                                                                 'Active'))
72
73     def test_xnf_filter_false(self):
74         self.assertFalse(self.app_conf.nf_filter.is_nf_in_filter('PNF-33',
75                                                                  'Active'))
76
77     def test_sub_measurement_group(self):
78         self.assertEqual(len(self.app_conf.subscription.measurementGroups), 2)
79
80     def test_sub_file_location(self):
81         self.assertEqual(self.app_conf.subscription.fileLocation, '/pm/pm.xml')
82
83     def test_get_subscription(self):
84         sub_name = 'ExtraPM-All-gNB-R2B'
85         self.app_conf.subscription.create()
86         new_sub = self.app_conf.subscription.get()
87         self.assertEqual(sub_name, new_sub.subscription_name)
88
89     def test_get_nf_names_per_sub(self):
90         self.app_conf.subscription.create()
91         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
92                                                                         self.sub_model)
93         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1],
94                                                                         self.sub_model)
95         nfs = Subscription.get_nf_names_per_sub(self.app_conf.subscription.subscriptionName)
96         self.assertEqual(2, len(nfs))
97
98     def test_create_existing_subscription(self):
99         sub1 = self.app_conf.subscription.create()
100         same_sub1 = self.app_conf.subscription.create()
101         self.assertEqual(sub1, same_sub1)
102         self.assertEqual(1, len(self.app_conf.subscription.get_all()))
103
104     def test_add_network_functions_per_subscription(self):
105         for nf in self.xnfs:
106             self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
107         nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
108         self.assertEqual(3, len(nfs_for_sub_1))
109         new_nf = NetworkFunction(nf_name='vnf_3', orchestration_status='Active')
110         self.app_conf.subscription.add_network_function_to_subscription(new_nf, self.sub_model)
111         nf_subs = Subscription.get_all_nfs_subscription_relations()
112         self.assertEqual(4, len(nf_subs))
113
114     def test_add_duplicate_network_functions_per_subscription(self):
115         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
116                                                                         self.sub_model)
117         nf_subs = Subscription.get_all_nfs_subscription_relations()
118         self.assertEqual(1, len(nf_subs))
119         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
120                                                                         self.sub_model)
121         nf_subs = Subscription.get_all_nfs_subscription_relations()
122         self.assertEqual(1, len(nf_subs))
123
124     def test_update_subscription_status(self):
125         self.app_conf.subscription.create()
126         self.app_conf.subscription.administrativeState = 'new_status'
127         self.app_conf.subscription.update_subscription_status()
128         sub = self.app_conf.subscription.get()
129
130         self.assertEqual('new_status', sub.status)
131
132     def test_delete_subscription(self):
133         for nf in self.xnfs:
134             self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
135         self.app_conf.subscription.delete_subscription()
136         self.assertEqual(0, len(Subscription.get_all()))
137         self.assertEqual(None, self.app_conf.subscription.get())
138         self.assertEqual(0, len(Subscription.get_all_nfs_subscription_relations()))
139         self.assertEqual(0, len(NetworkFunction.get_all()))
140         self.assertEqual(None, NetworkFunction.get(nf_name=list(self.xnfs)[0].nf_name))
141
142     def test_update_sub_nf_status(self):
143         sub_name = 'ExtraPM-All-gNB-R2B'
144         for nf in self.xnfs:
145             self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
146         sub_nfs = Subscription.get_all_nfs_subscription_relations()
147         self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
148
149         Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_23')
150         sub_nfs = Subscription.get_all_nfs_subscription_relations()
151         self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
152         self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status)
153
154     @patch('mod.subscription.Subscription.add_network_function_to_subscription')
155     @patch('mod.subscription.Subscription.update_sub_nf_status')
156     def test_process_activate_subscription(self, mock_update_sub_nf, mock_add_nfs):
157         self.app_conf.subscription.activate_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
158                                                          self.app_conf)
159
160         mock_add_nfs.assert_called()
161         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
162         mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
163                                               'PENDING_CREATE', list(self.xnfs)[0].nf_name)
164
165     @patch('mod.subscription.Subscription.get_network_functions')
166     @patch('mod.subscription.Subscription.update_sub_nf_status')
167     def test_process_deactivate_subscription(self, mock_update_sub_nf, mock_get_nfs):
168         self.app_conf.subscription.administrativeState = 'LOCKED'
169         mock_get_nfs.return_value = [list(self.xnfs)[0]]
170         self.app_conf.subscription.deactivate_subscription(self.mock_mr_pub, self.app_conf)
171         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
172         mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
173                                               'PENDING_DELETE', list(self.xnfs)[0].nf_name)
174
175     def test_activate_subscription_exception(self):
176         self.assertRaises(Exception, self.app_conf.subscription.activate_subscription,
177                           [list(self.xnfs)[0]], 'not_mr_pub', 'app_config')
178
179     def test_prepare_subscription_event(self):
180         with open(os.path.join(os.path.dirname(__file__),
181                                'data/pm_subscription_event.json'), 'r') as data:
182             expected_sub_event = json.load(data)
183         actual_sub_event = self.app_conf.subscription.prepare_subscription_event('pnf_1',
184                                                                                  self.app_conf)
185         self.assertEqual(expected_sub_event, actual_sub_event)
186
187     def test_get_nf_models(self):
188         for nf in self.xnfs:
189             self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
190         nf_models = self.app_conf.subscription._get_nf_models()
191
192         self.assertEqual(3, len(nf_models))
193         self.assertIsInstance(nf_models[0], NetworkFunctionModel)
194
195     def test_get_network_functions(self):
196         for nf in self.xnfs:
197             self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
198         nfs = self.app_conf.subscription.get_network_functions()
199
200         self.assertEqual(3, len(nfs))
201         self.assertIsInstance(nfs[0], NetworkFunction)