Apply lower case to any topics to be compatible with Kafka. 40/136840/2
authoradheli.tavares <adheli.tavares@est.tech>
Thu, 14 Dec 2023 10:05:49 +0000 (10:05 +0000)
committeradheli.tavares <adheli.tavares@est.tech>
Mon, 18 Dec 2023 17:36:43 +0000 (17:36 +0000)
Issue-ID: POLICY-4402
Change-Id: Iebaec5f52a1fa0feb881ccfcb5319bc8a951b496
Signed-off-by: adheli.tavares <adheli.tavares@est.tech>
16 files changed:
integrity-audit/src/main/resources/META-INF/persistence.xml
integrity-monitor/src/main/resources/META-INF/persistence.xml
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicSink.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/BusTopicSink.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSinkFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/IndexedKafkaTopicSourceFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/KafkaTopicSourceFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineKafkaTopicSink.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedKafkaTopicSource.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/TopicBase.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClient.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientException.java

index 3a7fdd7..63fea7b 100644 (file)
@@ -37,6 +37,7 @@
 
     <persistence-unit name="integrityAuditPU" transaction-type="RESOURCE_LOCAL">
         <!-- For operational use -->
+        <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
         <class>org.onap.policy.common.ia.jpa.IntegrityAuditEntity</class>
         <shared-cache-mode>NONE</shared-cache-mode>
         <properties>
index 0adaae9..beff143 100644 (file)
@@ -25,6 +25,7 @@
     xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
     <persistence-unit name="schemaPU" transaction-type="RESOURCE_LOCAL">
     <!-- Limited use for generating the DB and schema files for imtest DB - uses hibernate -->
+        <provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
         <class>org.onap.policy.common.im.jpa.ImTestEntity</class>
         <class>org.onap.policy.common.im.jpa.StateManagementEntity</class>
         <class>org.onap.policy.common.im.jpa.ForwardProgressEntity</class>
index f36dfa3..b67756e 100644 (file)
@@ -3,6 +3,7 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -32,9 +33,9 @@ public interface TopicSink extends Topic {
      * 
      * @return true if the send operation succeeded, false otherwise
      * @throws IllegalArgumentException an invalid message has been provided
-     * @throws IllegalStateException the entity is in an state that prevents
+     * @throws IllegalStateException the entity is in a state that prevents
      *         it from sending messages, for example, locked or stopped.
      */
-    public boolean send(String message);
+    boolean send(String message);
 
 }
index e77beea..5ca8773 100644 (file)
@@ -3,6 +3,7 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017, 2019 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -32,12 +33,12 @@ public interface BusTopicSink extends ApiKeyEnabled, TopicSink {
      *
      * @param partitionKey the partition key
      */
-    public void setPartitionKey(String partitionKey);
+    void setPartitionKey(String partitionKey);
 
     /**
      * Return the partition key in used by the system to publish messages.
      *
      * @return the partition key
      */
-    public String getPartitionKey();
+    String getPartitionKey();
 }
index 23aaabd..f913926 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -42,7 +42,7 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
     /**
      * Logger.
      */
-    private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class);
+    private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class);
 
     /**
      * KAFKA Topic Name Index.
@@ -98,7 +98,7 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
         List<KafkaTopicSink> newKafkaTopicSinks = new ArrayList<>();
         synchronized (this) {
             for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
-                addTopic(newKafkaTopicSinks, topic, properties);
+                addTopic(newKafkaTopicSinks, topic.toLowerCase(), properties);
             }
             return newKafkaTopicSinks;
         }
@@ -113,7 +113,8 @@ class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
         String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic;
 
         var props = new PropertyUtils(properties, topicPrefix,
-            (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+            (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic sink {} ",
+                this, name, value, topic));
 
         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
         if (StringUtils.isBlank(servers)) {
index 1d586e4..151d8f6 100644 (file)
@@ -42,7 +42,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
     /**
      * Logger.
      */
-    private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class);
+    private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class);
 
     /**
      * KAFKA Topic Name Index.
@@ -84,7 +84,7 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
         List<KafkaTopicSource> newKafkaTopicSources = new ArrayList<>();
         synchronized (this) {
             for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
-                addTopic(newKafkaTopicSources, topic, properties);
+                addTopic(newKafkaTopicSources, topic.toLowerCase(), properties);
             }
         }
         return newKafkaTopicSources;
@@ -110,11 +110,12 @@ class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
         String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic;
 
         var props = new PropertyUtils(properties, topicPrefix,
-            (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
+            (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic source {} ",
+                this, name, value, topic));
 
         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
         if (StringUtils.isBlank(servers)) {
-            logger.error("{}: no KAFKA servers configured for sink {}", this, topic);
+            logger.error("{}: no KAFKA servers configured for source {}", this, topic);
             return;
         }
 
index 8cb51df..e5642da 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -28,11 +28,11 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 public interface KafkaTopicSourceFactory {
 
     /**
-     * Creates an Kafka Topic Source based on properties files.
+     * Creates a Kafka Topic Source based on properties files.
      *
      * @param properties Properties containing initialization values
      *
-     * @return an Kafka Topic Source
+     * @return a Kafka Topic Source
      * @throws IllegalArgumentException if invalid parameters are present
      */
     List<KafkaTopicSource> build(Properties properties);
@@ -41,7 +41,7 @@ public interface KafkaTopicSourceFactory {
      * Instantiates a new Kafka Topic Source.
      *
      * @param busTopicParams parameters object
-     * @return an Kafka Topic Source
+     * @return a Kafka Topic Source
      */
     KafkaTopicSource build(BusTopicParams busTopicParams);
 
@@ -51,13 +51,13 @@ public interface KafkaTopicSourceFactory {
      * @param servers list of servers
      * @param topic topic name
      *
-     * @return an Kafka Topic Source
+     * @return a Kafka Topic Source
      * @throws IllegalArgumentException if invalid parameters are present
      */
     KafkaTopicSource build(List<String> servers, String topic);
 
     /**
-     * Destroys an Kafka Topic Source based on a topic.
+     * Destroys a Kafka Topic Source based on a topic.
      *
      * @param topic topic name
      * @throws IllegalArgumentException if invalid parameters are present
@@ -70,10 +70,10 @@ public interface KafkaTopicSourceFactory {
     void destroy();
 
     /**
-     * Gets an Kafka Topic Source based on topic name.
+     * Gets a Kafka Topic Source based on topic name.
      *
      * @param topic the topic name
-     * @return an Kafka Topic Source with topic name
+     * @return a Kafka Topic Source with topic name
      * @throws IllegalArgumentException if an invalid topic is provided
      * @throws IllegalStateException if the Kafka Topic Source is an incorrect state
      */
index f9537f5..d6fa21b 100644 (file)
@@ -4,7 +4,7 @@
  * ================================================================================
  * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
  * Modifications Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019, 2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -168,6 +168,26 @@ public class BusTopicParams {
         return additionalProps != null;
     }
 
+    public void setEffectiveTopic(String effectiveTopic) {
+        this.effectiveTopic = topicToLowerCase(effectiveTopic);
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topicToLowerCase(topic);
+    }
+
+    public String getEffectiveTopic() {
+        return topicToLowerCase(effectiveTopic);
+    }
+
+    public String getTopic() {
+        return topicToLowerCase(topic);
+    }
+
+    private String topicToLowerCase(String topic) {
+        return (topic == null || topic.isEmpty()) ? topic : topic.toLowerCase();
+    }
+
     @NoArgsConstructor(access = AccessLevel.PRIVATE)
     public static class TopicParamsBuilder {
 
@@ -179,12 +199,12 @@ public class BusTopicParams {
         }
 
         public TopicParamsBuilder topic(String topic) {
-            this.params.topic = topic;
+            this.params.setTopic(topic);
             return this;
         }
 
         public TopicParamsBuilder effectiveTopic(String effectiveTopic) {
-            this.params.effectiveTopic = effectiveTopic;
+            this.params.setEffectiveTopic(effectiveTopic);
             return this;
         }
 
@@ -306,7 +326,6 @@ public class BusTopicParams {
             this.params.serializationProvider = serializationProvider;
             return this;
         }
-
     }
 }
 
index 27ed5e7..02626d3 100644 (file)
@@ -5,6 +5,7 @@
  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
 * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -158,28 +159,6 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
         this.stop();
     }
 
-    @Override
-    protected boolean anyNullOrEmpty(String... args) {
-        for (String arg : args) {
-            if (arg == null || arg.isEmpty()) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    @Override
-    protected boolean allNullOrEmpty(String... args) {
-        for (String arg : args) {
-            if (!(arg == null || arg.isEmpty())) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-
     @Override
     public String toString() {
         return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher
index 6574d40..f605de9 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -33,12 +33,12 @@ public class InlineKafkaTopicSink extends InlineBusTopicSink implements KafkaTop
     /**
      * Logger.
      */
-    private static Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
+    private static final Logger logger = LoggerFactory.getLogger(InlineKafkaTopicSink.class);
 
-    protected Map<String, String> additionalProps = null;
+    protected Map<String, String> additionalProps;
 
     /**
-     * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains below mentioned
+     * Argument-based KAFKA Topic Writer instantiation. BusTopicParams contains the below
      * attributes.
      *
      * <p>servers              list of KAFKA servers available for publishing
index 2a651ee..713b4fd 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- * Copyright (C) 2022 Nordix Foundation.
+ * Copyright (C) 2022-2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -24,7 +24,7 @@ import org.onap.policy.common.endpoints.event.comm.Topic;
 import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicSource;
 
 /**
- * This topic source implementation specializes in reading messages over an Kafka Bus topic source and
+ * This topic source implementation specializes in reading messages over a Kafka Bus topic source and
  * notifying its listeners.
  */
 public class SingleThreadedKafkaTopicSource extends SingleThreadedBusTopicSource implements KafkaTopicSource {
index 3372e0a..c63fbcc 100644 (file)
@@ -117,8 +117,8 @@ public abstract class TopicBase implements Topic {
         }
 
         this.servers = servers;
-        this.topic = topic;
-        this.effectiveTopic = effectiveTopicCopy;
+        this.topic = topic.toLowerCase();
+        this.effectiveTopic = effectiveTopicCopy.toLowerCase();
     }
 
     @Override
index 2a9f144..b5d5390 100644 (file)
@@ -3,6 +3,7 @@
  * ONAP
  * ================================================================================
  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.policy.common.endpoints.event.comm.client;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import lombok.Getter;
+import org.jetbrains.annotations.NotNull;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
@@ -55,9 +58,9 @@ public class BidirectionalTopicClient {
     private final CommInfrastructure sourceTopicCommInfrastructure;
 
     /**
-     * Used when checking whether or not a message sent on the sink topic can be received
+     * Used when checking whether a message sent on the sink topic can be received
      * on the source topic. When a matching message is received on the incoming topic,
-     * {@code true} is placed on the queue. If {@link #stop()} is called or the waiting
+     * {@code true} is placed on the queue. If {@link #stopWaiting()} is called or the waiting
      * thread is interrupted, then {@code false} is placed on the queue. Whenever a value
      * is pulled from the queue, it is immediately placed back on the queue.
      */
@@ -72,8 +75,8 @@ public class BidirectionalTopicClient {
      * @throws BidirectionalTopicClientException if either topic does not exist
      */
     public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
-        this.sinkTopic = sinkTopic;
-        this.sourceTopic = sourceTopic;
+        this.sinkTopic = sinkTopic.toLowerCase();
+        this.sourceTopic = sourceTopic.toLowerCase();
 
         // init sinkClient
         List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
@@ -86,7 +89,7 @@ public class BidirectionalTopicClient {
         this.sink = sinks.get(0);
 
         // init source
-        List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
+        List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Collections.singletonList(sourceTopic));
         if (sources.isEmpty()) {
             throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
         } else if (sources.size() > 1) {
@@ -116,7 +119,7 @@ public class BidirectionalTopicClient {
     }
 
     /**
-     * Determines whether or not the topic is ready (i.e., {@link #awaitReady(Object)} has
+     * Determines whether the topic is ready (i.e., {@link #awaitReady(Object, long)} has
      * previously returned {@code true}).
      *
      * @return {@code true}, if the topic is ready to send and receive
@@ -129,7 +132,7 @@ public class BidirectionalTopicClient {
      * Waits for the bidirectional topic to become "ready" by publishing a message on the
      * sink topic and awaiting receipt of the message on the source topic. If the message
      * is not received within a few seconds, then it tries again. This process is
-     * continued until the message is received, {@link #stop()} is called, or this thread
+     * continued until the message is received, {@link #stopWaiting()} is called, or this thread
      * is interrupted. Once this returns, subsequent calls will return immediately, always
      * with the same value.
      *
@@ -150,24 +153,7 @@ public class BidirectionalTopicClient {
         final String messageText = coder.encode(message);
 
         // class of message to be decoded
-        @SuppressWarnings("unchecked")
-        final Class<? extends T> clazz = (Class<? extends T>) message.getClass();
-
-        // create a listener to detect when a matching message is received
-        final TopicListener listener = (infra, topic, msg) -> {
-            try {
-                T incoming = decode(msg, clazz);
-
-                if (message.equals(incoming)) {
-                    logger.info("topic {} is ready; found matching message {}", topic, incoming);
-                    checkerQueue.add(Boolean.TRUE);
-                }
-
-            } catch (CoderException e) {
-                logger.warn("cannot decode message from topic {}", topic, e);
-                decodeFailed();
-            }
-        };
+        final TopicListener listener = getTopicListener(message);
 
         source.register(listener);
 
@@ -193,6 +179,28 @@ public class BidirectionalTopicClient {
         return checkerQueue.peek();
     }
 
+    @NotNull
+    private <T> TopicListener getTopicListener(T message) {
+        @SuppressWarnings("unchecked")
+        final Class<? extends T> clazz = (Class<? extends T>) message.getClass();
+
+        // create a listener to detect when a matching message is received
+        return (infra, topic, msg) -> {
+            try {
+                T incoming = decode(msg, clazz);
+
+                if (message.equals(incoming)) {
+                    logger.info("topic {} is ready; found matching message {}", topic, incoming);
+                    checkerQueue.add(Boolean.TRUE);
+                }
+
+            } catch (CoderException e) {
+                logger.warn("cannot decode message from topic {}", topic, e);
+                decodeFailed();
+            }
+        };
+    }
+
     /**
      * Stops any listeners that are currently stuck in {@link #awaitReady(Object)} by
      * adding {@code false} to the queue.
index 3a5d727..1037d3a 100644 (file)
@@ -3,6 +3,7 @@
  * ONAP
  * ================================================================================
  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.common.endpoints.event.comm.client;
 
+import java.io.Serial;
+
 /**
  * Exception thrown by BidirectionalTopicClient class.
  */
 public class BidirectionalTopicClientException extends Exception {
+    @Serial
     private static final long serialVersionUID = 1L;
 
     public BidirectionalTopicClientException() {
index 9f8b3c0..5f49ea3 100644 (file)
@@ -56,9 +56,9 @@ public class TopicSinkClient {
      * @throws TopicSinkClientException if the topic does not exist
      */
     public TopicSinkClient(final String topic) throws TopicSinkClientException {
-        final List<TopicSink> lst = getTopicSinks(topic);
+        final List<TopicSink> lst = getTopicSinks(topic.toLowerCase());
         if (lst.isEmpty()) {
-            throw new TopicSinkClientException("no sinks for topic: " + topic);
+            throw new TopicSinkClientException("no sinks for topic: " + topic.toLowerCase());
         }
 
         this.sink = lst.get(0);
index 608393b..431d4f3 100644 (file)
@@ -3,7 +3,7 @@
  * ONAP PAP
  * ================================================================================
  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019 Nordix Foundation.
+ * Modifications Copyright (C) 2019, 2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.common.endpoints.event.comm.client;
 
+import java.io.Serial;
+
 /**
  * Exception thrown by TopicSink client classes.
  */
 public class TopicSinkClientException extends Exception {
+    @Serial
     private static final long serialVersionUID = 1L;
 
     public TopicSinkClientException() {