[PMSH] Update Filter API
[dcaegen2/services.git] / components / pm-subscription-handler / pmsh_service / mod / subscription_handler.py
index e74a173..5fbb9a6 100644 (file)
@@ -1,5 +1,5 @@
 # ============LICENSE_START===================================================
-#  Copyright (C) 2020 Nordix Foundation.
+#  Copyright (C) 2020-2021 Nordix Foundation.
 # ============================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 #
 # SPDX-License-Identifier: Apache-2.0
 # ============LICENSE_END=====================================================
+from jsonschema import ValidationError
 
-import mod.aai_client as aai
-from mod import logger
+from mod import logger, aai_client
+from mod.network_function import NetworkFunctionFilter
 from mod.subscription import AdministrativeState
 
 
 class SubscriptionHandler:
-    def __init__(self, mr_pub, app, app_conf, aai_event_thread, policy_event_thread):
+    def __init__(self, mr_pub, aai_sub, app, app_conf):
         self.mr_pub = mr_pub
+        self.aai_sub = aai_sub
         self.app = app
         self.app_conf = app_conf
-        self.aai_event_thread = aai_event_thread
-        self.policy_event_thread = policy_event_thread
+        self.aai_event_thread = None
 
     def execute(self):
         """
@@ -35,37 +36,83 @@ class SubscriptionHandler:
         the Subscription if a change has occurred
         """
         self.app.app_context().push()
-        local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
-        new_administrative_state = self.app_conf.subscription.administrativeState
         try:
-            if local_admin_state == new_administrative_state:
-                logger.info('Administrative State did not change in the Config')
+            local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
+            if local_admin_state == AdministrativeState.LOCKING.value:
+                self._check_for_failed_nfs()
             else:
-                if new_administrative_state == AdministrativeState.UNLOCKED.value:
-                    self._activate(local_admin_state, new_administrative_state)
-                elif local_admin_state == AdministrativeState.PENDING.value:
-                    logger.info('Administrative State is PENDING')
+                self.app_conf.refresh_config()
+                self.app_conf.validate_sub_schema()
+                new_administrative_state = self.app_conf.subscription.administrativeState
+                if local_admin_state == new_administrative_state:
+                    logger.info(f'Administrative State did not change in the app config: '
+                                f'{new_administrative_state}')
                 else:
-                    self._deactivate(local_admin_state, new_administrative_state)
+                    self._check_state_change(local_admin_state, new_administrative_state)
+        except (ValidationError, TypeError) as err:
+            logger.error(f'Error occurred during validation of subscription schema {err}',
+                         exc_info=True)
         except Exception as err:
             logger.error(f'Error occurred during the activation/deactivation process {err}',
                          exc_info=True)
 
-    def _activate(self, local_admin_state, new_administrative_state):
-        logger.info(f'Administrative State has changed from {local_admin_state} '
-                    f'to {new_administrative_state}.')
-        existing_nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf)
-        self.app_conf.subscription.activate_subscription(existing_nfs_in_aai, self.mr_pub,
-                                                         self.app_conf)
-        self.app_conf.subscription.update_subscription_status()
-        logger.info('Start listening for new NFs on AAI-EVENT topic in MR.')
-        self.aai_event_thread.start()
-        self.policy_event_thread.start()
+    def _check_state_change(self, local_admin_state, new_administrative_state):
+        if new_administrative_state == AdministrativeState.UNLOCKED.value:
+            logger.info(f'Administrative State has changed from {local_admin_state} '
+                        f'to {new_administrative_state}.')
+            self._activate(new_administrative_state)
+        elif new_administrative_state == AdministrativeState.LOCKED.value:
+            logger.info(f'Administrative State has changed from {local_admin_state} '
+                        f'to {new_administrative_state}.')
+            self._deactivate()
+        else:
+            raise Exception(f'Invalid AdministrativeState: {new_administrative_state}')
 
-    def _deactivate(self, local_admin_state, new_administrative_state):
-        logger.info(f'Administrative State has changed from {local_admin_state} '
-                    f'to {new_administrative_state}.')
-        self.aai_event_thread.cancel()
-        logger.info('Stop listening for NFs on AAI-EVENT topic in MR.')
-        self.app_conf.subscription.deactivate_subscription(self.mr_pub, self.app_conf)
+    def _activate(self, new_administrative_state):
+        if not self.app_conf.nf_filter:
+            self.app_conf.nf_filter = NetworkFunctionFilter(**self.app_conf.subscription.nfFilter)
+        self.app_conf.subscription.update_sub_params(new_administrative_state,
+                                                     self.app_conf.subscription.fileBasedGP,
+                                                     self.app_conf.subscription.fileLocation,
+                                                     self.app_conf.subscription.measurementGroups)
+        nfs_in_aai = aai_client.get_pmsh_nfs_from_aai(self.app_conf, self.app_conf.nf_filter)
+        self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub)
         self.app_conf.subscription.update_subscription_status()
+
+    def _deactivate(self):
+        nfs = self.app_conf.subscription.get_network_functions()
+        if nfs:
+            self.stop_aai_event_thread()
+            self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value
+            logger.info('Subscription is now LOCKING/DEACTIVATING.')
+            self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub)
+            self.app_conf.subscription.update_subscription_status()
+
+    def stop_aai_event_thread(self):
+        if self.aai_event_thread is not None:
+            self.aai_event_thread.cancel()
+            self.aai_event_thread = None
+            logger.info('Stopping polling for NFs events on AAI-EVENT topic in MR.')
+
+    def _check_for_failed_nfs(self):
+        logger.info('Checking for DELETE_FAILED NFs before LOCKING Subscription.')
+        del_failed_nfs = self.app_conf.subscription.get_delete_failed_nfs()
+        if del_failed_nfs or self.app_conf.subscription.get_delete_pending_nfs():
+            for nf in del_failed_nfs:
+                nf_model = nf.get(nf.nf_name)
+                if nf_model.retry_count < 3:
+                    logger.info(f'Retry deletion of subscription '
+                                f'{self.app_conf.subscription.subscriptionName} '
+                                f'from NF: {nf.nf_name}')
+                    self.app_conf.subscription.delete_subscription_from_nfs([nf], self.mr_pub)
+                    nf.increment_retry_count()
+                else:
+                    logger.error(f'Failed to delete the subscription '
+                                 f'{self.app_conf.subscription.subscriptionName} '
+                                 f'from NF: {nf.nf_name} after {nf_model.retry_count} '
+                                 f'attempts. Removing NF from DB')
+                    nf.delete(nf_name=nf.nf_name)
+        else:
+            logger.info('Proceeding to LOCKED adminState.')
+            self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
+            self.app_conf.subscription.update_subscription_status()