Add ability to turn on/off pdp statistics
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / comm / PdpModifyRequestMap.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP PAP
4  * ================================================================================
5  * Copyright (C) 2019, 2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2021 Nordix Foundation.
7  * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.pap.main.comm;
24
25 import java.time.Instant;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import lombok.Setter;
36 import org.onap.policy.models.base.PfModelException;
37 import org.onap.policy.models.pap.concepts.PolicyNotification;
38 import org.onap.policy.models.pdp.concepts.Pdp;
39 import org.onap.policy.models.pdp.concepts.PdpGroup;
40 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
41 import org.onap.policy.models.pdp.concepts.PdpMessage;
42 import org.onap.policy.models.pdp.concepts.PdpStateChange;
43 import org.onap.policy.models.pdp.concepts.PdpStatus;
44 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
45 import org.onap.policy.models.pdp.concepts.PdpUpdate;
46 import org.onap.policy.models.pdp.enums.PdpState;
47 import org.onap.policy.models.provider.PolicyModelsProvider;
48 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
49 import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
50 import org.onap.policy.pap.main.comm.msgdata.Request;
51 import org.onap.policy.pap.main.comm.msgdata.RequestListener;
52 import org.onap.policy.pap.main.comm.msgdata.StateChangeReq;
53 import org.onap.policy.pap.main.comm.msgdata.UpdateReq;
54 import org.onap.policy.pap.main.notification.DeploymentStatus;
55 import org.onap.policy.pap.main.notification.PolicyNotifier;
56 import org.onap.policy.pap.main.parameters.PdpModifyRequestMapParams;
57 import org.onap.policy.pap.main.parameters.RequestParams;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 /**
62  * Maps a PDP name to requests that modify PDPs.
63  */
64 public class PdpModifyRequestMap {
65     private static final Logger logger = LoggerFactory.getLogger(PdpModifyRequestMap.class);
66
67     private static final String UNEXPECTED_BROADCAST = "unexpected broadcast message: ";
68
69     /**
70      * Maps a PDP name to its outstanding requests.
71      */
72     private final Map<String, PdpRequests> pdp2requests = new HashMap<>();
73
74     /**
75      * PDP modification lock.
76      */
77     private final Object modifyLock;
78
79     /**
80      * The configuration parameters.
81      */
82     private final PdpModifyRequestMapParams params;
83
84     /**
85      * Factory for PAP DAO.
86      */
87     private final PolicyModelsProviderFactoryWrapper daoFactory;
88
89     /**
90      * Used to notify when policy updates completes.
91      */
92     private final PolicyNotifier policyNotifier;
93
94     /**
95      * Used to undeploy policies from the system, when they cannot be deployed to a PDP.
96      *
97      * <p/>
98      * Note: there's a "catch-22" here. The request map needs an undeployer, but the
99      * undeployer needs the request map. Consequently, the request map is created first,
100      * then the undeployer, and finally, this field is set.
101      */
102     @Setter
103     private PolicyUndeployer policyUndeployer;
104
105
106     /**
107      * Constructs the object.
108      *
109      * @param params configuration parameters
110      *
111      * @throws IllegalArgumentException if a required parameter is not set
112      */
113     public PdpModifyRequestMap(PdpModifyRequestMapParams params) {
114         params.validate();
115
116         this.params = params;
117         this.modifyLock = params.getModifyLock();
118         this.daoFactory = params.getDaoFactory();
119         this.policyNotifier = params.getPolicyNotifier();
120     }
121
122     /**
123      * Determines if the map contains any requests.
124      *
125      * @return {@code true} if the map is empty, {@code false} otherwise
126      */
127     public boolean isEmpty() {
128         return pdp2requests.isEmpty();
129     }
130
131     /**
132      * Stops publishing requests to the given PDP.
133      *
134      * @param pdpName PDP name
135      */
136     public void stopPublishing(String pdpName) {
137         synchronized (modifyLock) {
138             PdpRequests requests = pdp2requests.remove(pdpName);
139             if (requests != null) {
140                 requests.stopPublishing();
141             }
142         }
143     }
144
145     /**
146      * Adds a pair of requests to the map.
147      *
148      * @param update the UPDATE request or {@code null}
149      * @param stateChange the STATE-CHANGE request or {@code null}
150      */
151     public void addRequest(PdpUpdate update, PdpStateChange stateChange) {
152         if (update == null) {
153             addRequest(stateChange);
154
155         } else if (stateChange == null) {
156             addRequest(update);
157
158         } else if (stateChange.getState() == PdpState.ACTIVE) {
159             // publish update before activating
160             synchronized (modifyLock) {
161                 addRequest(update);
162                 addRequest(stateChange);
163             }
164
165         } else {
166             // deactivate before publishing update
167             synchronized (modifyLock) {
168                 addRequest(stateChange);
169                 addRequest(update);
170             }
171         }
172     }
173
174     /**
175      * Adds an UPDATE request to the map.
176      *
177      * @param update the UPDATE request or {@code null}
178      * @return the new request (this should only be used by junit tests)
179      */
180     public Request addRequest(PdpUpdate update) {
181         if (update == null) {
182             return null;
183         }
184
185         if (isBroadcast(update)) {
186             throw new IllegalArgumentException(UNEXPECTED_BROADCAST + update);
187         }
188
189         // @formatter:off
190         RequestParams reqparams = new RequestParams()
191             .setMaxRetryCount(params.getParams().getUpdateParameters().getMaxRetryCount())
192             .setTimers(params.getUpdateTimers())
193             .setModifyLock(params.getModifyLock())
194             .setPdpPublisher(params.getPdpPublisher())
195             .setResponseDispatcher(params.getResponseDispatcher());
196         // @formatter:on
197
198         String name = update.getName() + " " + PdpUpdate.class.getSimpleName();
199         var request = new UpdateReq(reqparams, name, update);
200
201         addSingleton(request);
202         return request;
203     }
204
205     /**
206      * Adds a STATE-CHANGE request to the map.
207      *
208      * @param stateChange the STATE-CHANGE request or {@code null}
209      * @return the new request (this should only be used by junit tests)
210      */
211     public Request addRequest(PdpStateChange stateChange) {
212         if (stateChange == null) {
213             return null;
214         }
215
216         if (isBroadcast(stateChange)) {
217             throw new IllegalArgumentException(UNEXPECTED_BROADCAST + stateChange);
218         }
219
220         // @formatter:off
221         RequestParams reqparams = new RequestParams()
222             .setMaxRetryCount(params.getParams().getStateChangeParameters().getMaxRetryCount())
223             .setTimers(params.getStateChangeTimers())
224             .setModifyLock(params.getModifyLock())
225             .setPdpPublisher(params.getPdpPublisher())
226             .setResponseDispatcher(params.getResponseDispatcher());
227         // @formatter:on
228
229         String name = stateChange.getName() + " " + PdpStateChange.class.getSimpleName();
230         var request = new StateChangeReq(reqparams, name, stateChange);
231
232         addSingleton(request);
233         return request;
234     }
235
236     /**
237      * Determines if a message is a broadcast message.
238      *
239      * @param message the message to examine
240      * @return {@code true} if the message is a broadcast message, {@code false} if
241      *         destined for a single PDP
242      */
243     private boolean isBroadcast(PdpMessage message) {
244         return (message.getName() == null);
245     }
246
247     /**
248      * Configures and adds a request to the map.
249      *
250      * @param request the request to be added
251      */
252     private void addSingleton(Request request) {
253
254         synchronized (modifyLock) {
255             PdpRequests requests = pdp2requests.computeIfAbsent(request.getMessage().getName(), this::makePdpRequests);
256
257             request.setListener(new SingletonListener(requests, request));
258             requests.addSingleton(request);
259         }
260     }
261
262     /**
263      * Removes expired PDPs from all active groups.
264      */
265     public void removeExpiredPdps() {
266
267         synchronized (modifyLock) {
268             logger.info("check for PDP records older than {}ms", params.getMaxPdpAgeMs());
269
270             try (PolicyModelsProvider dao = daoFactory.create()) {
271
272                 PdpGroupFilter filter = PdpGroupFilter.builder().groupState(PdpState.ACTIVE).build();
273                 List<PdpGroup> groups = dao.getFilteredPdpGroups(filter);
274                 List<PdpGroup> updates = new ArrayList<>(1);
275
276                 var status = new DeploymentStatus(dao);
277
278                 Instant minAge = Instant.now().minusMillis(params.getMaxPdpAgeMs());
279
280                 for (PdpGroup group : groups) {
281                     Set<String> pdps = removeFromGroup(minAge, group);
282                     if (!pdps.isEmpty()) {
283                         updates.add(group);
284                         status.loadByGroup(group.getName());
285                         pdps.forEach(status::deleteDeployment);
286                     }
287                 }
288
289                 if (!updates.isEmpty()) {
290                     dao.updatePdpGroups(updates);
291
292                     var notification = new PolicyNotification();
293                     status.flush(notification);
294
295                     policyNotifier.publish(notification);
296                 }
297
298             } catch (PfModelException e) {
299                 logger.warn("failed to remove expired PDPs", e);
300             }
301         }
302     }
303
304     /**
305      * Removes expired PDPs from a group.
306      *
307      * @param minAge minimum age for active PDPs
308      * @param group group from which expired PDPs should be removed
309      * @return the expired PDPs
310      */
311     private Set<String> removeFromGroup(Instant minAge, PdpGroup group) {
312         Set<String> pdps = new HashSet<>();
313         for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
314             removeFromSubgroup(minAge, group, subgrp, pdps);
315         }
316
317         return pdps;
318     }
319
320     /**
321      * Removes expired PDPs from a subgroup.
322      *
323      * @param minAge minimum age for active PDPs
324      * @param group group from which to attempt to remove the PDP
325      * @param subgrp subgroup from which to attempt to remove the PDP
326      * @param pdps where to place the expired PDPs
327      */
328     private void removeFromSubgroup(Instant minAge, PdpGroup group, PdpSubGroup subgrp, Set<String> pdps) {
329
330         Iterator<Pdp> iter = subgrp.getPdpInstances().iterator();
331
332         while (iter.hasNext()) {
333             Pdp instance = iter.next();
334
335             if (instance.getLastUpdate().isBefore(minAge)) {
336                 String pdpName = instance.getInstanceId();
337                 logger.info("removed {} from group={} subgroup={}", pdpName, group.getName(), subgrp.getPdpType());
338                 iter.remove();
339                 subgrp.setCurrentInstanceCount(subgrp.getPdpInstances().size());
340                 pdps.add(pdpName);
341             }
342         }
343     }
344
345     /**
346      * Creates a new set of requests for a PDP. May be overridden by junit tests.
347      *
348      * @param pdpName PDP name
349      * @return a new set of requests
350      */
351     protected PdpRequests makePdpRequests(String pdpName) {
352         return new PdpRequests(pdpName, policyNotifier);
353     }
354
355     /**
356      * Makes a handler for PDP responses.
357      * @return a response handler
358      */
359     protected PdpStatusMessageHandler makePdpResponseHandler() {
360         return new PdpStatusMessageHandler(params.getParams(), params.isSavePdpStatistics());
361     }
362
363     /**
364      * Listener for singleton request events.
365      */
366     private class SingletonListener implements RequestListener {
367         private final PdpRequests requests;
368         private final Request request;
369         private final String pdpName;
370
371         public SingletonListener(PdpRequests requests, Request request) {
372             this.requests = requests;
373             this.request = request;
374             this.pdpName = requests.getPdpName();
375         }
376
377         @Override
378         public void failure(String responsePdpName, String reason) {
379             Collection<ToscaConceptIdentifier> undeployPolicies = requestCompleted(responsePdpName);
380             if (undeployPolicies.isEmpty()) {
381                 // nothing to undeploy
382                 return;
383             }
384
385             /*
386              * Undeploy the extra policies. Note: this will likely cause a new message to
387              * be assigned to the request, thus we must re-start it after making the
388              * change.
389              */
390             PdpMessage oldmsg = request.getMessage();
391
392             try {
393                 logger.warn("undeploy policies from {}:{} that failed to deploy: {}", oldmsg.getPdpGroup(),
394                                 oldmsg.getPdpSubgroup(), undeployPolicies);
395                 policyUndeployer.undeploy(oldmsg.getPdpGroup(), oldmsg.getPdpSubgroup(), undeployPolicies);
396             } catch (PfModelException | RuntimeException e) {
397                 logger.error("cannot undeploy policies {}", undeployPolicies, e);
398             }
399
400             if (request.getMessage() == oldmsg) {
401                 // message is unchanged - start the next request
402                 startNextRequest(request);
403             } else {
404                 // message changed - restart the request
405                 request.startPublishing();
406             }
407         }
408
409         @Override
410         public void success(String responsePdpName, PdpStatus response) {
411             requestCompleted(responsePdpName);
412
413             if (!(request instanceof UpdateReq)) {
414                 // other response types may not include the list of policies
415                 return;
416             }
417
418             /*
419              * Update PDP time stamps. Also send pdp-update and pdp-state-change, as
420              * necessary, if the response does not reflect what's in the DB.
421              */
422             var handler = makePdpResponseHandler();
423             handler.handlePdpStatus(response);
424         }
425
426         /**
427          * Handles a request completion, starting the next request, if there is one.
428          *
429          * @param responsePdpName name of the PDP provided in the response
430          * @return a list of policies to be undeployed
431          */
432         private Collection<ToscaConceptIdentifier> requestCompleted(String responsePdpName) {
433             if (!pdpName.equals(responsePdpName)) {
434                 return Collections.emptyList();
435             }
436
437             if (pdp2requests.get(pdpName) != requests) {
438                 logger.info("discard old requests for {}", responsePdpName);
439                 requests.stopPublishing();
440                 return Collections.emptyList();
441             }
442
443             if (!requests.isFirstInQueue(request)) {
444                 logger.error("request is not first in the queue {}", request.getMessage());
445                 return Collections.emptyList();
446             }
447
448             Collection<ToscaConceptIdentifier> undeployPolicies = request.getUndeployPolicies();
449             if (undeployPolicies.isEmpty()) {
450                 // nothing to undeploy - just start the next request
451                 startNextRequest(request);
452             }
453
454             return undeployPolicies;
455         }
456
457         @Override
458         public void retryCountExhausted(Request request) {
459             if (pdp2requests.get(pdpName) == requests) {
460                 requests.stopPublishing();
461                 startNextRequest(request);
462             }
463         }
464
465         /**
466          * Starts the next request associated with a PDP.
467          *
468          * @param request the request that just completed
469          */
470         private void startNextRequest(Request request) {
471             if (!requests.startNextRequest(request)) {
472                 pdp2requests.remove(pdpName, requests);
473             }
474         }
475     }
476 }