Allow xacml-pdp to use kafka
[policy/xacml-pdp.git] / main / src / main / java / org / onap / policy / pdpx / main / startstop / XacmlPdpActivator.java
index e74ab9b..d62b6f4 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.pdpx.main.startstop;
 
-import java.util.Arrays;
+import java.util.List;
 import lombok.Getter;
 import lombok.Setter;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException;
 import org.onap.policy.common.endpoints.http.client.HttpClient;
 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
@@ -42,9 +43,9 @@ import org.onap.policy.pdpx.main.comm.XacmlPdpHearbeatPublisher;
 import org.onap.policy.pdpx.main.comm.listeners.XacmlPdpStateChangeListener;
 import org.onap.policy.pdpx.main.comm.listeners.XacmlPdpUpdateListener;
 import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterGroup;
-import org.onap.policy.pdpx.main.rest.XacmlPdpAafFilter;
 import org.onap.policy.pdpx.main.rest.XacmlPdpApplicationManager;
 import org.onap.policy.pdpx.main.rest.XacmlPdpRestController;
+import org.onap.policy.pdpx.main.rest.XacmlPdpServiceFilter;
 import org.onap.policy.pdpx.main.rest.XacmlPdpStatisticsManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,16 +60,20 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
     private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpActivator.class);
 
     private static final String[] MSG_TYPE_NAMES = {"messageName"};
-    private static final String TOPIC = "POLICY-PDP-PAP";
+    private static final String TOPIC = "policy-pdp-pap";
 
     @Getter
     @Setter
     private static XacmlPdpActivator current = null;
-    private final XacmlPdpRestServer restServer;
 
     // The parameters of this policy xacml pdp activator
     private final XacmlPdpParameterGroup xacmlPdpParameterGroup;
 
+    /**
+     * POLICY-PDP-PAP client.
+     */
+    private final BidirectionalTopicClient topicClient;
+
     /**
      * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then
      * dispatches them to appropriate listener.
@@ -91,6 +96,7 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
         final TopicSinkClient sinkClient;
         final XacmlState state;
 
+        XacmlPdpRestServer restServer;
         try {
             HttpClient apiClient = HttpClientFactoryInstance.getClientFactory().build(apiClientParams);
 
@@ -108,8 +114,11 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
             this.xacmlPdpParameterGroup = xacmlPdpParameterGroup;
             this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
 
-            sinkClient = new TopicSinkClient(TOPIC);
-            heartbeat = new XacmlPdpHearbeatPublisher(sinkClient, state);
+            topicClient = new BidirectionalTopicClient(TOPIC, TOPIC);
+            sinkClient = new TopicSinkClient(topicClient.getSink());
+
+            heartbeat = new XacmlPdpHearbeatPublisher(topicClient,
+                            xacmlPdpParameterGroup.getProbeHeartbeatTopicSec() * 1000, state);
 
             /*
              * since the dispatcher isn't registered with the topic yet, we can go ahead
@@ -120,10 +129,12 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
             msgDispatcher.register(PdpMessageType.PDP_UPDATE.name(),
                             new XacmlPdpUpdateListener(sinkClient, state, heartbeat, appmgr));
 
+            XacmlPdpServiceFilter.disableApi();
+
             restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters(),
-                    XacmlPdpAafFilter.class, XacmlPdpRestController.class);
+                                List.of(XacmlPdpServiceFilter.class), List.of(XacmlPdpRestController.class));
 
-        } catch (RuntimeException | TopicSinkClientException | HttpClientConfigException e) {
+        } catch (RuntimeException | HttpClientConfigException | BidirectionalTopicClientException e) {
             throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e);
         }
 
@@ -151,6 +162,9 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
             heartbeat::terminate);
 
         // @formatter:on
+        addAction("REST Server",
+            restServer::start,
+            restServer::stop);
     }
 
     /*
@@ -192,43 +206,31 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
      * Registers the dispatcher with the topic source(s).
      */
     private void registerMsgDispatcher() {
-        for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) {
-            source.register(msgDispatcher);
-        }
+        topicClient.getSource().register(msgDispatcher);
     }
 
     /**
      * Unregisters the dispatcher from the topic source(s).
      */
     private void unregisterMsgDispatcher() {
-        for (TopicSource source : TopicEndpointManager.getManager().getTopicSources(Arrays.asList(TOPIC))) {
-            source.unregister(msgDispatcher);
-        }
+        topicClient.getSource().unregister(msgDispatcher);
     }
 
     /**
      * Start the xacmlpdp rest controller.
      */
-    public void startXacmlRestController() {
-        if (isXacmlRestControllerAlive()) {
-            LOGGER.info("Xacml rest controller already running");
-        } else {
-            restServer.start();
-        }
+    public void enableApi() {
+        XacmlPdpServiceFilter.enableApi();
     }
 
     /**
      * Stop the xacmlpdp rest controller.
      */
-    public void stopXacmlRestController() {
-        if (isXacmlRestControllerAlive()) {
-            restServer.stop();
-        } else {
-            LOGGER.info("Xacml rest controller already stopped");
-        }
+    public void disableApi() {
+        XacmlPdpServiceFilter.disableApi();
     }
 
-    public boolean isXacmlRestControllerAlive() {
-        return restServer.isAlive();
+    public boolean isApiEnabled() {
+        return XacmlPdpServiceFilter.isApiEnabled();
     }
 }