2 # ============LICENSE_START=======================================================
4 # ================================================================================
5 # Copyright (C) 2019 Nokia. All rights reserved.
6 # ================================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END=========================================================
26 from kafka import KafkaProducer
29 logging.basicConfig(filename='netopeer_change_saver.log', level=logging.DEBUG)
35 class OperationType(Enum):
36 CREATED = sr.SR_OP_CREATED
37 DELETED = sr.SR_OP_DELETED
38 MODIFIED = sr.SR_OP_MODIFIED
39 MOVED = sr.SR_OP_MOVED
42 def module_change_callback(session, name, event, private_ctx):
43 if sr.SR_EV_APPLY == event:
44 change_path = "/{}:*".format(name)
45 changes = session.get_changes_iter(change_path)
46 change = session.get_change_next(changes)
49 process_change(change)
50 change = session.get_change_next(changes)
52 logging.exception("Exception occured")
57 def process_change(change):
59 message = {"type": OperationType(change.oper()).name}
61 message["old"] = {"path": change.old_val().xpath(), "value": change.old_val().val_to_string()}
63 message["new"] = {"path": change.new_val().xpath(), "value": change.new_val().val_to_string()}
67 def send_message(message):
68 logging.debug("Message to kafka : %s", message)
69 response = kafka_producer.send(topic, message)
70 logging.info(response.get(timeout=90))
73 def create_producer(server):
74 for i in range(10): # pylint: disable=W0612
76 return KafkaProducer(bootstrap_servers=server, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
79 raise Exception("Could not connect to kafka server")
82 def print_current_config(kafka_session, module):
83 name = "/{}:*//*".format(module)
84 logging.info("Retrieving current config for %s module", name)
85 values = kafka_session.get_items(name)
86 for i in range(values.val_cnt()):
87 logging.info(values.val(i).to_string())
90 if __name__ == "__main__":
92 module_name = sys.argv[1]
93 bootstrap_servers = sys.argv[2]
95 connection = sr.Connection("example_application2")
96 session = sr.Session(connection)
97 subscribe = sr.Subscribe(session)
98 subscribe.module_change_subscribe(module_name, module_change_callback)
100 print_current_config(session, module_name)
102 kafka_producer = create_producer(bootstrap_servers)
105 except Exception as e:
106 logging.exception("Exception occured")