[PMSH] Improve Failure Handling 99/109399/4
authorERIMROB <robertas.rimkus@est.tech>
Fri, 19 Jun 2020 12:09:44 +0000 (13:09 +0100)
committerERIMROB <robertas.rimkus@est.tech>
Tue, 30 Jun 2020 01:15:21 +0000 (02:15 +0100)
Signed-off-by: ERIMROB <robertas.rimkus@est.tech>
Change-Id: I15d338321957a293e9f444a10cf3bb06f4212f3e
Issue-ID: DCAEGEN2-2157

components/pm-subscription-handler/pmsh_service/mod/aai_event_handler.py
components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
components/pm-subscription-handler/pmsh_service/mod/policy_response_handler.py
components/pm-subscription-handler/pmsh_service/mod/subscription.py
components/pm-subscription-handler/pmsh_service/mod/subscription_handler.py
components/pm-subscription-handler/tests/test_pmsh_utils.py
components/pm-subscription-handler/tests/test_policy_response_handler.py
components/pm-subscription-handler/tests/test_subscription.py

index 96f5143..b3957bc 100755 (executable)
@@ -19,6 +19,8 @@
 import json
 from enum import Enum
 
+from requests import HTTPError
+
 from mod import logger
 from mod.network_function import NetworkFunction
 
@@ -44,22 +46,26 @@ def process_aai_events(mr_sub, mr_pub, app, app_conf):
         app_conf (AppConfig): the application configuration.
     """
     app.app_context().push()
-    aai_events = mr_sub.get_from_topic('AAI-EVENT')
-
-    if aai_events is not None and len(aai_events) != 0:
-        for entry in aai_events:
-            logger.debug(f'AAI-EVENT entry: {entry}')
-            entry = json.loads(entry)
-            event_header = entry['event-header']
-            aai_xnf = entry['entity']
-            action = event_header['action']
-            entity_type = event_header['entity-type']
-            xnf_name = aai_xnf['pnf-name'] if entity_type == XNFType.PNF.value else aai_xnf[
-                'vnf-name']
-            new_status = aai_xnf['orchestration-status']
+    try:
+        aai_events = mr_sub.get_from_topic('AAI-EVENT')
+        if len(aai_events) != 0:
+            for entry in aai_events:
+                logger.debug(f'AAI-EVENT entry: {entry}')
+                entry = json.loads(entry)
+                event_header = entry['event-header']
+                aai_xnf = entry['entity']
+                action = event_header['action']
+                entity_type = event_header['entity-type']
+                xnf_name = aai_xnf['pnf-name'] if entity_type == XNFType.PNF.value else aai_xnf[
+                    'vnf-name']
+                new_status = aai_xnf['orchestration-status']
 
-            if app_conf.nf_filter.is_nf_in_filter(xnf_name, new_status):
-                _process_event(action, new_status, xnf_name, mr_pub, app_conf)
+                if app_conf.nf_filter.is_nf_in_filter(xnf_name, new_status):
+                    _process_event(action, new_status, xnf_name, mr_pub, app_conf)
+    except HTTPError as e:
+        logger.debug(f'Failed to fetch AAI-EVENT messages from MR {e}')
+    except Exception as e:
+        logger.debug(f'Exception trying to process AAI events {e}')
 
 
 def _process_event(action, new_status, xnf_name, mr_pub, app_conf):
index fb6a519..354d6b8 100755 (executable)
@@ -200,12 +200,13 @@ class _MrPub(_DmaapMrClient):
                        'InvocationID': kwargs['invocation_id'],
                        'RequestID': kwargs['request_id']
                        }
+            logger.info(f'Attempting to publish event to {self.topic_url}')
             response = session.post(self.topic_url, headers=headers,
                                     auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
                                     verify=False)
             response.raise_for_status()
         except Exception as e:
-            logger.debug(e)
+            logger.error(f'Failed to publish message to MR topic: {e}')
             raise
 
     def publish_subscription_event_data(self, subscription, xnf_name, app_conf):
@@ -222,6 +223,7 @@ class _MrPub(_DmaapMrClient):
             self.publish_to_topic(subscription_event)
         except Exception as e:
             logger.debug(f'pmsh_utils.publish_subscription_event_data : {e}')
+            raise
 
 
 class _MrSub(_DmaapMrClient):
@@ -241,9 +243,8 @@ class _MrSub(_DmaapMrClient):
             timeout: The request timeout value in mSec.
 
         Returns:
-            list[str]: the json response from DMaaP Message Router topic, else None.
+            list[str]: the json response from DMaaP Message Router topic.
         """
-        topic_data = None
         try:
             session = requests.Session()
             headers = {'accept': 'application/json', 'content-type': 'application/json',
@@ -256,10 +257,10 @@ class _MrSub(_DmaapMrClient):
                                    verify=False)
             response.raise_for_status()
             if response.ok:
-                topic_data = response.json()
+                return response.json()
         except Exception as e:
             logger.error(f'Failed to fetch message from MR: {e}')
-        return topic_data
+            raise
 
 
 class PeriodicTask(Timer):
index 2b917ce..8a99382 100644 (file)
@@ -18,8 +18,6 @@
 
 import json
 
-from tenacity import retry, wait_fixed, retry_if_exception_type
-
 from mod import logger
 from mod.network_function import NetworkFunction
 from mod.subscription import Subscription, AdministrativeState, subscription_nf_states
@@ -42,7 +40,6 @@ class PolicyResponseHandler:
         self.app_conf = app_conf
         self.app = app
 
-    @retry(wait=wait_fixed(5), retry=retry_if_exception_type(Exception))
     def poll_policy_topic(self):
         """
         This method polls MR for response from policy. It checks whether the message is for the
@@ -62,7 +59,7 @@ class PolicyResponseHandler:
                     self._handle_response(self.app_conf.subscription.subscriptionName,
                                           administrative_state, nf_name, response_message)
         except Exception as err:
-            raise Exception(f'Error trying to poll policy response topic on MR: {err}')
+            logger.error(f'Error trying to poll policy response topic on MR: {err}')
 
     @staticmethod
     def _handle_response(subscription_name, administrative_state, nf_name, response_message):
@@ -82,4 +79,5 @@ class PolicyResponseHandler:
             policy_response_handle_functions[administrative_state][response_message](
                 subscription_name=subscription_name, status=sub_nf_status, nf_name=nf_name)
         except Exception as err:
-            raise Exception(f'Error changing nf_sub status in the DB: {err}')
+            logger.error(f'Error changing nf_sub status in the DB: {err}')
+            raise
index d6b17cd..b623cbd 100755 (executable)
@@ -17,8 +17,6 @@
 # ============LICENSE_END=====================================================
 from enum import Enum
 
-from tenacity import retry, retry_if_exception_type, wait_exponential, stop_after_attempt
-
 from mod import db, logger
 from mod.api.db_models import SubscriptionModel, NfSubRelationalModel, NetworkFunctionModel
 from mod.network_function import NetworkFunction
@@ -198,8 +196,6 @@ class Subscription:
             logger.debug(f'Failed to delete subscription: {self.subscriptionName} '
                          f'and it\'s relations from the DB: {e}')
 
-    @retry(wait=wait_exponential(multiplier=1, min=30, max=120), stop=stop_after_attempt(3),
-           retry=retry_if_exception_type(Exception))
     def process_subscription(self, nfs, mr_pub, app_conf):
         action = 'Deactivate'
         sub_nf_state = SubNfState.PENDING_DELETE.value
index add8be4..112f994 100644 (file)
@@ -42,10 +42,10 @@ class SubscriptionHandler:
             if self.administrative_state == new_administrative_state:
                 logger.info('Administrative State did not change in the Config')
             else:
-                logger.info(f'Administrative State has changed from {self.administrative_state} '
-                            f'to {new_administrative_state}.')
                 self.current_nfs = aai.get_pmsh_nfs_from_aai(self.app_conf)
                 self.current_sub = self.app_conf.subscription
+                logger.info(f'Administrative State has changed from {self.administrative_state} '
+                            f'to {new_administrative_state}.')
                 self.administrative_state = new_administrative_state
                 self.current_sub.process_subscription(self.current_nfs, self.mr_pub, self.app_conf)
 
index cfb78de..1ea27a7 100644 (file)
@@ -137,8 +137,8 @@ class PmshUtilsTestCase(TestCase):
                       'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
                       'dcae_pmsh_cg/1?timeout=1000',
                       json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=400)
-        mr_topic_data = policy_mr_sub.get_from_topic(1)
-        self.assertIsNone(mr_topic_data)
+        with self.assertRaises(Exception):
+            policy_mr_sub.get_from_topic(1)
 
     def test_get_db_connection_url_success(self):
         self.env = EnvironmentVarGuard()
index 582f0bc..9dd73ee 100644 (file)
@@ -20,8 +20,6 @@ import os
 from unittest import TestCase
 from unittest.mock import patch
 
-from tenacity import stop_after_attempt
-
 from mod.api.db_models import SubscriptionModel
 from mod.network_function import NetworkFunction
 from mod.pmsh_utils import AppConfig
@@ -128,11 +126,11 @@ class PolicyResponseHandlerTest(TestCase):
 
         mock_handle_response.assert_not_called()
 
+    @patch('mod.logger.error')
     @patch('mod.subscription.Subscription.get')
-    def test_poll_policy_topic_exception(self, mock_get_sub):
+    def test_poll_policy_topic_exception(self, mock_get_sub, mock_logger):
         self.mock_policy_mr_sub.get_from_topic.return_value = 'wrong_return'
         mock_get_sub.return_value = SubscriptionModel(subscription_name='ExtraPM-All-gNB-R2B',
                                                       status=AdministrativeState.UNLOCKED.value)
-        self.policy_response_handler.poll_policy_topic.retry.stop = stop_after_attempt(1)
-
-        self.assertRaises(Exception, self.policy_response_handler.poll_policy_topic)
+        self.policy_response_handler.poll_policy_topic()
+        mock_logger.assert_called()
index 95db6b2..1543afe 100755 (executable)
@@ -22,7 +22,6 @@ from unittest import TestCase
 from unittest.mock import patch
 
 from requests import Session
-from tenacity import stop_after_attempt
 
 import mod.aai_client as aai_client
 from mod import db, create_app
@@ -159,7 +158,6 @@ class SubscriptionTest(TestCase):
     @patch('mod.subscription.Subscription.update_subscription_status')
     def test_process_activate_subscription(self, mock_update_sub_status,
                                            mock_update_sub_nf, mock_add_nfs):
-        self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
         self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
                                                         self.app_conf)
 
@@ -174,7 +172,6 @@ class SubscriptionTest(TestCase):
     def test_process_deactivate_subscription(self, mock_update_sub_status,
                                              mock_update_sub_nf):
         self.app_conf.subscription.administrativeState = 'LOCKED'
-        self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
         self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
                                                         self.app_conf)
 
@@ -184,7 +181,6 @@ class SubscriptionTest(TestCase):
         mock_update_sub_status.assert_called()
 
     def test_process_subscription_exception(self):
-        self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
         self.assertRaises(Exception, self.app_conf.subscription.process_subscription,
                           [list(self.xnfs)[0]], 'not_mr_pub', 'app_config')