package org.onap.policy.pdpx.main.startstop;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClientException;
+import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
import org.onap.policy.common.parameters.ParameterService;
-import org.onap.policy.pdpx.main.PolicyXacmlPdpException;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.enums.PdpMessageType;
+import org.onap.policy.pdpx.main.PolicyXacmlPdpRuntimeException;
+import org.onap.policy.pdpx.main.XacmlState;
+import org.onap.policy.pdpx.main.comm.XacmlPdpHearbeatPublisher;
+import org.onap.policy.pdpx.main.comm.listeners.XacmlPdpStateChangeListener;
+import org.onap.policy.pdpx.main.comm.listeners.XacmlPdpUpdateListener;
import org.onap.policy.pdpx.main.parameters.XacmlPdpParameterGroup;
+import org.onap.policy.pdpx.main.rest.XacmlPdpApplicationManager;
import org.onap.policy.pdpx.main.rest.XacmlPdpRestServer;
+import org.onap.policy.pdpx.main.rest.XacmlPdpStatisticsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* This class wraps a distributor so that it can be activated as a complete service together with
* all its xacml pdp and forwarding handlers.
*/
-public class XacmlPdpActivator {
+public class XacmlPdpActivator extends ServiceManagerContainer {
+
// The logger for this class
private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpActivator.class);
- // The parameters of this policy xacml pdp activator
- private final XacmlPdpParameterGroup xacmlPdpParameterGroup;
+ private static final String[] MSG_TYPE_NAMES = {"messageName"};
+ private static final String TOPIC = "POLICY-PDP-PAP";
- private static boolean alive = false;
+ @Getter
+ @Setter
+ private static XacmlPdpActivator current = null;
- private XacmlPdpRestServer restServer;
+ // The parameters of this policy xacml pdp activator
+ private final XacmlPdpParameterGroup xacmlPdpParameterGroup;
/**
- * Instantiate the activator for policy xacml pdp as a complete service.
- *
- * @param xacmlPdpParameterGroup the parameters for the xacml pdp service
+ * The XACML PDP REST API server.
*/
- public XacmlPdpActivator(final XacmlPdpParameterGroup xacmlPdpParameterGroup) {
- this.xacmlPdpParameterGroup = xacmlPdpParameterGroup;
- }
+ private XacmlPdpRestServer restServer;
/**
- * Initialize xacml pdp as a complete service.
- *
- * @throws PolicyXacmlPdpException on errors in initializing the service
+ * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then
+ * dispatches them to appropriate listener.
*/
- public void initialize() throws PolicyXacmlPdpException {
- LOGGER.debug("Policy xacml pdp starting as a service . . .");
- startXacmlPdpRestServer();
- registerToParameterService(xacmlPdpParameterGroup);
- XacmlPdpActivator.setAlive(true);
- LOGGER.debug("Policy xacml pdp started as a service");
- }
+ private final MessageTypeDispatcher msgDispatcher;
/**
- * Starts the xacml pdp rest server using configuration parameters.
+ * Instantiate the activator for policy xacml pdp as a complete service.
*
- * @throws PolicyXacmlPdpException if server start fails
+ * @param xacmlPdpParameterGroup the parameters for the xacml pdp service
+ * @param topicProperties properties used to configure the topics
*/
- private void startXacmlPdpRestServer() throws PolicyXacmlPdpException {
- xacmlPdpParameterGroup.getRestServerParameters().setName(xacmlPdpParameterGroup.getName());
- restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters());
- if (!restServer.start()) {
- throw new PolicyXacmlPdpException("Failed to start xacml pdp rest server. Check log for more details...");
+ public XacmlPdpActivator(final XacmlPdpParameterGroup xacmlPdpParameterGroup, Properties topicProperties) {
+ LOGGER.info("Activator initializing using {} and {}", xacmlPdpParameterGroup, topicProperties);
+
+ TopicEndpoint.manager.addTopicSinks(topicProperties);
+ TopicEndpoint.manager.addTopicSources(topicProperties);
+
+ final XacmlPdpHearbeatPublisher heartbeat;
+ final TopicSinkClient sinkClient;
+ final XacmlState state;
+
+ try {
+ XacmlPdpApplicationManager appmgr =
+ new XacmlPdpApplicationManager(Paths.get(xacmlPdpParameterGroup.getApplicationPath()));
+ XacmlPdpApplicationManager.setCurrent(appmgr);
+
+ XacmlPdpStatisticsManager stats = new XacmlPdpStatisticsManager();
+ XacmlPdpStatisticsManager.setCurrent(stats);
+ stats.setTotalPolicyTypesCount(appmgr.getPolicyTypeCount());
+
+ state = new XacmlState(appmgr);
+
+ this.xacmlPdpParameterGroup = xacmlPdpParameterGroup;
+ this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+
+ sinkClient = new TopicSinkClient(TOPIC);
+ heartbeat = new XacmlPdpHearbeatPublisher(sinkClient, state);
+
+ /*
+ * since the dispatcher isn't registered with the topic yet, we can go ahead
+ * and register the listeners with it.
+ */
+ msgDispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(),
+ new XacmlPdpStateChangeListener(sinkClient, state));
+ msgDispatcher.register(PdpMessageType.PDP_UPDATE.name(),
+ new XacmlPdpUpdateListener(sinkClient, state, heartbeat, appmgr));
+
+ } catch (RuntimeException | TopicSinkClientException e) {
+ throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e);
}
+
+ xacmlPdpParameterGroup.getRestServerParameters().setName(xacmlPdpParameterGroup.getName());
+
+ // @formatter:off
+ addAction("XACML PDP parameters",
+ () -> ParameterService.register(xacmlPdpParameterGroup),
+ () -> ParameterService.deregister(xacmlPdpParameterGroup.getName()));
+
+ addAction("Message Dispatcher",
+ this::registerMsgDispatcher,
+ this::unregisterMsgDispatcher);
+
+ addAction("topics",
+ TopicEndpoint.manager::start,
+ TopicEndpoint.manager::shutdown);
+
+ addAction("Terminate PDP",
+ () -> { },
+ () -> sendTerminateMessage(sinkClient, state));
+ // initial heart beats act as registration messages
+ addAction("Heartbeat Publisher",
+ heartbeat::start,
+ heartbeat::terminate);
+
+ addAction("Create REST server",
+ () -> restServer = new XacmlPdpRestServer(xacmlPdpParameterGroup.getRestServerParameters()),
+ () -> restServer = null);
+
+ addAction("REST server",
+ () -> restServer.start(),
+ () -> restServer.stop());
+
+ // @formatter:on
}
- /**
- * Terminate policy xacml pdp.
- *
- * @throws PolicyXacmlPdpException on termination errors
+ /*
+ * Method used to send a terminate message to the PAP.
*/
- public void terminate() throws PolicyXacmlPdpException {
- try {
- deregisterToParameterService(xacmlPdpParameterGroup);
- XacmlPdpActivator.setAlive(false);
-
- // Stop the xacml pdp rest server
- restServer.stop();
- } catch (final Exception exp) {
- LOGGER.error("Policy xacml pdp service termination failed", exp);
- throw new PolicyXacmlPdpException(exp.getMessage(), exp);
- }
+ private void sendTerminateMessage(TopicSinkClient sinkClient, XacmlState state) {
+ PdpStatus terminateStatus = state.terminatePdpMessage();
+ sinkClient.send(terminateStatus);
}
/**
}
/**
- * Returns the alive status of xacml pdp service.
- *
- * @return the alive
+ * Registers the dispatcher with the topic source(s).
*/
- public static boolean isAlive() {
- return alive;
+ private void registerMsgDispatcher() {
+ for (TopicSource source : TopicEndpoint.manager.getTopicSources(Arrays.asList(TOPIC))) {
+ source.register(msgDispatcher);
+ }
}
/**
- * Change the alive status of xacml pdp service.
- *
- * @param status the status
+ * Unregisters the dispatcher from the topic source(s).
*/
- private static void setAlive(final boolean status) {
- alive = status;
+ private void unregisterMsgDispatcher() {
+ for (TopicSource source : TopicEndpoint.manager.getTopicSources(Arrays.asList(TOPIC))) {
+ source.unregister(msgDispatcher);
+ }
}
}