Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-pooling-messages / src / main / java / org / onap / policy / drools / pooling / PoolingManagerImpl.java
@@ -3,6 +3,7 @@
  * ONAP
  * ================================================================================
  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -98,7 +99,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
     /**
      * Manager for the internal DMaaP topic.
      */
-    private final DmaapManager dmaapMgr;
+    private final TopicMessageManager topicMessageManager;
 
     /**
      * Lock used while updating {@link #current}. In general, public methods must use
@@ -128,7 +129,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
     private ScheduledThreadPoolExecutor scheduler = null;
 
     /**
-     * Constructs the manager, initializing all of the data structures.
+     * Constructs the manager, initializing all the data structures.
      *
      * @param host name/uuid of this host
      * @param controller controller with which this is associated
@@ -146,7 +147,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
         try {
             this.serializer = new Serializer();
             this.topic = props.getPoolingTopic();
-            this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
+            this.topicMessageManager = makeTopicMessagesManager(props.getPoolingTopic());
             this.current = new IdleState(this);
 
             logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
@@ -179,7 +180,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
     public void beforeStart() {
         synchronized (curLocker) {
             if (scheduler == null) {
-                dmaapMgr.startPublisher();
+                topicMessageManager.startPublisher();
 
                 logger.debug("make scheduler thread for topic {}", getTopic());
                 scheduler = makeScheduler();
@@ -204,7 +205,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
     public void afterStart() {
         synchronized (curLocker) {
             if (current instanceof IdleState) {
-                dmaapMgr.startConsumer(this);
+                topicMessageManager.startConsumer(this);
                 changeState(new StartState(this));
             }
         }
@@ -223,7 +224,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
 
             if (!(current instanceof IdleState)) {
                 changeState(new IdleState(this));
-                dmaapMgr.stopConsumer(this);
+                topicMessageManager.stopConsumer(this);
                 publishAdmin(new Offline(getHost()));
             }
 
@@ -246,7 +247,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
              * stop the publisher, but allow time for any Offline message to be
              * transmitted
              */
-            dmaapMgr.stopPublisher(properties.getOfflinePubWaitMs());
+            topicMessageManager.stopPublisher(properties.getOfflinePubWaitMs());
         }
     }
 
@@ -326,7 +327,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
             msg.checkValidity();
 
             String txt = serializer.encodeMsg(msg);
-            dmaapMgr.publish(txt);
+            topicMessageManager.publish(txt);
 
         } catch (JsonParseException e) {
             logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
@@ -363,7 +364,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
      * need to be forwarded, thus in that case, they are decoded and forwarded.
      *
      * <p>On the other hand, if the controller is not locked, then we just return immediately
-     * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
+     * and let {@link #beforeInsert(String, Object)  beforeInsert()} handle
      * it instead, as it already has the decoded message.
      *
      * @param topic2 topic
@@ -600,11 +601,11 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
      * Creates a DMaaP manager.
      *
      * @param topic name of the internal DMaaP topic
-     * @return a new DMaaP manager
+     * @return a new topic messages manager
      * @throws PoolingFeatureException if an error occurs
      */
-    protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
-        return new DmaapManager(topic);
+    protected TopicMessageManager makeTopicMessagesManager(String topic) throws PoolingFeatureException {
+        return new TopicMessageManager(topic);
     }
 
     /**
@@ -636,7 +637,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
      * @param event event text to be decoded
      * @return the decoded event
      * @throws IllegalArgumentException illegal argument
-     * @throw UnsupportedOperationException unsupported operation
+     * @throws UnsupportedOperationException unsupported operation
      * @throws IllegalStateException illegal state
      */
     protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {