538baf39f28a4c545fcca9d8b22bc9adb6b41298
[dcaegen2/services.git] / components / pm-subscription-handler / tests / test_subscription.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2019-2021 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18 import json
19 import os
20 from unittest.mock import patch, Mock
21
22 from requests import Session
23
24 import mod.aai_client as aai_client
25 from mod.network_function import NetworkFunction
26 from mod.subscription import Subscription
27 from tests.base_setup import BaseClassSetup
28
29
30 class SubscriptionTest(BaseClassSetup):
31
32     @classmethod
33     def setUpClass(cls):
34         super().setUpClass()
35
36     @patch.object(Session, 'get')
37     @patch.object(Session, 'put')
38     def setUp(self, mock_session_put, mock_session_get):
39         super().setUp()
40         with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
41             self.aai_response_data = data.read()
42         mock_session_put.return_value.status_code = 200
43         mock_session_put.return_value.text = self.aai_response_data
44         with open(os.path.join(os.path.dirname(__file__), 'data/aai_model_info.json'), 'r') as data:
45             self.aai_model_data = data.read()
46         mock_session_get.return_value.status_code = 200
47         mock_session_get.return_value.text = self.aai_model_data
48         self.mock_mr_sub = Mock()
49         self.mock_mr_pub = Mock()
50         self.app_conf.subscription.create()
51         self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf, self.app_conf.nf_filter)
52         self.sub_model = self.app_conf.subscription.get()
53
54     def tearDown(self):
55         super().tearDown()
56
57     @classmethod
58     def tearDownClass(cls):
59         super().tearDownClass()
60
61     def test_sub_measurement_group(self):
62         self.assertEqual(len(self.app_conf.subscription.measurementGroups), 2)
63
64     def test_sub_file_location(self):
65         self.assertEqual(self.app_conf.subscription.fileLocation, '/pm/pm.xml')
66
67     def test_get_subscription(self):
68         sub_name = 'ExtraPM-All-gNB-R2B'
69         new_sub = self.app_conf.subscription.get()
70         self.assertEqual(sub_name, new_sub.subscription_name)
71
72     def test_get_nf_names_per_sub(self):
73         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
74                                                                         self.sub_model)
75         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1],
76                                                                         self.sub_model)
77
78     def test_create_existing_subscription(self):
79         sub1 = self.app_conf.subscription.create()
80         same_sub1 = self.app_conf.subscription.create()
81         self.assertEqual(sub1, same_sub1)
82         self.assertEqual(1, len(self.app_conf.subscription.get_all()))
83
84     def test_add_duplicate_network_functions_per_subscription(self):
85         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
86                                                                         self.sub_model)
87         nf_subs = Subscription.get_all_nfs_subscription_relations()
88         self.assertEqual(1, len(nf_subs))
89         self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0],
90                                                                         self.sub_model)
91         nf_subs = Subscription.get_all_nfs_subscription_relations()
92         self.assertEqual(1, len(nf_subs))
93
94     def test_update_subscription_status(self):
95         self.app_conf.subscription.administrativeState = 'new_status'
96         self.app_conf.subscription.update_subscription_status()
97         sub = self.app_conf.subscription.get()
98
99         self.assertEqual('new_status', sub.status)
100
101     def test_update_sub_nf_status(self):
102         sub_name = 'ExtraPM-All-gNB-R2B'
103         for nf in self.xnfs:
104             self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
105         sub_nfs = Subscription.get_all_nfs_subscription_relations()
106         self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
107
108         Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_23')
109         sub_nfs = Subscription.get_all_nfs_subscription_relations()
110         self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
111         self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status)
112
113     @patch('mod.subscription.Subscription.add_network_function_to_subscription')
114     @patch('mod.subscription.Subscription.update_sub_nf_status')
115     def test_process_activate_subscription(self, mock_update_sub_nf, mock_add_nfs):
116         self.app_conf.subscription.create_subscription_on_nfs([list(self.xnfs)[0]],
117                                                               self.mock_mr_pub)
118
119         mock_add_nfs.assert_called()
120         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
121         mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
122                                               'PENDING_CREATE', list(self.xnfs)[0].nf_name)
123
124     @patch('mod.subscription.Subscription.get_network_functions')
125     @patch('mod.subscription.Subscription.update_sub_nf_status')
126     def test_process_deactivate_subscription(self, mock_update_sub_nf, mock_get_nfs):
127         self.app_conf.subscription.administrativeState = 'LOCKED'
128         mock_get_nfs.return_value = [list(self.xnfs)[0]]
129         self.app_conf.subscription.delete_subscription_from_nfs(self.xnfs, self.mock_mr_pub)
130         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
131         self.assertEqual(mock_update_sub_nf.call_count, 3)
132
133     def test_activate_subscription_exception(self):
134         self.assertRaises(Exception, self.app_conf.subscription.create_subscription_on_nfs,
135                           [list(self.xnfs)[0]], 'not_mr_pub', 'app_config')
136
137     def test_prepare_subscription_event(self):
138         with open(os.path.join(os.path.dirname(__file__),
139                                'data/pm_subscription_event.json'), 'r') as data:
140             expected_sub_event = json.load(data)
141         nf = NetworkFunction(nf_name='pnf_1',
142                              ipv4_address='204.120.0.15',
143                              ipv6_address='',
144                              model_invariant_id='some-id',
145                              model_version_id='some-id')
146         nf.sdnc_model_name = 'some-name'
147         nf.sdnc_model_version = 'some-version'
148         actual_sub_event = self.app_conf.subscription.prepare_subscription_event(nf)
149         print(actual_sub_event)
150         self.assertEqual(expected_sub_event, actual_sub_event)
151
152     def test_prepare_subscription_event_with_ipv6(self):
153         with open(os.path.join(os.path.dirname(__file__),
154                                'data/pm_subscription_event.json'), 'r') as data:
155             expected_sub_event = json.load(data)
156         expected_sub_event['ipAddress'] = '2001:db8:3333:4444:5555:6666:7777:8888'
157         nf = NetworkFunction(nf_name='pnf_1',
158                              ipv4_address='204.120.0.15',
159                              ipv6_address='2001:db8:3333:4444:5555:6666:7777:8888',
160                              model_invariant_id='some-id',
161                              model_version_id='some-id')
162         nf.sdnc_model_name = 'some-name'
163         nf.sdnc_model_version = 'some-version'
164         actual_sub_event = self.app_conf.subscription.prepare_subscription_event(nf)
165         print(actual_sub_event)
166         self.assertEqual(expected_sub_event, actual_sub_event)
167
168     def test_get_network_functions(self):
169         for nf in self.xnfs:
170             self.app_conf.subscription.add_network_function_to_subscription(nf, self.sub_model)
171         nfs = self.app_conf.subscription.get_network_functions()
172
173         self.assertEqual(3, len(nfs))
174         self.assertIsInstance(nfs[0], NetworkFunction)