[DCAEGEN2] PMSH AAI changes with new subscription format
[dcaegen2/services.git] / components / pm-subscription-handler / pmsh_service / mod / subscription_handler.py
1 # ============LICENSE_START===================================================
2 #  Copyright (C) 2020-2021 Nordix Foundation.
3 # ============================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # SPDX-License-Identifier: Apache-2.0
17 # ============LICENSE_END=====================================================
18 from jsonschema import ValidationError
19
20 from mod import logger, aai_client
21 from mod.network_function import NetworkFunctionFilter
22 from mod.subscription import AdministrativeState
23
24
25 class SubscriptionHandler:
26     def __init__(self, mr_pub, aai_sub, app, app_conf):
27         self.mr_pub = mr_pub
28         self.aai_sub = aai_sub
29         self.app = app
30         self.app_conf = app_conf
31         self.aai_event_thread = None
32
33     def execute(self):
34         """
35         Checks for changes of administrative state in config and proceeds to process
36         the Subscription if a change has occurred
37         """
38         self.app.app_context().push()
39         try:
40             local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
41             if local_admin_state == AdministrativeState.LOCKING.value:
42                 self._check_for_failed_nfs()
43             else:
44                 self.app_conf.refresh_config()
45                 self.app_conf.validate_sub_schema()
46                 new_administrative_state = self.app_conf.subscription.administrativeState
47                 if local_admin_state == new_administrative_state:
48                     logger.info(f'Administrative State did not change in the app config: '
49                                 f'{new_administrative_state}')
50                 else:
51                     self._check_state_change(local_admin_state, new_administrative_state)
52         except (ValidationError, TypeError) as err:
53             logger.error(f'Error occurred during validation of subscription schema {err}',
54                          exc_info=True)
55         except Exception as err:
56             logger.error(f'Error occurred during the activation/deactivation process {err}',
57                          exc_info=True)
58
59     def _check_state_change(self, local_admin_state, new_administrative_state):
60         if new_administrative_state == AdministrativeState.UNLOCKED.value:
61             logger.info(f'Administrative State has changed from {local_admin_state} '
62                         f'to {new_administrative_state}.')
63             self._activate(new_administrative_state)
64         elif new_administrative_state == AdministrativeState.LOCKED.value:
65             logger.info(f'Administrative State has changed from {local_admin_state} '
66                         f'to {new_administrative_state}.')
67             self._deactivate()
68         else:
69             raise Exception(f'Invalid AdministrativeState: {new_administrative_state}')
70
71     def _activate(self, new_administrative_state):
72         if not self.app_conf.nf_filter:
73             self.app_conf.nf_filter = NetworkFunctionFilter(**self.app_conf.subscription.nfFilter)
74         self.app_conf.subscription.update_sub_params(new_administrative_state,
75                                                      self.app_conf.subscription.fileBasedGP,
76                                                      self.app_conf.subscription.fileLocation,
77                                                      self.app_conf.subscription.measurementGroups)
78         nfs_in_aai = aai_client.get_pmsh_nfs_from_aai(self.app_conf, self.app_conf.nf_filter)
79         self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub)
80         self.app_conf.subscription.update_subscription_status()
81
82     def _deactivate(self):
83         nfs = self.app_conf.subscription.get_network_functions()
84         if nfs:
85             self.stop_aai_event_thread()
86             self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value
87             logger.info('Subscription is now LOCKING/DEACTIVATING.')
88             self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub)
89             self.app_conf.subscription.update_subscription_status()
90
91     def stop_aai_event_thread(self):
92         if self.aai_event_thread is not None:
93             self.aai_event_thread.cancel()
94             self.aai_event_thread = None
95             logger.info('Stopping polling for NFs events on AAI-EVENT topic in MR.')
96
97     def _check_for_failed_nfs(self):
98         logger.info('Checking for DELETE_FAILED NFs before LOCKING Subscription.')
99         del_failed_nfs = self.app_conf.subscription.get_delete_failed_nfs()
100         if del_failed_nfs or self.app_conf.subscription.get_delete_pending_nfs():
101             for nf in del_failed_nfs:
102                 nf_model = nf.get(nf.nf_name)
103                 if nf_model.retry_count < 3:
104                     logger.info(f'Retry deletion of subscription '
105                                 f'{self.app_conf.subscription.subscriptionName} '
106                                 f'from NF: {nf.nf_name}')
107                     self.app_conf.subscription.delete_subscription_from_nfs([nf], self.mr_pub)
108                     nf.increment_retry_count()
109                 else:
110                     logger.error(f'Failed to delete the subscription '
111                                  f'{self.app_conf.subscription.subscriptionName} '
112                                  f'from NF: {nf.nf_name} after {nf_model.retry_count} '
113                                  f'attempts. Removing NF from DB')
114                     nf.delete(nf_name=nf.nf_name)
115         else:
116             logger.info('Proceeding to LOCKED adminState.')
117             self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
118             self.app_conf.subscription.update_subscription_status()