95db6b24b481154ea45a0581c7bf9fcfcfe41ff1
[dcaegen2/services.git] / components / pm-subscription-handler / tests / test_subscription.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2019-2020 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18 import json
19 import os
20 from test.support import EnvironmentVarGuard
21 from unittest import TestCase
22 from unittest.mock import patch
23
24 from requests import Session
25 from tenacity import stop_after_attempt
26
27 import mod.aai_client as aai_client
28 from mod import db, create_app
29 from mod.api.db_models import NetworkFunctionModel
30 from mod.network_function import NetworkFunction, OrchestrationStatus
31 from mod.pmsh_utils import AppConfig
32 from mod.subscription import Subscription
33
34
35 class SubscriptionTest(TestCase):
36     @patch('mod.update_logging_config')
37     @patch('mod.pmsh_utils._MrPub')
38     @patch('mod.pmsh_utils._MrSub')
39     @patch('mod.get_db_connection_url')
40     @patch.object(Session, 'put')
41     @patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
42     def setUp(self, mock_get_pmsh_config, mock_session, mock_get_db_url,
43               mock_mr_sub, mock_mr_pub, mock_update_config):
44         mock_get_db_url.return_value = 'sqlite://'
45         with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
46             self.aai_response_data = data.read()
47         mock_session.return_value.status_code = 200
48         mock_session.return_value.text = self.aai_response_data
49         self.env = EnvironmentVarGuard()
50         self.env.set('AAI_SERVICE_HOST', '1.2.3.4')
51         self.env.set('AAI_SERVICE_PORT', '8443')
52         self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
53         with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
54             self.cbs_data = json.load(data)
55         mock_get_pmsh_config.return_value = self.cbs_data
56         self.app_conf = AppConfig()
57         self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
58         self.mock_mr_sub = mock_mr_sub
59         self.mock_mr_pub = mock_mr_pub
60         self.app = create_app()
61         self.app_context = self.app.app_context()
62         self.app_context.push()
63         db.create_all()
64
65     def tearDown(self):
66         db.session.remove()
67         db.drop_all()
68         self.app_context.pop()
69
70     def test_xnf_filter_true(self):
71         self.assertTrue(self.app_conf.nf_filter.is_nf_in_filter('pnf1',
72                                                                 OrchestrationStatus.ACTIVE.value))
73
74     def test_xnf_filter_false(self):
75         self.assertFalse(self.app_conf.nf_filter.is_nf_in_filter('PNF-33',
76                                                                  OrchestrationStatus.ACTIVE.value))
77
78     def test_sub_measurement_group(self):
79         self.assertEqual(len(self.app_conf.subscription.measurementGroups), 2)
80
81     def test_sub_file_location(self):
82         self.assertEqual(self.app_conf.subscription.fileLocation, '/pm/pm.xml')
83
84     def test_get_subscription(self):
85         sub_name = 'ExtraPM-All-gNB-R2B'
86         self.app_conf.subscription.create()
87         new_sub = Subscription.get(sub_name)
88         self.assertEqual(sub_name, new_sub.subscription_name)
89
90     def test_get_subscription_no_match(self):
91         sub_name = 'sub2_does_not_exist'
92         sub = Subscription.get(sub_name)
93         self.assertEqual(sub, None)
94
95     def test_get_nf_names_per_sub(self):
96         self.app_conf.subscription.create()
97         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
98         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1])
99         nfs = Subscription.get_nf_names_per_sub(self.app_conf.subscription.subscriptionName)
100         self.assertEqual(2, len(nfs))
101
102     def test_create_existing_subscription(self):
103         sub1 = self.app_conf.subscription.create()
104         same_sub1 = self.app_conf.subscription.create()
105         self.assertEqual(sub1, same_sub1)
106         self.assertEqual(1, len(self.app_conf.subscription.get_all()))
107
108     def test_add_network_functions_per_subscription(self):
109         for nf in self.xnfs:
110             self.app_conf.subscription.add_network_function_to_subscription(nf)
111         nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
112         self.assertEqual(3, len(nfs_for_sub_1))
113         new_nf = NetworkFunction(nf_name='vnf_3', orchestration_status='Active')
114         self.app_conf.subscription.add_network_function_to_subscription(new_nf)
115         nf_subs = Subscription.get_all_nfs_subscription_relations()
116         self.assertEqual(4, len(nf_subs))
117
118     def test_add_duplicate_network_functions_per_subscription(self):
119         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
120         nf_subs = Subscription.get_all_nfs_subscription_relations()
121         self.assertEqual(1, len(nf_subs))
122         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
123         nf_subs = Subscription.get_all_nfs_subscription_relations()
124         self.assertEqual(1, len(nf_subs))
125
126     def test_update_subscription_status(self):
127         sub_name = 'ExtraPM-All-gNB-R2B'
128         self.app_conf.subscription.create()
129         self.app_conf.subscription.administrativeState = 'new_status'
130         self.app_conf.subscription.update_subscription_status()
131         sub = Subscription.get(sub_name)
132
133         self.assertEqual('new_status', sub.status)
134
135     def test_delete_subscription(self):
136         for nf in self.xnfs:
137             self.app_conf.subscription.add_network_function_to_subscription(nf)
138         self.app_conf.subscription.delete_subscription()
139         self.assertEqual(0, len(Subscription.get_all()))
140         self.assertEqual(None, Subscription.get(self.app_conf.subscription.subscriptionName))
141         self.assertEqual(0, len(Subscription.get_all_nfs_subscription_relations()))
142         self.assertEqual(0, len(NetworkFunction.get_all()))
143         self.assertEqual(None, NetworkFunction.get(nf_name=list(self.xnfs)[0].nf_name))
144
145     def test_update_sub_nf_status(self):
146         sub_name = 'ExtraPM-All-gNB-R2B'
147         for nf in self.xnfs:
148             self.app_conf.subscription.add_network_function_to_subscription(nf)
149         sub_nfs = Subscription.get_all_nfs_subscription_relations()
150         self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
151
152         Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_23')
153         sub_nfs = Subscription.get_all_nfs_subscription_relations()
154         self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
155         self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status)
156
157     @patch('mod.subscription.Subscription.add_network_function_to_subscription')
158     @patch('mod.subscription.Subscription.update_sub_nf_status')
159     @patch('mod.subscription.Subscription.update_subscription_status')
160     def test_process_activate_subscription(self, mock_update_sub_status,
161                                            mock_update_sub_nf, mock_add_nfs):
162         self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
163         self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
164                                                         self.app_conf)
165
166         mock_update_sub_status.assert_called()
167         mock_add_nfs.assert_called()
168         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
169         mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
170                                               'PENDING_CREATE', list(self.xnfs)[0].nf_name)
171
172     @patch('mod.subscription.Subscription.update_sub_nf_status')
173     @patch('mod.subscription.Subscription.update_subscription_status')
174     def test_process_deactivate_subscription(self, mock_update_sub_status,
175                                              mock_update_sub_nf):
176         self.app_conf.subscription.administrativeState = 'LOCKED'
177         self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
178         self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
179                                                         self.app_conf)
180
181         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
182         mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
183                                               'PENDING_DELETE', list(self.xnfs)[0].nf_name)
184         mock_update_sub_status.assert_called()
185
186     def test_process_subscription_exception(self):
187         self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
188         self.assertRaises(Exception, self.app_conf.subscription.process_subscription,
189                           [list(self.xnfs)[0]], 'not_mr_pub', 'app_config')
190
191     def test_prepare_subscription_event(self):
192         with open(os.path.join(os.path.dirname(__file__),
193                                'data/pm_subscription_event.json'), 'r') as data:
194             expected_sub_event = json.load(data)
195         actual_sub_event = self.app_conf.subscription.prepare_subscription_event('pnf_1',
196                                                                                  self.app_conf)
197         self.assertEqual(expected_sub_event, actual_sub_event)
198
199     def test_get_nf_models(self):
200         for nf in self.xnfs:
201             self.app_conf.subscription.add_network_function_to_subscription(nf)
202         nf_models = self.app_conf.subscription._get_nf_models()
203
204         self.assertEqual(3, len(nf_models))
205         self.assertIsInstance(nf_models[0], NetworkFunctionModel)
206
207     def test_get_network_functions(self):
208         for nf in self.xnfs:
209             self.app_conf.subscription.add_network_function_to_subscription(nf)
210         nfs = self.app_conf.subscription.get_network_functions()
211
212         self.assertEqual(3, len(nfs))
213         self.assertIsInstance(nfs[0], NetworkFunction)