private TopicSinkClient client;
private XacmlPdpHearbeatPublisher heartbeat;
+ private XacmlPdpMessage pdpInternalStatus;
/**
* Constructs the object.
*
* @param client used to send back response after receiving state change message
*/
- public XacmlPdpStateChangeListener(TopicSinkClient client) {
+ public XacmlPdpStateChangeListener(TopicSinkClient client, XacmlPdpMessage pdpStatusMessage) {
super(PdpStateChange.class);
PdpStateChange message = new PdpStateChange();
message.setState(PdpState.PASSIVE);
- heartbeat = new XacmlPdpHearbeatPublisher(client, message);
+ this.pdpInternalStatus = pdpStatusMessage;
this.client = client;
+ this.heartbeat = new XacmlPdpHearbeatPublisher(client, pdpStatusMessage);
}
@Override
public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, PdpStateChange message) {
- XacmlPdpMessage newMessage = new XacmlPdpMessage();
try {
- PdpStatus newStatus = newMessage.formatStatusMessage(message.getState());
- // Send State Change Status to PAP
- if (!client.send(newStatus)) {
- LOGGER.error("failed to send to topic sink {}", client.getTopic());
- throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
- }
+ if (message.appliesTo(pdpInternalStatus.getPdpName(), pdpInternalStatus.getPdpGroup(),
+ pdpInternalStatus.getPdpSubGroup())) {
- // Update the heartbeat internal state if publisher is running else create new publisher
- if (XacmlPdpHearbeatPublisher.isAlive()) {
- heartbeat.updateInternalState(message.getState());
- } else {
- heartbeat = new XacmlPdpHearbeatPublisher(client, message);
- }
+ pdpInternalStatus.updateInternalStatus(message);
+ PdpStatus newStatus = pdpInternalStatus.formatPdpStatusMessage();
+ // Send State Change Status to PAP
+ if (!client.send(newStatus)) {
+ LOGGER.error("failed to send to topic sink {}", client.getTopic());
+ throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
+ }
+
+ // Starte new heartbeat if publisher is NOT alive
+ if (!XacmlPdpHearbeatPublisher.isAlive()) {
+ heartbeat = new XacmlPdpHearbeatPublisher(client, pdpInternalStatus);
+ }
+ }
} catch (final Exception e) {
LOGGER.error("failed to handle the PDP State Change message.", e);
}