Fix xacml pdp registration issue 83/85983/5
authorMichael Mokry <michael.mokry@att.com>
Mon, 22 Apr 2019 16:36:20 +0000 (11:36 -0500)
committerMichael Mokry <michael.mokry@att.com>
Thu, 25 Apr 2019 12:43:34 +0000 (07:43 -0500)
1. Added appliesTo() check to state change and PDP update listeners.
2. Made changes to record the extracted pdpUpdate values.
3. Removed pdpState instance variable from heartbeat publisher
4. Split XacmlPdpMessage.updateInternalStatus() into two methods to handle
PdpStateChange and PdpUpdate messages separately
5. Cleaned up unused imports

Change-Id: I76c68d925e46d475c086bd5b86851ea44d821b28
Issue-ID: POLICY-1673
Signed-off-by: Michael Mokry <michael.mokry@att.com>
main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpHearbeatPublisher.java
main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpMessage.java
main/src/main/java/org/onap/policy/pdpx/main/comm/XacmlPdpUpdatePublisher.java
main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpStateChangeListener.java
main/src/main/java/org/onap/policy/pdpx/main/comm/listeners/XacmlPdpUpdateListener.java
main/src/main/java/org/onap/policy/pdpx/main/startstop/XacmlPdpActivator.java

index 0dc8bf5..8ffccbe 100644 (file)
@@ -23,8 +23,6 @@ package org.onap.policy.pdpx.main.comm;
 import java.util.Timer;
 import java.util.TimerTask;
 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.onap.policy.models.pdp.concepts.PdpStateChange;
-import org.onap.policy.models.pdp.enums.PdpState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,11 +32,8 @@ public class XacmlPdpHearbeatPublisher extends TimerTask {
 
     private Timer timer;
     private XacmlPdpMessage heartbeatMessage;
-    private Object message;
     private static TopicSinkClient topicSinkClient;
     private static volatile  boolean alive = false;
-    public static PdpState pdpState;
-
 
     /**
      * Constructor for instantiating XacmlPdpPublisher.
@@ -46,11 +41,9 @@ public class XacmlPdpHearbeatPublisher extends TimerTask {
      * @param message of the PDP
      * @param topicSinkClient used to send heartbeat message
      */
-    public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, PdpStateChange message) {
-        this.message = message;
-        this.pdpState = message.getState();
+    public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, XacmlPdpMessage message ) {
         this.topicSinkClient = topicSinkClient;
-        this.heartbeatMessage = new XacmlPdpMessage();
+        this.heartbeatMessage = message;
         timer = new Timer(false);
         timer.scheduleAtFixedRate(this, 0, 60000); // time interval temp hard coded now but will be parameterized
         setAlive(true);
@@ -58,7 +51,7 @@ public class XacmlPdpHearbeatPublisher extends TimerTask {
 
     @Override
     public void run() {
-        topicSinkClient.send(heartbeatMessage.formatHeartbeatMessage((PdpStateChange) message));
+        topicSinkClient.send(heartbeatMessage.formatPdpStatusMessage());
         LOGGER.info("Sending Xacml PDP heartbeat to the PAP");
     }
 
@@ -71,11 +64,6 @@ public class XacmlPdpHearbeatPublisher extends TimerTask {
         setAlive(false);
     }
 
-    public void updateInternalState(PdpState state) {
-        ((PdpStateChange) this.message).setState(state);
-        this.pdpState = state;
-    }
-
     public static boolean isAlive() {
         return alive;
     }
index 1253ff2..802d735 100644 (file)
@@ -22,6 +22,7 @@
 
 package org.onap.policy.pdpx.main.comm;
 
+import lombok.Getter;
 import org.onap.policy.common.utils.network.NetworkUtil;
 import org.onap.policy.models.pdp.concepts.PdpStateChange;
 import org.onap.policy.models.pdp.concepts.PdpStatus;
@@ -33,20 +34,25 @@ import org.onap.policy.pdpx.main.startstop.XacmlPdpActivator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Getter
 public class XacmlPdpMessage {
 
     // The logger for this class
     private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpMessage.class);
+    private String pdpGroup;
+    private String pdpSubGroup;
+    private PdpState pdpState;
+    private String pdpName = NetworkUtil.getHostname();
 
     /**
-     * Method used to format the status message.
+     * Method used to format the initial registration status message.
      *
      * @param state of the PDP
      * @return status message of the PDP
      */
-    public PdpStatus formatStatusMessage(PdpState state) {
+    public PdpStatus formatInitialStatusMessage(PdpState state) {
         PdpStatus status = new PdpStatus();
-        status.setName(NetworkUtil.getHostname());
+        status.setName(pdpName);
 
         if (XacmlPdpActivator.getCurrent().isAlive()) {
             status.setHealthy(PdpHealthStatus.HEALTHY);
@@ -65,14 +71,13 @@ public class XacmlPdpMessage {
     }
 
     /**
-     * Method used to format the heartbeat status message.
+     * Method used to format the PdpStatus message for heartbeat and PDP Updates.
      *
-     * @param message PdpStateChange message received from the PAP
      * @return status message of the PDP
      */
-    public PdpStatus formatHeartbeatMessage(PdpStateChange message) {
+    public PdpStatus formatPdpStatusMessage() {
         PdpStatus status = new PdpStatus();
-        status.setName(NetworkUtil.getHostname());
+        status.setName(pdpName);
 
         if (XacmlPdpActivator.getCurrent().isAlive()) {
             status.setHealthy(PdpHealthStatus.HEALTHY);
@@ -81,37 +86,29 @@ public class XacmlPdpMessage {
         }
 
         status.setPdpType("xacml");
-        status.setState(message.getState());
-        status.setPdpGroup(message.getPdpGroup());
-        status.setPdpSubgroup(message.getPdpSubgroup());
+        status.setState(pdpState);
+        status.setPdpGroup(pdpGroup);
+        status.setPdpSubgroup(pdpSubGroup);
         status.setSupportedPolicyTypes(XacmlPdpApplicationManager.getToscaPolicyTypeIdents());
+        status.setPolicies(XacmlPdpApplicationManager.getToscaPolicyIdentifiers());
 
         return status;
     }
 
     /**
-     * Method used to format the PdpUpdate message.
-     *
-     * @param message PdpUpdate message that was received from the PAP
-     * @return status message of the PDP
+     * Method used to update PDP status attributes from PdpStateChange.
      */
-    public PdpStatus formatPdpUpdateMessage(PdpUpdate message, PdpState state) {
-        PdpStatus status = new PdpStatus();
-        status.setName(NetworkUtil.getHostname());
-
-        if (XacmlPdpActivator.getCurrent().isAlive()) {
-            status.setHealthy(PdpHealthStatus.HEALTHY);
-        } else {
-            status.setHealthy(PdpHealthStatus.NOT_HEALTHY);
-        }
-
-        status.setPdpType("xacml");
-        status.setState(state);
-        status.setPdpGroup(message.getPdpGroup());
-        status.setPdpSubgroup(message.getPdpSubgroup());
-        status.setSupportedPolicyTypes(XacmlPdpApplicationManager.getToscaPolicyTypeIdents());
-        status.setPolicies(XacmlPdpApplicationManager.getToscaPolicyIdentifiers());
+    public void updateInternalStatus(PdpStateChange message) {
+        pdpGroup = message.getPdpGroup();
+        pdpSubGroup = message.getPdpSubgroup();
+        pdpState = message.getState();
+    }
 
-        return status;
+    /**
+     * Method used to update PDP status attributes from PdpUpdate.
+     */
+    public void updateInternalStatus(PdpUpdate message) {
+        pdpGroup = message.getPdpGroup();
+        pdpSubGroup = message.getPdpSubgroup();
     }
 }
index 4c9d0c2..54d9cf6 100644 (file)
@@ -44,7 +44,8 @@ public class XacmlPdpUpdatePublisher {
      * @param message Incoming message
      * @param client TopicSinkClient
      */
-    public static void handlePdpUpdate(PdpUpdate message, TopicSinkClient client) {
+    public static void handlePdpUpdate(PdpUpdate message, TopicSinkClient client,
+            XacmlPdpMessage updatePdpMessage) {
 
         if (!message.getPolicies().isEmpty() || message.getPolicies() != null) {
 
@@ -70,8 +71,8 @@ public class XacmlPdpUpdatePublisher {
             }
         }
 
-        XacmlPdpMessage updatePdpMessage = new XacmlPdpMessage();
-        PdpStatus statusMessage = updatePdpMessage.formatPdpUpdateMessage(message, XacmlPdpHearbeatPublisher.pdpState);
+        updatePdpMessage.updateInternalStatus(message);
+        PdpStatus statusMessage = updatePdpMessage.formatPdpStatusMessage();
         sendPdpUpdate(statusMessage, client);
     }
 
index 84572d9..3102edb 100644 (file)
@@ -40,40 +40,44 @@ public class XacmlPdpStateChangeListener extends ScoListener<PdpStateChange> {
     private TopicSinkClient client;
 
     private XacmlPdpHearbeatPublisher heartbeat;
+    private XacmlPdpMessage pdpInternalStatus;
 
     /**
      * Constructs the object.
      *
      * @param client used to send back response after receiving state change message
      */
-    public XacmlPdpStateChangeListener(TopicSinkClient client) {
+    public XacmlPdpStateChangeListener(TopicSinkClient client, XacmlPdpMessage pdpStatusMessage) {
         super(PdpStateChange.class);
         PdpStateChange message = new PdpStateChange();
         message.setState(PdpState.PASSIVE);
-        heartbeat = new XacmlPdpHearbeatPublisher(client, message);
+        this.pdpInternalStatus = pdpStatusMessage;
         this.client = client;
+        this.heartbeat = new XacmlPdpHearbeatPublisher(client, pdpStatusMessage);
     }
 
     @Override
     public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, PdpStateChange message) {
 
-        XacmlPdpMessage newMessage = new XacmlPdpMessage();
         try {
-            PdpStatus newStatus = newMessage.formatStatusMessage(message.getState());
 
-            // Send State Change Status to PAP
-            if (!client.send(newStatus)) {
-                LOGGER.error("failed to send to topic sink {}", client.getTopic());
-                throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
-            }
+            if (message.appliesTo(pdpInternalStatus.getPdpName(), pdpInternalStatus.getPdpGroup(),
+                    pdpInternalStatus.getPdpSubGroup())) {
 
-            // Update the heartbeat internal state if publisher is running else create new publisher
-            if (XacmlPdpHearbeatPublisher.isAlive()) {
-                heartbeat.updateInternalState(message.getState());
-            } else {
-                heartbeat = new XacmlPdpHearbeatPublisher(client, message);
-            }
+                pdpInternalStatus.updateInternalStatus(message);
+                PdpStatus newStatus = pdpInternalStatus.formatPdpStatusMessage();
 
+                // Send State Change Status to PAP
+                if (!client.send(newStatus)) {
+                    LOGGER.error("failed to send to topic sink {}", client.getTopic());
+                    throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
+                }
+
+                // Starte new heartbeat if publisher is NOT alive
+                if (!XacmlPdpHearbeatPublisher.isAlive()) {
+                    heartbeat = new XacmlPdpHearbeatPublisher(client, pdpInternalStatus);
+                }
+            }
         } catch (final Exception e) {
             LOGGER.error("failed to handle the PDP State Change message.", e);
         }
index 69f96a0..01d1916 100644 (file)
@@ -25,6 +25,7 @@ import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
 import org.onap.policy.common.endpoints.listeners.ScoListener;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.pdpx.main.comm.XacmlPdpMessage;
 import org.onap.policy.pdpx.main.comm.XacmlPdpUpdatePublisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,15 +35,17 @@ public class XacmlPdpUpdateListener extends ScoListener<PdpUpdate> {
     private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpStateChangeListener.class);
 
     private TopicSinkClient client;
+    private XacmlPdpMessage pdpInternalStatus;
 
     /**
      * Constructs the object.
      *
      * @param client used to send back response after receiving state change message
      */
-    public XacmlPdpUpdateListener(TopicSinkClient client) {
+    public XacmlPdpUpdateListener(TopicSinkClient client, XacmlPdpMessage pdpStatusMessage) {
         super(PdpUpdate.class);
         this.client = client;
+        this.pdpInternalStatus = pdpStatusMessage;
     }
 
     @Override
@@ -51,7 +54,12 @@ public class XacmlPdpUpdateListener extends ScoListener<PdpUpdate> {
         try {
 
             LOGGER.info("PDP update message has been received from the PAP - {}", message.toString());
-            XacmlPdpUpdatePublisher.handlePdpUpdate(message, client);
+
+            if (message.appliesTo(pdpInternalStatus.getPdpName(), pdpInternalStatus.getPdpGroup(),
+                    pdpInternalStatus.getPdpSubGroup())) {
+
+                XacmlPdpUpdatePublisher.handlePdpUpdate(message, client, pdpInternalStatus);
+            }
 
         } catch (final Exception e) {
             LOGGER.error("failed to handle the PDP Update message.", e);
index 70dd2c4..4db11d0 100644 (file)
@@ -104,13 +104,13 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
         TopicEndpoint.manager.addTopicSources(topicProperties);
 
         try {
-            TopicSinkClient sinkClient = new TopicSinkClient(TOPIC);
+            final TopicSinkClient sinkClient = new TopicSinkClient(TOPIC);
+            this.message = new XacmlPdpMessage();
             this.xacmlPdpParameterGroup = xacmlPdpParameterGroup;
             this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
-            this.pdpStateChangeListener = new XacmlPdpStateChangeListener(sinkClient);
-            this.pdpUpdateListener = new XacmlPdpUpdateListener(sinkClient);
+            this.pdpStateChangeListener = new XacmlPdpStateChangeListener(sinkClient, message);
+            this.pdpUpdateListener = new XacmlPdpUpdateListener(sinkClient, message);
             this.register = new XacmlPdpPapRegistration(sinkClient);
-            this.message = new XacmlPdpMessage();
         } catch (RuntimeException | TopicSinkClientException e) {
             throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e);
         }
@@ -154,10 +154,10 @@ public class XacmlPdpActivator extends ServiceManagerContainer {
 
         addAction("Initial Registration with PAP",
             () -> {
-                register.pdpRegistration(message.formatStatusMessage(PdpState.PASSIVE));
+                register.pdpRegistration(message.formatInitialStatusMessage(PdpState.PASSIVE));
             },
             () -> {
-                register.pdpRegistration(message.formatStatusMessage(PdpState.TERMINATED));
+                register.pdpRegistration(message.formatInitialStatusMessage(PdpState.TERMINATED));
             });
         // @formatter:on