Seperating usecase test suite dependencies
[integration/csit.git] / plans / usecases-pnf-sw-upgrade / pnf-sw-upgrade / simulators / pnfsim / pnf-sw-upgrade / subscriber.py
@@ -22,14 +22,18 @@ __author__ = "Eliezio Oliveira <eliezio.oliveira@est.tech>"
 __copyright__ = "Copyright (C) 2020 Nordix Foundation"
 __license__ = "Apache 2.0"
 
+import os
 import time
-from concurrent.futures import ThreadPoolExecutor
 from threading import Timer
 
 import sysrepo as sr
+from loguru import logger
 
 YANG_MODULE_NAME = 'pnf-sw-upgrade'
 
+XPATH_CTX = sr.Xpath_Ctx()
+PAUSE_TO_LOCK = 0.5
+
 #
 # ----- BEGIN Finite State Machine definitions -----
 #
@@ -48,9 +52,10 @@ ST_DOWNLOAD_COMPLETED = 'DOWNLOAD_COMPLETED'
 ST_ACTIVATION_IN_PROGRESS = 'ACTIVATION_IN_PROGRESS'
 ST_ACTIVATION_COMPLETED = 'ACTIVATION_COMPLETED'
 
-# Timeout used for timed transitions
-TO_DOWNLOAD = 7
-TO_ACTIVATION = 7
+# Timeouts used for timed transitions
+SWUG_TIMED_TRANSITION_TO = int(os.environ.get("SWUG_TIMED_TRANSITION_TO", "7"))
+TO_DOWNLOAD = SWUG_TIMED_TRANSITION_TO
+TO_ACTIVATION = SWUG_TIMED_TRANSITION_TO
 
 
 def timestamper(sess, key_id):
@@ -101,6 +106,7 @@ STATE_MACHINE = {
     }
 }
 
+
 #
 # ----- END Finite State Machine definitions -----
 #
@@ -117,28 +123,31 @@ def main():
         try:
             print_current_config(sess, YANG_MODULE_NAME)
         except Exception as e:
-            print(e)
+            logger.error(e)
 
         sr.global_loop()
 
-        print("Application exit requested, exiting.")
+        logger.info("Application exit requested, exiting.")
     except Exception as e:
-        print(e)
+        logger.error(e)
 
 
 # Function to be called for subscribed client of given session whenever configuration changes.
 def module_change_cb(sess, module_name, event, private_ctx):
-    try:
-        conn = private_ctx
-        change_path = xpath_of(None, 'action')
-        it = sess.get_changes_iter(change_path)
-        while True:
-            change = sess.get_change_next(it)
-            if change is None:
-                break
-            handle_change(conn, change.oper(), change.old_val(), change.new_val())
-    except Exception as e:
-        print(e)
+    if event == sr.SR_EV_APPLY:
+        try:
+            conn = private_ctx
+            change_path = xpath_of(None, 'action')
+            it = sess.get_changes_iter(change_path)
+            while True:
+                change = sess.get_change_next(it)
+                if change is None:
+                    break
+                op = change.oper()
+                if op in (sr.SR_OP_CREATED, sr.SR_OP_MODIFIED):
+                    handle_trigger_action(conn, sess, change.new_val())
+        except Exception as e:
+            logger.error(e)
     return sr.SR_ERR_OK
 
 
@@ -146,67 +155,55 @@ def module_change_cb(sess, module_name, event, private_ctx):
 # It does so by loading all the items of a session and printing them out.
 def print_current_config(session, module_name):
     select_xpath = f"/{module_name}:*//*"
-
     values = session.get_items(select_xpath)
-
-    if values is not None:
-        print("========== BEGIN CONFIG ==========")
+    if values:
+        logger.info("========== BEGIN CONFIG ==========")
         for i in range(values.val_cnt()):
-            print(values.val(i).to_string(), end='')
-        print("=========== END CONFIG ===========")
+            logger.info(values.val(i).to_string().strip())
+        logger.info("=========== END CONFIG ===========")
 
 
-def handle_change(conn, op, old_val, new_val):
+def handle_trigger_action(conn, sess, action_val):
     """
     Handle individual changes on the model.
     """
-    if op == sr.SR_OP_CREATED:
-        print("CREATED: %s" % new_val.to_string())
-        xpath = new_val.xpath()
-        last_node = xpath_ctx.last_node(xpath)
-        # Warning: 'key_value' modifies 'xpath'!
-        key_id = xpath_ctx.key_value(xpath, 'upgrade-package', 'id')
-        if key_id and last_node == 'action':
-            executor.submit(execute_action, conn, key_id, new_val.data().get_enum())
-    elif op == sr.SR_OP_DELETED:
-        print("DELETED: %s" % old_val.to_string())
-    elif op == sr.SR_OP_MODIFIED:
-        print("MODIFIED: %s to %s" % (old_val.to_string(), new_val.to_string()))
-    elif op == sr.SR_OP_MOVED:
-        print("MOVED: %s after %s" % (new_val.xpath(), old_val.xpath()))
-
-
-def execute_action(conn, key_id, action):
-    sess = sr.Session(conn)
-    try:
+    logger.info("CREATED/MODIFIED: %s" % action_val.to_string())
+    xpath = action_val.xpath()
+    last_node = XPATH_CTX.last_node(xpath)
+    # Warning: 'key_value' modifies 'xpath'!
+    key_id = XPATH_CTX.key_value(xpath, 'upgrade-package', 'id')
+    if key_id and last_node == 'action':
+        action = action_val.data().get_enum()
         cur_state = sess.get_item(xpath_of(key_id, 'current-status')).data().get_enum()
         next_state_str = STATE_MACHINE[cur_state]['transitions'].get(action, None)
         if next_state_str:
-            handle_set_state(conn, key_id, next_state_str)
-        sess.delete_item(xpath_of(key_id, 'action'))
-        sess.commit()
-    finally:
-        sess.session_stop()
+            Timer(PAUSE_TO_LOCK, try_change_state, (conn, key_id, next_state_str)).start()
 
 
-def handle_set_state(conn, key_id, state_str):
+def try_change_state(conn, key_id, state_str):
     sess = sr.Session(conn)
     try:
-        state = sr.Val(state_str, sr.SR_ENUM_T)
-        sess.set_item(xpath_of(key_id, 'current-status'), state)
-        on_enter = STATE_MACHINE[state_str].get('on_enter', None)
-        if on_enter:
-            # noinspection PyCallingNonCallable
-            on_enter(sess, key_id)
-        sess.commit()
+        try:
+            sess.lock_module(YANG_MODULE_NAME)
+        except RuntimeError:
+            logger.warning(f"Retrying after {PAUSE_TO_LOCK}s")
+            Timer(PAUSE_TO_LOCK, try_change_state, (conn, key_id, state_str)).start()
+            return
+        try:
+            state = sr.Val(state_str, sr.SR_ENUM_T)
+            sess.set_item(xpath_of(key_id, 'current-status'), state)
+            on_enter = STATE_MACHINE[state_str].get('on_enter', None)
+            if callable(on_enter):
+                on_enter(sess, key_id)
+            sess.commit()
+        finally:
+            sess.unlock_module(YANG_MODULE_NAME)
         delay, next_state_str = STATE_MACHINE[state_str].get('timed_transition', [0, None])
         if delay:
-            Timer(delay, handle_set_state, (conn, key_id, next_state_str)).start()
+            Timer(delay, try_change_state, (conn, key_id, next_state_str)).start()
     finally:
         sess.session_stop()
 
 
 if __name__ == '__main__':
-    xpath_ctx = sr.Xpath_Ctx()
-    executor = ThreadPoolExecutor(max_workers=2)
-    main()
+    main()
\ No newline at end of file