import org.onap.policy.common.parameters.ParameterService;
import org.onap.policy.common.utils.services.Registry;
import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.onap.policy.models.pap.concepts.PolicyNotification;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.models.pdp.enums.PdpMessageType;
import org.onap.policy.pap.main.PapConstants;
import org.onap.policy.pap.main.comm.PdpTracker;
import org.onap.policy.pap.main.comm.Publisher;
import org.onap.policy.pap.main.comm.TimerManager;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
import org.onap.policy.pap.main.parameters.PapParameterGroup;
import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
import org.onap.policy.pap.main.parameters.PdpParameters;
final Object pdpUpdateLock = new Object();
final PdpParameters pdpParams = papParameterGroup.getPdpParameters();
- final AtomicReference<Publisher> pdpPub = new AtomicReference<>();
+ final AtomicReference<Publisher<PdpMessage>> pdpPub = new AtomicReference<>();
+ final AtomicReference<Publisher<PolicyNotification>> notifyPub = new AtomicReference<>();
final AtomicReference<TimerManager> pdpUpdTimers = new AtomicReference<>();
final AtomicReference<TimerManager> pdpStChgTimers = new AtomicReference<>();
final AtomicReference<TimerManager> heartBeatTimers = new AtomicReference<>();
final AtomicReference<PolicyModelsProviderFactoryWrapper> daoFactory = new AtomicReference<>();
final AtomicReference<PdpModifyRequestMap> requestMap = new AtomicReference<>();
final AtomicReference<RestServer> restServer = new AtomicReference<>();
+ final AtomicReference<PolicyNotifier> notifier = new AtomicReference<>();
// @formatter:off
addAction("PAP parameters",
addAction("PDP publisher",
() -> {
- pdpPub.set(new Publisher(PapConstants.TOPIC_POLICY_PDP_PAP));
+ pdpPub.set(new Publisher<>(PapConstants.TOPIC_POLICY_PDP_PAP));
startThread(pdpPub.get());
},
() -> pdpPub.get().stop());
+ addAction("Policy Notification publisher",
+ () -> {
+ notifyPub.set(new Publisher<>(PapConstants.TOPIC_POLICY_NOTIFICATION));
+ startThread(notifyPub.get());
+ notifier.set(new PolicyNotifier(notifyPub.get()));
+ },
+ () -> notifyPub.get().stop());
+
+ addAction("Policy Notifier",
+ () -> Registry.register(PapConstants.REG_POLICY_NOTIFIER, notifier.get()),
+ () -> Registry.unregister(PapConstants.REG_POLICY_NOTIFIER));
+
addAction("PDP heart beat timers",
() -> {
long maxWaitHeartBeatMs = MAX_MISSED_HEARTBEATS * pdpParams.getHeartBeatMs();
.setDaoFactory(daoFactory.get())
.setModifyLock(pdpUpdateLock)
.setParams(pdpParams)
- .setPublisher(pdpPub.get())
+ .setPolicyNotifier(notifier.get())
+ .setPdpPublisher(pdpPub.get())
.setResponseDispatcher(reqIdDispatcher)
.setStateChangeTimers(pdpStChgTimers.get())
.setUpdateTimers(pdpUpdTimers.get())));