* ============LICENSE_START=======================================================
* ONAP PAP
* ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2021 Nordix Foundation.
+ * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.pap.main.comm;
+import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import lombok.Setter;
+import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.pap.concepts.PolicyNotification;
+import org.onap.policy.models.pdp.concepts.Pdp;
+import org.onap.policy.models.pdp.concepts.PdpGroup;
+import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
+import org.onap.policy.models.pdp.concepts.PdpMessage;
import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpSubGroup;
import org.onap.policy.models.pdp.concepts.PdpUpdate;
-import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
-import org.onap.policy.pap.main.comm.msgdata.StateChangeData;
-import org.onap.policy.pap.main.comm.msgdata.UpdateData;
+import org.onap.policy.models.pdp.enums.PdpState;
+import org.onap.policy.models.provider.PolicyModelsProvider;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
+import org.onap.policy.pap.main.comm.msgdata.Request;
+import org.onap.policy.pap.main.comm.msgdata.RequestListener;
+import org.onap.policy.pap.main.comm.msgdata.StateChangeReq;
+import org.onap.policy.pap.main.comm.msgdata.UpdateReq;
+import org.onap.policy.pap.main.notification.DeploymentStatus;
+import org.onap.policy.pap.main.notification.PolicyNotifier;
import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
+import org.onap.policy.pap.main.parameters.RequestParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Maps a PDP name to requests that modify PDPs.
*/
public class PdpModifyRequestMap {
+ private static final Logger logger = LoggerFactory.getLogger(PdpModifyRequestMap.class);
+
+ private static final String UNEXPECTED_BROADCAST = "unexpected broadcast message: ";
/**
- * Maps a PDP name to its request data. An entry is removed once all of the requests
- * within the data have been completed.
+ * Maps a PDP name to its outstanding requests.
*/
- private final Map<String, ModifyReqData> name2data = new HashMap<>();
+ private final Map<String, PdpRequests> pdp2requests = new HashMap<>();
/**
* PDP modification lock.
private final PdpModifyRequestMapParams params;
/**
- * Constructs the data.
+ * Factory for PAP DAO.
+ */
+ private final PolicyModelsProviderFactoryWrapper daoFactory;
+
+ /**
+ * Used to notify when policy updates completes.
+ */
+ private final PolicyNotifier policyNotifier;
+
+ /**
+ * Used to undeploy policies from the system, when they cannot be deployed to a PDP.
+ *
+ * <p/>
+ * Note: there's a "catch-22" here. The request map needs an undeployer, but the
+ * undeployer needs the request map. Consequently, the request map is created first,
+ * then the undeployer, and finally, this field is set.
+ */
+ @Setter
+ private PolicyUndeployer policyUndeployer;
+
+
+ /**
+ * Constructs the object.
*
* @param params configuration parameters
*
this.params = params;
this.modifyLock = params.getModifyLock();
+ this.daoFactory = params.getDaoFactory();
+ this.policyNotifier = params.getPolicyNotifier();
}
/**
- * Adds an UPDATE request to the map.
+ * Determines if the map contains any requests.
*
- * @param update the UPDATE request or {@code null}
+ * @return {@code true} if the map is empty, {@code false} otherwise
*/
- public void addRequest(PdpUpdate update) {
- addRequest(update, null);
+ public boolean isEmpty() {
+ return pdp2requests.isEmpty();
}
/**
- * Adds STATE-CHANGE request to the map.
+ * Stops publishing requests to the given PDP.
*
- * @param stateChange the STATE-CHANGE request or {@code null}
+ * @param pdpName PDP name
*/
- public void addRequest(PdpStateChange stateChange) {
- addRequest(null, stateChange);
+ public void stopPublishing(String pdpName) {
+ synchronized (modifyLock) {
+ PdpRequests requests = pdp2requests.remove(pdpName);
+ if (requests != null) {
+ requests.stopPublishing();
+ }
+ }
}
/**
* @param stateChange the STATE-CHANGE request or {@code null}
*/
public void addRequest(PdpUpdate update, PdpStateChange stateChange) {
- if (update == null && stateChange == null) {
- return;
- }
+ if (update == null) {
+ addRequest(stateChange);
- synchronized (modifyLock) {
- String pdpName = getPdpName(update, stateChange);
+ } else if (stateChange == null) {
+ addRequest(update);
- ModifyReqData data = name2data.get(pdpName);
- if (data != null) {
- // update the existing request
- data.add(update);
- data.add(stateChange);
+ } else if (stateChange.getState() == PdpState.ACTIVE) {
+ // publish update before activating
+ synchronized (modifyLock) {
+ addRequest(update);
+ addRequest(stateChange);
+ }
- } else {
- data = makeRequestData(update, stateChange);
- name2data.put(pdpName, data);
- data.startPublishing();
+ } else {
+ // deactivate before publishing update
+ synchronized (modifyLock) {
+ addRequest(stateChange);
+ addRequest(update);
}
}
}
/**
- * Gets the PDP name from two requests.
+ * Adds an UPDATE request to the map.
*
- * @param update the update request, or {@code null}
- * @param stateChange the state-change request, or {@code null}
- * @return the PDP name, or {@code null} if both requests are {@code null}
+ * @param update the UPDATE request or {@code null}
+ * @return the new request (this should only be used by junit tests)
*/
- private static String getPdpName(PdpUpdate update, PdpStateChange stateChange) {
- String pdpName;
+ public Request addRequest(PdpUpdate update) {
+ if (update == null) {
+ return null;
+ }
- if (update != null) {
- if ((pdpName = update.getName()) == null) {
- throw new IllegalArgumentException("missing name in " + update);
- }
+ if (isBroadcast(update)) {
+ throw new IllegalArgumentException(UNEXPECTED_BROADCAST + update);
+ }
- if (stateChange != null && !pdpName.equals(stateChange.getName())) {
- throw new IllegalArgumentException(
- "name " + stateChange.getName() + " does not match " + pdpName + " " + stateChange);
- }
+ // @formatter:off
+ RequestParams reqparams = new RequestParams()
+ .setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount())
+ .setTimers(params.getUpdateTimers())
+ .setModifyLock(params.getModifyLock())
+ .setPdpPublisher(params.getPdpPublisher())
+ .setResponseDispatcher(params.getResponseDispatcher());
+ // @formatter:on
- } else {
- if ((pdpName = stateChange.getName()) == null) {
- throw new IllegalArgumentException("missing name in " + stateChange);
- }
- }
+ String name = update.getName() + " " + PdpUpdate.class.getSimpleName();
+ var request = new UpdateReq(reqparams, name, update);
- return pdpName;
+ addSingleton(request);
+ return request;
}
/**
- * Determines if two requests are the "same", which is does not necessarily mean
- * "equals".
+ * Adds a STATE-CHANGE request to the map.
*
- * @param first first request to check
- * @param second second request to check
- * @return {@code true} if the requests are the "same", {@code false} otherwise
+ * @param stateChange the STATE-CHANGE request or {@code null}
+ * @return the new request (this should only be used by junit tests)
*/
- protected static boolean isSame(PdpUpdate first, PdpUpdate second) {
- if (first.getPolicies().size() != second.getPolicies().size()) {
- return false;
+ public Request addRequest(PdpStateChange stateChange) {
+ if (stateChange == null) {
+ return null;
}
- if (!first.getPdpGroup().equals(second.getPdpGroup())) {
- return false;
+ if (isBroadcast(stateChange)) {
+ throw new IllegalArgumentException(UNEXPECTED_BROADCAST + stateChange);
}
- if (!first.getPdpSubgroup().equals(second.getPdpSubgroup())) {
- return false;
- }
+ // @formatter:off
+ RequestParams reqparams = new RequestParams()
+ .setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount())
+ .setTimers(params.getStateChangeTimers())
+ .setModifyLock(params.getModifyLock())
+ .setPdpPublisher(params.getPdpPublisher())
+ .setResponseDispatcher(params.getResponseDispatcher());
+ // @formatter:on
- // see if the other has any policies that this does not have
- ArrayList<ToscaPolicy> lst = new ArrayList<>(second.getPolicies());
- lst.removeAll(first.getPolicies());
+ String name = stateChange.getName() + " " + PdpStateChange.class.getSimpleName();
+ var request = new StateChangeReq(reqparams, name, stateChange);
- return lst.isEmpty();
+ addSingleton(request);
+ return request;
}
/**
- * Determines if two requests are the "same", which is does not necessarily mean
- * "equals".
+ * Determines if a message is a broadcast message.
*
- * @param first first request to check
- * @param second second request to check
- * @return {@code true} if this update subsumes the other, {@code false} otherwise
+ * @param message the message to examine
+ * @return {@code true} if the message is a broadcast message, {@code false} if
+ * destined for a single PDP
*/
- protected static boolean isSame(PdpStateChange first, PdpStateChange second) {
- return (first.getState() == second.getState());
+ private boolean isBroadcast(PdpMessage message) {
+ return (message.getName() == null);
}
/**
- * Request data, which contains an UPDATE or a STATE-CHANGE request, or both. The
- * UPDATE is always published before the STATE-CHANGE. In addition, both requests may
- * be changed at any point, possibly triggering a restart of the publishing.
+ * Configures and adds a request to the map.
+ *
+ * @param request the request to be added
*/
- public class ModifyReqData extends RequestData {
+ private void addSingleton(Request request) {
- /**
- * The UPDATE message to be published, or {@code null}.
- */
- private PdpUpdate update;
+ synchronized (modifyLock) {
+ PdpRequests requests = pdp2requests.computeIfAbsent(request.getMessage().getName(), this::makePdpRequests);
- /**
- * The STATE-CHANGE message to be published, or {@code null}.
- */
- private PdpStateChange stateChange;
+ request.setListener(new SingletonListener(requests, request));
+ requests.addSingleton(request);
+ }
+ }
+ /**
+ * Removes expired PDPs from all active groups.
+ */
+ public void removeExpiredPdps() {
- /**
- * Constructs the object.
- *
- * @param newUpdate the UPDATE message to be sent, or {@code null}
- * @param newState the STATE-CHANGE message to be sent, or {@code null}
- */
- public ModifyReqData(PdpUpdate newUpdate, PdpStateChange newState) {
- super(params);
+ synchronized (modifyLock) {
+ logger.info("check for PDP records older than {}ms", params.getMaxPdpAgeMs());
- if (newUpdate != null) {
- this.stateChange = newState;
- setName(newUpdate.getName());
- update = newUpdate;
- configure(new ModUpdateData(newUpdate));
+ try (PolicyModelsProvider dao = daoFactory.create()) {
- } else {
- this.update = null;
- setName(newState.getName());
- stateChange = newState;
- configure(new ModStateChangeData(newState));
- }
- }
+ PdpGroupFilter filter = PdpGroupFilter.builder().groupState(PdpState.ACTIVE).build();
+ List<PdpGroup> groups = dao.getFilteredPdpGroups(filter);
+ List<PdpGroup> updates = new ArrayList<>(1);
- /**
- * Determines if this request is still in the map.
- */
- @Override
- protected boolean isActive() {
- return (name2data.get(getName()) == this);
- }
+ var status = new DeploymentStatus(dao);
- /**
- * Removes this request from the map.
- */
- @Override
- protected void allCompleted() {
- name2data.remove(getName(), this);
- }
+ Instant minAge = Instant.now().minusMillis(params.getMaxPdpAgeMs());
- /**
- * Adds an UPDATE to the request data, replacing any existing UPDATE, if
- * appropriate. If the UPDATE is replaced, then publishing is restarted.
- *
- * @param newRequest the new UPDATE request
- */
- private void add(PdpUpdate newRequest) {
- if (newRequest == null) {
- return;
- }
-
- synchronized (modifyLock) {
- if (update != null && isSame(update, newRequest)) {
- // already have this update - discard it
- return;
+ for (PdpGroup group : groups) {
+ Set<String> pdps = removeFromGroup(minAge, group);
+ if (!pdps.isEmpty()) {
+ updates.add(group);
+ status.loadByGroup(group.getName());
+ pdps.forEach(status::deleteDeployment);
+ }
}
- // must restart from scratch
- stopPublishing();
+ if (!updates.isEmpty()) {
+ dao.updatePdpGroups(updates);
- update = newRequest;
- configure(new ModUpdateData(newRequest));
+ var notification = new PolicyNotification();
+ status.flush(notification);
- startPublishing();
+ policyNotifier.publish(notification);
+ }
+
+ } catch (PfModelException e) {
+ logger.warn("failed to remove expired PDPs", e);
}
}
+ }
- /**
- * Adds a STATE-CHANGE to the request data, replacing any existing UPDATE, if
- * appropriate. If the STATE-CHANGE is replaced, and we're currently publishing
- * the STATE-CHANGE, then publishing is restarted.
- *
- * @param newRequest the new STATE-CHANGE request
- */
- private void add(PdpStateChange newRequest) {
- if (newRequest == null) {
- return;
- }
+ /**
+ * Removes expired PDPs from a group.
+ *
+ * @param minAge minimum age for active PDPs
+ * @param group group from which expired PDPs should be removed
+ * @return the expired PDPs
+ */
+ private Set<String> removeFromGroup(Instant minAge, PdpGroup group) {
+ Set<String> pdps = new HashSet<>();
+ for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
+ removeFromSubgroup(minAge, group, subgrp, pdps);
+ }
- synchronized (modifyLock) {
- if (stateChange != null && isSame(stateChange, newRequest)) {
- // already have this update - discard it
- return;
- }
+ return pdps;
+ }
- if (getWrapper() instanceof StateChangeData) {
- // we were publishing STATE-CHANGE, thus must restart it
- stopPublishing();
+ /**
+ * Removes expired PDPs from a subgroup.
+ *
+ * @param minAge minimum age for active PDPs
+ * @param group group from which to attempt to remove the PDP
+ * @param subgrp subgroup from which to attempt to remove the PDP
+ * @param pdps where to place the expired PDPs
+ */
+ private void removeFromSubgroup(Instant minAge, PdpGroup group, PdpSubGroup subgrp, Set<String> pdps) {
- stateChange = newRequest;
- configure(new ModStateChangeData(newRequest));
+ Iterator<Pdp> iter = subgrp.getPdpInstances().iterator();
- startPublishing();
+ while (iter.hasNext()) {
+ Pdp instance = iter.next();
- } else {
- // haven't started publishing STATE-CHANGE yet, just replace it
- stateChange = newRequest;
- }
+ if (instance.getLastUpdate().isBefore(minAge)) {
+ String pdpName = instance.getInstanceId();
+ logger.info("removed {} from group={} subgroup={}", pdpName, group.getName(), subgrp.getPdpType());
+ iter.remove();
+ subgrp.setCurrentInstanceCount(subgrp.getPdpInstances().size());
+ pdps.add(pdpName);
}
}
+ }
- /**
- * Indicates that the retry count was exhausted.
- */
- protected void retryCountExhausted() {
- // remove this request data from the PDP request map
- allCompleted();
-
- // TODO what to do?
- }
+ /**
+ * Creates a new set of requests for a PDP. May be overridden by junit tests.
+ *
+ * @param pdpName PDP name
+ * @return a new set of requests
+ */
+ protected PdpRequests makePdpRequests(String pdpName) {
+ return new PdpRequests(pdpName, policyNotifier);
+ }
- /**
- * Indicates that a response did not match the data.
- *
- * @param reason the reason for the mismatch
- */
- protected void mismatch(String reason) {
- // remove this request data from the PDP request map
- allCompleted();
+ /**
+ * Makes a handler for PDP responses.
+ * @return a response handler
+ */
+ protected PdpStatusMessageHandler makePdpResponseHandler() {
+ return new PdpStatusMessageHandler(params.getParams(), params.isSavePdpStatistics());
+ }
- // TODO what to do?
+ /**
+ * Listener for singleton request events.
+ */
+ private class SingletonListener implements RequestListener {
+ private final PdpRequests requests;
+ private final Request request;
+ private final String pdpName;
+
+ public SingletonListener(PdpRequests requests, Request request) {
+ this.requests = requests;
+ this.request = request;
+ this.pdpName = requests.getPdpName();
}
- /**
- * Wraps an UPDATE.
- */
- private class ModUpdateData extends UpdateData {
+ @Override
+ public void failure(String responsePdpName, String reason) {
+ Collection<ToscaConceptIdentifier> undeployPolicies = requestCompleted(responsePdpName);
+ if (undeployPolicies.isEmpty()) {
+ // nothing to undeploy
+ return;
+ }
- public ModUpdateData(PdpUpdate message) {
- super(message, params);
+ /*
+ * Undeploy the extra policies. Note: this will likely cause a new message to
+ * be assigned to the request, thus we must re-start it after making the
+ * change.
+ */
+ PdpMessage oldmsg = request.getMessage();
+
+ try {
+ logger.warn("undeploy policies from {}:{} that failed to deploy: {}", oldmsg.getPdpGroup(),
+ oldmsg.getPdpSubgroup(), undeployPolicies);
+ policyUndeployer.undeploy(oldmsg.getPdpGroup(), oldmsg.getPdpSubgroup(), undeployPolicies);
+ } catch (PfModelException | RuntimeException e) {
+ logger.error("cannot undeploy policies {}", undeployPolicies, e);
}
- @Override
- public void mismatch(String reason) {
- ModifyReqData.this.mismatch(reason);
+ if (request.getMessage() == oldmsg) {
+ // message is unchanged - start the next request
+ startNextRequest(request);
+ } else {
+ // message changed - restart the request
+ request.startPublishing();
}
+ }
- @Override
- public void completed() {
- if (stateChange == null) {
- // no STATE-CHANGE request - we're done
- allCompleted();
+ @Override
+ public void success(String responsePdpName, PdpStatus response) {
+ requestCompleted(responsePdpName);
- } else {
- // now process the STATE-CHANGE request
- configure(new ModStateChangeData(stateChange));
- startPublishing();
- }
+ if (!(request instanceof UpdateReq)) {
+ // other response types may not include the list of policies
+ return;
}
+
+ /*
+ * Update PDP time stamps. Also send pdp-update and pdp-state-change, as
+ * necessary, if the response does not reflect what's in the DB.
+ */
+ var handler = makePdpResponseHandler();
+ handler.handlePdpStatus(response);
}
/**
- * Wraps a STATE-CHANGE.
+ * Handles a request completion, starting the next request, if there is one.
+ *
+ * @param responsePdpName name of the PDP provided in the response
+ * @return a list of policies to be undeployed
*/
- private class ModStateChangeData extends StateChangeData {
+ private Collection<ToscaConceptIdentifier> requestCompleted(String responsePdpName) {
+ if (!pdpName.equals(responsePdpName)) {
+ return Collections.emptyList();
+ }
- public ModStateChangeData(PdpStateChange message) {
- super(message, params);
+ if (pdp2requests.get(pdpName) != requests) {
+ logger.info("discard old requests for {}", responsePdpName);
+ requests.stopPublishing();
+ return Collections.emptyList();
}
- @Override
- public void mismatch(String reason) {
- ModifyReqData.this.mismatch(reason);
+ if (!requests.isFirstInQueue(request)) {
+ logger.error("request is not first in the queue {}", request.getMessage());
+ return Collections.emptyList();
}
- @Override
- public void completed() {
- allCompleted();
+ Collection<ToscaConceptIdentifier> undeployPolicies = request.getUndeployPolicies();
+ if (undeployPolicies.isEmpty()) {
+ // nothing to undeploy - just start the next request
+ startNextRequest(request);
}
+
+ return undeployPolicies;
}
- }
- // these may be overridden by junit tests
+ @Override
+ public void retryCountExhausted(Request request) {
+ if (pdp2requests.get(pdpName) == requests) {
+ requests.stopPublishing();
+ startNextRequest(request);
+ }
+ }
- protected ModifyReqData makeRequestData(PdpUpdate update, PdpStateChange stateChange) {
- return new ModifyReqData(update, stateChange);
+ /**
+ * Starts the next request associated with a PDP.
+ *
+ * @param request the request that just completed
+ */
+ private void startNextRequest(Request request) {
+ if (!requests.startNextRequest(request)) {
+ pdp2requests.remove(pdpName, requests);
+ }
+ }
}
}