Add Support for Activation and Deactivation
[dcaegen2/services.git] / components / pm-subscription-handler / tests / test_subscription.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2019-2020 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18 import json
19 import os
20 from test.support import EnvironmentVarGuard
21 from unittest import TestCase
22 from unittest.mock import patch
23
24 from requests import Session
25 from tenacity import stop_after_attempt
26
27 import mod.aai_client as aai_client
28 from mod import db, create_app
29 from mod.network_function import NetworkFunction
30 from mod.subscription import Subscription, NetworkFunctionFilter
31
32
33 class SubscriptionTest(TestCase):
34     @patch('mod.pmsh_utils._MrPub')
35     @patch('mod.pmsh_utils._MrSub')
36     @patch('mod.get_db_connection_url')
37     @patch.object(Session, 'put')
38     def setUp(self, mock_session, mock_get_db_url, mock_mr_sub, mock_mr_pub):
39         mock_get_db_url.return_value = 'sqlite://'
40         with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
41             self.aai_response_data = data.read()
42         mock_session.return_value.status_code = 200
43         mock_session.return_value.text = self.aai_response_data
44         self.env = EnvironmentVarGuard()
45         self.env.set('AAI_SERVICE_HOST', '1.2.3.4')
46         self.env.set('AAI_SERVICE_PORT_AAI_SSL', '8443')
47         self.env.set('TESTING', 'True')
48         self.env.set('LOGS_PATH', './unit_test_logs')
49         with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
50             self.cbs_data_1 = json.load(data)
51         with open(os.path.join(os.path.dirname(__file__),
52                                'data/cbs_data_2.json'), 'r') as data:
53             self.cbs_data_2 = json.load(data)
54         self.sub_1, self.xnfs = aai_client.get_pmsh_subscription_data(self.cbs_data_1)
55         self.sub_2, self.xnfs = aai_client.get_pmsh_subscription_data(self.cbs_data_2)
56         self.nf_1 = NetworkFunction(nf_name='pnf_1', orchestration_status='Inventoried')
57         self.nf_2 = NetworkFunction(nf_name='pnf_2', orchestration_status='Active')
58         self.xnf_filter = NetworkFunctionFilter(**self.sub_1.nfFilter)
59         self.mock_mr_sub = mock_mr_sub
60         self.mock_mr_pub = mock_mr_pub
61         self.app = create_app()
62         self.app_context = self.app.app_context()
63         self.app_context.push()
64         db.create_all()
65
66     def tearDown(self):
67         db.session.remove()
68         db.drop_all()
69         self.app_context.pop()
70
71     def test_xnf_filter_true(self):
72         self.assertTrue(self.xnf_filter.is_nf_in_filter('pnf1'))
73
74     def test_xnf_filter_false(self):
75         self.assertFalse(self.xnf_filter.is_nf_in_filter('PNF-33'))
76
77     def test_sub_measurement_group(self):
78         self.assertEqual(len(self.sub_1.measurementGroups), 2)
79
80     def test_sub_file_location(self):
81         self.assertEqual(self.sub_1.fileLocation, '/pm/pm.xml')
82
83     def test_get_subscription(self):
84         sub_name = 'ExtraPM-All-gNB-R2B'
85         self.sub_1.create()
86         new_sub = Subscription.get(sub_name)
87         self.assertEqual(sub_name, new_sub.subscription_name)
88
89     def test_get_subscription_no_match(self):
90         sub_name = 'sub2_does_not_exist'
91         sub = Subscription.get(sub_name)
92         self.assertEqual(sub, None)
93
94     def test_get_subscriptions(self):
95         self.sub_1.create()
96         self.sub_2.create()
97         subs = self.sub_1.get_all()
98
99         self.assertEqual(2, len(subs))
100
101     def test_create_existing_subscription(self):
102         sub1 = self.sub_1.create()
103         same_sub1 = self.sub_1.create()
104         self.assertEqual(sub1, same_sub1)
105         self.assertEqual(1, len(self.sub_1.get_all()))
106
107     def test_add_network_functions_per_subscription(self):
108         nf_array = [self.nf_1, self.nf_2]
109         self.sub_1.add_network_functions_to_subscription(nf_array)
110         nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
111         self.assertEqual(2, len(nfs_for_sub_1))
112         new_nf_array = [NetworkFunction(nf_name='vnf_3', orchestration_status='Inventoried')]
113         self.sub_1.add_network_functions_to_subscription(new_nf_array)
114         nf_subs = Subscription.get_all_nfs_subscription_relations()
115         print(nf_subs)
116         self.assertEqual(3, len(nf_subs))
117
118     def test_add_duplicate_network_functions_per_subscription(self):
119         nf_array = [self.nf_1]
120         self.sub_1.add_network_functions_to_subscription(nf_array)
121         nf_subs = Subscription.get_all_nfs_subscription_relations()
122         self.assertEqual(1, len(nf_subs))
123         self.sub_1.add_network_functions_to_subscription(nf_array)
124         nf_subs = Subscription.get_all_nfs_subscription_relations()
125         self.assertEqual(1, len(nf_subs))
126
127     def test_update_subscription_status(self):
128         sub_name = 'ExtraPM-All-gNB-R2B'
129         self.sub_1.create()
130         self.sub_1.administrativeState = 'new_status'
131         self.sub_1.update_subscription_status()
132         sub = Subscription.get(sub_name)
133
134         self.assertEqual('new_status', sub.status)
135
136     def test_delete_subscription(self):
137         self.sub_1.create()
138         subs = self.sub_1.get_all()
139         self.assertEqual(1, len(subs))
140
141         self.sub_1.delete_subscription()
142         new_subs = self.sub_1.get_all()
143         self.assertEqual(0, len(new_subs))
144
145     def test_update_sub_nf_status(self):
146         sub_name = 'ExtraPM-All-gNB-R2B'
147         nf_array = [self.nf_1, self.nf_2]
148         self.sub_1.add_network_functions_to_subscription(nf_array)
149         sub_nfs = Subscription.get_all_nfs_subscription_relations()
150         self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
151
152         Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_1')
153         sub_nfs = Subscription.get_all_nfs_subscription_relations()
154         self.assertEqual('Active', sub_nfs[0].nf_sub_status)
155         self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status)
156
157     @patch('mod.subscription.Subscription.add_network_functions_to_subscription')
158     @patch('mod.subscription.Subscription.update_sub_nf_status')
159     @patch('mod.subscription.Subscription.update_subscription_status')
160     def test_process_activate_subscription(self, mock_update_sub_status,
161                                            mock_update_sub_nf, mock_add_nfs):
162         self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
163         self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub)
164
165         mock_update_sub_status.assert_called()
166         mock_add_nfs.assert_called()
167         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
168         mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
169                                               'PENDING_CREATE', self.nf_1.nf_name)
170
171     @patch('mod.subscription.Subscription.update_sub_nf_status')
172     @patch('mod.subscription.Subscription.update_subscription_status')
173     def test_process_deactivate_subscription(self, mock_update_sub_status,
174                                              mock_update_sub_nf):
175         self.sub_1.administrativeState = 'LOCKED'
176         self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
177         self.sub_1.process_subscription([self.nf_1], self.mock_mr_pub)
178
179         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
180         mock_update_sub_nf.assert_called_with(self.sub_1.subscriptionName,
181                                               'PENDING_DELETE', self.nf_1.nf_name)
182         mock_update_sub_status.assert_called()
183
184     def test_process_subscription_exception(self):
185         self.sub_1.process_subscription.retry.stop = stop_after_attempt(1)
186         self.assertRaises(Exception, self.sub_1.process_subscription,
187                           [self.nf_1], 'not_mr_pub')