__copyright__ = "Copyright (C) 2020 Nordix Foundation"
__license__ = "Apache 2.0"
+import os
import time
-from concurrent.futures import ThreadPoolExecutor
from threading import Timer
import sysrepo as sr
+from loguru import logger
YANG_MODULE_NAME = 'pnf-sw-upgrade'
+XPATH_CTX = sr.Xpath_Ctx()
+PAUSE_TO_LOCK = 0.5
+
#
# ----- BEGIN Finite State Machine definitions -----
#
ST_ACTIVATION_IN_PROGRESS = 'ACTIVATION_IN_PROGRESS'
ST_ACTIVATION_COMPLETED = 'ACTIVATION_COMPLETED'
-# Timeout used for timed transitions
-TO_DOWNLOAD = 7
-TO_ACTIVATION = 7
+# Timeouts used for timed transitions
+SWUG_TIMED_TRANSITION_TO = int(os.environ.get("SWUG_TIMED_TRANSITION_TO", "7"))
+TO_DOWNLOAD = SWUG_TIMED_TRANSITION_TO
+TO_ACTIVATION = SWUG_TIMED_TRANSITION_TO
def timestamper(sess, key_id):
}
}
+
#
# ----- END Finite State Machine definitions -----
#
try:
print_current_config(sess, YANG_MODULE_NAME)
except Exception as e:
- print(e)
+ logger.error(e)
sr.global_loop()
- print("Application exit requested, exiting.")
+ logger.info("Application exit requested, exiting.")
except Exception as e:
- print(e)
+ logger.error(e)
# Function to be called for subscribed client of given session whenever configuration changes.
def module_change_cb(sess, module_name, event, private_ctx):
- try:
- conn = private_ctx
- change_path = xpath_of(None, 'action')
- it = sess.get_changes_iter(change_path)
- while True:
- change = sess.get_change_next(it)
- if change is None:
- break
- handle_change(conn, change.oper(), change.old_val(), change.new_val())
- except Exception as e:
- print(e)
+ if event == sr.SR_EV_APPLY:
+ try:
+ conn = private_ctx
+ change_path = xpath_of(None, 'action')
+ it = sess.get_changes_iter(change_path)
+ while True:
+ change = sess.get_change_next(it)
+ if change is None:
+ break
+ op = change.oper()
+ if op in (sr.SR_OP_CREATED, sr.SR_OP_MODIFIED):
+ handle_trigger_action(conn, sess, change.new_val())
+ except Exception as e:
+ logger.error(e)
return sr.SR_ERR_OK
# It does so by loading all the items of a session and printing them out.
def print_current_config(session, module_name):
select_xpath = f"/{module_name}:*//*"
-
values = session.get_items(select_xpath)
-
- if values is not None:
- print("========== BEGIN CONFIG ==========")
+ if values:
+ logger.info("========== BEGIN CONFIG ==========")
for i in range(values.val_cnt()):
- print(values.val(i).to_string(), end='')
- print("=========== END CONFIG ===========")
+ logger.info(values.val(i).to_string().strip())
+ logger.info("=========== END CONFIG ===========")
-def handle_change(conn, op, old_val, new_val):
+def handle_trigger_action(conn, sess, action_val):
"""
Handle individual changes on the model.
"""
- if op == sr.SR_OP_CREATED:
- print("CREATED: %s" % new_val.to_string())
- xpath = new_val.xpath()
- last_node = xpath_ctx.last_node(xpath)
- # Warning: 'key_value' modifies 'xpath'!
- key_id = xpath_ctx.key_value(xpath, 'upgrade-package', 'id')
- if key_id and last_node == 'action':
- executor.submit(execute_action, conn, key_id, new_val.data().get_enum())
- elif op == sr.SR_OP_DELETED:
- print("DELETED: %s" % old_val.to_string())
- elif op == sr.SR_OP_MODIFIED:
- print("MODIFIED: %s to %s" % (old_val.to_string(), new_val.to_string()))
- elif op == sr.SR_OP_MOVED:
- print("MOVED: %s after %s" % (new_val.xpath(), old_val.xpath()))
-
-
-def execute_action(conn, key_id, action):
- sess = sr.Session(conn)
- try:
+ logger.info("CREATED/MODIFIED: %s" % action_val.to_string())
+ xpath = action_val.xpath()
+ last_node = XPATH_CTX.last_node(xpath)
+ # Warning: 'key_value' modifies 'xpath'!
+ key_id = XPATH_CTX.key_value(xpath, 'upgrade-package', 'id')
+ if key_id and last_node == 'action':
+ action = action_val.data().get_enum()
cur_state = sess.get_item(xpath_of(key_id, 'current-status')).data().get_enum()
next_state_str = STATE_MACHINE[cur_state]['transitions'].get(action, None)
if next_state_str:
- handle_set_state(conn, key_id, next_state_str)
- sess.delete_item(xpath_of(key_id, 'action'))
- sess.commit()
- finally:
- sess.session_stop()
+ Timer(PAUSE_TO_LOCK, try_change_state, (conn, key_id, next_state_str)).start()
-def handle_set_state(conn, key_id, state_str):
+def try_change_state(conn, key_id, state_str):
sess = sr.Session(conn)
try:
- state = sr.Val(state_str, sr.SR_ENUM_T)
- sess.set_item(xpath_of(key_id, 'current-status'), state)
- on_enter = STATE_MACHINE[state_str].get('on_enter', None)
- if on_enter:
- # noinspection PyCallingNonCallable
- on_enter(sess, key_id)
- sess.commit()
+ try:
+ sess.lock_module(YANG_MODULE_NAME)
+ except RuntimeError:
+ logger.warning(f"Retrying after {PAUSE_TO_LOCK}s")
+ Timer(PAUSE_TO_LOCK, try_change_state, (conn, key_id, state_str)).start()
+ return
+ try:
+ state = sr.Val(state_str, sr.SR_ENUM_T)
+ sess.set_item(xpath_of(key_id, 'current-status'), state)
+ on_enter = STATE_MACHINE[state_str].get('on_enter', None)
+ if callable(on_enter):
+ on_enter(sess, key_id)
+ sess.commit()
+ finally:
+ sess.unlock_module(YANG_MODULE_NAME)
delay, next_state_str = STATE_MACHINE[state_str].get('timed_transition', [0, None])
if delay:
- Timer(delay, handle_set_state, (conn, key_id, next_state_str)).start()
+ Timer(delay, try_change_state, (conn, key_id, next_state_str)).start()
finally:
sess.session_stop()
if __name__ == '__main__':
- xpath_ctx = sr.Xpath_Ctx()
- executor = ThreadPoolExecutor(max_workers=2)
- main()
+ main()
\ No newline at end of file