import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.models.pdp.concepts.PdpStateChange;
import org.onap.policy.models.pdp.concepts.PdpStatus;
-import org.onap.policy.models.pdp.enums.PdpState;
-import org.onap.policy.pdpx.main.comm.XacmlPdpHearbeatPublisher;
-import org.onap.policy.pdpx.main.comm.XacmlPdpMessage;
+import org.onap.policy.pdpx.main.XacmlState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOGGER = LoggerFactory.getLogger(XacmlPdpStateChangeListener.class);
- private TopicSinkClient client;
+ private final TopicSinkClient client;
- private XacmlPdpHearbeatPublisher heartbeat;
- private XacmlPdpMessage pdpInternalStatus;
+ private final XacmlState state;
/**
* Constructs the object.
*
* @param client used to send back response after receiving state change message
+ * @param state tracks the state of this PDP
*/
- public XacmlPdpStateChangeListener(TopicSinkClient client, XacmlPdpMessage pdpStatusMessage) {
+ public XacmlPdpStateChangeListener(TopicSinkClient client, XacmlState state) {
super(PdpStateChange.class);
- PdpStateChange message = new PdpStateChange();
- message.setState(PdpState.PASSIVE);
- this.pdpInternalStatus = pdpStatusMessage;
this.client = client;
- this.heartbeat = new XacmlPdpHearbeatPublisher(client, pdpStatusMessage);
+ this.state = state;
}
@Override
public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, PdpStateChange message) {
try {
+ if (!state.shouldHandle(message)) {
+ LOGGER.debug("PDP State Change message discarded - {}", message);
+ return;
+ }
- if (message.appliesTo(pdpInternalStatus.getPdpName(), pdpInternalStatus.getPdpGroup(),
- pdpInternalStatus.getPdpSubGroup())) {
-
- pdpInternalStatus.updateInternalStatus(message);
- PdpStatus newStatus = pdpInternalStatus.formatPdpStatusMessage();
-
- // Send State Change Status to PAP
- if (!client.send(newStatus)) {
- LOGGER.error("failed to send to topic sink {}", client.getTopic());
- throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
- }
+ LOGGER.info("PDP State Change message has been received from the PAP - {}", message);
+ PdpStatus newStatus = state.updateInternalState(message);
- // Starte new heartbeat if publisher is NOT alive
- if (!XacmlPdpHearbeatPublisher.isAlive()) {
- heartbeat = new XacmlPdpHearbeatPublisher(client, pdpInternalStatus);
- }
+ // Send State Change Status to PAP
+ if (!client.send(newStatus)) {
+ LOGGER.error("failed to send to topic sink {}", client.getTopic());
+ throw new TopicSinkClientException("failed to send to topic sink " + client.getTopic());
}
+
} catch (final Exception e) {
LOGGER.error("failed to handle the PDP State Change message.", e);
}