[PMSH] Bug fix to include ip in event
[dcaegen2/services.git] / components / pm-subscription-handler / pmsh_service / pmsh_service_main.py
index 8245466..f92fdc9 100755 (executable)
 # SPDX-License-Identifier: Apache-2.0
 # ============LICENSE_END=====================================================
 import sys
-import threading
+from signal import signal, SIGTERM
 
-import mod.aai_client as aai
-import mod.pmsh_logging as logger
-from mod import db, create_app, launch_api_server
+from mod import db, create_app, launch_api_server, logger
 from mod.aai_event_handler import process_aai_events
-from mod.config_handler import ConfigHandler
+from mod.exit_handler import ExitHandler
 from mod.pmsh_utils import AppConfig, PeriodicTask
-from mod.subscription import Subscription, AdministrativeState
-
-
-def subscription_processor(config_handler, administrative_state, mr_pub, app,
-                           mr_aai_event_subscriber):
-    """
-    Checks for changes of administrative state in config and proceeds to process
-    the Subscription if a change has occurred
-
-    Args:
-        config_handler (ConfigHandler): Configuration Handler used to get config
-        administrative_state (str): The administrative state
-        mr_pub (_MrPub): MR publisher
-        app (db): DB application
-        mr_aai_event_subscriber (_MrSub): AAI events MR subscriber
-    """
-    app.app_context().push()
-    config = config_handler.get_config()
-    app_conf = AppConfig(**config['config'])
-    new_administrative_state = config['policy']['subscription']['administrativeState']
-    polling_period = 30.0
-
-    try:
-        if administrative_state == new_administrative_state:
-            logger.debug('Administrative State did not change in the Config')
-        else:
-            logger.debug(f'Administrative State changed from "{administrative_state}" "to '
-                         f'"{new_administrative_state}".')
-            sub, nfs = aai.get_pmsh_subscription_data(config)
-            sub.process_subscription(nfs, mr_pub, app_conf)
-            aai_event_thread = PeriodicTask(10, process_aai_events, args=(
-                mr_aai_event_subscriber, sub, mr_pub, app, app_conf))
-
-            if new_administrative_state == AdministrativeState.UNLOCKED.value:
-                logger.debug('Listening to AAI-EVENT topic in MR.')
-                aai_event_thread.start()
-            else:
-                logger.debug('Stopping to listen to AAI-EVENT topic in MR.')
-                aai_event_thread.cancel()
-
-    except Exception as err:
-        logger.debug(f'Error occurred during the activation/deactivation process {err}')
-
-    threading.Timer(polling_period, subscription_processor,
-                    [config_handler, new_administrative_state, mr_pub, app,
-                     mr_aai_event_subscriber]).start()
+from mod.policy_response_handler import PolicyResponseHandler
+from mod.subscription_handler import SubscriptionHandler
 
 
 def main():
     try:
-        config_handler = ConfigHandler()
-        config = config_handler.get_config()
-        app_conf = AppConfig(**config['config'])
-        app = create_app()
-        app.app_context().push()
-        db.create_all(app=app)
-        sub, nfs = aai.get_pmsh_subscription_data(config)
-        mr_pub = app_conf.get_mr_pub('policy_pm_publisher')
-        mr_sub = app_conf.get_mr_sub('policy_pm_subscriber')
-        mr_aai_event_subscriber = app_conf.get_mr_sub('aai_subscriber')
-        initial_start_delay = 5.0
-
-        administrative_state = AdministrativeState.LOCKED.value
-        subscription_in_db = Subscription.get(sub.subscriptionName)
-        if subscription_in_db is not None:
-            administrative_state = subscription_in_db.status
-
-        threading.Timer(initial_start_delay, subscription_processor,
-                        [config_handler, administrative_state, mr_pub,
-                         app, mr_aai_event_subscriber]).start()
-
-        threading.Timer(20.0, mr_sub.poll_policy_topic, [sub.subscriptionName, app]).start()
-
+        try:
+            app = create_app()
+            app.app_context().push()
+            db.create_all(app=app)
+            app_conf = AppConfig()
+            policy_mr_pub = app_conf.get_mr_pub('policy_pm_publisher')
+            policy_mr_sub = app_conf.get_mr_sub('policy_pm_subscriber')
+            aai_event_mr_sub = app_conf.get_mr_sub('aai_subscriber')
+        except Exception as e:
+            logger.error(f'Failed to get config and create application: {e}', exc_info=True)
+            sys.exit(e)
+
+        app_conf_thread = PeriodicTask(10, app_conf.refresh_config)
+        app_conf_thread.name = 'app_conf_thread'
+        app_conf_thread.start()
+
+        policy_response_handler = PolicyResponseHandler(policy_mr_sub, app_conf, app)
+        policy_response_handler_thread = PeriodicTask(25, policy_response_handler.poll_policy_topic)
+        policy_response_handler_thread.name = 'policy_event_thread'
+
+        aai_event_thread = PeriodicTask(20, process_aai_events,
+                                        args=(aai_event_mr_sub, policy_mr_pub, app, app_conf))
+        aai_event_thread.name = 'aai_event_thread'
+
+        subscription_handler = SubscriptionHandler(policy_mr_pub, app, app_conf, aai_event_thread,
+                                                   policy_response_handler_thread)
+
+        subscription_handler_thread = PeriodicTask(30, subscription_handler.execute)
+        subscription_handler_thread.name = 'sub_handler_thread'
+        subscription_handler_thread.start()
+
+        periodic_tasks = [app_conf_thread, aai_event_thread, subscription_handler_thread,
+                          policy_response_handler_thread]
+
+        signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks,
+                                    app_conf=app_conf, subscription_handler=subscription_handler))
         launch_api_server(app_conf)
 
     except Exception as e:
-        logger.debug(f'Failed to Init PMSH: {e}')
+        logger.error(f'Failed to initialise PMSH: {e}', exc_info=True)
         sys.exit(e)