X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=plans%2Fusecases-pnf-sw-upgrade%2Fpnf-sw-upgrade%2Fsimulators%2Fpnfsim%2Fpnf-sw-upgrade%2Fsubscriber.py;fp=plans%2Fusecases%2Fpnf-sw-upgrade%2Fsimulators%2Fpnfsim%2Fpnf-sw-upgrade%2Fsubscriber.py;h=e8a81594b23065a16ace468c7c69f6fe6765cf6f;hb=0387b3b7a9dd64cb062bb224083a8bae28b6b042;hp=810fe453e306952c770ab22a61544c8a6ea69177;hpb=f93a9e6d67b8cb59e64362c44f13f5ddc2cbe112;p=integration%2Fcsit.git diff --git a/plans/usecases/pnf-sw-upgrade/simulators/pnfsim/pnf-sw-upgrade/subscriber.py b/plans/usecases-pnf-sw-upgrade/pnf-sw-upgrade/simulators/pnfsim/pnf-sw-upgrade/subscriber.py similarity index 67% rename from plans/usecases/pnf-sw-upgrade/simulators/pnfsim/pnf-sw-upgrade/subscriber.py rename to plans/usecases-pnf-sw-upgrade/pnf-sw-upgrade/simulators/pnfsim/pnf-sw-upgrade/subscriber.py index 810fe453..e8a81594 100755 --- a/plans/usecases/pnf-sw-upgrade/simulators/pnfsim/pnf-sw-upgrade/subscriber.py +++ b/plans/usecases-pnf-sw-upgrade/pnf-sw-upgrade/simulators/pnfsim/pnf-sw-upgrade/subscriber.py @@ -22,14 +22,18 @@ __author__ = "Eliezio Oliveira " __copyright__ = "Copyright (C) 2020 Nordix Foundation" __license__ = "Apache 2.0" +import os import time -from concurrent.futures import ThreadPoolExecutor from threading import Timer import sysrepo as sr +from loguru import logger YANG_MODULE_NAME = 'pnf-sw-upgrade' +XPATH_CTX = sr.Xpath_Ctx() +PAUSE_TO_LOCK = 0.5 + # # ----- BEGIN Finite State Machine definitions ----- # @@ -48,9 +52,10 @@ ST_DOWNLOAD_COMPLETED = 'DOWNLOAD_COMPLETED' ST_ACTIVATION_IN_PROGRESS = 'ACTIVATION_IN_PROGRESS' ST_ACTIVATION_COMPLETED = 'ACTIVATION_COMPLETED' -# Timeout used for timed transitions -TO_DOWNLOAD = 7 -TO_ACTIVATION = 7 +# Timeouts used for timed transitions +SWUG_TIMED_TRANSITION_TO = int(os.environ.get("SWUG_TIMED_TRANSITION_TO", "7")) +TO_DOWNLOAD = SWUG_TIMED_TRANSITION_TO +TO_ACTIVATION = SWUG_TIMED_TRANSITION_TO def timestamper(sess, key_id): @@ -101,6 +106,7 @@ STATE_MACHINE = { } } + # # ----- END Finite State Machine definitions ----- # @@ -117,28 +123,31 @@ def main(): try: print_current_config(sess, YANG_MODULE_NAME) except Exception as e: - print(e) + logger.error(e) sr.global_loop() - print("Application exit requested, exiting.") + logger.info("Application exit requested, exiting.") except Exception as e: - print(e) + logger.error(e) # Function to be called for subscribed client of given session whenever configuration changes. def module_change_cb(sess, module_name, event, private_ctx): - try: - conn = private_ctx - change_path = xpath_of(None, 'action') - it = sess.get_changes_iter(change_path) - while True: - change = sess.get_change_next(it) - if change is None: - break - handle_change(conn, change.oper(), change.old_val(), change.new_val()) - except Exception as e: - print(e) + if event == sr.SR_EV_APPLY: + try: + conn = private_ctx + change_path = xpath_of(None, 'action') + it = sess.get_changes_iter(change_path) + while True: + change = sess.get_change_next(it) + if change is None: + break + op = change.oper() + if op in (sr.SR_OP_CREATED, sr.SR_OP_MODIFIED): + handle_trigger_action(conn, sess, change.new_val()) + except Exception as e: + logger.error(e) return sr.SR_ERR_OK @@ -146,67 +155,55 @@ def module_change_cb(sess, module_name, event, private_ctx): # It does so by loading all the items of a session and printing them out. def print_current_config(session, module_name): select_xpath = f"/{module_name}:*//*" - values = session.get_items(select_xpath) - - if values is not None: - print("========== BEGIN CONFIG ==========") + if values: + logger.info("========== BEGIN CONFIG ==========") for i in range(values.val_cnt()): - print(values.val(i).to_string(), end='') - print("=========== END CONFIG ===========") + logger.info(values.val(i).to_string().strip()) + logger.info("=========== END CONFIG ===========") -def handle_change(conn, op, old_val, new_val): +def handle_trigger_action(conn, sess, action_val): """ Handle individual changes on the model. """ - if op == sr.SR_OP_CREATED: - print("CREATED: %s" % new_val.to_string()) - xpath = new_val.xpath() - last_node = xpath_ctx.last_node(xpath) - # Warning: 'key_value' modifies 'xpath'! - key_id = xpath_ctx.key_value(xpath, 'upgrade-package', 'id') - if key_id and last_node == 'action': - executor.submit(execute_action, conn, key_id, new_val.data().get_enum()) - elif op == sr.SR_OP_DELETED: - print("DELETED: %s" % old_val.to_string()) - elif op == sr.SR_OP_MODIFIED: - print("MODIFIED: %s to %s" % (old_val.to_string(), new_val.to_string())) - elif op == sr.SR_OP_MOVED: - print("MOVED: %s after %s" % (new_val.xpath(), old_val.xpath())) - - -def execute_action(conn, key_id, action): - sess = sr.Session(conn) - try: + logger.info("CREATED/MODIFIED: %s" % action_val.to_string()) + xpath = action_val.xpath() + last_node = XPATH_CTX.last_node(xpath) + # Warning: 'key_value' modifies 'xpath'! + key_id = XPATH_CTX.key_value(xpath, 'upgrade-package', 'id') + if key_id and last_node == 'action': + action = action_val.data().get_enum() cur_state = sess.get_item(xpath_of(key_id, 'current-status')).data().get_enum() next_state_str = STATE_MACHINE[cur_state]['transitions'].get(action, None) if next_state_str: - handle_set_state(conn, key_id, next_state_str) - sess.delete_item(xpath_of(key_id, 'action')) - sess.commit() - finally: - sess.session_stop() + Timer(PAUSE_TO_LOCK, try_change_state, (conn, key_id, next_state_str)).start() -def handle_set_state(conn, key_id, state_str): +def try_change_state(conn, key_id, state_str): sess = sr.Session(conn) try: - state = sr.Val(state_str, sr.SR_ENUM_T) - sess.set_item(xpath_of(key_id, 'current-status'), state) - on_enter = STATE_MACHINE[state_str].get('on_enter', None) - if on_enter: - # noinspection PyCallingNonCallable - on_enter(sess, key_id) - sess.commit() + try: + sess.lock_module(YANG_MODULE_NAME) + except RuntimeError: + logger.warning(f"Retrying after {PAUSE_TO_LOCK}s") + Timer(PAUSE_TO_LOCK, try_change_state, (conn, key_id, state_str)).start() + return + try: + state = sr.Val(state_str, sr.SR_ENUM_T) + sess.set_item(xpath_of(key_id, 'current-status'), state) + on_enter = STATE_MACHINE[state_str].get('on_enter', None) + if callable(on_enter): + on_enter(sess, key_id) + sess.commit() + finally: + sess.unlock_module(YANG_MODULE_NAME) delay, next_state_str = STATE_MACHINE[state_str].get('timed_transition', [0, None]) if delay: - Timer(delay, handle_set_state, (conn, key_id, next_state_str)).start() + Timer(delay, try_change_state, (conn, key_id, next_state_str)).start() finally: sess.session_stop() if __name__ == '__main__': - xpath_ctx = sr.Xpath_Ctx() - executor = ThreadPoolExecutor(max_workers=2) - main() + main() \ No newline at end of file