Migrate pap startup & controllers to spring boot
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / comm / PdpStatusMessageHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019-2021 Nordix Foundation.
4  *  Modifications Copyright (C) 2019-2021 AT&T Intellectual Property.
5  *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.pap.main.comm;
24
25 import java.sql.SQLIntegrityConstraintViolationException;
26 import java.time.Instant;
27 import java.util.Arrays;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Optional;
34 import java.util.concurrent.TimeUnit;
35 import java.util.stream.Collectors;
36 import org.apache.commons.lang3.builder.EqualsBuilder;
37 import org.eclipse.persistence.exceptions.EclipseLinkException;
38 import org.onap.policy.models.base.PfModelException;
39 import org.onap.policy.models.pdp.concepts.Pdp;
40 import org.onap.policy.models.pdp.concepts.PdpGroup;
41 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
42 import org.onap.policy.models.pdp.concepts.PdpStatistics;
43 import org.onap.policy.models.pdp.concepts.PdpStatus;
44 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
45 import org.onap.policy.models.pdp.enums.PdpState;
46 import org.onap.policy.models.provider.PolicyModelsProvider;
47 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
48 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
49 import org.onap.policy.pap.main.PolicyPapException;
50 import org.onap.policy.pap.main.parameters.PdpParameters;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54
55 /**
56  * Handler for PDP Status messages which either represent registration or heart beat.
57  *
58  * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
59  */
60 public class PdpStatusMessageHandler extends PdpMessageGenerator {
61     private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class);
62
63     private final PdpParameters params;
64
65     private final boolean savePdpStatistics;
66
67     /**
68      * List to store policies present in db.
69      */
70     List<ToscaPolicy> policies = new LinkedList<>();
71
72     /**
73      * List to store policies to be deployed (heartbeat).
74      */
75     Map<ToscaConceptIdentifier, ToscaPolicy> policiesToBeDeployed = new HashMap<>();
76
77     /**
78      * List to store policies to be undeployed (heartbeat).
79      */
80     List<ToscaConceptIdentifier> policiesToBeUndeployed = new LinkedList<>();
81
82     /**
83      * Constructs the object.
84      *
85      * @param params PDP parameters
86      */
87     public PdpStatusMessageHandler(PdpParameters params, boolean savePdpStatistics) {
88         super(true);
89         super.initialize();
90         this.params = params;
91         this.savePdpStatistics = savePdpStatistics;
92     }
93
94     /**
95      * Handles the PdpStatus message coming from various PDP's.
96      *
97      * @param message the PdpStatus message
98      */
99     public void handlePdpStatus(final PdpStatus message) {
100         if (message.getPolicies() == null) {
101             message.setPolicies(Collections.emptyList());
102         }
103
104         long diffms = System.currentTimeMillis() - message.getTimestampMs();
105         if (diffms > params.getMaxMessageAgeMs()) {
106             long diffsec = TimeUnit.SECONDS.convert(diffms, TimeUnit.MILLISECONDS);
107             LOGGER.info("discarding status message from {} age {}s", message.getName(), diffsec);
108             return;
109         }
110
111         synchronized (updateLock) {
112             try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) {
113                 if (message.getPdpSubgroup() == null) {
114                     handlePdpRegistration(message, databaseProvider);
115                 } else {
116                     handlePdpHeartbeat(message, databaseProvider);
117                 }
118             } catch (final PolicyPapException exp) {
119                 LOGGER.error("Operation Failed", exp);
120             } catch (final Exception exp) {
121                 if (isDuplicateKeyException(exp)) {
122                     /*
123                      * this is to be expected, if multiple PAPs are processing the same
124                      * heartbeat at a time, thus we log the exception at a trace level
125                      * instead of an error level.
126                      */
127                     LOGGER.info("Failed updating PDP information for {} - may have been added by another PAP",
128                                     message.getName());
129                     LOGGER.trace("Failed updating PDP information for {}", message.getName(), exp);
130                 } else {
131                     LOGGER.error("Failed connecting to database provider", exp);
132                 }
133             }
134         }
135     }
136
137     /**
138      * Determines if the exception indicates a duplicate key.
139      *
140      * @param thrown exception to check
141      * @return {@code true} if the exception occurred due to a duplicate key
142      */
143     protected static boolean isDuplicateKeyException(Throwable thrown) {
144         while (thrown != null) {
145             if (thrown instanceof SQLIntegrityConstraintViolationException) {
146                 return true;
147             }
148
149             if (thrown instanceof EclipseLinkException) {
150                 EclipseLinkException ele = (EclipseLinkException) thrown;
151                 if (isDuplicateKeyException(ele.getInternalException())) {
152                     return true;
153                 }
154             }
155
156             thrown = thrown.getCause();
157         }
158
159         return false;
160     }
161
162     private void handlePdpRegistration(final PdpStatus message, final PolicyModelsProvider databaseProvider)
163             throws PfModelException, PolicyPapException {
164         if (!findAndUpdatePdpGroup(message, databaseProvider)) {
165             final var errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - ";
166             LOGGER.debug("{}{}", errorMessage, message);
167             throw new PolicyPapException(errorMessage + message);
168         }
169     }
170
171     private boolean findAndUpdatePdpGroup(final PdpStatus message, final PolicyModelsProvider databaseProvider)
172             throws PfModelException {
173         var pdpGroupFound = false;
174         final PdpGroupFilter filter =
175                 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
176
177         final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
178         if (!pdpGroups.isEmpty()) {
179             pdpGroupFound = registerPdp(message, databaseProvider, pdpGroups.get(0));
180         }
181         return pdpGroupFound;
182     }
183
184     private boolean registerPdp(final PdpStatus message, final PolicyModelsProvider databaseProvider,
185             final PdpGroup finalizedPdpGroup) throws PfModelException {
186         Optional<PdpSubGroup> subGroup;
187         var pdpGroupFound = false;
188         subGroup = findPdpSubGroup(message, finalizedPdpGroup);
189
190         if (subGroup.isPresent()) {
191             policies = getToscaPolicies(subGroup.get(), databaseProvider);
192             policiesToBeDeployed = policies.stream().collect(Collectors
193                     .toMap(ToscaPolicy::getIdentifier, policy -> policy));
194             policiesToBeUndeployed = null;
195
196             LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", finalizedPdpGroup, message);
197             Optional<Pdp> pdp = findPdpInstance(message, subGroup.get());
198             if (pdp.isPresent()) {
199                 updatePdpHealthStatus(message, subGroup.get(), pdp.get(), finalizedPdpGroup, databaseProvider);
200             } else {
201                 updatePdpSubGroup(finalizedPdpGroup, subGroup.get(), message, databaseProvider);
202             }
203             sendPdpMessage(finalizedPdpGroup.getName(), subGroup.get(), message.getName(), null, databaseProvider);
204             pdpGroupFound = true;
205         }
206         return pdpGroupFound;
207     }
208
209     private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message,
210             final PolicyModelsProvider databaseProvider) throws PfModelException {
211
212         final var pdpInstance = new Pdp();
213         pdpInstance.setInstanceId(message.getName());
214         pdpInstance.setPdpState(PdpState.ACTIVE);
215         pdpInstance.setHealthy(message.getHealthy());
216         pdpInstance.setMessage(message.getDescription());
217         pdpInstance.setLastUpdate(Instant.now());
218         pdpSubGroup.getPdpInstances().add(pdpInstance);
219
220         pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1);
221
222         databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
223
224         LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup.getName());
225     }
226
227     private void handlePdpHeartbeat(final PdpStatus message, final PolicyModelsProvider databaseProvider)
228             throws PfModelException {
229
230         final PdpGroupFilter filter =
231                 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
232         final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
233         if (!pdpGroups.isEmpty()) {
234             var pdpGroup = pdpGroups.get(0);
235             Optional<PdpSubGroup> pdpSubgroup = findPdpSubGroup(message, pdpGroup);
236             if (pdpSubgroup.isPresent()) {
237                 Optional<Pdp> pdpInstance = findPdpInstance(message, pdpSubgroup.get());
238                 if (pdpInstance.isPresent()) {
239                     processPdpDetails(message, pdpSubgroup.get(), pdpInstance.get(), pdpGroup, databaseProvider);
240                 } else {
241                     LOGGER.debug("PdpInstance not Found in DB. Sending Pdp for registration - {}", message);
242                     registerPdp(message, databaseProvider, pdpGroup);
243                 }
244             }
245         }
246     }
247
248     private Optional<PdpSubGroup> findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) {
249         PdpSubGroup pdpSubgroup = null;
250         for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) {
251             if (message.getPdpType().equals(subGroup.getPdpType())) {
252                 pdpSubgroup = subGroup;
253                 break;
254             }
255         }
256         return Optional.ofNullable(pdpSubgroup);
257     }
258
259     private Optional<Pdp> findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) {
260         Pdp pdpInstance = null;
261         for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) {
262             if (pdpInstanceDetails.getInstanceId().equals(message.getName())) {
263                 pdpInstance = pdpInstanceDetails;
264                 break;
265             }
266         }
267         return Optional.ofNullable(pdpInstance);
268     }
269
270     private void processPdpDetails(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
271             final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider)
272                     throws PfModelException {
273         // all policies
274         policies = getToscaPolicies(pdpSubGroup, databaseProvider);
275
276         Map<ToscaConceptIdentifier, ToscaPolicy> policyMap =
277                         policies.stream().collect(Collectors.toMap(ToscaPolicy::getIdentifier, policy -> policy));
278
279         // policies that the PDP already has (-) all
280         policiesToBeUndeployed = message.getPolicies().stream().filter(policyId -> !policyMap.containsKey(policyId))
281                         .collect(Collectors.toList());
282
283         // all (-) policies that the PDP already has
284         policiesToBeDeployed = policyMap;
285         policiesToBeDeployed.keySet().removeAll(message.getPolicies());
286
287         if (PdpState.TERMINATED.equals(message.getState())) {
288             processPdpTermination(pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
289         } else if (validatePdpDetails(message, pdpGroup, pdpSubGroup, pdpInstance)) {
290             LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance);
291             updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
292
293             if (savePdpStatistics) {
294                 processPdpStatistics(message, pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
295             } else {
296                 LOGGER.debug("Not processing PdpStatistics - {}", message.getStatistics());
297             }
298         } else {
299             LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance);
300             LOGGER.debug("Policy list in DB - {}. Policy list in heartbeat - {}", pdpSubGroup.getPolicies(),
301                 message.getPolicies());
302             updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
303             sendPdpMessage(pdpGroup.getName(), pdpSubGroup, pdpInstance.getInstanceId(), pdpInstance.getPdpState(),
304                 databaseProvider);
305         }
306     }
307
308     private void processPdpStatistics(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
309                     final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider) throws PfModelException {
310         if (validatePdpStatisticsDetails(message, pdpInstance, pdpGroup, pdpSubGroup)) {
311             LOGGER.debug("PdpStatistics details are correct. Saving current statistics in DB - {}",
312                     message.getStatistics());
313             createPdpStatistics(message.getStatistics(), databaseProvider);
314         } else {
315             LOGGER.debug("PdpStatistics details are not correct - {}", message.getStatistics());
316         }
317     }
318
319     private void processPdpTermination(final PdpSubGroup pdpSubGroup, final Pdp pdpInstance, final PdpGroup pdpGroup,
320             final PolicyModelsProvider databaseProvider) throws PfModelException {
321         pdpSubGroup.getPdpInstances().remove(pdpInstance);
322         pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() - 1);
323         databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
324
325         LOGGER.debug("Deleted PdpInstance - {} belonging to PdpSubGroup - {} and PdpGroup - {}", pdpInstance,
326                 pdpSubGroup, pdpGroup);
327     }
328
329     private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup,
330             final Pdp pdpInstanceDetails) {
331         /*
332          * "EqualsBuilder" is a bit of a misnomer, as it uses containsAll() to check policies. Nevertheless, it does the
333          * job and provides a convenient way to build a bunch of comparisons.
334          */
335         return new EqualsBuilder().append(message.getPdpGroup(), pdpGroup.getName())
336                 .append(message.getPdpSubgroup(), subGroup.getPdpType())
337                 .append(message.getPdpType(), subGroup.getPdpType())
338                 .append(message.getState(), pdpInstanceDetails.getPdpState())
339                 .append(message.getPolicies().containsAll(subGroup.getPolicies()), true)
340                 .append(subGroup.getPolicies().containsAll(message.getPolicies()), true).build();
341     }
342
343     private boolean validatePdpStatisticsDetails(final PdpStatus message, final Pdp pdpInstanceDetails,
344             final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup) {
345         if (message.getStatistics() != null) {
346             return new EqualsBuilder()
347                     .append(message.getStatistics().getPdpInstanceId(), pdpInstanceDetails.getInstanceId())
348                     .append(message.getStatistics().getPdpGroupName(), pdpGroup.getName())
349                     .append(message.getStatistics().getPdpSubGroupName(), pdpSubGroup.getPdpType())
350                     .append(message.getStatistics().getPolicyDeployCount() < 0, false)
351                     .append(message.getStatistics().getPolicyDeployFailCount() < 0, false)
352                     .append(message.getStatistics().getPolicyDeploySuccessCount() < 0, false)
353                     .append(message.getStatistics().getPolicyUndeployCount() < 0, false)
354                     .append(message.getStatistics().getPolicyUndeployFailCount() < 0, false)
355                     .append(message.getStatistics().getPolicyUndeploySuccessCount() < 0, false)
356                     .append(message.getStatistics().getPolicyExecutedCount() < 0, false)
357                     .append(message.getStatistics().getPolicyExecutedFailCount() < 0, false)
358                     .append(message.getStatistics().getPolicyExecutedSuccessCount() < 0, false).build();
359         } else {
360             LOGGER.debug("PdpStatistics is null");
361             return false;
362         }
363     }
364
365     private void updatePdpHealthStatus(final PdpStatus message, final PdpSubGroup pdpSubgroup, final Pdp pdpInstance,
366             final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider) throws PfModelException {
367         pdpInstance.setHealthy(message.getHealthy());
368         pdpInstance.setMessage(message.getDescription());
369         pdpInstance.setLastUpdate(Instant.now());
370         databaseProvider.updatePdp(pdpGroup.getName(), pdpSubgroup.getPdpType(), pdpInstance);
371
372         LOGGER.debug("Updated Pdp in DB - {}", pdpInstance);
373     }
374
375     private void createPdpStatistics(final PdpStatistics pdpStatistics, final PolicyModelsProvider databaseProvider)
376             throws PfModelException {
377         databaseProvider.createPdpStatistics(Arrays.asList(pdpStatistics));
378         LOGGER.debug("Created PdpStatistics in DB - {}", pdpStatistics);
379     }
380
381     private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId,
382             final PdpState pdpState, final PolicyModelsProvider databaseProvider)
383                     throws PfModelException {
384         final List<ToscaPolicy> polsToBeDeployed = new LinkedList<>(policiesToBeDeployed.values());
385         final var pdpUpdatemessage =
386             createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId,
387                         polsToBeDeployed, policiesToBeUndeployed);
388         final var pdpStateChangeMessage =
389             createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState);
390         updateDeploymentStatus(pdpGroupName, subGroup.getPdpType(), pdpInstanceId, pdpStateChangeMessage.getState(),
391             databaseProvider, pdpUpdatemessage.getPoliciesToBeDeployed());
392
393         requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage);
394         LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage);
395         LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage);
396     }
397 }