* ============LICENSE_START=======================================================
* ONAP PAP
* ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2021 Nordix Foundation.
+ * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.pap.main.comm;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import lombok.Setter;
import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.pap.concepts.PolicyNotification;
import org.onap.policy.models.pdp.concepts.Pdp;
import org.onap.policy.models.pdp.concepts.PdpGroup;
import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
import org.onap.policy.models.pdp.concepts.PdpSubGroup;
import org.onap.policy.models.pdp.concepts.PdpUpdate;
import org.onap.policy.models.pdp.enums.PdpState;
import org.onap.policy.models.provider.PolicyModelsProvider;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
import org.onap.policy.pap.main.comm.msgdata.Request;
import org.onap.policy.pap.main.comm.msgdata.RequestListener;
import org.onap.policy.pap.main.comm.msgdata.StateChangeReq;
import org.onap.policy.pap.main.comm.msgdata.UpdateReq;
+import org.onap.policy.pap.main.notification.DeploymentStatus;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
import org.onap.policy.pap.main.parameters.RequestParams;
import org.slf4j.Logger;
*/
private final PolicyModelsProviderFactoryWrapper daoFactory;
+ /**
+ * Used to notify when policy updates completes.
+ */
+ private final PolicyNotifier policyNotifier;
+
+ /**
+ * Used to undeploy policies from the system, when they cannot be deployed to a PDP.
+ *
+ * <p/>
+ * Note: there's a "catch-22" here. The request map needs an undeployer, but the
+ * undeployer needs the request map. Consequently, the request map is created first,
+ * then the undeployer, and finally, this field is set.
+ */
+ @Setter
+ private PolicyUndeployer policyUndeployer;
+
/**
* Constructs the object.
this.params = params;
this.modifyLock = params.getModifyLock();
this.daoFactory = params.getDaoFactory();
+ this.policyNotifier = params.getPolicyNotifier();
+ }
+
+ /**
+ * Determines if the map contains any requests.
+ *
+ * @return {@code true} if the map is empty, {@code false} otherwise
+ */
+ public boolean isEmpty() {
+ return pdp2requests.isEmpty();
}
/**
if (update == null) {
addRequest(stateChange);
- } else {
+ } else if (stateChange == null) {
+ addRequest(update);
+
+ } else if (stateChange.getState() == PdpState.ACTIVE) {
+ // publish update before activating
synchronized (modifyLock) {
addRequest(update);
addRequest(stateChange);
}
+
+ } else {
+ // deactivate before publishing update
+ synchronized (modifyLock) {
+ addRequest(stateChange);
+ addRequest(update);
+ }
}
}
* Adds an UPDATE request to the map.
*
* @param update the UPDATE request or {@code null}
+ * @return the new request (this should only be used by junit tests)
*/
- public void addRequest(PdpUpdate update) {
+ public Request addRequest(PdpUpdate update) {
if (update == null) {
- return;
+ return null;
}
if (isBroadcast(update)) {
.setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount())
.setTimers(params.getUpdateTimers())
.setModifyLock(params.getModifyLock())
- .setPublisher(params.getPublisher())
+ .setPdpPublisher(params.getPdpPublisher())
.setResponseDispatcher(params.getResponseDispatcher());
// @formatter:on
String name = update.getName() + " " + PdpUpdate.class.getSimpleName();
- UpdateReq request = new UpdateReq(reqparams, name, update);
+ var request = new UpdateReq(reqparams, name, update);
addSingleton(request);
+ return request;
}
/**
* Adds a STATE-CHANGE request to the map.
*
* @param stateChange the STATE-CHANGE request or {@code null}
+ * @return the new request (this should only be used by junit tests)
*/
- public void addRequest(PdpStateChange stateChange) {
+ public Request addRequest(PdpStateChange stateChange) {
if (stateChange == null) {
- return;
+ return null;
}
if (isBroadcast(stateChange)) {
.setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount())
.setTimers(params.getStateChangeTimers())
.setModifyLock(params.getModifyLock())
- .setPublisher(params.getPublisher())
+ .setPdpPublisher(params.getPdpPublisher())
.setResponseDispatcher(params.getResponseDispatcher());
// @formatter:on
String name = stateChange.getName() + " " + PdpStateChange.class.getSimpleName();
- StateChangeReq request = new StateChangeReq(reqparams, name, stateChange);
+ var request = new StateChangeReq(reqparams, name, stateChange);
addSingleton(request);
+ return request;
}
/**
}
/**
- * Starts the next request associated with a PDP.
- *
- * @param requests current set of requests
- * @param request the request that just completed
- */
- private void startNextRequest(PdpRequests requests, Request request) {
- if (!requests.startNextRequest(request)) {
- pdp2requests.remove(requests.getPdpName(), requests);
- }
- }
-
- /**
- * Disables a PDP by removing it from its subgroup and then sending it a PASSIVE
- * request.
- *
- * @param requests the requests associated with the PDP to be disabled
+ * Removes expired PDPs from all active groups.
*/
- private void disablePdp(PdpRequests requests) {
-
- // remove the requests from the map
- if (!pdp2requests.remove(requests.getPdpName(), requests)) {
- // don't have the info we need to disable it
- logger.warn("no requests with which to disable {}", requests.getPdpName());
- return;
- }
-
- logger.warn("disabling {}", requests.getPdpName());
+ public void removeExpiredPdps() {
- requests.stopPublishing();
+ synchronized (modifyLock) {
+ logger.info("check for PDP records older than {}ms", params.getMaxPdpAgeMs());
- // don't do anything if we don't have a group
- String name = requests.getLastGroupName();
- if (name == null) {
- logger.warn("no group with which to disable {}", requests.getPdpName());
- return;
- }
+ try (PolicyModelsProvider dao = daoFactory.create()) {
- // remove the PDP from the group
- removeFromGroup(requests.getPdpName(), name);
+ PdpGroupFilter filter = PdpGroupFilter.builder().groupState(PdpState.ACTIVE).build();
+ List<PdpGroup> groups = dao.getFilteredPdpGroups(filter);
+ List<PdpGroup> updates = new ArrayList<>(1);
- // send the state change
- PdpStateChange change = new PdpStateChange();
- change.setName(requests.getPdpName());
- change.setState(PdpState.PASSIVE);
- addRequest(change);
- }
+ var status = new DeploymentStatus(dao);
- /**
- * Removes a PDP from its group.
- *
- * @param pdpName name of the PDP to be removed
- * @param groupName name of the group from which it should be removed
- */
- private void removeFromGroup(String pdpName, String groupName) {
+ Instant minAge = Instant.now().minusMillis(params.getMaxPdpAgeMs());
- try (PolicyModelsProvider dao = daoFactory.create()) {
+ for (PdpGroup group : groups) {
+ Set<String> pdps = removeFromGroup(minAge, group);
+ if (!pdps.isEmpty()) {
+ updates.add(group);
+ status.loadByGroup(group.getName());
+ pdps.forEach(status::deleteDeployment);
+ }
+ }
- PdpGroupFilter filter = PdpGroupFilter.builder().name(groupName).groupState(PdpState.ACTIVE)
- .version(PdpGroupFilter.LATEST_VERSION).build();
+ if (!updates.isEmpty()) {
+ dao.updatePdpGroups(updates);
- List<PdpGroup> groups = dao.getFilteredPdpGroups(filter);
- if (groups.isEmpty()) {
- return;
- }
+ var notification = new PolicyNotification();
+ status.flush(notification);
- PdpGroup group = groups.get(0);
-
- for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
- if (removeFromSubgroup(pdpName, group, subgrp)) {
- dao.updatePdpGroups(Collections.singletonList(group));
- return;
+ policyNotifier.publish(notification);
}
+
+ } catch (PfModelException e) {
+ logger.warn("failed to remove expired PDPs", e);
}
+ }
+ }
- } catch (PfModelException e) {
- logger.info("unable to remove PDP {} from subgroup", pdpName, e);
+ /**
+ * Removes expired PDPs from a group.
+ *
+ * @param minAge minimum age for active PDPs
+ * @param group group from which expired PDPs should be removed
+ * @return the expired PDPs
+ */
+ private Set<String> removeFromGroup(Instant minAge, PdpGroup group) {
+ Set<String> pdps = new HashSet<>();
+ for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
+ removeFromSubgroup(minAge, group, subgrp, pdps);
}
+
+ return pdps;
}
/**
- * Removes a PDP from a subgroup.
+ * Removes expired PDPs from a subgroup.
*
- * @param pdpName name of the PDP to be removed
+ * @param minAge minimum age for active PDPs
* @param group group from which to attempt to remove the PDP
* @param subgrp subgroup from which to attempt to remove the PDP
- * @return {@code true} if the PDP was removed, {@code false} if the PDP was not in
- * the group
- * @throws PfModelException if a DB error occurs
+ * @param pdps where to place the expired PDPs
*/
- private boolean removeFromSubgroup(String pdpName, PdpGroup group, PdpSubGroup subgrp) throws PfModelException {
+ private void removeFromSubgroup(Instant minAge, PdpGroup group, PdpSubGroup subgrp, Set<String> pdps) {
Iterator<Pdp> iter = subgrp.getPdpInstances().iterator();
while (iter.hasNext()) {
Pdp instance = iter.next();
- if (pdpName.equals(instance.getInstanceId())) {
- logger.info("removed {} from group={} version={} subgroup={}", pdpName, group.getName(),
- group.getVersion(), subgrp.getPdpType());
+ if (instance.getLastUpdate().isBefore(minAge)) {
+ String pdpName = instance.getInstanceId();
+ logger.info("removed {} from group={} subgroup={}", pdpName, group.getName(), subgrp.getPdpType());
iter.remove();
subgrp.setCurrentInstanceCount(subgrp.getPdpInstances().size());
- return true;
+ pdps.add(pdpName);
}
}
-
- return false;
}
/**
* @return a new set of requests
*/
protected PdpRequests makePdpRequests(String pdpName) {
- return new PdpRequests(pdpName);
+ return new PdpRequests(pdpName, policyNotifier);
+ }
+
+ /**
+ * Makes a handler for PDP responses.
+ * @return a response handler
+ */
+ protected PdpStatusMessageHandler makePdpResponseHandler() {
+ return new PdpStatusMessageHandler(params.getParams(), params.isSavePdpStatistics());
}
/**
private class SingletonListener implements RequestListener {
private final PdpRequests requests;
private final Request request;
+ private final String pdpName;
public SingletonListener(PdpRequests requests, Request request) {
this.requests = requests;
this.request = request;
+ this.pdpName = requests.getPdpName();
}
@Override
- public void failure(String pdpName, String reason) {
- if (requests.getPdpName().equals(pdpName)) {
- disablePdp(requests);
+ public void failure(String responsePdpName, String reason) {
+ Collection<ToscaConceptIdentifier> undeployPolicies = requestCompleted(responsePdpName);
+ if (undeployPolicies.isEmpty()) {
+ // nothing to undeploy
+ return;
+ }
+
+ /*
+ * Undeploy the extra policies. Note: this will likely cause a new message to
+ * be assigned to the request, thus we must re-start it after making the
+ * change.
+ */
+ PdpMessage oldmsg = request.getMessage();
+
+ try {
+ logger.warn("undeploy policies from {}:{} that failed to deploy: {}", oldmsg.getPdpGroup(),
+ oldmsg.getPdpSubgroup(), undeployPolicies);
+ policyUndeployer.undeploy(oldmsg.getPdpGroup(), oldmsg.getPdpSubgroup(), undeployPolicies);
+ } catch (PfModelException | RuntimeException e) {
+ logger.error("cannot undeploy policies {}", undeployPolicies, e);
+ }
+
+ if (request.getMessage() == oldmsg) {
+ // message is unchanged - start the next request
+ startNextRequest(request);
+ } else {
+ // message changed - restart the request
+ request.startPublishing();
}
}
@Override
- public void success(String pdpName) {
- if (requests.getPdpName().equals(pdpName)) {
- if (pdp2requests.get(requests.getPdpName()) == requests) {
- startNextRequest(requests, request);
-
- } else {
- logger.info("discard old requests for {}", pdpName);
- requests.stopPublishing();
- }
+ public void success(String responsePdpName, PdpStatus response) {
+ requestCompleted(responsePdpName);
+
+ if (!(request instanceof UpdateReq)) {
+ // other response types may not include the list of policies
+ return;
}
+
+ /*
+ * Update PDP time stamps. Also send pdp-update and pdp-state-change, as
+ * necessary, if the response does not reflect what's in the DB.
+ */
+ var handler = makePdpResponseHandler();
+ handler.handlePdpStatus(response);
+ }
+
+ /**
+ * Handles a request completion, starting the next request, if there is one.
+ *
+ * @param responsePdpName name of the PDP provided in the response
+ * @return a list of policies to be undeployed
+ */
+ private Collection<ToscaConceptIdentifier> requestCompleted(String responsePdpName) {
+ if (!pdpName.equals(responsePdpName)) {
+ return Collections.emptyList();
+ }
+
+ if (pdp2requests.get(pdpName) != requests) {
+ logger.info("discard old requests for {}", responsePdpName);
+ requests.stopPublishing();
+ return Collections.emptyList();
+ }
+
+ if (!requests.isFirstInQueue(request)) {
+ logger.error("request is not first in the queue {}", request.getMessage());
+ return Collections.emptyList();
+ }
+
+ Collection<ToscaConceptIdentifier> undeployPolicies = request.getUndeployPolicies();
+ if (undeployPolicies.isEmpty()) {
+ // nothing to undeploy - just start the next request
+ startNextRequest(request);
+ }
+
+ return undeployPolicies;
}
@Override
- public void retryCountExhausted() {
- disablePdp(requests);
+ public void retryCountExhausted(Request request) {
+ if (pdp2requests.get(pdpName) == requests) {
+ requests.stopPublishing();
+ startNextRequest(request);
+ }
+ }
+
+ /**
+ * Starts the next request associated with a PDP.
+ *
+ * @param request the request that just completed
+ */
+ private void startNextRequest(Request request) {
+ if (!requests.startNextRequest(request)) {
+ pdp2requests.remove(pdpName, requests);
+ }
}
}
}