1. Added appliesTo() check to state change and PDP update listeners.
2. Made changes to record the extracted pdpUpdate values.
3. Removed pdpState instance variable from heartbeat publisher
4. Split XacmlPdpMessage.updateInternalStatus() into two methods to handle
PdpStateChange and PdpUpdate messages separately
5. Cleaned up unused imports
Change-Id: I76c68d925e46d475c086bd5b86851ea44d821b28
Issue-ID: POLICY-1673
Signed-off-by: Michael Mokry <michael.mokry@att.com>
import java.util.Timer;
import java.util.TimerTask;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
-import org.onap.policy.models.pdp.concepts.PdpStateChange;
-import org.onap.policy.models.pdp.enums.PdpState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private Timer timer;
private XacmlPdpMessage heartbeatMessage;
- private Object message;
private static TopicSinkClient topicSinkClient;
private static volatile boolean alive = false;
- public static PdpState pdpState;
-
/**
* Constructor for instantiating XacmlPdpPublisher.
* @param message of the PDP
* @param topicSinkClient used to send heartbeat message
*/
- public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, PdpStateChange message) {
- this.message = message;
- this.pdpState = message.getState();
+ public XacmlPdpHearbeatPublisher(TopicSinkClient topicSinkClient, XacmlPdpMessage message ) {
this.topicSinkClient = topicSinkClient;
- this.heartbeatMessage = new XacmlPdpMessage();
+ this.heartbeatMessage = message;
timer = new Timer(false);
timer.scheduleAtFixedRate(this, 0, 60000); // time interval temp hard coded now but will be parameterized
setAlive(true);
@Override
public void run() {
- topicSinkClient.send(heartbeatMessage.formatHeartbeatMessage((PdpStateChange) message));
+ topicSinkClient.send(heartbeatMessage.formatPdpStatusMessage());
LOGGER.info("Sending Xacml PDP heartbeat to the PAP");
}
setAlive(false);
}
- public void updateInternalState(PdpState state) {
- ((PdpStateChange) this.message).setState(state);
- this.pdpState = state;
- }
-
public static boolean isAlive() {
return alive;
}
package org.onap.policy.pdpx.main.comm;
+import lombok.Getter;
import org.onap.policy.common.utils.network.NetworkUtil;
import org.onap.policy.models.pdp.concepts.PdpStateChange;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Getter
public class XacmlPdpMessage {
// The logger for this class
private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpMessage.class);
+ private String pdpGroup;
+ private String pdpSubGroup;
+ private PdpState pdpState;
+ private String pdpName = NetworkUtil.getHostname();
/**
- * Method used to format the status message.
+ * Method used to format the initial registration status message.
*
* @param state of the PDP
* @return status message of the PDP
*/
- public PdpStatus formatStatusMessage(PdpState state) {
+ public PdpStatus formatInitialStatusMessage(PdpState state) {
PdpStatus status = new PdpStatus();
- status.setName(NetworkUtil.getHostname());
+ status.setName(pdpName);
if (XacmlPdpActivator.getCurrent().isAlive()) {
status.setHealthy(PdpHealthStatus.HEALTHY);
}
/**
- * Method used to format the heartbeat status message.
+ * Method used to format the PdpStatus message for heartbeat and PDP Updates.
*
- * @param message PdpStateChange message received from the PAP
* @return status message of the PDP
*/
- public PdpStatus formatHeartbeatMessage(PdpStateChange message) {
+ public PdpStatus formatPdpStatusMessage() {
PdpStatus status = new PdpStatus();
- status.setName(NetworkUtil.getHostname());
+ status.setName(pdpName);
if (XacmlPdpActivator.getCurrent().isAlive()) {
status.setHealthy(PdpHealthStatus.HEALTHY);
}
status.setPdpType("xacml");
- status.setState(message.getState());
- status.setPdpGroup(message.getPdpGroup());
- status.setPdpSubgroup(message.getPdpSubgroup());
+ status.setState(pdpState);
+ status.setPdpGroup(pdpGroup);
+ status.setPdpSubgroup(pdpSubGroup);
status.setSupportedPolicyTypes(XacmlPdpApplicationManager.getToscaPolicyTypeIdents());
+ status.setPolicies(XacmlPdpApplicationManager.getToscaPolicyIdentifiers());
return status;
}
/**
- * Method used to format the PdpUpdate message.
- *
- * @param message PdpUpdate message that was received from the PAP
- * @return status message of the PDP
+ * Method used to update PDP status attributes from PdpStateChange.
*/
- public PdpStatus formatPdpUpdateMessage(PdpUpdate message, PdpState state) {
- PdpStatus status = new PdpStatus();
- status.setName(NetworkUtil.getHostname());
-
- if (XacmlPdpActivator.getCurrent().isAlive()) {
- status.setHealthy(PdpHealthStatus.HEALTHY);
- } else {
- status.setHealthy(PdpHealthStatus.NOT_HEALTHY);
- }
-
- status.setPdpType("xacml");
- status.setState(state);
- status.setPdpGroup(message.getPdpGroup());
- status.setPdpSubgroup(message.getPdpSubgroup());
- status.setSupportedPolicyTypes(XacmlPdpApplicationManager.getToscaPolicyTypeIdents());
- status.setPolicies(XacmlPdpApplicationManager.getToscaPolicyIdentifiers());
+ public void updateInternalStatus(PdpStateChange message) {
+ pdpGroup = message.getPdpGroup();
+ pdpSubGroup = message.getPdpSubgroup();
+ pdpState = message.getState();
+ }
- return status;
+ /**
+ * Method used to update PDP status attributes from PdpUpdate.
+ */
+ public void updateInternalStatus(PdpUpdate message) {
+ pdpGroup = message.getPdpGroup();
+ pdpSubGroup = message.getPdpSubgroup();
}
}
* @param message Incoming message
* @param client TopicSinkClient
*/
- public static void handlePdpUpdate(PdpUpdate message, TopicSinkClient client) {
+ public static void handlePdpUpdate(PdpUpdate message, TopicSinkClient client,
+ XacmlPdpMessage updatePdpMessage) {
if (!message.getPolicies().isEmpty() || message.getPolicies() != null) {
}
}
- XacmlPdpMessage updatePdpMessage = new XacmlPdpMessage();
- PdpStatus statusMessage = updatePdpMessage.formatPdpUpdateMessage(message, XacmlPdpHearbeatPublisher.pdpState);
+ updatePdpMessage.updateInternalStatus(message);
+ PdpStatus statusMessage = updatePdpMessage.formatPdpStatusMessage();
sendPdpUpdate(statusMessage, client);
}
private TopicSinkClient client;
private XacmlPdpHearbeatPublisher heartbeat;
+ private XacmlPdpMessage pdpInternalStatus;
/**
* Constructs the object.
*
* @param client used to send back response after receiving state change message
*/
- public XacmlPdpStateChangeListener(TopicSinkClient client) {
+ public XacmlPdpStateChangeListener(TopicSinkClient client, XacmlPdpMessage pdpStatusMessage) {
super(PdpStateChange.class);
PdpStateChange message = new PdpStateChange();
message.setState(PdpState.PASSIVE);
- heartbeat = new XacmlPdpHearbeatPublisher(client, message);
+ this.pdpInternalStatus = pdpStatusMessage;
this.client = client;
+ this.heartbeat = new XacmlPdpHearbeatPublisher(client, pdpStatusMessage);
}
@Override
public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, PdpStateChange message) {
- XacmlPdpMessage newMessage = new XacmlPdpMessage();
try {
- PdpStatus newStatus = newMessage.formatStatusMessage(message.getState());
- // Send State Change Status to PAP
- if (!client.send(newStatus)) {
- LOGGER.error("failed to send to topic sink {}", client.getTopic());
- throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
- }
+ if (message.appliesTo(pdpInternalStatus.getPdpName(), pdpInternalStatus.getPdpGroup(),
+ pdpInternalStatus.getPdpSubGroup())) {
- // Update the heartbeat internal state if publisher is running else create new publisher
- if (XacmlPdpHearbeatPublisher.isAlive()) {
- heartbeat.updateInternalState(message.getState());
- } else {
- heartbeat = new XacmlPdpHearbeatPublisher(client, message);
- }
+ pdpInternalStatus.updateInternalStatus(message);
+ PdpStatus newStatus = pdpInternalStatus.formatPdpStatusMessage();
+ // Send State Change Status to PAP
+ if (!client.send(newStatus)) {
+ LOGGER.error("failed to send to topic sink {}", client.getTopic());
+ throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
+ }
+
+ // Starte new heartbeat if publisher is NOT alive
+ if (!XacmlPdpHearbeatPublisher.isAlive()) {
+ heartbeat = new XacmlPdpHearbeatPublisher(client, pdpInternalStatus);
+ }
+ }
} catch (final Exception e) {
LOGGER.error("failed to handle the PDP State Change message.", e);
}
import org.onap.policy.common.endpoints.listeners.ScoListener;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.pdpx.main.comm.XacmlPdpMessage;
import org.onap.policy.pdpx.main.comm.XacmlPdpUpdatePublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpStateChangeListener.class);
private TopicSinkClient client;
+ private XacmlPdpMessage pdpInternalStatus;
/**
* Constructs the object.
*
* @param client used to send back response after receiving state change message
*/
- public XacmlPdpUpdateListener(TopicSinkClient client) {
+ public XacmlPdpUpdateListener(TopicSinkClient client, XacmlPdpMessage pdpStatusMessage) {
super(PdpUpdate.class);
this.client = client;
+ this.pdpInternalStatus = pdpStatusMessage;
}
@Override
try {
LOGGER.info("PDP update message has been received from the PAP - {}", message.toString());
- XacmlPdpUpdatePublisher.handlePdpUpdate(message, client);
+
+ if (message.appliesTo(pdpInternalStatus.getPdpName(), pdpInternalStatus.getPdpGroup(),
+ pdpInternalStatus.getPdpSubGroup())) {
+
+ XacmlPdpUpdatePublisher.handlePdpUpdate(message, client, pdpInternalStatus);
+ }
} catch (final Exception e) {
LOGGER.error("failed to handle the PDP Update message.", e);
TopicEndpoint.manager.addTopicSources(topicProperties);
try {
- TopicSinkClient sinkClient = new TopicSinkClient(TOPIC);
+ final TopicSinkClient sinkClient = new TopicSinkClient(TOPIC);
+ this.message = new XacmlPdpMessage();
this.xacmlPdpParameterGroup = xacmlPdpParameterGroup;
this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
- this.pdpStateChangeListener = new XacmlPdpStateChangeListener(sinkClient);
- this.pdpUpdateListener = new XacmlPdpUpdateListener(sinkClient);
+ this.pdpStateChangeListener = new XacmlPdpStateChangeListener(sinkClient, message);
+ this.pdpUpdateListener = new XacmlPdpUpdateListener(sinkClient, message);
this.register = new XacmlPdpPapRegistration(sinkClient);
- this.message = new XacmlPdpMessage();
} catch (RuntimeException | TopicSinkClientException e) {
throw new PolicyXacmlPdpRuntimeException(e.getMessage(), e);
}
addAction("Initial Registration with PAP",
() -> {
- register.pdpRegistration(message.formatStatusMessage(PdpState.PASSIVE));
+ register.pdpRegistration(message.formatInitialStatusMessage(PdpState.PASSIVE));
},
() -> {
- register.pdpRegistration(message.formatStatusMessage(PdpState.TERMINATED));
+ register.pdpRegistration(message.formatInitialStatusMessage(PdpState.TERMINATED));
});
// @formatter:on