2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2021 Nordix Foundation.
7 * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.pap.main.comm;
25 import java.time.Instant;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.List;
36 import org.onap.policy.models.base.PfModelException;
37 import org.onap.policy.models.pap.concepts.PolicyNotification;
38 import org.onap.policy.models.pdp.concepts.Pdp;
39 import org.onap.policy.models.pdp.concepts.PdpGroup;
40 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
41 import org.onap.policy.models.pdp.concepts.PdpMessage;
42 import org.onap.policy.models.pdp.concepts.PdpStateChange;
43 import org.onap.policy.models.pdp.concepts.PdpStatus;
44 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
45 import org.onap.policy.models.pdp.concepts.PdpUpdate;
46 import org.onap.policy.models.pdp.enums.PdpState;
47 import org.onap.policy.models.provider.PolicyModelsProvider;
48 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
49 import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
50 import org.onap.policy.pap.main.comm.msgdata.Request;
51 import org.onap.policy.pap.main.comm.msgdata.RequestListener;
52 import org.onap.policy.pap.main.comm.msgdata.StateChangeReq;
53 import org.onap.policy.pap.main.comm.msgdata.UpdateReq;
54 import org.onap.policy.pap.main.notification.DeploymentStatus;
55 import org.onap.policy.pap.main.notification.PolicyNotifier;
56 import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
57 import org.onap.policy.pap.main.parameters.RequestParams;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
62 * Maps a PDP name to requests that modify PDPs.
64 public class PdpModifyRequestMap {
65 private static final Logger logger = LoggerFactory.getLogger(PdpModifyRequestMap.class);
67 private static final String UNEXPECTED_BROADCAST = "unexpected broadcast message: ";
70 * Maps a PDP name to its outstanding requests.
72 private final Map<String, PdpRequests> pdp2requests = new HashMap<>();
75 * PDP modification lock.
77 private final Object modifyLock;
80 * The configuration parameters.
82 private final PdpModifyRequestMapParams params;
85 * Factory for PAP DAO.
87 private final PolicyModelsProviderFactoryWrapper daoFactory;
90 * Used to notify when policy updates completes.
92 private final PolicyNotifier policyNotifier;
95 * Used to undeploy policies from the system, when they cannot be deployed to a PDP.
98 * Note: there's a "catch-22" here. The request map needs an undeployer, but the
99 * undeployer needs the request map. Consequently, the request map is created first,
100 * then the undeployer, and finally, this field is set.
103 private PolicyUndeployer policyUndeployer;
107 * Constructs the object.
109 * @param params configuration parameters
111 * @throws IllegalArgumentException if a required parameter is not set
113 public PdpModifyRequestMap(PdpModifyRequestMapParams params) {
116 this.params = params;
117 this.modifyLock = params.getModifyLock();
118 this.daoFactory = params.getDaoFactory();
119 this.policyNotifier = params.getPolicyNotifier();
123 * Determines if the map contains any requests.
125 * @return {@code true} if the map is empty, {@code false} otherwise
127 public boolean isEmpty() {
128 return pdp2requests.isEmpty();
132 * Stops publishing requests to the given PDP.
134 * @param pdpName PDP name
136 public void stopPublishing(String pdpName) {
137 synchronized (modifyLock) {
138 PdpRequests requests = pdp2requests.remove(pdpName);
139 if (requests != null) {
140 requests.stopPublishing();
146 * Adds a pair of requests to the map.
148 * @param update the UPDATE request or {@code null}
149 * @param stateChange the STATE-CHANGE request or {@code null}
151 public void addRequest(PdpUpdate update, PdpStateChange stateChange) {
152 if (update == null) {
153 addRequest(stateChange);
155 } else if (stateChange == null) {
158 } else if (stateChange.getState() == PdpState.ACTIVE) {
159 // publish update before activating
160 synchronized (modifyLock) {
162 addRequest(stateChange);
166 // deactivate before publishing update
167 synchronized (modifyLock) {
168 addRequest(stateChange);
175 * Adds an UPDATE request to the map.
177 * @param update the UPDATE request or {@code null}
178 * @return the new request (this should only be used by junit tests)
180 public Request addRequest(PdpUpdate update) {
181 if (update == null) {
185 if (isBroadcast(update)) {
186 throw new IllegalArgumentException(UNEXPECTED_BROADCAST + update);
190 RequestParams reqparams = new RequestParams()
191 .setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount())
192 .setTimers(params.getUpdateTimers())
193 .setModifyLock(params.getModifyLock())
194 .setPdpPublisher(params.getPdpPublisher())
195 .setResponseDispatcher(params.getResponseDispatcher());
198 String name = update.getName() + " " + PdpUpdate.class.getSimpleName();
199 var request = new UpdateReq(reqparams, name, update);
201 addSingleton(request);
206 * Adds a STATE-CHANGE request to the map.
208 * @param stateChange the STATE-CHANGE request or {@code null}
209 * @return the new request (this should only be used by junit tests)
211 public Request addRequest(PdpStateChange stateChange) {
212 if (stateChange == null) {
216 if (isBroadcast(stateChange)) {
217 throw new IllegalArgumentException(UNEXPECTED_BROADCAST + stateChange);
221 RequestParams reqparams = new RequestParams()
222 .setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount())
223 .setTimers(params.getStateChangeTimers())
224 .setModifyLock(params.getModifyLock())
225 .setPdpPublisher(params.getPdpPublisher())
226 .setResponseDispatcher(params.getResponseDispatcher());
229 String name = stateChange.getName() + " " + PdpStateChange.class.getSimpleName();
230 var request = new StateChangeReq(reqparams, name, stateChange);
232 addSingleton(request);
237 * Determines if a message is a broadcast message.
239 * @param message the message to examine
240 * @return {@code true} if the message is a broadcast message, {@code false} if
241 * destined for a single PDP
243 private boolean isBroadcast(PdpMessage message) {
244 return (message.getName() == null);
248 * Configures and adds a request to the map.
250 * @param request the request to be added
252 private void addSingleton(Request request) {
254 synchronized (modifyLock) {
255 PdpRequests requests = pdp2requests.computeIfAbsent(request.getMessage().getName(), this::makePdpRequests);
257 request.setListener(new SingletonListener(requests, request));
258 requests.addSingleton(request);
263 * Removes expired PDPs from all active groups.
265 public void removeExpiredPdps() {
267 synchronized (modifyLock) {
268 logger.info("check for PDP records older than {}ms", params.getMaxPdpAgeMs());
270 try (PolicyModelsProvider dao = daoFactory.create()) {
272 PdpGroupFilter filter = PdpGroupFilter.builder().groupState(PdpState.ACTIVE).build();
273 List<PdpGroup> groups = dao.getFilteredPdpGroups(filter);
274 List<PdpGroup> updates = new ArrayList<>(1);
276 var status = new DeploymentStatus(dao);
278 Instant minAge = Instant.now().minusMillis(params.getMaxPdpAgeMs());
280 for (PdpGroup group : groups) {
281 Set<String> pdps = removeFromGroup(minAge, group);
282 if (!pdps.isEmpty()) {
284 status.loadByGroup(group.getName());
285 pdps.forEach(status::deleteDeployment);
289 if (!updates.isEmpty()) {
290 dao.updatePdpGroups(updates);
292 var notification = new PolicyNotification();
293 status.flush(notification);
295 policyNotifier.publish(notification);
298 } catch (PfModelException e) {
299 logger.warn("failed to remove expired PDPs", e);
305 * Removes expired PDPs from a group.
307 * @param minAge minimum age for active PDPs
308 * @param group group from which expired PDPs should be removed
309 * @return the expired PDPs
311 private Set<String> removeFromGroup(Instant minAge, PdpGroup group) {
312 Set<String> pdps = new HashSet<>();
313 for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
314 removeFromSubgroup(minAge, group, subgrp, pdps);
321 * Removes expired PDPs from a subgroup.
323 * @param minAge minimum age for active PDPs
324 * @param group group from which to attempt to remove the PDP
325 * @param subgrp subgroup from which to attempt to remove the PDP
326 * @param pdps where to place the expired PDPs
328 private void removeFromSubgroup(Instant minAge, PdpGroup group, PdpSubGroup subgrp, Set<String> pdps) {
330 Iterator<Pdp> iter = subgrp.getPdpInstances().iterator();
332 while (iter.hasNext()) {
333 Pdp instance = iter.next();
335 if (instance.getLastUpdate().isBefore(minAge)) {
336 String pdpName = instance.getInstanceId();
337 logger.info("removed {} from group={} subgroup={}", pdpName, group.getName(), subgrp.getPdpType());
339 subgrp.setCurrentInstanceCount(subgrp.getPdpInstances().size());
346 * Creates a new set of requests for a PDP. May be overridden by junit tests.
348 * @param pdpName PDP name
349 * @return a new set of requests
351 protected PdpRequests makePdpRequests(String pdpName) {
352 return new PdpRequests(pdpName, policyNotifier);
356 * Makes a handler for PDP responses.
357 * @return a response handler
359 protected PdpStatusMessageHandler makePdpResponseHandler() {
360 return new PdpStatusMessageHandler(params.getParams(), params.isSavePdpStatistics());
364 * Listener for singleton request events.
366 private class SingletonListener implements RequestListener {
367 private final PdpRequests requests;
368 private final Request request;
369 private final String pdpName;
371 public SingletonListener(PdpRequests requests, Request request) {
372 this.requests = requests;
373 this.request = request;
374 this.pdpName = requests.getPdpName();
378 public void failure(String responsePdpName, String reason) {
379 Collection<ToscaConceptIdentifier> undeployPolicies = requestCompleted(responsePdpName);
380 if (undeployPolicies.isEmpty()) {
381 // nothing to undeploy
386 * Undeploy the extra policies. Note: this will likely cause a new message to
387 * be assigned to the request, thus we must re-start it after making the
390 PdpMessage oldmsg = request.getMessage();
393 logger.warn("undeploy policies from {}:{} that failed to deploy: {}", oldmsg.getPdpGroup(),
394 oldmsg.getPdpSubgroup(), undeployPolicies);
395 policyUndeployer.undeploy(oldmsg.getPdpGroup(), oldmsg.getPdpSubgroup(), undeployPolicies);
396 } catch (PfModelException | RuntimeException e) {
397 logger.error("cannot undeploy policies {}", undeployPolicies, e);
400 if (request.getMessage() == oldmsg) {
401 // message is unchanged - start the next request
402 startNextRequest(request);
404 // message changed - restart the request
405 request.startPublishing();
410 public void success(String responsePdpName, PdpStatus response) {
411 requestCompleted(responsePdpName);
413 if (!(request instanceof UpdateReq)) {
414 // other response types may not include the list of policies
419 * Update PDP time stamps. Also send pdp-update and pdp-state-change, as
420 * necessary, if the response does not reflect what's in the DB.
422 var handler = makePdpResponseHandler();
423 handler.handlePdpStatus(response);
427 * Handles a request completion, starting the next request, if there is one.
429 * @param responsePdpName name of the PDP provided in the response
430 * @return a list of policies to be undeployed
432 private Collection<ToscaConceptIdentifier> requestCompleted(String responsePdpName) {
433 if (!pdpName.equals(responsePdpName)) {
434 return Collections.emptyList();
437 if (pdp2requests.get(pdpName) != requests) {
438 logger.info("discard old requests for {}", responsePdpName);
439 requests.stopPublishing();
440 return Collections.emptyList();
443 if (!requests.isFirstInQueue(request)) {
444 logger.error("request is not first in the queue {}", request.getMessage());
445 return Collections.emptyList();
448 Collection<ToscaConceptIdentifier> undeployPolicies = request.getUndeployPolicies();
449 if (undeployPolicies.isEmpty()) {
450 // nothing to undeploy - just start the next request
451 startNextRequest(request);
454 return undeployPolicies;
458 public void retryCountExhausted(Request request) {
459 if (pdp2requests.get(pdpName) == requests) {
460 requests.stopPublishing();
461 startNextRequest(request);
466 * Starts the next request associated with a PDP.
468 * @param request the request that just completed
470 private void startNextRequest(Request request) {
471 if (!requests.startNextRequest(request)) {
472 pdp2requests.remove(pdpName, requests);