[PMSH] Add retry mech for DELETE_FAILED NFs 19/116319/4
authorefiacor <fiachra.corcoran@est.tech>
Fri, 11 Dec 2020 16:19:47 +0000 (16:19 +0000)
committerefiacor <fiachra.corcoran@est.tech>
Tue, 12 Jan 2021 11:35:09 +0000 (11:35 +0000)
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: I3980f0bb43c67e192828172dafe4e7be102dcc98
Issue-ID: DCAEGEN2-2152

12 files changed:
components/pm-subscription-handler/Changelog.md
components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
components/pm-subscription-handler/pmsh_service/mod/api/db_models.py
components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
components/pm-subscription-handler/pmsh_service/mod/network_function.py
components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py
components/pm-subscription-handler/pmsh_service/mod/subscription.py
components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
components/pm-subscription-handler/tests/test_aai_event_handler.py
components/pm-subscription-handler/tests/test_subscription.py
components/pm-subscription-handler/tests/test_subscription_handler.py

index ea375ba..d0fb4c0 100755 (executable)
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
 ### Changed
 * Bug fix prevent sub threads from crashing permanently (DCAEGEN2-2501)
 * Added Resource Name (model-name) to filter (DCAEGEN2-2402)
+* Added retry mechanism for DELETE_FAILED subscriptions on given NFs (DCAEGEN2-2152)
 
 ## [1.1.2]
 ### Changed
index fd9f625..46d52f1 100755 (executable)
@@ -77,7 +77,7 @@ def process_aai_events(mr_sub, mr_pub, app, app_conf):
 def _process_event(action, nf, mr_pub, app_conf):
     if action == AAIEvent.UPDATE.value:
         logger.info(f'Update event found for network function {nf.nf_name}')
-        app_conf.subscription.activate_subscription([nf], mr_pub, app_conf)
+        app_conf.subscription.create_subscription_on_nfs([nf], mr_pub, app_conf)
     elif action == AAIEvent.DELETE.value:
         logger.info(f'Delete event found for network function {nf.nf_name}')
         NetworkFunction.delete(nf_name=nf.nf_name)
index ea9603e..a9dd6ef 100755 (executable)
@@ -63,6 +63,7 @@ class NetworkFunctionModel(db.Model):
     model_name = Column(String(100))
     sdnc_model_name = Column(String(100))
     sdnc_model_version = Column(String(100))
+    retry_count = Column(Integer)
 
     subscriptions = relationship(
         'NfSubRelationalModel',
@@ -71,7 +72,7 @@ class NetworkFunctionModel(db.Model):
 
     def __init__(self, nf_name, ip_address, model_invariant_id,
                  model_version_id, model_name, sdnc_model_name,
-                 sdnc_model_version):
+                 sdnc_model_version, retry_count=0):
         self.nf_name = nf_name
         self.ip_address = ip_address
         self.model_invariant_id = model_invariant_id
@@ -79,6 +80,7 @@ class NetworkFunctionModel(db.Model):
         self.model_name = model_name
         self.sdnc_model_name = sdnc_model_name
         self.sdnc_model_version = sdnc_model_version
+        self.retry_count = retry_count
 
     def __repr__(self):
         return str(self.to_nf())
index fbb8b24..aed8630 100755 (executable)
@@ -40,13 +40,15 @@ class ExitHandler:
         logger.info('Graceful shutdown of PMSH initiated.')
         logger.debug(f'ExitHandler was called with signal number: {sig_num}.')
         for thread in self.periodic_tasks:
-            if thread.name == 'app_conf_thread':
+            if thread.name == 'aai_event_thread':
                 logger.info(f'Cancelling thread {thread.name}')
                 thread.cancel()
         current_sub = self.app_conf.subscription
         if current_sub.administrativeState == AdministrativeState.UNLOCKED.value:
             try:
-                current_sub.deactivate_subscription(self.subscription_handler.mr_pub, self.app_conf)
+                nfs = self.app_conf.subscription.get_network_functions()
+                current_sub.delete_subscription_from_nfs(nfs, self.subscription_handler.mr_pub,
+                                                         self.app_conf)
             except Exception as e:
                 logger.error(f'Failed to shut down PMSH application: {e}', exc_info=True)
         for thread in self.periodic_tasks:
index a6d2164..0265635 100755 (executable)
@@ -105,6 +105,19 @@ class NetworkFunction:
                          exc_info=True)
         return not params_set
 
+    def increment_retry_count(self):
+        try:
+            NetworkFunctionModel.query.filter(
+                NetworkFunctionModel.nf_name == self.nf_name)\
+                .update({'retry_count': NetworkFunctionModel.retry_count + 1},
+                        synchronize_session='evaluate')
+            db.session.commit()
+        except Exception as e:
+            logger.error(f'Failed to update retry_count of NetworkFunction: {self.nf_name}: {e}',
+                         exc_info=True)
+        finally:
+            db.session.remove()
+
     @staticmethod
     def get(nf_name):
         """ Retrieves a network function
index 73a5e7e..09c9704 100644 (file)
@@ -30,6 +30,10 @@ policy_response_handle_functions = {
     AdministrativeState.UNLOCKED.value: {
         'success': Subscription.update_sub_nf_status,
         'failed': Subscription.update_sub_nf_status
+    },
+    AdministrativeState.LOCKING.value: {
+        'success': NetworkFunction.delete,
+        'failed': Subscription.update_sub_nf_status
     }
 }
 
index 34753e8..8443c9d 100755 (executable)
@@ -1,5 +1,5 @@
 # ============LICENSE_START===================================================
-#  Copyright (C) 2019-2020 Nordix Foundation.
+#  Copyright (C) 2019-2021 Nordix Foundation.
 # ============================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -27,26 +27,40 @@ class SubNfState(Enum):
     CREATED = 'CREATED'
     PENDING_DELETE = 'PENDING_DELETE'
     DELETE_FAILED = 'DELETE_FAILED'
+    DELETED = 'DELETED'
 
 
 class AdministrativeState(Enum):
     UNLOCKED = 'UNLOCKED'
+    LOCKING = 'LOCKING'
     LOCKED = 'LOCKED'
-    PENDING = 'PENDING'
 
 
 subscription_nf_states = {
     AdministrativeState.LOCKED.value: {
-        'success': SubNfState.CREATED,
+        'success': SubNfState.DELETED,
         'failed': SubNfState.DELETE_FAILED
     },
     AdministrativeState.UNLOCKED.value: {
         'success': SubNfState.CREATED,
         'failed': SubNfState.CREATE_FAILED
+    },
+    AdministrativeState.LOCKING.value: {
+        'success': SubNfState.DELETED,
+        'failed': SubNfState.DELETE_FAILED
     }
 }
 
 
+def _get_nf_objects(nf_sub_relationships):
+    nfs = []
+    for nf_sub_entry in nf_sub_relationships:
+        nf_model_object = NetworkFunctionModel.query.filter(
+            NetworkFunctionModel.nf_name == nf_sub_entry.nf_name).one_or_none()
+        nfs.append(nf_model_object.to_nf())
+    return nfs
+
+
 class Subscription:
     def __init__(self, **kwargs):
         self.subscriptionName = kwargs.get('subscriptionName')
@@ -68,7 +82,7 @@ class Subscription:
                 SubscriptionModel.subscription_name == self.subscriptionName).one_or_none())
             if existing_subscription is None:
                 new_subscription = SubscriptionModel(subscription_name=self.subscriptionName,
-                                                     status=AdministrativeState.PENDING.value)
+                                                     status=AdministrativeState.LOCKED.value)
                 db.session.add(new_subscription)
                 db.session.commit()
                 return new_subscription
@@ -109,14 +123,17 @@ class Subscription:
         """
         try:
             clean_sub = {k: v for k, v in self.__dict__.items() if k != 'nfFilter'}
+            if self.administrativeState == AdministrativeState.LOCKING.value:
+                change_type = 'DELETE'
+            else:
+                change_type = 'CREATE'
             sub_event = {'nfName': nf.nf_name,
                          'ipv4Address': nf.ip_address,
                          'blueprintName': nf.sdnc_model_name,
                          'blueprintVersion': nf.sdnc_model_version,
                          'policyName': app_conf.operational_policy_name,
-                         'changeType': 'DELETE'
-                         if self.administrativeState == AdministrativeState.LOCKED.value
-                         else 'CREATE', 'closedLoopControlName': app_conf.control_loop_name,
+                         'changeType': change_type,
+                         'closedLoopControlName': app_conf.control_loop_name,
                          'subscription': clean_sub}
             return sub_event
         except Exception as e:
@@ -182,35 +199,45 @@ class Subscription:
         db.session.remove()
         return sub_models
 
-    def activate_subscription(self, nfs, mr_pub, app_conf):
-        logger.info(f'Activate subscription initiated for {self.subscriptionName}.')
+    def create_subscription_on_nfs(self, nfs, mr_pub, app_conf):
+        """ Publishes an event to create a Subscription on an nf
+
+        Args:
+            nfs(list[NetworkFunction]): A list of NetworkFunction Objects.
+            mr_pub (_MrPub): MR publisher
+            app_conf (AppConfig): the application configuration.
+        """
         try:
             existing_nfs = self.get_network_functions()
             sub_model = self.get()
             for nf in [new_nf for new_nf in nfs if new_nf not in existing_nfs]:
-                logger.info(f'Publishing event to activate '
-                            f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
+                logger.info(f'Publishing event to create '
+                            f'Sub: {self.subscriptionName} on nf: {nf.nf_name}')
                 mr_pub.publish_subscription_event_data(self, nf, app_conf)
                 self.add_network_function_to_subscription(nf, sub_model)
                 self.update_sub_nf_status(self.subscriptionName, SubNfState.PENDING_CREATE.value,
                                           nf.nf_name)
         except Exception as err:
-            raise Exception(f'Error publishing activation event to MR: {err}')
+            raise Exception(f'Error publishing create event to MR: {err}')
+
+    def delete_subscription_from_nfs(self, nfs, mr_pub, app_conf):
+        """ Publishes an event to delete a Subscription from an nf
 
-    def deactivate_subscription(self, mr_pub, app_conf):
+        Args:
+            nfs(list[NetworkFunction]): A list of NetworkFunction Objects.
+            mr_pub (_MrPub): MR publisher
+            app_conf (AppConfig): the application configuration.
+        """
         try:
-            nfs = self.get_network_functions()
-            if nfs:
-                logger.info(f'Deactivate subscription initiated for {self.subscriptionName}.')
-                for nf in nfs:
-                    mr_pub.publish_subscription_event_data(self, nf, app_conf)
-                    logger.debug(f'Publishing Event to deactivate '
-                                 f'Sub: {self.subscriptionName} for the nf: {nf.nf_name}')
-                    self.update_sub_nf_status(self.subscriptionName,
-                                              SubNfState.PENDING_DELETE.value,
-                                              nf.nf_name)
+            for nf in nfs:
+                logger.debug(f'Publishing Event to delete '
+                             f'Sub: {self.subscriptionName} from the nf: {nf.nf_name}')
+                mr_pub.publish_subscription_event_data(self, nf, app_conf)
+                self.update_sub_nf_status(self.subscriptionName,
+                                          SubNfState.PENDING_DELETE.value,
+                                          nf.nf_name)
         except Exception as err:
-            raise Exception(f'Error publishing deactivation event to MR: {err}')
+            raise Exception(f'Error publishing delete event to MR: {err}')
 
     @staticmethod
     def get_all_nfs_subscription_relations():
@@ -245,10 +272,22 @@ class Subscription:
     def get_network_functions(self):
         nf_sub_relationships = NfSubRelationalModel.query.filter(
             NfSubRelationalModel.subscription_name == self.subscriptionName)
-        nfs = []
-        for nf_sub_entry in nf_sub_relationships:
-            nf_model_object = NetworkFunctionModel.query.filter(
-                NetworkFunctionModel.nf_name == nf_sub_entry.nf_name).one_or_none()
-            nfs.append(nf_model_object.to_nf())
+        nfs = _get_nf_objects(nf_sub_relationships)
+        db.session.remove()
+        return nfs
+
+    def get_delete_failed_nfs(self):
+        nf_sub_relationships = NfSubRelationalModel.query.filter(
+            NfSubRelationalModel.subscription_name == self.subscriptionName,
+            NfSubRelationalModel.nf_sub_status == SubNfState.DELETE_FAILED.value)
+        nfs = _get_nf_objects(nf_sub_relationships)
+        db.session.remove()
+        return nfs
+
+    def get_delete_pending_nfs(self):
+        nf_sub_relationships = NfSubRelationalModel.query.filter(
+            NfSubRelationalModel.subscription_name == self.subscriptionName,
+            NfSubRelationalModel.nf_sub_status == SubNfState.PENDING_DELETE.value)
+        nfs = _get_nf_objects(nf_sub_relationships)
         db.session.remove()
         return nfs
index 6de702f..a273a44 100644 (file)
@@ -22,12 +22,11 @@ from mod.subscription import AdministrativeState
 
 
 class SubscriptionHandler:
-    def __init__(self, mr_pub, app, app_conf, aai_event_thread, policy_event_thread):
+    def __init__(self, mr_pub, app, app_conf, aai_event_thread):
         self.mr_pub = mr_pub
         self.app = app
         self.app_conf = app_conf
         self.aai_event_thread = aai_event_thread
-        self.policy_event_thread = policy_event_thread
 
     def execute(self):
         """
@@ -35,37 +34,73 @@ class SubscriptionHandler:
         the Subscription if a change has occurred
         """
         self.app.app_context().push()
+        self.app_conf.refresh_config()
         local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
         new_administrative_state = self.app_conf.subscription.administrativeState
         try:
             if local_admin_state == new_administrative_state:
-                logger.info('Administrative State did not change in the Config')
+                logger.info(f'Administrative State did not change in the app config: '
+                            f'{new_administrative_state}')
             else:
-                if new_administrative_state == AdministrativeState.UNLOCKED.value:
-                    self._activate(local_admin_state, new_administrative_state)
-                elif local_admin_state == AdministrativeState.PENDING.value:
-                    logger.info('Administrative State is PENDING')
-                else:
-                    self._deactivate(local_admin_state, new_administrative_state)
+                self._check_state_change(local_admin_state, new_administrative_state)
         except Exception as err:
             logger.error(f'Error occurred during the activation/deactivation process {err}',
                          exc_info=True)
 
-    def _activate(self, local_admin_state, new_administrative_state):
-        logger.info(f'Administrative State has changed from {local_admin_state} '
-                    f'to {new_administrative_state}.')
+    def _check_state_change(self, local_admin_state, new_administrative_state):
+        if local_admin_state == AdministrativeState.LOCKING.value:
+            self._check_for_failed_nfs()
+        else:
+            if new_administrative_state == AdministrativeState.UNLOCKED.value:
+                logger.info(f'Administrative State has changed from {local_admin_state} '
+                            f'to {new_administrative_state}.')
+                self._activate()
+            elif new_administrative_state == AdministrativeState.LOCKED.value:
+                logger.info(f'Administrative State has changed from {local_admin_state} '
+                            f'to {new_administrative_state}.')
+                self._deactivate()
+            else:
+                logger.error(f'Invalid AdministrativeState: {new_administrative_state}')
+
+    def _activate(self):
         nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf)
-        self.app_conf.subscription.activate_subscription(nfs_in_aai, self.mr_pub,
-                                                         self.app_conf)
+        self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub,
+                                                              self.app_conf)
         self.app_conf.subscription.update_subscription_status()
-        logger.info('Start listening for new NFs on AAI-EVENT topic in MR.')
-        self.aai_event_thread.start()
-        self.policy_event_thread.start()
+        if not self.aai_event_thread.is_alive():
+            logger.info('Start polling for NF info on AAI-EVENT topic on DMaaP MR.')
+            self.aai_event_thread.start()
 
-    def _deactivate(self, local_admin_state, new_administrative_state):
-        logger.info(f'Administrative State has changed from {local_admin_state} '
-                    f'to {new_administrative_state}.')
-        self.aai_event_thread.cancel()
-        logger.info('Stop listening for NFs on AAI-EVENT topic in MR.')
-        self.app_conf.subscription.deactivate_subscription(self.mr_pub, self.app_conf)
-        self.app_conf.subscription.update_subscription_status()
+    def _deactivate(self):
+        nfs = self.app_conf.subscription.get_network_functions()
+        if nfs:
+            self.aai_event_thread.cancel()
+            logger.info('Stop listening for NFs events on AAI-EVENT topic in MR.')
+            self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value
+            logger.info('Subscription is now LOCKING/DEACTIVATING.')
+            self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub, self.app_conf)
+            self.app_conf.subscription.update_subscription_status()
+
+    def _check_for_failed_nfs(self):
+        logger.info('Checking for DELETE_FAILED NFs before LOCKING Subscription.')
+        del_failed_nfs = self.app_conf.subscription.get_delete_failed_nfs()
+        if del_failed_nfs or self.app_conf.subscription.get_delete_pending_nfs():
+            for nf in del_failed_nfs:
+                nf_model = nf.get(nf.nf_name)
+                if nf_model.retry_count < 3:
+                    logger.info(f'Retry deletion of subscription '
+                                f'{self.app_conf.subscription.subscriptionName} '
+                                f'from NF: {nf.nf_name}')
+                    self.app_conf.subscription.delete_subscription_from_nfs([nf], self.mr_pub,
+                                                                            self.app_conf)
+                    nf.increment_retry_count()
+                else:
+                    logger.error(f'Failed to delete the subscription '
+                                 f'{self.app_conf.subscription.subscriptionName} '
+                                 f'from NF: {nf.nf_name} after {nf_model.retry_count} '
+                                 f'attempts. Removing NF from DB')
+                    nf.delete(nf_name=nf.nf_name)
+        else:
+            logger.info('Proceeding to LOCKED adminState.')
+            self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
+            self.app_conf.subscription.update_subscription_status()
index f92fdc9..307235d 100755 (executable)
@@ -40,26 +40,25 @@ def main():
             logger.error(f'Failed to get config and create application: {e}', exc_info=True)
             sys.exit(e)
 
-        app_conf_thread = PeriodicTask(10, app_conf.refresh_config)
-        app_conf_thread.name = 'app_conf_thread'
-        app_conf_thread.start()
-
         policy_response_handler = PolicyResponseHandler(policy_mr_sub, app_conf, app)
         policy_response_handler_thread = PeriodicTask(25, policy_response_handler.poll_policy_topic)
         policy_response_handler_thread.name = 'policy_event_thread'
+        logger.info('Start polling PMSH_CL_INPUT topic on DMaaP MR.')
+        policy_response_handler_thread.start()
 
         aai_event_thread = PeriodicTask(20, process_aai_events,
                                         args=(aai_event_mr_sub, policy_mr_pub, app, app_conf))
         aai_event_thread.name = 'aai_event_thread'
+        logger.info('Start polling for NF info on AAI-EVENT topic on DMaaP MR.')
+        aai_event_thread.start()
 
-        subscription_handler = SubscriptionHandler(policy_mr_pub, app, app_conf, aai_event_thread,
-                                                   policy_response_handler_thread)
+        subscription_handler = SubscriptionHandler(policy_mr_pub, app, app_conf, aai_event_thread)
 
-        subscription_handler_thread = PeriodicTask(30, subscription_handler.execute)
+        subscription_handler_thread = PeriodicTask(20, subscription_handler.execute)
         subscription_handler_thread.name = 'sub_handler_thread'
         subscription_handler_thread.start()
 
-        periodic_tasks = [app_conf_thread, aai_event_thread, subscription_handler_thread,
+        periodic_tasks = [aai_event_thread, subscription_handler_thread,
                           policy_response_handler_thread]
 
         signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks,
index 47a36f5..0ae1942 100755 (executable)
@@ -45,7 +45,7 @@ class AAIEventHandlerTest(BaseClassSetup):
         super().tearDownClass()
 
     @patch('mod.network_function.NetworkFunction.set_nf_model_params')
-    @patch('mod.subscription.Subscription.activate_subscription')
+    @patch('mod.subscription.Subscription.create_subscription_on_nfs')
     @patch('mod.aai_event_handler.NetworkFunction.delete')
     def test_process_aai_update_and_delete_events(self, mock_nf_delete, mock_activate_sub,
                                                   mock_set_sdnc_params):
index 62a9e16..b18f41e 100755 (executable)
@@ -113,8 +113,8 @@ class SubscriptionTest(BaseClassSetup):
     @patch('mod.subscription.Subscription.add_network_function_to_subscription')
     @patch('mod.subscription.Subscription.update_sub_nf_status')
     def test_process_activate_subscription(self, mock_update_sub_nf, mock_add_nfs):
-        self.app_conf.subscription.activate_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
-                                                         self.app_conf)
+        self.app_conf.subscription.create_subscription_on_nfs([list(self.xnfs)[0]],
+                                                              self.mock_mr_pub, self.app_conf)
 
         mock_add_nfs.assert_called()
         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
@@ -126,13 +126,13 @@ class SubscriptionTest(BaseClassSetup):
     def test_process_deactivate_subscription(self, mock_update_sub_nf, mock_get_nfs):
         self.app_conf.subscription.administrativeState = 'LOCKED'
         mock_get_nfs.return_value = [list(self.xnfs)[0]]
-        self.app_conf.subscription.deactivate_subscription(self.mock_mr_pub, self.app_conf)
+        self.app_conf.subscription.delete_subscription_from_nfs(self.xnfs, self.mock_mr_pub,
+                                                                self.app_conf)
         self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
-        mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
-                                              'PENDING_DELETE', list(self.xnfs)[0].nf_name)
+        self.assertEqual(mock_update_sub_nf.call_count, 3)
 
     def test_activate_subscription_exception(self):
-        self.assertRaises(Exception, self.app_conf.subscription.activate_subscription,
+        self.assertRaises(Exception, self.app_conf.subscription.create_subscription_on_nfs,
                           [list(self.xnfs)[0]], 'not_mr_pub', 'app_config')
 
     def test_prepare_subscription_event(self):
index bf72382..f77dfd1 100644 (file)
 #
 # SPDX-License-Identifier: Apache-2.0
 # ============LICENSE_END=====================================================
-from unittest.mock import patch, Mock
+from unittest.mock import patch, Mock, MagicMock
 
+from mod.api.db_models import NetworkFunctionModel
 from mod.network_function import NetworkFunction
 from mod.subscription import AdministrativeState
 from mod.subscription_handler import SubscriptionHandler
-from tests.base_setup import BaseClassSetup
+from tests.base_setup import BaseClassSetup, get_pmsh_config
 
 
 class SubscriptionHandlerTest(BaseClassSetup):
+    nfs = [
+        NetworkFunction(nf_name='pnf_1', model_invariant_id='some-id', model_version_id='some-id'),
+        NetworkFunction(nf_name='pnf_2', model_invariant_id='some-id', model_version_id='some-id')]
 
     @classmethod
     def setUpClass(cls):
@@ -32,12 +36,6 @@ class SubscriptionHandlerTest(BaseClassSetup):
     @patch('mod.pmsh_utils._MrPub')
     def setUp(self, mock_mr_pub):
         super().setUp()
-        self.nfs = [NetworkFunction(nf_name='pnf_1',
-                                    model_invariant_id='some-id',
-                                    model_version_id='some-id'),
-                    NetworkFunction(nf_name='pnf_2',
-                                    model_invariant_id='some-id',
-                                    model_version_id='some-id')]
         self.mock_mr_pub = mock_mr_pub
         self.mock_aai_event_thread = Mock()
         self.mock_policy_event_thread = Mock()
@@ -49,6 +47,7 @@ class SubscriptionHandlerTest(BaseClassSetup):
     def tearDownClass(cls):
         super().tearDownClass()
 
+    @patch('mod.pmsh_utils.AppConfig.refresh_config', MagicMock(return_value=get_pmsh_config()))
     @patch('mod.subscription.Subscription.get_local_sub_admin_state')
     @patch('mod.logger.info')
     @patch('mod.aai_client.get_pmsh_nfs_from_aai')
@@ -57,13 +56,14 @@ class SubscriptionHandlerTest(BaseClassSetup):
         mock_get_aai.return_value = self.nfs
         sub_handler = SubscriptionHandler(self.mock_mr_pub,
                                           self.app, self.app_conf,
-                                          self.mock_aai_event_thread,
-                                          self.mock_policy_event_thread)
+                                          self.mock_aai_event_thread)
         sub_handler.execute()
-        mock_logger.assert_called_with('Administrative State did not change in the Config')
+        mock_logger.assert_called_with('Administrative State did not change '
+                                       'in the app config: UNLOCKED')
 
+    @patch('mod.pmsh_utils.AppConfig.refresh_config', MagicMock(return_value=get_pmsh_config()))
     @patch('mod.subscription.Subscription.get_local_sub_admin_state')
-    @patch('mod.subscription.Subscription.activate_subscription')
+    @patch('mod.subscription.Subscription.create_subscription_on_nfs')
     @patch('mod.aai_client.get_pmsh_nfs_from_aai')
     def test_execute_change_of_state_to_unlocked(self, mock_get_aai, mock_activate_sub,
                                                  mock_get_sub_status):
@@ -72,33 +72,30 @@ class SubscriptionHandlerTest(BaseClassSetup):
         self.mock_aai_event_thread.return_value.start.return_value = 'start_method'
         sub_handler = SubscriptionHandler(self.mock_mr_pub,
                                           self.app, self.app_conf,
-                                          self.mock_aai_event_thread.return_value,
-                                          self.mock_policy_event_thread)
+                                          self.mock_aai_event_thread.return_value)
         sub_handler.execute()
         self.assertEqual(AdministrativeState.UNLOCKED.value,
                          self.app_conf.subscription.administrativeState)
         mock_activate_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
-        self.mock_aai_event_thread.return_value.start.assert_called()
 
+    @patch('mod.subscription.Subscription.get_network_functions', MagicMock(return_value=nfs))
+    @patch('mod.pmsh_utils.AppConfig.refresh_config', MagicMock(return_value=get_pmsh_config()))
     @patch('mod.subscription.Subscription.get_local_sub_admin_state')
-    @patch('mod.subscription.Subscription.deactivate_subscription')
-    @patch('mod.aai_client.get_pmsh_nfs_from_aai')
-    def test_execute_change_of_state_to_locked(self, mock_get_aai, mock_deactivate_sub,
-                                               mock_get_sub_status):
+    @patch('mod.subscription.Subscription.delete_subscription_from_nfs')
+    def test_execute_change_of_state_to_locked(self, mock_deactivate_sub, mock_get_sub_status):
         mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value
         self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
         self.app_conf.subscription.update_subscription_status()
-        mock_get_aai.return_value = self.nfs
         self.mock_aai_event_thread.return_value.cancel.return_value = 'cancel_method'
         sub_handler = SubscriptionHandler(self.mock_mr_pub,
                                           self.app, self.app_conf,
-                                          self.mock_aai_event_thread.return_value,
-                                          self.mock_policy_event_thread)
+                                          self.mock_aai_event_thread.return_value)
         sub_handler.execute()
-        mock_deactivate_sub.assert_called_with(self.mock_mr_pub, self.app_conf)
+        mock_deactivate_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
         self.mock_aai_event_thread.return_value.cancel.assert_called()
 
-    @patch('mod.subscription.Subscription.activate_subscription')
+    @patch('mod.pmsh_utils.AppConfig.refresh_config', MagicMock(return_value=get_pmsh_config()))
+    @patch('mod.subscription.Subscription.create_subscription_on_nfs')
     @patch('mod.logger.error')
     @patch('mod.aai_client.get_pmsh_nfs_from_aai')
     def test_execute_exception(self, mock_get_aai, mock_logger, mock_activate_sub):
@@ -106,8 +103,65 @@ class SubscriptionHandlerTest(BaseClassSetup):
         mock_activate_sub.side_effect = Exception
         sub_handler = SubscriptionHandler(self.mock_mr_pub,
                                           self.app, self.app_conf,
-                                          self.mock_aai_event_thread,
-                                          self.mock_policy_event_thread)
+                                          self.mock_aai_event_thread)
         sub_handler.execute()
         mock_logger.assert_called_with('Error occurred during the activation/deactivation process ',
                                        exc_info=True)
+
+    @patch('mod.network_function.NetworkFunction.get',
+           MagicMock(return_value=NetworkFunctionModel(nf_name='pnf_1',
+                                                       model_invariant_id='some-id',
+                                                       model_version_id='some-id',
+                                                       ip_address='ip_address',
+                                                       model_name='model_name',
+                                                       sdnc_model_name='sdnc_model_name',
+                                                       sdnc_model_version='sdnc_model_version')))
+    @patch('mod.subscription.Subscription.get_delete_failed_nfs', MagicMock(return_value=nfs))
+    @patch('mod.subscription.Subscription.get_network_functions', MagicMock(return_value=nfs))
+    @patch('mod.pmsh_utils.AppConfig.refresh_config', MagicMock(return_value=get_pmsh_config()))
+    @patch('mod.subscription.Subscription.get_local_sub_admin_state')
+    @patch('mod.subscription.Subscription.delete_subscription_from_nfs')
+    @patch('mod.network_function.NetworkFunction.increment_retry_count')
+    def test_execute_change_of_state_to_locking_retry_delete(self, mock_retry_inc, mock_delete_sub,
+                                                             mock_get_sub_status):
+        mock_get_sub_status.return_value = AdministrativeState.LOCKING.value
+        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.app, self.app_conf,
+                                          self.mock_aai_event_thread)
+        sub_handler.execute()
+        self.assertEqual(mock_delete_sub.call_count, 2)
+        self.assertEqual(mock_retry_inc.call_count, 2)
+
+    @patch('mod.subscription.Subscription.get_delete_failed_nfs', MagicMock(return_value=[]))
+    @patch('mod.subscription.Subscription.get_network_functions', MagicMock(return_value=nfs))
+    @patch('mod.pmsh_utils.AppConfig.refresh_config', MagicMock(return_value=get_pmsh_config()))
+    @patch('mod.subscription.Subscription.get_local_sub_admin_state')
+    @patch('mod.subscription.Subscription.update_subscription_status')
+    def test_execute_change_of_state_to_locking_success(self, mock_update_sub,
+                                                        mock_get_sub_status):
+        mock_get_sub_status.return_value = AdministrativeState.LOCKING.value
+        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.app, self.app_conf,
+                                          self.mock_aai_event_thread)
+        sub_handler.execute()
+        mock_update_sub.assert_called_once()
+
+    @patch('mod.network_function.NetworkFunction.get',
+           MagicMock(return_value=NetworkFunctionModel(nf_name='pnf_1',
+                                                       model_invariant_id='some-id',
+                                                       model_version_id='some-id',
+                                                       ip_address='ip_address',
+                                                       model_name='model_name',
+                                                       sdnc_model_name='sdnc_model_name',
+                                                       sdnc_model_version='sdnc_model_version',
+                                                       retry_count=3)))
+    @patch('mod.subscription.Subscription.get_delete_failed_nfs', MagicMock(return_value=[nfs[0]]))
+    @patch('mod.subscription.Subscription.get_network_functions', MagicMock(return_value=nfs[0]))
+    @patch('mod.pmsh_utils.AppConfig.refresh_config', MagicMock(return_value=get_pmsh_config()))
+    @patch('mod.subscription.Subscription.get_local_sub_admin_state')
+    @patch('mod.network_function.NetworkFunction.delete')
+    def test_execute_change_of_state_to_locking_retry_failed(self, mock_nf_del,
+                                                             mock_get_sub_status):
+        mock_get_sub_status.return_value = AdministrativeState.LOCKING.value
+        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.app, self.app_conf,
+                                          self.mock_aai_event_thread)
+        sub_handler.execute()
+        mock_nf_del.assert_called_once()