Remove expired PDPs
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / comm / PdpModifyRequestMap.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP PAP
4  * ================================================================================
5  * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2021 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.pap.main.comm;
23
24 import java.time.Instant;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import lombok.Setter;
35 import org.onap.policy.models.base.PfModelException;
36 import org.onap.policy.models.pap.concepts.PolicyNotification;
37 import org.onap.policy.models.pdp.concepts.Pdp;
38 import org.onap.policy.models.pdp.concepts.PdpGroup;
39 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
40 import org.onap.policy.models.pdp.concepts.PdpMessage;
41 import org.onap.policy.models.pdp.concepts.PdpStateChange;
42 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
43 import org.onap.policy.models.pdp.concepts.PdpUpdate;
44 import org.onap.policy.models.pdp.enums.PdpState;
45 import org.onap.policy.models.provider.PolicyModelsProvider;
46 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
47 import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
48 import org.onap.policy.pap.main.comm.msgdata.Request;
49 import org.onap.policy.pap.main.comm.msgdata.RequestListener;
50 import org.onap.policy.pap.main.comm.msgdata.StateChangeReq;
51 import org.onap.policy.pap.main.comm.msgdata.UpdateReq;
52 import org.onap.policy.pap.main.notification.DeploymentStatus;
53 import org.onap.policy.pap.main.notification.PolicyNotifier;
54 import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
55 import org.onap.policy.pap.main.parameters.RequestParams;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 /**
60  * Maps a PDP name to requests that modify PDPs.
61  */
62 public class PdpModifyRequestMap {
63     private static final Logger logger = LoggerFactory.getLogger(PdpModifyRequestMap.class);
64
65     private static final String UNEXPECTED_BROADCAST = "unexpected broadcast message: ";
66
67     /**
68      * Maps a PDP name to its outstanding requests.
69      */
70     private final Map<String, PdpRequests> pdp2requests = new HashMap<>();
71
72     /**
73      * PDP modification lock.
74      */
75     private final Object modifyLock;
76
77     /**
78      * The configuration parameters.
79      */
80     private final PdpModifyRequestMapParams params;
81
82     /**
83      * Factory for PAP DAO.
84      */
85     private final PolicyModelsProviderFactoryWrapper daoFactory;
86
87     /**
88      * Used to notify when policy updates completes.
89      */
90     private final PolicyNotifier policyNotifier;
91
92     /**
93      * Used to undeploy policies from the system, when they cannot be deployed to a PDP.
94      *
95      * <p/>
96      * Note: there's a "catch-22" here. The request map needs an undeployer, but the
97      * undeployer needs the request map. Consequently, the request map is created first,
98      * then the undeployer, and finally, this field is set.
99      */
100     @Setter
101     private PolicyUndeployer policyUndeployer;
102
103
104     /**
105      * Constructs the object.
106      *
107      * @param params configuration parameters
108      *
109      * @throws IllegalArgumentException if a required parameter is not set
110      */
111     public PdpModifyRequestMap(PdpModifyRequestMapParams params) {
112         params.validate();
113
114         this.params = params;
115         this.modifyLock = params.getModifyLock();
116         this.daoFactory = params.getDaoFactory();
117         this.policyNotifier = params.getPolicyNotifier();
118     }
119
120     /**
121      * Determines if the map contains any requests.
122      *
123      * @return {@code true} if the map is empty, {@code false} otherwise
124      */
125     public boolean isEmpty() {
126         return pdp2requests.isEmpty();
127     }
128
129     /**
130      * Stops publishing requests to the given PDP.
131      *
132      * @param pdpName PDP name
133      */
134     public void stopPublishing(String pdpName) {
135         synchronized (modifyLock) {
136             PdpRequests requests = pdp2requests.remove(pdpName);
137             if (requests != null) {
138                 requests.stopPublishing();
139             }
140         }
141     }
142
143     /**
144      * Adds a pair of requests to the map.
145      *
146      * @param update the UPDATE request or {@code null}
147      * @param stateChange the STATE-CHANGE request or {@code null}
148      */
149     public void addRequest(PdpUpdate update, PdpStateChange stateChange) {
150         if (update == null) {
151             addRequest(stateChange);
152
153         } else if (stateChange == null) {
154             addRequest(update);
155
156         } else if (stateChange.getState() == PdpState.ACTIVE) {
157             // publish update before activating
158             synchronized (modifyLock) {
159                 addRequest(update);
160                 addRequest(stateChange);
161             }
162
163         } else {
164             // deactivate before publishing update
165             synchronized (modifyLock) {
166                 addRequest(stateChange);
167                 addRequest(update);
168             }
169         }
170     }
171
172     /**
173      * Adds an UPDATE request to the map.
174      *
175      * @param update the UPDATE request or {@code null}
176      * @return the new request (this should only be used by junit tests)
177      */
178     public Request addRequest(PdpUpdate update) {
179         if (update == null) {
180             return null;
181         }
182
183         if (isBroadcast(update)) {
184             throw new IllegalArgumentException(UNEXPECTED_BROADCAST + update);
185         }
186
187         // @formatter:off
188         RequestParams reqparams = new RequestParams()
189             .setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount())
190             .setTimers(params.getUpdateTimers())
191             .setModifyLock(params.getModifyLock())
192             .setPdpPublisher(params.getPdpPublisher())
193             .setResponseDispatcher(params.getResponseDispatcher());
194         // @formatter:on
195
196         String name = update.getName() + " " + PdpUpdate.class.getSimpleName();
197         var request = new UpdateReq(reqparams, name, update);
198
199         addSingleton(request);
200         return request;
201     }
202
203     /**
204      * Adds a STATE-CHANGE request to the map.
205      *
206      * @param stateChange the STATE-CHANGE request or {@code null}
207      * @return the new request (this should only be used by junit tests)
208      */
209     public Request addRequest(PdpStateChange stateChange) {
210         if (stateChange == null) {
211             return null;
212         }
213
214         if (isBroadcast(stateChange)) {
215             throw new IllegalArgumentException(UNEXPECTED_BROADCAST + stateChange);
216         }
217
218         // @formatter:off
219         RequestParams reqparams = new RequestParams()
220             .setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount())
221             .setTimers(params.getStateChangeTimers())
222             .setModifyLock(params.getModifyLock())
223             .setPdpPublisher(params.getPdpPublisher())
224             .setResponseDispatcher(params.getResponseDispatcher());
225         // @formatter:on
226
227         String name = stateChange.getName() + " " + PdpStateChange.class.getSimpleName();
228         var request = new StateChangeReq(reqparams, name, stateChange);
229
230         addSingleton(request);
231         return request;
232     }
233
234     /**
235      * Determines if a message is a broadcast message.
236      *
237      * @param message the message to examine
238      * @return {@code true} if the message is a broadcast message, {@code false} if
239      *         destined for a single PDP
240      */
241     private boolean isBroadcast(PdpMessage message) {
242         return (message.getName() == null);
243     }
244
245     /**
246      * Configures and adds a request to the map.
247      *
248      * @param request the request to be added
249      */
250     private void addSingleton(Request request) {
251
252         synchronized (modifyLock) {
253             PdpRequests requests = pdp2requests.computeIfAbsent(request.getMessage().getName(), this::makePdpRequests);
254
255             request.setListener(new SingletonListener(requests, request));
256             requests.addSingleton(request);
257         }
258     }
259
260     /**
261      * Removes expired PDPs from all active groups.
262      */
263     public void removeExpiredPdps() {
264
265         synchronized (modifyLock) {
266             logger.info("check for PDP records older than {}ms", params.getMaxPdpAgeMs());
267
268             try (PolicyModelsProvider dao = daoFactory.create()) {
269
270                 PdpGroupFilter filter = PdpGroupFilter.builder().groupState(PdpState.ACTIVE).build();
271                 List<PdpGroup> groups = dao.getFilteredPdpGroups(filter);
272                 List<PdpGroup> updates = new ArrayList<>(1);
273
274                 var status = new DeploymentStatus(dao);
275
276                 Instant minAge = Instant.now().minusMillis(params.getMaxPdpAgeMs());
277
278                 for (PdpGroup group : groups) {
279                     Set<String> pdps = removeFromGroup(minAge, group);
280                     if (!pdps.isEmpty()) {
281                         updates.add(group);
282                         status.loadByGroup(group.getName());
283                         pdps.forEach(status::deleteDeployment);
284                     }
285                 }
286
287                 if (!updates.isEmpty()) {
288                     dao.updatePdpGroups(updates);
289
290                     var notification = new PolicyNotification();
291                     status.flush(notification);
292
293                     policyNotifier.publish(notification);
294                 }
295
296             } catch (PfModelException e) {
297                 logger.warn("failed to remove expired PDPs", e);
298             }
299         }
300     }
301
302     /**
303      * Removes expired PDPs from a group.
304      *
305      * @param minAge minimum age for active PDPs
306      * @param group group from which expired PDPs should be removed
307      * @return the expired PDPs
308      */
309     private Set<String> removeFromGroup(Instant minAge, PdpGroup group) {
310         Set<String> pdps = new HashSet<>();
311         for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
312             removeFromSubgroup(minAge, group, subgrp, pdps);
313         }
314
315         return pdps;
316     }
317
318     /**
319      * Removes expired PDPs from a subgroup.
320      *
321      * @param minAge minimum age for active PDPs
322      * @param group group from which to attempt to remove the PDP
323      * @param subgrp subgroup from which to attempt to remove the PDP
324      * @param pdps where to place the expired PDPs
325      */
326     private void removeFromSubgroup(Instant minAge, PdpGroup group, PdpSubGroup subgrp, Set<String> pdps) {
327
328         Iterator<Pdp> iter = subgrp.getPdpInstances().iterator();
329
330         while (iter.hasNext()) {
331             Pdp instance = iter.next();
332
333             if (instance.getLastUpdate().isBefore(minAge)) {
334                 String pdpName = instance.getInstanceId();
335                 logger.info("removed {} from group={} subgroup={}", pdpName, group.getName(), subgrp.getPdpType());
336                 iter.remove();
337                 subgrp.setCurrentInstanceCount(subgrp.getPdpInstances().size());
338                 pdps.add(pdpName);
339             }
340         }
341     }
342
343     /**
344      * Creates a new set of requests for a PDP. May be overridden by junit tests.
345      *
346      * @param pdpName PDP name
347      * @return a new set of requests
348      */
349     protected PdpRequests makePdpRequests(String pdpName) {
350         return new PdpRequests(pdpName, policyNotifier);
351     }
352
353     /**
354      * Listener for singleton request events.
355      */
356     private class SingletonListener implements RequestListener {
357         private final PdpRequests requests;
358         private final Request request;
359         private final String pdpName;
360
361         public SingletonListener(PdpRequests requests, Request request) {
362             this.requests = requests;
363             this.request = request;
364             this.pdpName = requests.getPdpName();
365         }
366
367         @Override
368         public void failure(String responsePdpName, String reason) {
369             Collection<ToscaConceptIdentifier> undeployPolicies = requestCompleted(responsePdpName);
370             if (undeployPolicies.isEmpty()) {
371                 // nothing to undeploy
372                 return;
373             }
374
375             /*
376              * Undeploy the extra policies. Note: this will likely cause a new message to
377              * be assigned to the request, thus we must re-start it after making the
378              * change.
379              */
380             PdpMessage oldmsg = request.getMessage();
381
382             try {
383                 logger.warn("undeploy policies from {}:{} that failed to deploy: {}", oldmsg.getPdpGroup(),
384                                 oldmsg.getPdpSubgroup(), undeployPolicies);
385                 policyUndeployer.undeploy(oldmsg.getPdpGroup(), oldmsg.getPdpSubgroup(), undeployPolicies);
386             } catch (PfModelException | RuntimeException e) {
387                 logger.error("cannot undeploy policies {}", undeployPolicies, e);
388             }
389
390             if (request.getMessage() == oldmsg) {
391                 // message is unchanged - start the next request
392                 startNextRequest(request);
393             } else {
394                 // message changed - restart the request
395                 request.startPublishing();
396             }
397         }
398
399         @Override
400         public void success(String responsePdpName) {
401             requestCompleted(responsePdpName);
402         }
403
404         /**
405          * Handles a request completion, starting the next request, if there is one.
406          *
407          * @param responsePdpName name of the PDP provided in the response
408          * @return a list of policies to be undeployed
409          */
410         private Collection<ToscaConceptIdentifier> requestCompleted(String responsePdpName) {
411             if (!pdpName.equals(responsePdpName)) {
412                 return Collections.emptyList();
413             }
414
415             if (pdp2requests.get(pdpName) != requests) {
416                 logger.info("discard old requests for {}", responsePdpName);
417                 requests.stopPublishing();
418                 return Collections.emptyList();
419             }
420
421             if (!requests.isFirstInQueue(request)) {
422                 logger.error("request is not first in the queue {}", request.getMessage());
423                 return Collections.emptyList();
424             }
425
426             Collection<ToscaConceptIdentifier> undeployPolicies = request.getUndeployPolicies();
427             if (undeployPolicies.isEmpty()) {
428                 // nothing to undeploy - just start the next request
429                 startNextRequest(request);
430             }
431
432             return undeployPolicies;
433         }
434
435         @Override
436         public void retryCountExhausted(Request request) {
437             if (pdp2requests.get(pdpName) == requests) {
438                 requests.stopPublishing();
439                 startNextRequest(request);
440             }
441         }
442
443         /**
444          * Starts the next request associated with a PDP.
445          *
446          * @param request the request that just completed
447          */
448         private void startNextRequest(Request request) {
449             if (!requests.startNextRequest(request)) {
450                 pdp2requests.remove(pdpName, requests);
451             }
452         }
453     }
454 }