Add retry mechanism when trying to connect to kafka 72/120772/3 1.0.1
authorEdyta Krukowska <edyta.krukowska@nokia.com>
Wed, 21 Apr 2021 13:23:13 +0000 (15:23 +0200)
committerEdyta Krukowska <edyta.krukowska@nokia.com>
Thu, 22 Apr 2021 04:42:11 +0000 (06:42 +0200)
Issue-ID: INT-1869
Signed-off-by: Edyta Krukowska <edyta.krukowska@nokia.com>
Change-Id: I926344788343b946dfdfc39c1e663a03531cbfe6

Changelog.md
pom.xml
src/python/netconf_change_listener_application.py
src/python/netconf_server/netconf_change_listener_factory.py
version.properties

index 8bccd3e..a8bc977 100644 (file)
@@ -4,5 +4,9 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](http://keepachangelog.com/)
 and this project adheres to [Semantic Versioning](http://semver.org/).
 
+
+## [1.0.1] - 22/04/2021
+        - [INT-1869] (https://jira.onap.org/browse/INT-1869) Add retry mechanism when trying to connect to kafka
+
 ## [1.0.0] - 10/03/2021
         - [INT-1869] (https://jira.onap.org/browse/INT-1869) Create netconf-server
diff --git a/pom.xml b/pom.xml
index b161eff..d75ffda 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -31,7 +31,7 @@
 
     <groupId>org.onap.integration.simulators.nf-simulator.netconf-server</groupId>
     <artifactId>netconfserver</artifactId>
-    <version>1.0.0-SNAPSHOT</version>
+    <version>1.0.1-SNAPSHOT</version>
     <name>netconfserver</name>
 
     <properties>
index 400ff3d..97b2130 100644 (file)
 import asyncio
 import sys
 import logging
-
+from retry import retry
 
 from netconf_server.netconf_app_configuration import NetconfAppConfiguration
 
 from netconf_server.netconf_change_listener import NetconfChangeListener
-from netconf_server.netconf_change_listener_factory import NetconfChangeListenerFactory
+from netconf_server.netconf_change_listener_factory import NetconfChangeListenerFactory, NetconfChangeListenerException
 from netconf_server.sysrepo_configuration.sysrepo_configuration_loader import SysrepoConfigurationLoader, \
     ConfigLoadingException
 from netconf_server.sysrepo_interface.sysrepo_client import SysrepoClient
@@ -44,6 +44,11 @@ def run_server_forever(session, connection, change_listener: NetconfChangeListen
 
 def create_change_listener(app_configuration: NetconfAppConfiguration) -> NetconfChangeListener:
     configuration = SysrepoConfigurationLoader.load_configuration(app_configuration.module_configuration_file_path)
+    return subscribe_to_kafka(app_configuration, configuration)
+
+
+@retry(NetconfChangeListenerException, tries=10, delay=10)
+def subscribe_to_kafka(app_configuration, configuration):
     return NetconfChangeListenerFactory(configuration.models_to_subscribe_to, app_configuration).create()
 
 
index 9b9ff24..f023542 100644 (file)
@@ -19,7 +19,6 @@
 ###
 import logging
 
-
 from netconf_server.netconf_app_configuration import NetconfAppConfiguration
 from netconf_server.netconf_change_listener import NetconfChangeListener
 from netconf_server.netconf_kafka_client import NetconfKafkaClient, provide_configured_kafka_client
@@ -35,18 +34,25 @@ class NetconfChangeListenerFactory(object):
         self.app_configuration = app_configuration
 
     def create(self) -> NetconfChangeListener:
-        subscriptions = list()
-        for module_name in self.modules_to_subscribe_names:
-            subscriptions.append(
-                ConfigChangeSubscriber(module_name)
+        try:
+            subscriptions = list()
+            for module_name in self.modules_to_subscribe_names:
+                subscriptions.append(
+                    ConfigChangeSubscriber(module_name)
+                )
+            kafka_client = NetconfChangeListenerFactory._try_to_create_kafka_client(
+                self.app_configuration.kafka_host_name,
+                self.app_configuration.kafka_port
             )
-        kafka_client = NetconfChangeListenerFactory._try_to_create_kafka_client(
-            self.app_configuration.kafka_host_name,
-            self.app_configuration.kafka_port
-        )
 
-        return NetconfChangeListener(subscriptions, kafka_client, self.app_configuration.kafka_topic)
+            return NetconfChangeListener(subscriptions, kafka_client, self.app_configuration.kafka_topic)
+        except Exception as e:
+            raise NetconfChangeListenerException(e)
 
     @staticmethod
     def _try_to_create_kafka_client(kafka_host_name: str, kafka_port: int):
         return provide_configured_kafka_client(kafka_host_name, kafka_port)  # type: NetconfKafkaClient
+
+
+class NetconfChangeListenerException(Exception):
+    pass
index 2ddebb3..0f1f46a 100644 (file)
@@ -1,6 +1,6 @@
 major=1
 minor=0
-patch=0
+patch=1
 base_version=${major}.${minor}.${patch}
 release_version=${base_version}
 snapshot_version=${base_version}-SNAPSHOT