Add Kafka support 11/120011/6
authorBogumil Zebek <bogumil.zebek@nokia.com>
Tue, 30 Mar 2021 13:01:37 +0000 (15:01 +0200)
committerZebek Bogumil <bogumil.zebek@nokia.com>
Fri, 2 Apr 2021 05:39:48 +0000 (07:39 +0200)
- send changes on Kafka topic
- add endpoint for fetching changes from Kafka

Signed-off-by: Bogumil Zebek <bogumil.zebek@nokia.com>
Issue-ID: INT-1869
Signed-off-by: Zebek Bogumil <bogumil.zebek@nokia.com>
Change-Id: I349fdc4295659fc69407b5b1138281e2673f7938

18 files changed:
Dockerfile
pom.xml
src/python/netconf_change_listener_application.py
src/python/netconf_rest_application.py
src/python/netconf_server/kafka_consumer_factory.py [new file with mode: 0644]
src/python/netconf_server/netconf_app_configuration.py
src/python/netconf_server/netconf_change_listener.py
src/python/netconf_server/netconf_change_listener_factory.py
src/python/netconf_server/netconf_kafka_client.py
src/python/netconf_server/netconf_kafka_message_factory.py [new file with mode: 0644]
src/python/netconf_server/netconf_rest_server.py
src/python/netconf_server/sysrepo_interface/config_change_subscriber.py
src/python/netconf_server/sysrepo_interface/sysrepo_message_model.py [new file with mode: 0644]
src/python/requirements.txt
src/python/test-requirements.txt
src/python/tests/unit/test_netconf_change_listener.py [new file with mode: 0644]
src/python/tests/unit/test_netconf_change_listener_factory.py [moved from src/python/tests/unit/test_netconf_chang_listener.py with 61% similarity]
src/python/tests/unit/test_netconf_kafka_message_factory.py [new file with mode: 0644]

index 1303b40..2ac0c5f 100644 (file)
@@ -20,6 +20,6 @@ RUN mkdir -p /resources/certs && \
 ENV ENABLE_TLS=false
 ENV KAFKA_HOST_NAME="kafka1"
 ENV KAFKA_PORT=9092
-ENV KAFKA_TOPIC="config:1:1"
+ENV KAFKA_TOPIC="config"
 
 ENTRYPOINT ["./scripts/set-up-netopeer.sh", "/resources/models", "/resources/certs"]
diff --git a/pom.xml b/pom.xml
index 3071906..b161eff 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                             </arguments>
                         </configuration>
                     </execution>
+                    <execution>
+                        <id>python-clean</id>
+                        <phase>clean</phase>
+                        <goals>
+                            <goal>exec</goal>
+                        </goals>
+                        <configuration>
+                            <workingDirectory>./src/python</workingDirectory>
+                            <executable>rm</executable>
+                            <arguments>
+                                <argument>-rf</argument>
+                                <argument>.tox</argument>
+                            </arguments>
+                        </configuration>
+                    </execution>
                 </executions>
             </plugin>
         </plugins>
index 34b00b3..400ff3d 100644 (file)
@@ -21,7 +21,6 @@ import asyncio
 import sys
 import logging
 
-from typing import Tuple
 
 from netconf_server.netconf_app_configuration import NetconfAppConfiguration
 
@@ -33,7 +32,7 @@ from netconf_server.sysrepo_interface.sysrepo_client import SysrepoClient
 
 logging.basicConfig(
     handlers=[logging.StreamHandler(), logging.FileHandler("/logs/netconf_change_listener.log")],
-    level=logging.DEBUG
+    level=logging.INFO
 )
 logger = logging.getLogger("netconf_change_listener")
 
@@ -43,9 +42,9 @@ def run_server_forever(session, connection, change_listener: NetconfChangeListen
     asyncio.get_event_loop().run_forever()
 
 
-def create_change_listener(module_configuration_file_path: str) -> NetconfChangeListener:
-    configuration = SysrepoConfigurationLoader.load_configuration(module_configuration_file_path)
-    return NetconfChangeListenerFactory(configuration.models_to_subscribe_to).create()
+def create_change_listener(app_configuration: NetconfAppConfiguration) -> NetconfChangeListener:
+    configuration = SysrepoConfigurationLoader.load_configuration(app_configuration.module_configuration_file_path)
+    return NetconfChangeListenerFactory(configuration.models_to_subscribe_to, app_configuration).create()
 
 
 if __name__ == "__main__":
@@ -54,7 +53,7 @@ if __name__ == "__main__":
     if app_configuration:
         logger.info("Netconf change listener application configuration: {}".format(app_configuration))
         try:
-            netconf_change_listener = create_change_listener(app_configuration.module_configuration_file_path)
+            netconf_change_listener = create_change_listener(app_configuration)
             SysrepoClient().run_in_session(run_server_forever, netconf_change_listener)
         except ConfigLoadingException:
             logger.error("File to load configuration from file %s" % app_configuration.module_configuration_file_path)
index 0a040c9..b13a084 100644 (file)
@@ -21,6 +21,7 @@ import sys
 import logging
 
 from netconf_server.netconf_app_configuration import NetconfAppConfiguration
+from netconf_server.netconf_kafka_client import provide_configured_kafka_client
 from netconf_server.netconf_rest_server import NetconfRestServer
 from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager
 
@@ -28,14 +29,18 @@ from netconf_server.sysrepo_interface.sysrepo_client import SysrepoClient
 
 logging.basicConfig(
     handlers=[logging.StreamHandler(), logging.FileHandler("/logs/netconf_rest_application.log")],
-    level=logging.DEBUG
+    level=logging.INFO
 )
 logger = logging.getLogger("netconf_rest_application")
 
 
 def start_rest_server(session, connection, server_rest: NetconfRestServer, netconf_app_configuration: NetconfAppConfiguration):
     sysrepo_cfg_manager = create_conf_manager(session, connection)
-    server_rest.start(sysrepo_cfg_manager, netconf_app_configuration)
+    kafka_client = provide_configured_kafka_client(
+        netconf_app_configuration.kafka_host_name,
+        netconf_app_configuration.kafka_port
+    )
+    server_rest.start(sysrepo_cfg_manager, kafka_client, netconf_app_configuration.kafka_topic)
 
 
 def create_rest_server() -> NetconfRestServer:
diff --git a/src/python/netconf_server/kafka_consumer_factory.py b/src/python/netconf_server/kafka_consumer_factory.py
new file mode 100644 (file)
index 0000000..332cd21
--- /dev/null
@@ -0,0 +1,35 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+from json import loads
+
+from kafka import KafkaConsumer
+
+STANDARD_CHARSETS_UTF8 = 'utf-8'
+
+
+def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer:
+    return KafkaConsumer(topic,
+                         consumer_timeout_ms=1000,
+                         group_id='netconf-group',
+                         auto_offset_reset='earliest',
+                         enable_auto_commit=False,
+                         bootstrap_servers=[server],
+                         value_deserializer=lambda x: loads(x.decode(STANDARD_CHARSETS_UTF8))
+                         )
index 0e25a41..1aaf12c 100644 (file)
@@ -38,6 +38,7 @@ class NetconfAppConfiguration(object):
         self.kafka_topic = kafka_topic
 
     def __str__(self):
-        return "NetconfAppConfiguration[configuration_file -> '{}', kafka_host_name -> '{}', kafka_port -> '{}', kafka_topic -> '{}']"\
+        return "NetconfAppConfiguration[configuration_file -> '{}', " \
+               "kafka_host_name -> '{}', kafka_port -> '{}', kafka_topic -> '{}']"\
             .format(self.module_configuration_file_path, self.kafka_host_name, self.kafka_port, self.kafka_topic)
 
index 44910d1..628f757 100644 (file)
 ###
 import logging
 
+from kafka.producer.future import FutureRecordMetadata
+
+
+from netconf_server.netconf_kafka_client import NetconfKafkaClient
+from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory
 from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData
+from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage
 
 logger = logging.getLogger(__name__)
 
 
 class NetconfChangeListener(object):
 
-    def __init__(self, subscriptions: list):
+    def __init__(self, subscriptions: list, kafka_client: NetconfKafkaClient, topic: str):
         self.subscriptions = subscriptions
+        self.kafka_client = kafka_client
+        self.topic = topic
 
     def run(self, session):
         for subscription in self.subscriptions:
-            subscription.callback_function = self.__on_module_configuration_change
+            subscription.callback_function = self._on_module_configuration_change
             subscription.subscribe_on_model_change(session)
 
+    def _on_module_configuration_change(self, config_change_data: ConfigChangeData):
+        logger.info("Received module changed: {} , {} ".format(config_change_data.event, config_change_data.changes))
+        if config_change_data.event != "done":
+            self._send_change_to_kafka(config_change_data)
+
+    def _send_change_to_kafka(self, config_change_data):
+        for change in config_change_data.changes:
+            try:
+                kafka_message = NetconfChangeListener._create_kafka_message(change)
+                logging.info("Sending message '{}' to Kafka '{}' topic".format(kafka_message, self.topic))
+                response = self.kafka_client.send(self.topic, kafka_message)  # type: FutureRecordMetadata
+                logging.info("Response from Kafka: {}".format(response.get(timeout=1)))
+
+            except Exception as e:
+                logger.error("Exception occurred during handling of sysrepo config change", e)
+        logger.info("Module changes sent to Kafka. Operation finished.")
+
     @staticmethod
-    def __on_module_configuration_change(config_change_data: ConfigChangeData):
-        logger.info("Received module changed: %s , %s " % (config_change_data.event, config_change_data.changes))
+    def _create_kafka_message(change):
+        return NetconfKafkaMessageFactory.create(SysrepoMessage(change))
index fa5e071..9b9ff24 100644 (file)
 ###
 import logging
 
+
+from netconf_server.netconf_app_configuration import NetconfAppConfiguration
 from netconf_server.netconf_change_listener import NetconfChangeListener
+from netconf_server.netconf_kafka_client import NetconfKafkaClient, provide_configured_kafka_client
 from netconf_server.sysrepo_interface.config_change_subscriber import ConfigChangeSubscriber
 
 logger = logging.getLogger(__name__)
@@ -27,8 +30,9 @@ logger = logging.getLogger(__name__)
 
 class NetconfChangeListenerFactory(object):
 
-    def __init__(self, modules_to_subscribe_names: list):
+    def __init__(self, modules_to_subscribe_names: list, app_configuration: NetconfAppConfiguration):
         self.modules_to_subscribe_names = modules_to_subscribe_names
+        self.app_configuration = app_configuration
 
     def create(self) -> NetconfChangeListener:
         subscriptions = list()
@@ -36,5 +40,13 @@ class NetconfChangeListenerFactory(object):
             subscriptions.append(
                 ConfigChangeSubscriber(module_name)
             )
-        return NetconfChangeListener(subscriptions)
+        kafka_client = NetconfChangeListenerFactory._try_to_create_kafka_client(
+            self.app_configuration.kafka_host_name,
+            self.app_configuration.kafka_port
+        )
+
+        return NetconfChangeListener(subscriptions, kafka_client, self.app_configuration.kafka_topic)
 
+    @staticmethod
+    def _try_to_create_kafka_client(kafka_host_name: str, kafka_port: int):
+        return provide_configured_kafka_client(kafka_host_name, kafka_port)  # type: NetconfKafkaClient
index b0effc3..8687802 100644 (file)
 # ============LICENSE_END=========================================================
 ###
 import logging
-from json import dumps, loads
-from typing import Callable
+from json import dumps
+from typing import Callable, Any
 
 from kafka import KafkaProducer, KafkaConsumer
+from kafka.errors import NoBrokersAvailable
 from kafka.producer.future import FutureRecordMetadata
+from retry import retry
+
+from netconf_server.kafka_consumer_factory import provide_kafka_consumer
 
 STANDARD_CHARSETS_UTF8 = 'utf-8'
 
-logger = logging.getLogger("netconf_kafka_client")
+logger = logging.getLogger(__name__)
 
 
-def provide_kafka_consumer(topic: str, server: str) -> KafkaConsumer:
-    return KafkaConsumer(topic,
-                         consumer_timeout_ms=1000,
-                         group_id='netconf-group',
-                         auto_offset_reset='earliest',
-                         enable_auto_commit=True,
-                         bootstrap_servers=[server],
-                         value_deserializer=lambda x: loads(x.decode(STANDARD_CHARSETS_UTF8))
-                         )
+@retry(NoBrokersAvailable, tries=3, delay=5)
+def provide_configured_kafka_client(kafka_host_name, kafka_port):
+    return NetconfKafkaClient.create(
+        host=kafka_host_name,
+        port=kafka_port
+    )  # type: NetconfKafkaClient
 
 
 class NetconfKafkaClient(object):
 
     @staticmethod
-    def create(host: str, port: int) -> object:
+    def create(host: str, port: int):
         server = "{}:{}".format(host, port)
         producer = KafkaProducer(
             bootstrap_servers=server,
@@ -59,7 +60,7 @@ class NetconfKafkaClient(object):
         self._producer = producer
         self._get_kafka_consumer = get_kafka_consumer_func
 
-    def send(self, topic: str, value: str) -> FutureRecordMetadata:
+    def send(self, topic: str, value: Any) -> FutureRecordMetadata:
         return self._producer.send(
             topic=topic,
             value=value
@@ -72,7 +73,7 @@ class NetconfKafkaClient(object):
         consumer = self._get_kafka_consumer(topic)
         for message in consumer:
             message_value = message.value
-            logger.debug("Fetched config change %s" % message_value)
+            logger.info("Fetched config change %s" % message_value)
             messages.append(message_value)
 
         return messages
diff --git a/src/python/netconf_server/netconf_kafka_message_factory.py b/src/python/netconf_server/netconf_kafka_message_factory.py
new file mode 100644 (file)
index 0000000..e39556e
--- /dev/null
@@ -0,0 +1,54 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+import logging
+
+from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage
+
+logger = logging.getLogger(__name__)
+
+
+class NetconfKafkaMessageFactory(object):
+
+    @classmethod
+    def create(cls, change: SysrepoMessage) -> dict:
+        message = {}
+        if change and change.is_modified():
+            logger.debug("Parsing change modified")
+            message = cls._create_modified_message(change)
+        elif change and change.is_created():
+            logger.debug("Parsing change created")
+            message = cls._create_created_message(change)
+        return message
+
+    @classmethod
+    def _create_created_message(cls, change: SysrepoMessage) -> dict:
+        message = {"type": "ChangeCreated"}
+        if change.value():
+            message["new"] = {"path": change.xpath(), "value": change.value()}
+        return message
+
+    @classmethod
+    def _create_modified_message(cls, change: SysrepoMessage) -> dict:
+        message = {"type": "ChangeModified"}
+        if change.prev_val():
+            message["old"] = {"path": change.xpath(), "value": change.prev_val()}
+        if change.value():
+            message["new"] = {"path": change.xpath(), "value": change.value()}
+        return message
index 8568454..edaa6e8 100644 (file)
@@ -18,9 +18,8 @@
 # ============LICENSE_END=========================================================
 ###
 
-from flask import Flask, logging, make_response, Response, request
+from flask import Flask, logging, make_response, Response, request, jsonify
 
-from netconf_server.netconf_app_configuration import NetconfAppConfiguration
 from netconf_server.netconf_kafka_client import NetconfKafkaClient
 from netconf_server.sysrepo_configuration.sysrepo_configuration_manager import SysrepoConfigurationManager
 
@@ -29,7 +28,8 @@ class NetconfRestServer:
     _rest_server: Flask = Flask("server")
     logger = logging.create_logger(_rest_server)
     _configuration_manager: SysrepoConfigurationManager
-    _app_configuration: NetconfAppConfiguration
+    _kafka_topic: str
+    _kafka_client: NetconfKafkaClient
 
     def __init__(self, host='0.0.0.0', port=6555):
         self._host = host
@@ -37,9 +37,11 @@ class NetconfRestServer:
 
     def start(self,
               configuration_manager: SysrepoConfigurationManager,
-              netconf_app_configuration: NetconfAppConfiguration):
+              kafka_client: NetconfKafkaClient,
+              kafka_topic: str):
         NetconfRestServer._configuration_manager = configuration_manager
-        NetconfRestServer._app_configuration = netconf_app_configuration
+        NetconfRestServer._kafka_client = kafka_client
+        NetconfRestServer._kafka_topic = kafka_topic
         Flask.run(
             NetconfRestServer._rest_server,
             host=self._host,
@@ -54,22 +56,11 @@ class NetconfRestServer:
     @staticmethod
     @_rest_server.route("/readiness")
     def _readiness_check():
-        try:
-            NetconfRestServer.__try_connect_to_kafka()
+        if NetconfRestServer._kafka_client:
             return Response('Ready', status=200)
-        except Exception as e:
-            NetconfRestServer.logger.error("Unable to create a Kafka client", e)
+        else:
             return Response('Not Ready', status=503)
 
-    # if Kafka is up & running and hostname with port is proper, then client will be created; otherwise
-    # an error will be reported
-    @staticmethod
-    def __try_connect_to_kafka():
-        NetconfKafkaClient.create(
-            host=NetconfRestServer._app_configuration.kafka_host_name,
-            port=NetconfRestServer._app_configuration.kafka_port
-        )
-
     @staticmethod
     @_rest_server.route("/change_config/<path:module_name>", methods=['POST'])
     def _change_config(module_name):
@@ -77,13 +68,18 @@ class NetconfRestServer:
         NetconfRestServer._configuration_manager.change_configuration(config_data, module_name)
         return NetconfRestServer.__create_http_response(202, "Accepted")
 
+    @staticmethod
+    @_rest_server.route("/change_history")
+    def _change_history():
+        history = NetconfRestServer._kafka_client.get_all_messages_from(NetconfRestServer._kafka_topic)
+        return jsonify(history), 200
+
     @staticmethod
     @_rest_server.route("/get_config/<path:module_name>", methods=['GET'])
     def _get_config(module_name):
         data = NetconfRestServer._configuration_manager.get_configuration(module_name)
         return NetconfRestServer.__create_http_response(200, data)
 
-
     @staticmethod
     def __create_http_response(code, message):
         return make_response(
index faa8254..e3e09f0 100644 (file)
@@ -22,7 +22,7 @@ import logging
 
 from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData
 
-logger = logging.getLogger("sysrep_config_change_subscriber")
+logger = logging.getLogger(__name__)
 
 
 class ConfigChangeSubscriber(object):
diff --git a/src/python/netconf_server/sysrepo_interface/sysrepo_message_model.py b/src/python/netconf_server/sysrepo_interface/sysrepo_message_model.py
new file mode 100644 (file)
index 0000000..e21a9b5
--- /dev/null
@@ -0,0 +1,42 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+import sysrepo
+
+
+class SysrepoMessage(object):
+
+    def __init__(self, change):
+        self._change = change
+
+    def is_modified(self):
+        return isinstance(self._change, sysrepo.ChangeModified)
+
+    def is_created(self):
+        return isinstance(self._change, sysrepo.ChangeCreated)
+
+    def value(self):
+        return self._change.value
+
+    def xpath(self):
+        return self._change.xpath
+
+    def prev_val(self):
+        return self._change.prev_val
index eb07c40..1654c50 100644 (file)
@@ -21,3 +21,4 @@
 sysrepo==0.4.2
 Flask==1.1.1
 kafka-python==2.0.2
+retry==0.9.2
index 4983061..75de922 100644 (file)
@@ -20,4 +20,5 @@
 
 pytest==6.2.2
 kafka-python==2.0.2
+retry==0.9.2
 
diff --git a/src/python/tests/unit/test_netconf_change_listener.py b/src/python/tests/unit/test_netconf_change_listener.py
new file mode 100644 (file)
index 0000000..c9e11d3
--- /dev/null
@@ -0,0 +1,74 @@
+###
+# ============LICENSE_START=======================================================
+# Netconf Server
+# ================================================================================
+# Copyright (C) 2021 Nokia. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+import unittest
+from unittest.mock import MagicMock
+
+import sys
+
+# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution
+from netconf_server.sysrepo_interface.config_change_data import ConfigChangeData
+
+sys.modules['sysrepo'] = MagicMock()
+from netconf_server.netconf_change_listener import NetconfChangeListener
+
+KAFKA_TOPIC = "config"
+
+
+class TestNetconfChangeListener(unittest.TestCase):
+    def test_should_subscribe_on_run(self):
+        # given
+        subscriber1 = MagicMock()
+        subscriber2 = MagicMock()
+        kafka_client = MagicMock()
+        session = MagicMock()
+        netconf_change_listener = NetconfChangeListener([subscriber1, subscriber2], kafka_client, KAFKA_TOPIC)
+
+        # when
+        netconf_change_listener.run(session)
+
+        # then
+        subscriber1.subscribe_on_model_change.assert_called_once()
+        self.assertEqual(subscriber1.callback_function, netconf_change_listener._on_module_configuration_change)
+        subscriber2.subscribe_on_model_change.assert_called_once()
+        self.assertEqual(subscriber2.callback_function, netconf_change_listener._on_module_configuration_change)
+
+    def test_should_send_two_changes_at_kafka(self):
+        # given
+        subscriber1 = MagicMock()
+        subscriber2 = MagicMock()
+        kafka_client = MagicMock()
+        netconf_change_listener = NetconfChangeListener([subscriber1, subscriber2], kafka_client, KAFKA_TOPIC)
+        NetconfChangeListener._create_kafka_message = lambda _: MagicMock()
+
+        # when
+        netconf_change_listener._on_module_configuration_change(
+            ConfigChangeData(
+                event="event",
+                req_id=1,
+                changes=[MagicMock(), MagicMock()]
+            )
+        )
+
+        # then
+        self.assertEqual(kafka_client.send.call_count, 2)
+
+
+if __name__ == '__main__':
+    unittest.main()
 import unittest
 from unittest.mock import MagicMock
 
+import sys
+
+# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution
+sys.modules['sysrepo'] = MagicMock()
+
+from netconf_server.netconf_app_configuration import NetconfAppConfiguration
 from netconf_server.netconf_change_listener_factory import NetconfChangeListenerFactory
 from tests.mocs.mocked_session import MockedSession
 
 
-class TestNetconfServer(unittest.TestCase):
+class TestNetconfChangeListenerFactory(unittest.TestCase):
 
     def test_should_create_and_run_netconf_server_with_one_model(self):
         # given
         modules_to_subscribe_names = ["test"]
-        server = NetconfChangeListenerFactory(modules_to_subscribe_names).create()
+        factory = TestNetconfChangeListenerFactory._given_netconf_change_listener_factory(modules_to_subscribe_names)
+        server = factory.create()
         session = MockedSession()
         session.subscribe_module_change = MagicMock()
 
@@ -42,7 +49,8 @@ class TestNetconfServer(unittest.TestCase):
     def test_should_create_and_run_netconf_server_with_multiple_models(self):
         # given
         modules_to_subscribe_names = ["test", "test2", "test3"]
-        server = NetconfChangeListenerFactory(modules_to_subscribe_names).create()
+        factory = TestNetconfChangeListenerFactory._given_netconf_change_listener_factory(modules_to_subscribe_names)
+        server = factory.create()
         session = MockedSession()
         session.subscribe_module_change = MagicMock()
 
@@ -51,3 +59,12 @@ class TestNetconfServer(unittest.TestCase):
 
         # then
         self.assertEqual(session.subscribe_module_change.call_count, 3)
+
+    @staticmethod
+    def _given_netconf_change_listener_factory(modules_to_subscribe_names: list) -> NetconfChangeListenerFactory:
+        app_configuration, _ = NetconfAppConfiguration.get_configuration(
+            ["../models", "models-configuration.ini", "127.0.0.1", "9092",
+             "kafka1"])  # type: NetconfAppConfiguration, str
+        factory = NetconfChangeListenerFactory(modules_to_subscribe_names, app_configuration)
+        NetconfChangeListenerFactory._try_to_create_kafka_client = lambda host, port: MagicMock()
+        return factory
diff --git a/src/python/tests/unit/test_netconf_kafka_message_factory.py b/src/python/tests/unit/test_netconf_kafka_message_factory.py
new file mode 100644 (file)
index 0000000..b899bd9
--- /dev/null
@@ -0,0 +1,75 @@
+from collections import namedtuple
+from unittest import TestCase
+from unittest.mock import MagicMock
+
+import sys
+
+# we need to mock sysrepo library. It is not possible to install it in the newest version of the Linux distribution
+sys.modules['sysrepo'] = MagicMock()
+
+from netconf_server.sysrepo_interface.sysrepo_message_model import SysrepoMessage
+
+from netconf_server.netconf_kafka_message_factory import NetconfKafkaMessageFactory
+
+SYSREPO_MESSAGE_MODEL = namedtuple('SM', ['value', 'xpath', 'prev_val'])
+
+
+class TestNetconfKafkaMessageFactory(TestCase):
+    def test_should_return_empty_dict_when_sysrepo_message_is_none(self):
+        # when
+        actual = NetconfKafkaMessageFactory.create(None)
+
+        # then
+        self.assertEqual({}, actual)
+
+    def test_should_prepare_message_for_sysrepo_message_with_status_change_created(self):
+        # given
+        s = SYSREPO_MESSAGE_MODEL(44, '/pnf-simulator:config/itemValue1', None)
+
+        sysrepo_message = SysrepoMessage(s)
+        sysrepo_message.is_modified = lambda: False
+        sysrepo_message.is_created = lambda: True
+
+        # when
+        actual = NetconfKafkaMessageFactory.create(sysrepo_message)
+
+        # then
+        self.assertEqual(
+            {'type': 'ChangeCreated', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}},
+            actual
+        )
+
+    def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_no_old_value(self):
+        # given
+        s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', None)
+
+        sysrepo_message = SysrepoMessage(s)
+        sysrepo_message.is_modified = lambda: True
+        sysrepo_message.is_created = lambda: False
+
+        # when
+        actual = NetconfKafkaMessageFactory.create(sysrepo_message)
+
+        # then
+        self.assertEqual(
+            {'type': 'ChangeModified', 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}},
+            actual
+        )
+
+    def test_should_prepare_message_for_sysrepo_message_with_status_change_modified_old_value_exists(self):
+        # given
+        s = SYSREPO_MESSAGE_MODEL(45, '/pnf-simulator:config/itemValue1', 44)
+
+        sysrepo_message = SysrepoMessage(s)
+        sysrepo_message.is_modified = lambda: True
+        sysrepo_message.is_created = lambda: False
+
+        # when
+        actual = NetconfKafkaMessageFactory.create(sysrepo_message)
+
+        # then
+        self.assertEqual(
+            {'type': 'ChangeModified', 'old': {'path': '/pnf-simulator:config/itemValue1', 'value': 44}, 'new': {'path': '/pnf-simulator:config/itemValue1', 'value': 45}},
+            actual
+        )
+