Merge "Drools-apps CSIT randomly fails deploying policies"
[integration/csit.git] / plans / usecases / pnf-sw-upgrade / simulators / pnfsim / pnf-sw-upgrade / subscriber.py
1 #!/usr/bin/env python3
2
3 # ============LICENSE_START=======================================================
4 #  Copyright (C) 2020 Nordix Foundation.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #      http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18 # SPDX-License-Identifier: Apache-2.0
19 # ============LICENSE_END=========================================================
20
21 __author__ = "Eliezio Oliveira <eliezio.oliveira@est.tech>"
22 __copyright__ = "Copyright (C) 2020 Nordix Foundation"
23 __license__ = "Apache 2.0"
24
25 import time
26 from concurrent.futures import ThreadPoolExecutor
27 from threading import Timer
28
29 import sysrepo as sr
30
31 YANG_MODULE_NAME = 'pnf-sw-upgrade'
32
33 #
34 # ----- BEGIN Finite State Machine definitions -----
35 #
36
37 # Actions
38 ACT_PRE_CHECK = 'PRE_CHECK'
39 ACT_DOWNLOAD_NE_SW = 'DOWNLOAD_NE_SW'
40 ACT_ACTIVATE_NE_SW = 'ACTIVATE_NE_SW'
41 ACT_CANCEL = 'CANCEL'
42
43 # States
44 ST_CREATED = 'CREATED'
45 ST_INITIALIZED = 'INITIALIZED'
46 ST_DOWNLOAD_IN_PROGRESS = 'DOWNLOAD_IN_PROGRESS'
47 ST_DOWNLOAD_COMPLETED = 'DOWNLOAD_COMPLETED'
48 ST_ACTIVATION_IN_PROGRESS = 'ACTIVATION_IN_PROGRESS'
49 ST_ACTIVATION_COMPLETED = 'ACTIVATION_COMPLETED'
50
51 # Timeout used for timed transitions
52 TO_DOWNLOAD = 7
53 TO_ACTIVATION = 7
54
55
56 def timestamper(sess, key_id):
57     xpath = xpath_of(key_id, 'state-change-time')
58     now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
59     state = sr.Val(now, sr.SR_STRING_T)
60     sess.set_item(xpath, state)
61
62
63 def xpath_of(key_id, leaf_id):
64     selector = "[id='{0}']".format(key_id) if key_id else ''
65     return "/%s:software-upgrade/upgrade-package%s/%s" % (YANG_MODULE_NAME, selector, leaf_id)
66
67
68 """
69 The finite state machine (FSM) is represented as a dictionary where the current state is the key, and its value is
70 an object (also represented as a dictionary) with the following optional attributes:
71
72 - on_enter: a function called when FSM enters this state;
73 - transitions: a dictionary mapping every acceptable action to the target state;
74 - timed_transition: a pair for a timed transition that will automatically occur after a given interval.
75 """
76 STATE_MACHINE = {
77     ST_CREATED: {
78         'transitions': {ACT_PRE_CHECK: ST_INITIALIZED}
79     },
80     ST_INITIALIZED: {
81         'on_enter': timestamper,
82         'transitions': {ACT_DOWNLOAD_NE_SW: ST_DOWNLOAD_IN_PROGRESS}
83     },
84     ST_DOWNLOAD_IN_PROGRESS: {
85         'on_enter': timestamper,
86         'timed_transition': (TO_DOWNLOAD, ST_DOWNLOAD_COMPLETED),
87         'transitions': {ACT_CANCEL: ST_INITIALIZED}
88     },
89     ST_DOWNLOAD_COMPLETED: {
90         'on_enter': timestamper,
91         'transitions': {ACT_ACTIVATE_NE_SW: ST_ACTIVATION_IN_PROGRESS}
92     },
93     ST_ACTIVATION_IN_PROGRESS: {
94         'on_enter': timestamper,
95         'timed_transition': (TO_ACTIVATION, ST_ACTIVATION_COMPLETED),
96         'transitions': {ACT_CANCEL: ST_DOWNLOAD_COMPLETED}
97     },
98     ST_ACTIVATION_COMPLETED: {
99         'on_enter': timestamper,
100         'transitions': {ACT_ACTIVATE_NE_SW: ST_ACTIVATION_IN_PROGRESS}
101     }
102 }
103
104 #
105 # ----- END Finite State Machine definitions -----
106 #
107
108
109 def main():
110     try:
111         conn = sr.Connection(YANG_MODULE_NAME)
112         sess = sr.Session(conn)
113         subscribe = sr.Subscribe(sess)
114
115         subscribe.module_change_subscribe(YANG_MODULE_NAME, module_change_cb, conn)
116
117         try:
118             print_current_config(sess, YANG_MODULE_NAME)
119         except Exception as e:
120             print(e)
121
122         sr.global_loop()
123
124         print("Application exit requested, exiting.")
125     except Exception as e:
126         print(e)
127
128
129 # Function to be called for subscribed client of given session whenever configuration changes.
130 def module_change_cb(sess, module_name, event, private_ctx):
131     try:
132         conn = private_ctx
133         change_path = xpath_of(None, 'action')
134         it = sess.get_changes_iter(change_path)
135         while True:
136             change = sess.get_change_next(it)
137             if change is None:
138                 break
139             handle_change(conn, change.oper(), change.old_val(), change.new_val())
140     except Exception as e:
141         print(e)
142     return sr.SR_ERR_OK
143
144
145 # Function to print current configuration state.
146 # It does so by loading all the items of a session and printing them out.
147 def print_current_config(session, module_name):
148     select_xpath = f"/{module_name}:*//*"
149
150     values = session.get_items(select_xpath)
151
152     if values is not None:
153         print("========== BEGIN CONFIG ==========")
154         for i in range(values.val_cnt()):
155             print(values.val(i).to_string(), end='')
156         print("=========== END CONFIG ===========")
157
158
159 def handle_change(conn, op, old_val, new_val):
160     """
161     Handle individual changes on the model.
162     """
163     if op == sr.SR_OP_CREATED:
164         print("CREATED: %s" % new_val.to_string())
165         xpath = new_val.xpath()
166         last_node = xpath_ctx.last_node(xpath)
167         # Warning: 'key_value' modifies 'xpath'!
168         key_id = xpath_ctx.key_value(xpath, 'upgrade-package', 'id')
169         if key_id and last_node == 'action':
170             executor.submit(execute_action, conn, key_id, new_val.data().get_enum())
171     elif op == sr.SR_OP_DELETED:
172         print("DELETED: %s" % old_val.to_string())
173     elif op == sr.SR_OP_MODIFIED:
174         print("MODIFIED: %s to %s" % (old_val.to_string(), new_val.to_string()))
175     elif op == sr.SR_OP_MOVED:
176         print("MOVED: %s after %s" % (new_val.xpath(), old_val.xpath()))
177
178
179 def execute_action(conn, key_id, action):
180     sess = sr.Session(conn)
181     try:
182         cur_state = sess.get_item(xpath_of(key_id, 'current-status')).data().get_enum()
183         next_state_str = STATE_MACHINE[cur_state]['transitions'].get(action, None)
184         if next_state_str:
185             handle_set_state(conn, key_id, next_state_str)
186         sess.delete_item(xpath_of(key_id, 'action'))
187         sess.commit()
188     finally:
189         sess.session_stop()
190
191
192 def handle_set_state(conn, key_id, state_str):
193     sess = sr.Session(conn)
194     try:
195         state = sr.Val(state_str, sr.SR_ENUM_T)
196         sess.set_item(xpath_of(key_id, 'current-status'), state)
197         on_enter = STATE_MACHINE[state_str].get('on_enter', None)
198         if on_enter:
199             # noinspection PyCallingNonCallable
200             on_enter(sess, key_id)
201         sess.commit()
202         delay, next_state_str = STATE_MACHINE[state_str].get('timed_transition', [0, None])
203         if delay:
204             Timer(delay, handle_set_state, (conn, key_id, next_state_str)).start()
205     finally:
206         sess.session_stop()
207
208
209 if __name__ == '__main__':
210     xpath_ctx = sr.Xpath_Ctx()
211     executor = ThreadPoolExecutor(max_workers=2)
212     main()