Wait for pdp-pap topic in xacml-pdp
[policy/xacml-pdp.git] / main / src / main / java / org / onap / policy / pdpx / main / startstop / XacmlPdpActivator.java
index 4dd8a9b..050d8b2 100644 (file)
 
 package org.onap.policy.pdpx.main.startstop;
 
-import java.util.Arrays;
 import lombok.Getter;
 import lombok.Setter;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException;
 import org.onap.policy.common.endpoints.http.client.HttpClient;
 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
@@ -69,6 +68,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
     // The parameters of this policy xacml pdp activator
     private final XacmlPdpParameterGroup xacmlPdpParameterGroup;
 
+    /**
+     * POLICY-PDP-PAP client.
+     */
+    private BidirectionalTopicClient topicClient;
+
     /**
      * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then
      * dispatches them to appropriate listener.
@@ -108,8 +112,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
             this.xacmlPdpParameterGroup = xacmlPdpParameterGroup;
             this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
 
-            sinkClient = new TopicSinkClient(TOPIC);
-            heartbeat = new XacmlPdpHearbeatPublisher(sinkClient, state);
+            topicClient = new BidirectionalTopicClient(TOPIC, TOPIC);
+            sinkClient = new TopicSinkClient(topicClient.getSink());
+
+            heartbeat = new XacmlPdpHearbeatPublisher(topicClient,
+                            xacmlPdpParameterGroup.getProbeHeartbeatTopicSec() * 1000, state);
 
             /*
              * since the dispatcher isn't registered with the topic yet, we can go ahead
@@ -123,7 +130,7 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
             restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters(),
                     XacmlPdpAafFilter.class, XacmlPdpRestController.class);
 
-        } catch (RuntimeException | TopicSinkClientException | HttpClientConfigException e) {
+        } catch (RuntimeException | HttpClientConfigException | BidirectionalTopicClientException e) {
             throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e);
         }
 
@@ -197,18 +204,14 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
      * Registers the dispatcher with the topic source(s).
      */
     private void registerMsgDispatcher() {
-        for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) {
-            source.register(msgDispatcher);
-        }
+        topicClient.getSource().register(msgDispatcher);
     }
 
     /**
      * Unregisters the dispatcher from the topic source(s).
      */
     private void unregisterMsgDispatcher() {
-        for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) {
-            source.unregister(msgDispatcher);
-        }
+        topicClient.getSource().unregister(msgDispatcher);
     }
 
     /**