2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2021 Nordix Foundation.
7 * Modifications Copyright (C) 2021-2022 Bell Canada. All rights reserved.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.pap.main.comm;
25 import java.time.Instant;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.List;
35 import org.onap.policy.models.base.PfModelException;
36 import org.onap.policy.models.pap.concepts.PolicyNotification;
37 import org.onap.policy.models.pdp.concepts.Pdp;
38 import org.onap.policy.models.pdp.concepts.PdpGroup;
39 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
40 import org.onap.policy.models.pdp.concepts.PdpMessage;
41 import org.onap.policy.models.pdp.concepts.PdpStateChange;
42 import org.onap.policy.models.pdp.concepts.PdpStatus;
43 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
44 import org.onap.policy.models.pdp.concepts.PdpUpdate;
45 import org.onap.policy.models.pdp.enums.PdpState;
46 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
47 import org.onap.policy.pap.main.comm.msgdata.Request;
48 import org.onap.policy.pap.main.comm.msgdata.RequestListener;
49 import org.onap.policy.pap.main.comm.msgdata.StateChangeReq;
50 import org.onap.policy.pap.main.comm.msgdata.UpdateReq;
51 import org.onap.policy.pap.main.notification.DeploymentStatus;
52 import org.onap.policy.pap.main.notification.PolicyNotifier;
53 import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
54 import org.onap.policy.pap.main.parameters.RequestParams;
55 import org.onap.policy.pap.main.service.PdpGroupService;
56 import org.onap.policy.pap.main.service.PolicyStatusService;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import org.springframework.stereotype.Component;
62 * Maps a PDP name to requests that modify PDPs.
65 public class PdpModifyRequestMap {
66 private static final Logger logger = LoggerFactory.getLogger(PdpModifyRequestMap.class);
68 private static final String UNEXPECTED_BROADCAST = "unexpected broadcast message: ";
71 * Maps a PDP name to its outstanding requests.
73 private final Map<String, PdpRequests> pdp2requests = new HashMap<>();
76 * PDP modification lock.
78 private Object modifyLock;
81 * The configuration parameters.
83 private PdpModifyRequestMapParams params;
86 * Used to notify when policy updates completes.
88 private final PolicyNotifier policyNotifier;
91 * Used to undeploy policies from the system, when they cannot be deployed to a PDP.
94 * Note: The request map needs an undeployer during creation, and the undeployer
95 * needs the request map when it's initialize method is called.
97 private final PolicyUndeployer policyUndeployer;
99 private final PdpGroupService pdpGroupService;
101 private final PolicyStatusService policyStatusService;
103 private final PdpStatusMessageHandler pdpStatusMessageHandler;
106 * Constructs the object.
108 * @param pdpGroupService the pdpGroupService
109 * @param policyStatusService the policyStatusService
110 * @param pdpStatusMessageHandler the pdpStatusMessageHandler
111 * @param policyUndeployer the policyUndeployer
112 * @param policyNotifier the policyNotifier
114 public PdpModifyRequestMap(PdpGroupService pdpGroupService, PolicyStatusService policyStatusService,
115 PdpStatusMessageHandler pdpStatusMessageHandler, PolicyUndeployer policyUndeployer,
116 PolicyNotifier policyNotifier) {
117 this.pdpGroupService = pdpGroupService;
118 this.policyStatusService = policyStatusService;
119 this.pdpStatusMessageHandler = pdpStatusMessageHandler;
120 this.policyUndeployer = policyUndeployer;
121 this.policyNotifier = policyNotifier;
125 * Initializes the requestMap.
127 * @param params the parameters.
129 public void initialize(PdpModifyRequestMapParams params) {
132 this.params = params;
133 this.modifyLock = params.getModifyLock();
137 * Determines if the map contains any requests.
139 * @return {@code true} if the map is empty, {@code false} otherwise
141 public boolean isEmpty() {
142 return pdp2requests.isEmpty();
146 * Stops publishing requests to the given PDP.
148 * @param pdpName PDP name
150 public void stopPublishing(String pdpName) {
151 synchronized (modifyLock) {
152 PdpRequests requests = pdp2requests.remove(pdpName);
153 if (requests != null) {
154 requests.stopPublishing();
160 * Adds a pair of requests to the map.
162 * @param update the UPDATE request or {@code null}
163 * @param stateChange the STATE-CHANGE request or {@code null}
165 public void addRequest(PdpUpdate update, PdpStateChange stateChange) {
166 if (update == null) {
167 addRequest(stateChange);
169 } else if (stateChange == null) {
172 } else if (stateChange.getState() == PdpState.ACTIVE) {
173 // publish update before activating
174 synchronized (modifyLock) {
176 addRequest(stateChange);
180 // deactivate before publishing update
181 synchronized (modifyLock) {
182 addRequest(stateChange);
189 * Adds an UPDATE request to the map.
191 * @param update the UPDATE request or {@code null}
192 * @return the new request (this should only be used by junit tests)
194 public Request addRequest(PdpUpdate update) {
195 if (update == null) {
199 if (isBroadcast(update)) {
200 throw new IllegalArgumentException(UNEXPECTED_BROADCAST + update);
204 RequestParams reqparams = new RequestParams()
205 .setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount())
206 .setTimers(params.getUpdateTimers())
207 .setModifyLock(params.getModifyLock())
208 .setPdpPublisher(params.getPdpPublisher())
209 .setResponseDispatcher(params.getResponseDispatcher());
212 String name = update.getName() + " " + PdpUpdate.class.getSimpleName();
213 var request = new UpdateReq(reqparams, name, update);
215 addSingleton(request);
220 * Adds a STATE-CHANGE request to the map.
222 * @param stateChange the STATE-CHANGE request or {@code null}
223 * @return the new request (this should only be used by junit tests)
225 public Request addRequest(PdpStateChange stateChange) {
226 if (stateChange == null) {
230 if (isBroadcast(stateChange)) {
231 throw new IllegalArgumentException(UNEXPECTED_BROADCAST + stateChange);
235 RequestParams reqparams = new RequestParams()
236 .setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount())
237 .setTimers(params.getStateChangeTimers())
238 .setModifyLock(params.getModifyLock())
239 .setPdpPublisher(params.getPdpPublisher())
240 .setResponseDispatcher(params.getResponseDispatcher());
243 String name = stateChange.getName() + " " + PdpStateChange.class.getSimpleName();
244 var request = new StateChangeReq(reqparams, name, stateChange);
246 addSingleton(request);
251 * Determines if a message is a broadcast message.
253 * @param message the message to examine
254 * @return {@code true} if the message is a broadcast message, {@code false} if
255 * destined for a single PDP
257 private boolean isBroadcast(PdpMessage message) {
258 return (message.getName() == null);
262 * Configures and adds a request to the map.
264 * @param request the request to be added
266 private void addSingleton(Request request) {
268 synchronized (modifyLock) {
269 PdpRequests requests = pdp2requests.computeIfAbsent(request.getMessage().getName(), this::makePdpRequests);
271 request.setListener(new SingletonListener(requests, request));
272 requests.addSingleton(request);
277 * Removes expired PDPs from all active groups.
279 public void removeExpiredPdps() {
281 synchronized (modifyLock) {
282 logger.info("check for PDP records older than {}ms", params.getMaxPdpAgeMs());
286 PdpGroupFilter filter = PdpGroupFilter.builder().groupState(PdpState.ACTIVE).build();
287 List<PdpGroup> groups = pdpGroupService.getFilteredPdpGroups(filter);
288 List<PdpGroup> updates = new ArrayList<>(1);
290 var status = new DeploymentStatus(policyStatusService);
292 Instant minAge = Instant.now().minusMillis(params.getMaxPdpAgeMs());
294 for (PdpGroup group : groups) {
295 Set<String> pdps = removeFromGroup(minAge, group);
296 if (!pdps.isEmpty()) {
298 status.loadByGroup(group.getName());
299 pdps.forEach(status::deleteDeployment);
303 if (!updates.isEmpty()) {
304 pdpGroupService.updatePdpGroups(updates);
306 var notification = new PolicyNotification();
307 status.flush(notification);
309 policyNotifier.publish(notification);
312 } catch (RuntimeException e) {
313 logger.warn("failed to remove expired PDPs", e);
319 * Removes expired PDPs from a group.
321 * @param minAge minimum age for active PDPs
322 * @param group group from which expired PDPs should be removed
323 * @return the expired PDPs
325 private Set<String> removeFromGroup(Instant minAge, PdpGroup group) {
326 Set<String> pdps = new HashSet<>();
327 for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
328 removeFromSubgroup(minAge, group, subgrp, pdps);
335 * Removes expired PDPs from a subgroup.
337 * @param minAge minimum age for active PDPs
338 * @param group group from which to attempt to remove the PDP
339 * @param subgrp subgroup from which to attempt to remove the PDP
340 * @param pdps where to place the expired PDPs
342 private void removeFromSubgroup(Instant minAge, PdpGroup group, PdpSubGroup subgrp, Set<String> pdps) {
344 Iterator<Pdp> iter = subgrp.getPdpInstances().iterator();
346 while (iter.hasNext()) {
347 Pdp instance = iter.next();
349 if (instance.getLastUpdate().isBefore(minAge)) {
350 String pdpName = instance.getInstanceId();
351 logger.info("removed {} from group={} subgroup={}", pdpName, group.getName(), subgrp.getPdpType());
353 subgrp.setCurrentInstanceCount(subgrp.getPdpInstances().size());
360 * Creates a new set of requests for a PDP. May be overridden by junit tests.
362 * @param pdpName PDP name
363 * @return a new set of requests
365 protected PdpRequests makePdpRequests(String pdpName) {
366 return new PdpRequests(pdpName, policyNotifier);
370 * Listener for singleton request events.
372 private class SingletonListener implements RequestListener {
373 private final PdpRequests requests;
374 private final Request request;
375 private final String pdpName;
377 public SingletonListener(PdpRequests requests, Request request) {
378 this.requests = requests;
379 this.request = request;
380 this.pdpName = requests.getPdpName();
384 public void failure(String responsePdpName, String reason) {
385 Collection<ToscaConceptIdentifier> undeployPolicies = requestCompleted(responsePdpName);
386 if (undeployPolicies.isEmpty()) {
387 // nothing to undeploy
392 * Undeploy the extra policies. Note: this will likely cause a new message to
393 * be assigned to the request, thus we must re-start it after making the
396 PdpMessage oldmsg = request.getMessage();
399 logger.warn("undeploy policies from {}:{} that failed to deploy: {}", oldmsg.getPdpGroup(),
400 oldmsg.getPdpSubgroup(), undeployPolicies);
401 policyUndeployer.undeploy(oldmsg.getPdpGroup(), oldmsg.getPdpSubgroup(), undeployPolicies);
402 } catch (PfModelException | RuntimeException e) {
403 logger.error("cannot undeploy policies {}", undeployPolicies, e);
406 if (request.getMessage() == oldmsg) {
407 // message is unchanged - start the next request
408 startNextRequest(request);
410 // message changed - restart the request
411 request.startPublishing();
416 public void success(String responsePdpName, PdpStatus response) {
417 requestCompleted(responsePdpName);
419 if (!(request instanceof UpdateReq)) {
420 // other response types may not include the list of policies
425 * Update PDP time stamps. Also send pdp-update and pdp-state-change, as
426 * necessary, if the response does not reflect what's in the DB.
428 pdpStatusMessageHandler.handlePdpStatus(response);
432 * Handles a request completion, starting the next request, if there is one.
434 * @param responsePdpName name of the PDP provided in the response
435 * @return a list of policies to be undeployed
437 private Collection<ToscaConceptIdentifier> requestCompleted(String responsePdpName) {
438 if (!pdpName.equals(responsePdpName)) {
439 return Collections.emptyList();
442 if (pdp2requests.get(pdpName) != requests) {
443 logger.info("discard old requests for {}", responsePdpName);
444 requests.stopPublishing();
445 return Collections.emptyList();
448 if (!requests.isFirstInQueue(request)) {
449 logger.error("request is not first in the queue {}", request.getMessage());
450 return Collections.emptyList();
453 Collection<ToscaConceptIdentifier> undeployPolicies = request.getUndeployPolicies();
454 if (undeployPolicies.isEmpty()) {
455 // nothing to undeploy - just start the next request
456 startNextRequest(request);
459 return undeployPolicies;
463 public void retryCountExhausted(Request request) {
464 if (pdp2requests.get(pdpName) == requests) {
465 requests.stopPublishing();
466 startNextRequest(request);
471 * Starts the next request associated with a PDP.
473 * @param request the request that just completed
475 private void startNextRequest(Request request) {
476 if (!requests.startNextRequest(request)) {
477 pdp2requests.remove(pdpName, requests);