Allow xacml-pdp to use kafka
[policy/xacml-pdp.git] / main / src / main / java / org / onap / policy / pdpx / main / startstop / XacmlPdpActivator.java
index 4db11d0..d62b6f4 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.policy.pdpx.main.startstop;
 
-import java.util.Arrays;
-import java.util.Properties;
-
-import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import java.util.List;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClient;
+import org.onap.policy.common.endpoints.event.comm.client.BidirectionalTopicClientException;
 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException;
+import org.onap.policy.common.endpoints.http.client.HttpClient;
+import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
+import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
+import org.onap.policy.common.endpoints.parameters.RestClientParameters;
 import org.onap.policy.common.parameters.ParameterService;
 import org.onap.policy.common.utils.services.ServiceManagerContainer;
 import org.onap.policy.models.pdp.concepts.PdpStatus;
-import org.onap.policy.models.pdp.concepts.PdpUpdate;
 import org.onap.policy.models.pdp.enums.PdpMessageType;
-import org.onap.policy.models.pdp.enums.PdpState;
 import org.onap.policy.pdpx.main.PolicyXacmlPdpRuntimeException;
-import org.onap.policy.pdpx.main.comm.XacmlPdpMessage;
-import org.onap.policy.pdpx.main.comm.XacmlPdpPapRegistration;
+import org.onap.policy.pdpx.main.XacmlState;
+import org.onap.policy.pdpx.main.comm.XacmlPdpHearbeatPublisher;
 import org.onap.policy.pdpx.main.comm.listeners.XacmlPdpStateChangeListener;
 import org.onap.policy.pdpx.main.comm.listeners.XacmlPdpUpdateListener;
 import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterGroup;
-import org.onap.policy.pdpx.main.rest.XacmlPdpRestServer;
+import org.onap.policy.pdpx.main.rest.XacmlPdpApplicationManager;
+import org.onap.policy.pdpx.main.rest.XacmlPdpRestController;
+import org.onap.policy.pdpx.main.rest.XacmlPdpServiceFilter;
+import org.onap.policy.pdpx.main.rest.XacmlPdpStatisticsManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,15 +60,19 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
     private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpActivator.class);
 
     private static final String[] MSG_TYPE_NAMES = {"messageName"};
-    private static final String TOPIC = "POLICY-PDP-PAP";
+    private static final String TOPIC = "policy-pdp-pap";
+
+    @Getter
+    @Setter
+    private static XacmlPdpActivator current = null;
 
     // The parameters of this policy xacml pdp activator
     private final XacmlPdpParameterGroup xacmlPdpParameterGroup;
 
     /**
-     * The XACML PDP REST API server.
+     * POLICY-PDP-PAP client.
      */
-    private XacmlPdpRestServer restServer;
+    private final BidirectionalTopicClient topicClient;
 
     /**
      * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then
@@ -71,97 +81,98 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
     private final MessageTypeDispatcher msgDispatcher;
 
     /**
-     * Listens for {@link PdpStateChange} messages from the PAP.
+     * Instantiate the activator for policy xacml pdp as a complete service.
+     *
+     * @param xacmlPdpParameterGroup the parameters for the xacml pdp service
      */
-    private final XacmlPdpStateChangeListener pdpStateChangeListener;
+    public XacmlPdpActivator(final XacmlPdpParameterGroup xacmlPdpParameterGroup) {
+        LOGGER.info("Activator initializing using {}", xacmlPdpParameterGroup);
 
-    /**
-     * Listens for {@link PdpUpdate} messages from the PAP.
-     */
-    private final XacmlPdpUpdateListener pdpUpdateListener;
+        RestClientParameters apiClientParams = xacmlPdpParameterGroup.getPolicyApiParameters();
 
-    /**
-     * The current activator.
-     */
-    private static XacmlPdpActivator current = null;
+        TopicEndpointManager.getManager().addTopics(xacmlPdpParameterGroup.getTopicParameterGroup());
 
-    private volatile boolean alive = false;
+        final XacmlPdpHearbeatPublisher heartbeat;
+        final TopicSinkClient sinkClient;
+        final XacmlState state;
 
-    private XacmlPdpPapRegistration register;
+        XacmlPdpRestServer restServer;
+        try {
+            HttpClient apiClient = HttpClientFactoryInstance.getClientFactory().build(apiClientParams);
 
-    private XacmlPdpMessage message;
+            var appmgr = new XacmlPdpApplicationManager(xacmlPdpParameterGroup.getApplicationParameters(),
+                                            apiClient);
+            XacmlPdpApplicationManager.setCurrent(appmgr);
 
-    /**
-     * Instantiate the activator for policy xacml pdp as a complete service.
-     *
-     * @param xacmlPdpParameterGroup the parameters for the xacml pdp service
-     * @param topicProperties properties used to configure the topics
-     */
-    public XacmlPdpActivator(final XacmlPdpParameterGroup xacmlPdpParameterGroup, Properties topicProperties) {
-        LOGGER.info("Activator initializing using {} and {}", xacmlPdpParameterGroup, topicProperties);
+            var stats = new XacmlPdpStatisticsManager();
+            XacmlPdpStatisticsManager.setCurrent(stats);
+            stats.setTotalPolicyTypesCount(appmgr.getPolicyTypeCount());
+            stats.setTotalPolicyCount(appmgr.getPolicyCount());
 
-        TopicEndpoint.manager.addTopicSinks(topicProperties);
-        TopicEndpoint.manager.addTopicSources(topicProperties);
+            state = new XacmlState(appmgr, xacmlPdpParameterGroup.getPdpGroup(), xacmlPdpParameterGroup.getPdpType());
 
-        try {
-            final TopicSinkClient sinkClient = new TopicSinkClient(TOPIC);
-            this.message = new XacmlPdpMessage();
             this.xacmlPdpParameterGroup = xacmlPdpParameterGroup;
             this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
-            this.pdpStateChangeListener = new XacmlPdpStateChangeListener(sinkClient, message);
-            this.pdpUpdateListener = new XacmlPdpUpdateListener(sinkClient, message);
-            this.register = new XacmlPdpPapRegistration(sinkClient);
-        } catch (RuntimeException | TopicSinkClientException e) {
+
+            topicClient = new BidirectionalTopicClient(TOPIC, TOPIC);
+            sinkClient = new TopicSinkClient(topicClient.getSink());
+
+            heartbeat = new XacmlPdpHearbeatPublisher(topicClient,
+                            xacmlPdpParameterGroup.getProbeHeartbeatTopicSec() * 1000, state);
+
+            /*
+             * since the dispatcher isn't registered with the topic yet, we can go ahead
+             * and register the listeners with it.
+             */
+            msgDispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(),
+                            new XacmlPdpStateChangeListener(sinkClient, state));
+            msgDispatcher.register(PdpMessageType.PDP_UPDATE.name(),
+                            new XacmlPdpUpdateListener(sinkClient, state, heartbeat, appmgr));
+
+            XacmlPdpServiceFilter.disableApi();
+
+            restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters(),
+                                List.of(XacmlPdpServiceFilter.class), List.of(XacmlPdpRestController.class));
+
+        } catch (RuntimeException | HttpClientConfigException | BidirectionalTopicClientException e) {
             throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e);
         }
 
         xacmlPdpParameterGroup.getRestServerParameters().setName(xacmlPdpParameterGroup.getName());
 
         // @formatter:off
-        addAction("XACML PDP parameters", () -> ParameterService.register(xacmlPdpParameterGroup),
+        addAction("XACML PDP parameters",
+            () -> ParameterService.register(xacmlPdpParameterGroup),
             () -> ParameterService.deregister(xacmlPdpParameterGroup.getName()));
 
-        addAction("PdpStateChange Dispatcher",
-            () -> msgDispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(), this.pdpStateChangeListener),
-            () -> msgDispatcher.unregister(PdpMessageType.PDP_STATE_CHANGE.name()));
-
-        addAction("PdpUpdate Dispatcher",
-            () -> msgDispatcher.register(PdpMessageType.PDP_UPDATE.name(), this.pdpUpdateListener),
-            () -> msgDispatcher.unregister(PdpMessageType.PDP_UPDATE.name()));
-
         addAction("Message Dispatcher",
-            () -> registerMsgDispatcher(),
-            () -> unregisterMsgDispatcher());
+            this::registerMsgDispatcher,
+            this::unregisterMsgDispatcher);
 
         addAction("topics",
-            () -> TopicEndpoint.manager.start(),
-            () -> TopicEndpoint.manager.shutdown());
-
-        addAction("Create REST server",
-            () -> {
-                restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters(),
-                        xacmlPdpParameterGroup.getApplicationPath());
-            },
-            () -> {
-                restServer = null;
-            });
-
-        addAction("REST server",
-            () -> restServer.start(),
-            () -> restServer.stop());
-
-        addAction("set alive", () -> setAlive(true), () -> setAlive(false));
-
-        addAction("Initial Registration with PAP",
-            () -> {
-                register.pdpRegistration(message.formatInitialStatusMessage(PdpState.PASSIVE));
-            },
-            () -> {
-                register.pdpRegistration(message.formatInitialStatusMessage(PdpState.TERMINATED));
-            });
+            TopicEndpointManager.getManager()::start,
+            TopicEndpointManager.getManager()::shutdown);
+
+        addAction("Terminate PDP",
+            () -> { },
+            () -> sendTerminateMessage(sinkClient, state));
+        // initial heart beats act as registration messages
+        addAction("Heartbeat Publisher",
+            heartbeat::start,
+            heartbeat::terminate);
+
         // @formatter:on
+        addAction("REST Server",
+            restServer::start,
+            restServer::stop);
+    }
 
-        current = this;
+    /*
+     * Method used to send a terminate message to the PAP.
+     */
+    private void sendTerminateMessage(TopicSinkClient sinkClient, XacmlState state) {
+        PdpStatus terminateStatus = state.terminatePdpMessage();
+        sinkClient.send(terminateStatus);
     }
 
     /**
@@ -195,44 +206,31 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
      * Registers the dispatcher with the topic source(s).
      */
     private void registerMsgDispatcher() {
-        for (TopicSource source : TopicEndpoint.manager.getTopicSources(Arrays.asList(TOPIC))) {
-            source.register(msgDispatcher);
-        }
+        topicClient.getSource().register(msgDispatcher);
     }
 
     /**
      * Unregisters the dispatcher from the topic source(s).
      */
     private void unregisterMsgDispatcher() {
-        for (TopicSource source : TopicEndpoint.manager.getTopicSources(Arrays.asList(TOPIC))) {
-            source.unregister(msgDispatcher);
-        }
+        topicClient.getSource().unregister(msgDispatcher);
     }
 
     /**
-     * Returns the alive status of xacml pdp service.
-     *
-     * @return the alive
+     * Start the xacmlpdp rest controller.
      */
-    @Override
-    public boolean isAlive() {
-        return alive;
+    public void enableApi() {
+        XacmlPdpServiceFilter.enableApi();
     }
 
     /**
-     * Change the alive status of xacml pdp service.
-     *
-     * @param status the status
+     * Stop the xacmlpdp rest controller.
      */
-    private void setAlive(final boolean status) {
-        alive = status;
-    }
-
-    public static XacmlPdpActivator getCurrent() {
-        return current;
+    public void disableApi() {
+        XacmlPdpServiceFilter.disableApi();
     }
 
-    public static void setCurrent(XacmlPdpActivator current) {
-        XacmlPdpActivator.current = current;
+    public boolean isApiEnabled() {
+        return XacmlPdpServiceFilter.isApiEnabled();
     }
 }