Fix Python linting issues in Python scripts
[integration.git] / test / mocks / pnfsimulator / netconfsimulator / netconf / netopeer_change_saver.py
1 ###
2 # ============LICENSE_START=======================================================
3 # Simulator
4 # ================================================================================
5 # Copyright (C) 2019 Nokia. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
19 ###
20
21 import sysrepo as sr
22 import sys
23 import json
24 import time
25 import logging
26 from kafka import KafkaProducer
27 from enum import Enum
28
29 logging.basicConfig(filename='netopeer_change_saver.log', level=logging.DEBUG)
30
31 kafka_producer = None
32 topic = "config"
33
34
35 class OperationType(Enum):
36     CREATED = sr.SR_OP_CREATED
37     DELETED = sr.SR_OP_DELETED
38     MODIFIED = sr.SR_OP_MODIFIED
39     MOVED = sr.SR_OP_MOVED
40
41
42 def module_change_callback(session, name, event, private_ctx):
43     if sr.SR_EV_APPLY == event:
44         change_path = "/{}:*".format(name)
45         changes = session.get_changes_iter(change_path)
46         change = session.get_change_next(changes)
47         while change:
48             try:
49                 process_change(change)
50                 change = session.get_change_next(changes)
51             except Exception:
52                 logging.exception("Exception occured")
53
54     return sr.SR_ERR_OK
55
56
57 def process_change(change):
58     if change:
59         message = {"type": OperationType(change.oper()).name}
60         if change.old_val():
61             message["old"] = {"path": change.old_val().xpath(), "value": change.old_val().val_to_string()}
62         if change.new_val():
63             message["new"] = {"path": change.new_val().xpath(), "value": change.new_val().val_to_string()}
64         send_message(message)
65
66
67 def send_message(message):
68     logging.debug("Message to kafka : %s", message)
69     response = kafka_producer.send(topic, message)
70     logging.info(response.get(timeout=90))
71
72
73 def create_producer(server):
74     for i in range(10): # pylint: disable=W0612
75         try:
76             return KafkaProducer(bootstrap_servers=server, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
77         except Exception:
78             time.sleep(15)
79     raise Exception("Could not connect to kafka server")
80
81
82 def print_current_config(kafka_session, module):
83     name = "/{}:*//*".format(module)
84     logging.info("Retrieving current config for %s module", name)
85     values = kafka_session.get_items(name)
86     for i in range(values.val_cnt()):
87         logging.info(values.val(i).to_string())
88
89
90 if __name__ == "__main__":
91     try:
92         module_name = sys.argv[1]
93         bootstrap_servers = sys.argv[2]
94         topic = sys.argv[3]
95         connection = sr.Connection("example_application2")
96         session = sr.Session(connection)
97         subscribe = sr.Subscribe(session)
98         subscribe.module_change_subscribe(module_name, module_change_callback)
99
100         print_current_config(session, module_name)
101
102         kafka_producer = create_producer(bootstrap_servers)
103
104         sr.global_loop()
105     except Exception as e:
106         logging.exception("Exception occured")
107         raise e