import org.onap.policy.models.pdp.concepts.PdpStateChange;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.models.pdp.enums.PdpState;
-import org.onap.policy.pdpx.main.comm.XacmlPdpHeartbeatPublisher;
+import org.onap.policy.pdpx.main.comm.XacmlPdpHearbeatPublisher;
import org.onap.policy.pdpx.main.comm.XacmlPdpMessage;
-import org.onap.policy.pdpx.main.startstop.XacmlPdpActivator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private TopicSinkClient client;
- private XacmlPdpHeartbeatPublisher heartbeat;
+ private XacmlPdpHearbeatPublisher heartbeat;
+ private XacmlPdpMessage pdpInternalStatus;
/**
* Constructs the object.
*
* @param client used to send back response after receiving state change message
*/
- public XacmlPdpStateChangeListener(TopicSinkClient client) {
+ public XacmlPdpStateChangeListener(TopicSinkClient client, XacmlPdpMessage pdpStatusMessage) {
super(PdpStateChange.class);
- heartbeat = new XacmlPdpHeartbeatPublisher(client, PdpState.PASSIVE);
+ PdpStateChange message = new PdpStateChange();
+ message.setState(PdpState.PASSIVE);
+ this.pdpInternalStatus = pdpStatusMessage;
this.client = client;
+ this.heartbeat = new XacmlPdpHearbeatPublisher(client, pdpStatusMessage);
}
@Override
public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, PdpStateChange message) {
- XacmlPdpMessage newMessage = new XacmlPdpMessage();
try {
- PdpStatus newStatus = newMessage.formatStatusMessage(message.getState());
- // Send State Change Status to PAP
- if (!client.send(newStatus)) {
- LOGGER.error("failed to send to topic sink " + client.getTopic());
- throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
- }
+ if (message.appliesTo(pdpInternalStatus.getPdpName(), pdpInternalStatus.getPdpGroup(),
+ pdpInternalStatus.getPdpSubGroup())) {
- // Update the heartbeat internal state if publisher is running else create new publisher
- if (XacmlPdpHeartbeatPublisher.isAlive()) {
- heartbeat.updateInternalState(message.getState());
- } else {
- heartbeat = new XacmlPdpHeartbeatPublisher(client, message.getState());
- }
+ pdpInternalStatus.updateInternalStatus(message);
+ PdpStatus newStatus = pdpInternalStatus.formatPdpStatusMessage();
+ // Send State Change Status to PAP
+ if (!client.send(newStatus)) {
+ LOGGER.error("failed to send to topic sink {}", client.getTopic());
+ throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
+ }
+
+ // Starte new heartbeat if publisher is NOT alive
+ if (!XacmlPdpHearbeatPublisher.isAlive()) {
+ heartbeat = new XacmlPdpHearbeatPublisher(client, pdpInternalStatus);
+ }
+ }
} catch (final Exception e) {
LOGGER.error("failed to handle the PDP State Change message.", e);
}