[PMSH] Update sub object on activate 63/116863/4
authorefiacor <fiachra.corcoran@est.tech>
Wed, 13 Jan 2021 16:12:57 +0000 (16:12 +0000)
committerefiacor <fiachra.corcoran@est.tech>
Tue, 26 Jan 2021 16:47:26 +0000 (16:47 +0000)
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: Id9418301e0cb4d373339b9b3e3476f7db5770f3e
Issue-ID: DCAEGEN2-2152

components/pm-subscription-handler/Changelog.md
components/pm-subscription-handler/pmsh_service/mod/exit_handler.py
components/pm-subscription-handler/pmsh_service/mod/network_function.py
components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
components/pm-subscription-handler/pmsh_service/mod/subscription.py
components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
components/pm-subscription-handler/tests/test_pmsh_utils.py
components/pm-subscription-handler/tests/test_subscription_handler.py

index d0fb4c0..8988508 100755 (executable)
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
 * Bug fix prevent sub threads from crashing permanently (DCAEGEN2-2501)
 * Added Resource Name (model-name) to filter (DCAEGEN2-2402)
 * Added retry mechanism for DELETE_FAILED subscriptions on given NFs (DCAEGEN2-2152)
+* Added func to update the subscription object on ACTIVATE/UNLOCK (DCAEGEN2-2152)
 
 ## [1.1.2]
 ### Changed
index aed8630..1ea83e5 100755 (executable)
@@ -1,5 +1,5 @@
 # ============LICENSE_START===================================================
-#  Copyright (C) 2020 Nordix Foundation.
+#  Copyright (C) 2020-2021 Nordix Foundation.
 # ============================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -39,10 +39,7 @@ class ExitHandler:
     def __call__(self, sig_num, frame):
         logger.info('Graceful shutdown of PMSH initiated.')
         logger.debug(f'ExitHandler was called with signal number: {sig_num}.')
-        for thread in self.periodic_tasks:
-            if thread.name == 'aai_event_thread':
-                logger.info(f'Cancelling thread {thread.name}')
-                thread.cancel()
+        self.subscription_handler.stop_aai_event_thread()
         current_sub = self.app_conf.subscription
         if current_sub.administrativeState == AdministrativeState.UNLOCKED.value:
             try:
index 0265635..83130a8 100755 (executable)
@@ -1,5 +1,5 @@
 # ============LICENSE_START===================================================
-#  Copyright (C) 2020 Nordix Foundation.
+#  Copyright (C) 2020-2021 Nordix Foundation.
 # ============================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,7 +18,6 @@
 
 import re
 
-import mod.aai_client
 from mod import logger, db
 from mod.api.db_models import NetworkFunctionModel
 
@@ -89,8 +88,9 @@ class NetworkFunction:
     def set_nf_model_params(self, app_conf):
         params_set = True
         try:
-            sdnc_model_data = mod.aai_client.get_aai_model_data(app_conf, self.model_invariant_id,
-                                                                self.model_version_id, self.nf_name)
+            from mod.aai_client import get_aai_model_data
+            sdnc_model_data = get_aai_model_data(app_conf, self.model_invariant_id,
+                                                 self.model_version_id, self.nf_name)
 
             try:
                 self.sdnc_model_name = sdnc_model_data['sdnc-model-name']
index c6be38d..1883413 100755 (executable)
@@ -1,5 +1,5 @@
 # ============LICENSE_START===================================================
-#  Copyright (C) 2019-2020 Nordix Foundation.
+#  Copyright (C) 2019-2021 Nordix Foundation.
 # ============================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -62,10 +62,7 @@ class AppConfig:
     INSTANCE = None
 
     def __init__(self):
-        try:
-            conf = self._get_pmsh_config()
-        except Exception:
-            raise
+        conf = self._get_pmsh_config()
         self.aaf_creds = {'aaf_id': conf['config'].get('aaf_identity'),
                           'aaf_pass': conf['config'].get('aaf_password')}
         self.enable_tls = conf['config'].get('enable_tls')
@@ -85,7 +82,8 @@ class AppConfig:
         return AppConfig.INSTANCE
 
     @mdc_handler
-    @retry(wait=wait_fixed(5), stop=stop_after_attempt(5), retry=retry_if_exception_type(Exception))
+    @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
+           retry=retry_if_exception_type(ValueError))
     def _get_pmsh_config(self, **kwargs):
         """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
         is received, it retries after 2 seconds for 5 times before raising an exception.
@@ -97,13 +95,13 @@ class AppConfig:
             Exception: If any error occurred pulling configuration from Config binding service.
         """
         try:
-            logger.info('Fetching PMSH Configuration from CBS.')
+            logger.info('Attempting to fetch PMSH Configuration from CBS.')
             config = get_all()
             logger.info(f'Successfully fetched PMSH config from CBS: {config}')
             return config
-        except Exception as err:
-            logger.error(f'Failed to get config from CBS: {err}', exc_info=True)
-            raise Exception
+        except Exception as e:
+            logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
+            raise ValueError(e)
 
     def refresh_config(self):
         """
@@ -114,11 +112,11 @@ class AppConfig:
         """
         try:
             app_conf = self._get_pmsh_config()
-            self.subscription.administrativeState = app_conf['policy']['subscription'][
-                'administrativeState']
+            self.subscription = Subscription(**app_conf['policy']['subscription'])
             logger.info("AppConfig data has been refreshed")
-        except ValueError or Exception as e:
-            logger.error(f'Failed to refresh AppConfig: {e}', exc_info=True)
+        except Exception:
+            logger.error('Failed to refresh PMSH AppConfig')
+            raise
 
     def get_mr_sub(self, sub_name):
         """
@@ -235,6 +233,7 @@ class _MrPub(_DmaapMrClient):
             self.publish_to_topic(subscription_event)
         except Exception as e:
             logger.error(f'Failed to publish to topic {self.topic_url}: {e}', exc_info=True)
+            raise e
 
 
 class _MrSub(_DmaapMrClient):
index 8443c9d..fdc1394 100755 (executable)
@@ -71,6 +71,12 @@ class Subscription:
         self.measurementGroups = kwargs.get('measurementGroups')
         self.create()
 
+    def update_sub_params(self, admin_state, file_based_gp, file_location, meas_groups):
+        self.administrativeState = admin_state
+        self.fileBasedGP = file_based_gp
+        self.fileLocation = file_location
+        self.measurementGroups = meas_groups
+
     def create(self):
         """ Creates a subscription database entry
 
index a273a44..f50f5ab 100644 (file)
@@ -1,5 +1,5 @@
 # ============LICENSE_START===================================================
-#  Copyright (C) 2020 Nordix Foundation.
+#  Copyright (C) 2020-2021 Nordix Foundation.
 # ============================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # SPDX-License-Identifier: Apache-2.0
 # ============LICENSE_END=====================================================
 
-import mod.aai_client as aai
-from mod import logger
+from mod import logger, aai_client
+from mod.aai_event_handler import process_aai_events
+from mod.pmsh_utils import PeriodicTask
 from mod.subscription import AdministrativeState
 
 
 class SubscriptionHandler:
-    def __init__(self, mr_pub, app, app_conf, aai_event_thread):
+    def __init__(self, mr_pub, aai_sub, app, app_conf):
         self.mr_pub = mr_pub
+        self.aai_sub = aai_sub
         self.app = app
         self.app_conf = app_conf
-        self.aai_event_thread = aai_event_thread
+        self.aai_event_thread = None
 
     def execute(self):
         """
@@ -34,53 +36,69 @@ class SubscriptionHandler:
         the Subscription if a change has occurred
         """
         self.app.app_context().push()
-        self.app_conf.refresh_config()
-        local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
-        new_administrative_state = self.app_conf.subscription.administrativeState
         try:
-            if local_admin_state == new_administrative_state:
-                logger.info(f'Administrative State did not change in the app config: '
-                            f'{new_administrative_state}')
+            local_admin_state = self.app_conf.subscription.get_local_sub_admin_state()
+            if local_admin_state == AdministrativeState.LOCKING.value:
+                self._check_for_failed_nfs()
             else:
-                self._check_state_change(local_admin_state, new_administrative_state)
+                self.app_conf.refresh_config()
+                new_administrative_state = self.app_conf.subscription.administrativeState
+                if local_admin_state == new_administrative_state:
+                    logger.info(f'Administrative State did not change in the app config: '
+                                f'{new_administrative_state}')
+                else:
+                    self._check_state_change(local_admin_state, new_administrative_state)
         except Exception as err:
             logger.error(f'Error occurred during the activation/deactivation process {err}',
                          exc_info=True)
 
     def _check_state_change(self, local_admin_state, new_administrative_state):
-        if local_admin_state == AdministrativeState.LOCKING.value:
-            self._check_for_failed_nfs()
+        if new_administrative_state == AdministrativeState.UNLOCKED.value:
+            logger.info(f'Administrative State has changed from {local_admin_state} '
+                        f'to {new_administrative_state}.')
+            self._activate(new_administrative_state)
+        elif new_administrative_state == AdministrativeState.LOCKED.value:
+            logger.info(f'Administrative State has changed from {local_admin_state} '
+                        f'to {new_administrative_state}.')
+            self._deactivate()
         else:
-            if new_administrative_state == AdministrativeState.UNLOCKED.value:
-                logger.info(f'Administrative State has changed from {local_admin_state} '
-                            f'to {new_administrative_state}.')
-                self._activate()
-            elif new_administrative_state == AdministrativeState.LOCKED.value:
-                logger.info(f'Administrative State has changed from {local_admin_state} '
-                            f'to {new_administrative_state}.')
-                self._deactivate()
-            else:
-                logger.error(f'Invalid AdministrativeState: {new_administrative_state}')
+            raise Exception(f'Invalid AdministrativeState: {new_administrative_state}')
 
-    def _activate(self):
-        nfs_in_aai = aai.get_pmsh_nfs_from_aai(self.app_conf)
+    def _activate(self, new_administrative_state):
+        self._start_aai_event_thread()
+        self.app_conf.subscription.update_sub_params(new_administrative_state,
+                                                     self.app_conf.subscription.fileBasedGP,
+                                                     self.app_conf.subscription.fileLocation,
+                                                     self.app_conf.subscription.measurementGroups)
+        nfs_in_aai = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
         self.app_conf.subscription.create_subscription_on_nfs(nfs_in_aai, self.mr_pub,
                                                               self.app_conf)
         self.app_conf.subscription.update_subscription_status()
-        if not self.aai_event_thread.is_alive():
-            logger.info('Start polling for NF info on AAI-EVENT topic on DMaaP MR.')
-            self.aai_event_thread.start()
 
     def _deactivate(self):
         nfs = self.app_conf.subscription.get_network_functions()
         if nfs:
-            self.aai_event_thread.cancel()
-            logger.info('Stop listening for NFs events on AAI-EVENT topic in MR.')
+            self.stop_aai_event_thread()
             self.app_conf.subscription.administrativeState = AdministrativeState.LOCKING.value
             logger.info('Subscription is now LOCKING/DEACTIVATING.')
             self.app_conf.subscription.delete_subscription_from_nfs(nfs, self.mr_pub, self.app_conf)
             self.app_conf.subscription.update_subscription_status()
 
+    def _start_aai_event_thread(self):
+        logger.info('Starting polling for NF info on AAI-EVENT topic on DMaaP MR.')
+        self.aai_event_thread = PeriodicTask(20, process_aai_events, args=(self.aai_sub,
+                                                                           self.mr_pub,
+                                                                           self.app,
+                                                                           self.app_conf))
+        self.aai_event_thread.name = 'aai_event_thread'
+        self.aai_event_thread.start()
+
+    def stop_aai_event_thread(self):
+        if self.aai_event_thread is not None:
+            self.aai_event_thread.cancel()
+            self.aai_event_thread = None
+            logger.info('Stopping polling for NFs events on AAI-EVENT topic in MR.')
+
     def _check_for_failed_nfs(self):
         logger.info('Checking for DELETE_FAILED NFs before LOCKING Subscription.')
         del_failed_nfs = self.app_conf.subscription.get_delete_failed_nfs()
index 307235d..4f2ca4a 100755 (executable)
@@ -1,5 +1,5 @@
 # ============LICENSE_START===================================================
-#  Copyright (C) 2019-2020 Nordix Foundation.
+#  Copyright (C) 2019-2021 Nordix Foundation.
 # ============================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -19,7 +19,6 @@ import sys
 from signal import signal, SIGTERM
 
 from mod import db, create_app, launch_api_server, logger
-from mod.aai_event_handler import process_aai_events
 from mod.exit_handler import ExitHandler
 from mod.pmsh_utils import AppConfig, PeriodicTask
 from mod.policy_response_handler import PolicyResponseHandler
@@ -46,20 +45,12 @@ def main():
         logger.info('Start polling PMSH_CL_INPUT topic on DMaaP MR.')
         policy_response_handler_thread.start()
 
-        aai_event_thread = PeriodicTask(20, process_aai_events,
-                                        args=(aai_event_mr_sub, policy_mr_pub, app, app_conf))
-        aai_event_thread.name = 'aai_event_thread'
-        logger.info('Start polling for NF info on AAI-EVENT topic on DMaaP MR.')
-        aai_event_thread.start()
-
-        subscription_handler = SubscriptionHandler(policy_mr_pub, app, app_conf, aai_event_thread)
-
+        subscription_handler = SubscriptionHandler(policy_mr_pub, aai_event_mr_sub, app, app_conf)
         subscription_handler_thread = PeriodicTask(20, subscription_handler.execute)
         subscription_handler_thread.name = 'sub_handler_thread'
         subscription_handler_thread.start()
 
-        periodic_tasks = [aai_event_thread, subscription_handler_thread,
-                          policy_response_handler_thread]
+        periodic_tasks = [subscription_handler_thread, policy_response_handler_thread]
 
         signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks,
                                     app_conf=app_conf, subscription_handler=subscription_handler))
index 1bc039d..602253b 100644 (file)
@@ -24,7 +24,6 @@ from tenacity import RetryError
 
 from mod import get_db_connection_url
 from mod.network_function import NetworkFunction
-from mod.pmsh_utils import AppConfig
 from tests.base_setup import BaseClassSetup
 from tests.base_setup import get_pmsh_config
 
@@ -139,9 +138,7 @@ class PmshUtilsTestCase(BaseClassSetup):
     @patch('mod.logger.error')
     @patch('mod.pmsh_utils.get_all')
     def test_refresh_config_fail(self, mock_cbs_client_get_all, mock_logger):
-        mock_cbs_client_get_all.return_value = get_pmsh_config()
-        self.app_conf = AppConfig()
-        mock_cbs_client_get_all.side_effect = Exception
+        mock_cbs_client_get_all.side_effect = ValueError
         with self.assertRaises(RetryError):
             self.app_conf.refresh_config()
-        mock_logger.assert_called_with('Failed to get config from CBS: ', exc_info=True)
+        mock_logger.assert_called_with('Failed to refresh PMSH AppConfig')
index f77dfd1..31dd094 100644 (file)
@@ -1,5 +1,5 @@
 # ============LICENSE_START===================================================
-#  Copyright (C) 2020 Nordix Foundation.
+#  Copyright (C) 2020-2021 Nordix Foundation.
 # ============================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -33,11 +33,12 @@ class SubscriptionHandlerTest(BaseClassSetup):
     def setUpClass(cls):
         super().setUpClass()
 
+    @patch('mod.pmsh_utils._MrSub')
     @patch('mod.pmsh_utils._MrPub')
-    def setUp(self, mock_mr_pub):
+    def setUp(self, mock_mr_pub, mock_mr_sub):
         super().setUp()
         self.mock_mr_pub = mock_mr_pub
-        self.mock_aai_event_thread = Mock()
+        self.mock_mr_sub = mock_mr_sub
         self.mock_policy_event_thread = Mock()
 
     def tearDown(self):
@@ -55,12 +56,13 @@ class SubscriptionHandlerTest(BaseClassSetup):
         mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value
         mock_get_aai.return_value = self.nfs
         sub_handler = SubscriptionHandler(self.mock_mr_pub,
-                                          self.app, self.app_conf,
-                                          self.mock_aai_event_thread)
+                                          self.mock_mr_sub, self.app, self.app_conf)
         sub_handler.execute()
         mock_logger.assert_called_with('Administrative State did not change '
                                        'in the app config: UNLOCKED')
 
+    @patch('mod.subscription_handler.SubscriptionHandler._start_aai_event_thread',
+           MagicMock())
     @patch('mod.pmsh_utils.AppConfig.refresh_config', MagicMock(return_value=get_pmsh_config()))
     @patch('mod.subscription.Subscription.get_local_sub_admin_state')
     @patch('mod.subscription.Subscription.create_subscription_on_nfs')
@@ -69,10 +71,8 @@ class SubscriptionHandlerTest(BaseClassSetup):
                                                  mock_get_sub_status):
         mock_get_aai.return_value = self.nfs
         mock_get_sub_status.return_value = AdministrativeState.LOCKED.value
-        self.mock_aai_event_thread.return_value.start.return_value = 'start_method'
-        sub_handler = SubscriptionHandler(self.mock_mr_pub,
-                                          self.app, self.app_conf,
-                                          self.mock_aai_event_thread.return_value)
+        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app,
+                                          self.app_conf)
         sub_handler.execute()
         self.assertEqual(AdministrativeState.UNLOCKED.value,
                          self.app_conf.subscription.administrativeState)
@@ -86,14 +86,12 @@ class SubscriptionHandlerTest(BaseClassSetup):
         mock_get_sub_status.return_value = AdministrativeState.UNLOCKED.value
         self.app_conf.subscription.administrativeState = AdministrativeState.LOCKED.value
         self.app_conf.subscription.update_subscription_status()
-        self.mock_aai_event_thread.return_value.cancel.return_value = 'cancel_method'
-        sub_handler = SubscriptionHandler(self.mock_mr_pub,
-                                          self.app, self.app_conf,
-                                          self.mock_aai_event_thread.return_value)
+        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app,
+                                          self.app_conf)
         sub_handler.execute()
         mock_deactivate_sub.assert_called_with(self.nfs, self.mock_mr_pub, self.app_conf)
-        self.mock_aai_event_thread.return_value.cancel.assert_called()
 
+    @patch('mod.subscription_handler.SubscriptionHandler._start_aai_event_thread', MagicMock())
     @patch('mod.pmsh_utils.AppConfig.refresh_config', MagicMock(return_value=get_pmsh_config()))
     @patch('mod.subscription.Subscription.create_subscription_on_nfs')
     @patch('mod.logger.error')
@@ -101,9 +99,8 @@ class SubscriptionHandlerTest(BaseClassSetup):
     def test_execute_exception(self, mock_get_aai, mock_logger, mock_activate_sub):
         mock_get_aai.return_value = self.nfs
         mock_activate_sub.side_effect = Exception
-        sub_handler = SubscriptionHandler(self.mock_mr_pub,
-                                          self.app, self.app_conf,
-                                          self.mock_aai_event_thread)
+        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app,
+                                          self.app_conf)
         sub_handler.execute()
         mock_logger.assert_called_with('Error occurred during the activation/deactivation process ',
                                        exc_info=True)
@@ -125,8 +122,8 @@ class SubscriptionHandlerTest(BaseClassSetup):
     def test_execute_change_of_state_to_locking_retry_delete(self, mock_retry_inc, mock_delete_sub,
                                                              mock_get_sub_status):
         mock_get_sub_status.return_value = AdministrativeState.LOCKING.value
-        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.app, self.app_conf,
-                                          self.mock_aai_event_thread)
+        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app,
+                                          self.app_conf)
         sub_handler.execute()
         self.assertEqual(mock_delete_sub.call_count, 2)
         self.assertEqual(mock_retry_inc.call_count, 2)
@@ -139,8 +136,8 @@ class SubscriptionHandlerTest(BaseClassSetup):
     def test_execute_change_of_state_to_locking_success(self, mock_update_sub,
                                                         mock_get_sub_status):
         mock_get_sub_status.return_value = AdministrativeState.LOCKING.value
-        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.app, self.app_conf,
-                                          self.mock_aai_event_thread)
+        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app,
+                                          self.app_conf)
         sub_handler.execute()
         mock_update_sub.assert_called_once()
 
@@ -161,7 +158,7 @@ class SubscriptionHandlerTest(BaseClassSetup):
     def test_execute_change_of_state_to_locking_retry_failed(self, mock_nf_del,
                                                              mock_get_sub_status):
         mock_get_sub_status.return_value = AdministrativeState.LOCKING.value
-        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.app, self.app_conf,
-                                          self.mock_aai_event_thread)
+        sub_handler = SubscriptionHandler(self.mock_mr_pub, self.mock_mr_sub, self.app,
+                                          self.app_conf)
         sub_handler.execute()
         mock_nf_del.assert_called_once()