[PMSH] Bug fix to include ip in event
[dcaegen2/services.git] / components / pm-subscription-handler / pmsh_service / mod / subscription.py
index 99a787d..34753e8 100755 (executable)
 #
 # SPDX-License-Identifier: Apache-2.0
 # ============LICENSE_END=====================================================
-
 from enum import Enum
 
-from tenacity import retry, retry_if_exception_type, wait_exponential, stop_after_attempt
-
-import mod.pmsh_logging as logger
-from mod import db
-from mod.db_models import SubscriptionModel, NfSubRelationalModel
+from mod import db, logger
+from mod.api.db_models import SubscriptionModel, NfSubRelationalModel, NetworkFunctionModel
 
 
 class SubNfState(Enum):
@@ -36,6 +32,7 @@ class SubNfState(Enum):
 class AdministrativeState(Enum):
     UNLOCKED = 'UNLOCKED'
     LOCKED = 'LOCKED'
+    PENDING = 'PENDING'
 
 
 subscription_nf_states = {
@@ -58,24 +55,7 @@ class Subscription:
         self.fileLocation = kwargs.get('fileLocation')
         self.nfFilter = kwargs.get('nfFilter')
         self.measurementGroups = kwargs.get('measurementGroups')
-
-    def prepare_subscription_event(self, xnf_name, app_conf):
-        """Prepare the sub event for publishing
-
-        Args:
-            xnf_name: the AAI xnf name.
-            app_conf (AppConfig): the application configuration.
-
-        Returns:
-            dict: the Subscription event to be published.
-        """
-        clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
-        sub_event = {'nfName': xnf_name, 'policyName': app_conf.operational_policy_name,
-                     'changeType': 'DELETE'
-                     if self.administrativeState == AdministrativeState.LOCKED.value
-                     else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
-                     'subscription': clean_sub}
-        return sub_event
+        self.create()
 
     def create(self):
         """ Creates a subscription database entry
@@ -83,124 +63,164 @@ class Subscription:
         Returns:
             Subscription object
         """
-        existing_subscription = (SubscriptionModel.query.filter(
-            SubscriptionModel.subscription_name == self.subscriptionName).one_or_none())
+        try:
+            existing_subscription = (SubscriptionModel.query.filter(
+                SubscriptionModel.subscription_name == self.subscriptionName).one_or_none())
+            if existing_subscription is None:
+                new_subscription = SubscriptionModel(subscription_name=self.subscriptionName,
+                                                     status=AdministrativeState.PENDING.value)
+                db.session.add(new_subscription)
+                db.session.commit()
+                return new_subscription
+            else:
+                logger.debug(f'Subscription {self.subscriptionName} already exists,'
+                             f' returning this subscription..')
+                return existing_subscription
+        except Exception as e:
+            logger.error(f'Failed to create subscription {self.subscriptionName} in the DB: {e}',
+                         exc_info=True)
+        finally:
+            db.session.remove()
 
-        if existing_subscription is None:
-            new_subscription = SubscriptionModel(subscription_name=self.subscriptionName,
-                                                 status=self.administrativeState)
+    def update_subscription_status(self):
+        """ Updates the status of subscription in subscription table """
+        try:
+            SubscriptionModel.query.filter(
+                SubscriptionModel.subscription_name == self.subscriptionName)\
+                .update({SubscriptionModel.status: self.administrativeState},
+                        synchronize_session='evaluate')
 
-            db.session.add(new_subscription)
             db.session.commit()
+        except Exception as e:
+            logger.error(f'Failed to update status of subscription: {self.subscriptionName}: {e}',
+                         exc_info=True)
+        finally:
+            db.session.remove()
 
-            return new_subscription
+    def prepare_subscription_event(self, nf, app_conf):
+        """Prepare the sub event for publishing
 
-        else:
-            logger.debug(f'Subscription {self.subscriptionName} already exists,'
-                         f' returning this subscription..')
-            return existing_subscription
+        Args:
+            nf (NetworkFunction): the AAI nf.
+            app_conf (AppConfig): the application configuration.
 
-    def add_network_functions_to_subscription(self, nf_list):
-        """ Associates network functions to a Subscription
+        Returns:
+            dict: the Subscription event to be published.
+        """
+        try:
+            clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
+            sub_event = {'nfName': nf.nf_name,
+                         'ipv4Address': nf.ip_address,
+                         'blueprintName': nf.sdnc_model_name,
+                         'blueprintVersion': nf.sdnc_model_version,
+                         'policyName': app_conf.operational_policy_name,
+                         'changeType': 'DELETE'
+                         if self.administrativeState == AdministrativeState.LOCKED.value
+                         else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
+                         'subscription': clean_sub}
+            return sub_event
+        except Exception as e:
+            logger.error(f'Failed to prep Sub event for xNF {nf.nf_name}: {e}', exc_info=True)
+            raise
+
+    def add_network_function_to_subscription(self, nf, sub_model):
+        """ Associates a network function to a Subscription
 
         Args:
-            nf_list : A list of NetworkFunction objects.
+            sub_model(SubscriptionModel): The SubscriptionModel from the DB.
+            nf(NetworkFunction): A NetworkFunction object.
         """
-        current_sub = self.create()
-        logger.debug(f'Adding network functions to subscription {current_sub.subscription_name}')
-
-        for nf in nf_list:
+        try:
             current_nf = nf.create()
-
             existing_entry = NfSubRelationalModel.query.filter(
-                NfSubRelationalModel.subscription_name == current_sub.subscription_name,
+                NfSubRelationalModel.subscription_name == self.subscriptionName,
                 NfSubRelationalModel.nf_name == current_nf.nf_name).one_or_none()
             if existing_entry is None:
-                new_nf_sub = NfSubRelationalModel(current_sub.subscription_name,
+                new_nf_sub = NfSubRelationalModel(self.subscriptionName,
                                                   nf.nf_name, SubNfState.PENDING_CREATE.value)
-                new_nf_sub.nf = current_nf
-                logger.debug(current_nf)
-                current_sub.nfs.append(new_nf_sub)
-
-        db.session.add(current_sub)
-        db.session.commit()
+                sub_model.nfs.append(new_nf_sub)
+                db.session.add(sub_model)
+                db.session.commit()
+                logger.info(f'Network function {current_nf.nf_name} added to Subscription '
+                            f'{self.subscriptionName}')
+        except Exception as e:
+            logger.error(f'Failed to add nf {nf.nf_name} to subscription '
+                         f'{self.subscriptionName}: {e}', exc_info=True)
+            logger.debug(f'Subscription {self.subscriptionName} now contains these XNFs:'
+                         f'{[nf.nf_name for nf.nf_name in self.get_network_functions()]}')
+
+    def get(self):
+        """ Retrieves a SubscriptionModel object
 
-    @staticmethod
-    def get(subscription_name):
-        """ Retrieves a subscription
+        Returns:
+            SubscriptionModel object else None
+        """
+        sub_model = SubscriptionModel.query.filter(
+            SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+        return sub_model
 
-        Args:
-            subscription_name (str): The subscription name
+    def get_local_sub_admin_state(self):
+        """ Retrieves the subscription admin state
 
         Returns:
-            Subscription object else None
+            str: The admin state of the SubscriptionModel
         """
-        return SubscriptionModel.query.filter(
-            SubscriptionModel.subscription_name == subscription_name).one_or_none()
+        sub_model = SubscriptionModel.query.filter(
+            SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
+        db.session.remove()
+        return sub_model.status
 
     @staticmethod
     def get_all():
         """ Retrieves a list of subscriptions
 
         Returns:
-            list: Subscription list else empty
+            list(SubscriptionModel): Subscriptions list else empty
         """
-        return SubscriptionModel.query.all()
-
-    def update_subscription_status(self):
-        """ Updates the status of subscription in subscription table """
-        SubscriptionModel.query.filter(
-            SubscriptionModel.subscription_name == self.subscriptionName). \
-            update({SubscriptionModel.status: self.administrativeState},
-                   synchronize_session='evaluate')
 
-        db.session.commit()
-
-    def delete_subscription(self):
-        """ Deletes a subscription and all its association from the database. A network function
-        that is only associated with the subscription being removed will also be deleted."""
-        subscription = SubscriptionModel.query.filter(
-            SubscriptionModel.subscription_name == self.subscriptionName).one_or_none()
-        if subscription:
-            for nf_relationship in subscription.nfs:
-                other_nf_relationship = NfSubRelationalModel.query.filter(
-                    NfSubRelationalModel.subscription_name != self.subscriptionName,
-                    NfSubRelationalModel.nf_name == nf_relationship.nf_name).one_or_none()
-                if not other_nf_relationship:
-                    db.session.delete(nf_relationship.nf)
-            db.session.delete(subscription)
-            db.session.commit()
-
-    @retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3),
-           retry=retry_if_exception_type(Exception))
-    def process_subscription(self, nfs, mr_pub, app_conf):
-        action = 'Deactivate'
-        sub_nf_state = SubNfState.PENDING_DELETE.value
-        self.update_subscription_status()
-
-        if self.administrativeState == AdministrativeState.UNLOCKED.value:
-            action = 'Activate'
-            sub_nf_state = SubNfState.PENDING_CREATE.value
+        sub_models = SubscriptionModel.query.all()
+        db.session.remove()
+        return sub_models
 
+    def activate_subscription(self, nfs, mr_pub, app_conf):
+        logger.info(f'Activate subscription initiated for {self.subscriptionName}.')
         try:
-            for nf in nfs:
-                mr_pub.publish_subscription_event_data(self, nf.nf_name, app_conf)
-                logger.debug(f'Publishing Event to {action} '
-                             f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
-                self.add_network_functions_to_subscription(nfs)
-                self.update_sub_nf_status(self.subscriptionName, sub_nf_state, nf.nf_name)
+            existing_nfs = self.get_network_functions()
+            sub_model = self.get()
+            for nf in [new_nf for new_nf in nfs if new_nf not in existing_nfs]:
+                logger.info(f'Publishing event to activate '
+                            f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
+                mr_pub.publish_subscription_event_data(self, nf, app_conf)
+                self.add_network_function_to_subscription(nf, sub_model)
+                self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_CREATE.value,
+                                          nf.nf_name)
         except Exception as err:
             raise Exception(f'Error publishing activation event to MR: {err}')
 
+    def deactivate_subscription(self, mr_pub, app_conf):
+        try:
+            nfs = self.get_network_functions()
+            if nfs:
+                logger.info(f'Deactivate subscription initiated for {self.subscriptionName}.')
+                for nf in nfs:
+                    mr_pub.publish_subscription_event_data(self, nf, app_conf)
+                    logger.debug(f'Publishing Event to deactivate '
+                                 f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
+                    self.update_sub_nf_status(self.subscriptionName,
+                                              SubNfState.PENDING_DELETE.value,
+                                              nf.nf_name)
+        except Exception as err:
+            raise Exception(f'Error publishing deactivation event to MR: {err}')
+
     @staticmethod
     def get_all_nfs_subscription_relations():
         """ Retrieves all network function to subscription relations
 
         Returns:
-            list: NetworkFunctions per Subscription list else empty
+            list(NfSubRelationalModel): NetworkFunctions per Subscription list else empty
         """
         nf_per_subscriptions = NfSubRelationalModel.query.all()
-
+        db.session.remove()
         return nf_per_subscriptions
 
     @staticmethod
@@ -212,9 +232,23 @@ class Subscription:
             nf_name (str): The network function name
             status (str): Status of the subscription
         """
-        NfSubRelationalModel.query.filter(
-            NfSubRelationalModel.subscription_name == subscription_name,
-            NfSubRelationalModel.nf_name == nf_name). \
-            update({NfSubRelationalModel.nf_sub_status: status}, synchronize_session='evaluate')
-
-        db.session.commit()
+        try:
+            NfSubRelationalModel.query.filter(
+                NfSubRelationalModel.subscription_name == subscription_name,
+                NfSubRelationalModel.nf_name == nf_name). \
+                update({NfSubRelationalModel.nf_sub_status: status}, synchronize_session='evaluate')
+            db.session.commit()
+        except Exception as e:
+            logger.error(f'Failed to update status of nf: {nf_name} for subscription: '
+                         f'{subscription_name}: {e}', exc_info=True)
+
+    def get_network_functions(self):
+        nf_sub_relationships = NfSubRelationalModel.query.filter(
+            NfSubRelationalModel.subscription_name == self.subscriptionName)
+        nfs = []
+        for nf_sub_entry in nf_sub_relationships:
+            nf_model_object = NetworkFunctionModel.query.filter(
+                NetworkFunctionModel.nf_name == nf_sub_entry.nf_name).one_or_none()
+            nfs.append(nf_model_object.to_nf())
+        db.session.remove()
+        return nfs