070261dbdd69f0055b040834ec629a5f0addb5a3
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / comm / PdpStatusMessageHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019-2021 Nordix Foundation.
4  *  Modifications Copyright (C) 2019-2021 AT&T Intellectual Property.
5  *  Modifications Copyright (C) 2021-2022 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.pap.main.comm;
24
25 import java.sql.SQLIntegrityConstraintViolationException;
26 import java.time.Instant;
27 import java.util.Arrays;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Optional;
34 import java.util.concurrent.TimeUnit;
35 import java.util.stream.Collectors;
36 import org.apache.commons.lang3.builder.EqualsBuilder;
37 import org.eclipse.persistence.exceptions.EclipseLinkException;
38 import org.onap.policy.models.base.PfModelException;
39 import org.onap.policy.models.pdp.concepts.Pdp;
40 import org.onap.policy.models.pdp.concepts.PdpGroup;
41 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
42 import org.onap.policy.models.pdp.concepts.PdpStatistics;
43 import org.onap.policy.models.pdp.concepts.PdpStatus;
44 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
45 import org.onap.policy.models.pdp.enums.PdpState;
46 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
47 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
48 import org.onap.policy.pap.main.PolicyPapException;
49 import org.onap.policy.pap.main.parameters.PapParameterGroup;
50 import org.onap.policy.pap.main.parameters.PdpParameters;
51 import org.onap.policy.pap.main.service.PdpGroupService;
52 import org.onap.policy.pap.main.service.PdpStatisticsService;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.springframework.stereotype.Component;
56
57
58 /**
59  * Handler for PDP Status messages which either represent registration or heart beat.
60  *
61  * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
62  */
63
64 @Component
65 public class PdpStatusMessageHandler extends PdpMessageGenerator {
66     private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class);
67
68     private final PdpParameters params;
69
70     private final boolean savePdpStatistics;
71
72     private final PdpGroupService pdpGroupService;
73
74     private final PdpStatisticsService pdpStatisticsService;
75
76     /**
77      * List to store policies present in db.
78      */
79     List<ToscaPolicy> policies = new LinkedList<>();
80
81     /**
82      * List to store policies to be deployed (heartbeat).
83      */
84     Map<ToscaConceptIdentifier, ToscaPolicy> policiesToBeDeployed = new HashMap<>();
85
86     /**
87      * List to store policies to be undeployed (heartbeat).
88      */
89     List<ToscaConceptIdentifier> policiesToBeUndeployed = new LinkedList<>();
90
91     /**
92      * Constructs the object.
93      *
94      * @param parameterGroup the parameterGroup
95      * @param pdpGroupService the pdpGroupService
96      * @param pdpStatisticsService the pdpStatisticsService
97      */
98     public PdpStatusMessageHandler(PapParameterGroup parameterGroup, PdpGroupService pdpGroupService,
99         PdpStatisticsService pdpStatisticsService) {
100         super(true);
101         this.params = parameterGroup.getPdpParameters();
102         this.savePdpStatistics = parameterGroup.isSavePdpStatisticsInDb();
103         this.pdpGroupService = pdpGroupService;
104         this.pdpStatisticsService = pdpStatisticsService;
105     }
106
107     /**
108      * Handles the PdpStatus message coming from various PDP's.
109      *
110      * @param message the PdpStatus message
111      */
112     public void handlePdpStatus(final PdpStatus message) {
113         if (message.getPolicies() == null) {
114             message.setPolicies(Collections.emptyList());
115         }
116
117         long diffms = System.currentTimeMillis() - message.getTimestampMs();
118         if (diffms > params.getMaxMessageAgeMs()) {
119             long diffsec = TimeUnit.SECONDS.convert(diffms, TimeUnit.MILLISECONDS);
120             LOGGER.info("discarding status message from {} age {}s", message.getName(), diffsec);
121             return;
122         }
123
124         synchronized (updateLock) {
125             try {
126                 if (message.getPdpSubgroup() == null) {
127                     handlePdpRegistration(message);
128                 } else {
129                     handlePdpHeartbeat(message);
130                 }
131             } catch (final PolicyPapException exp) {
132                 LOGGER.error("Operation Failed", exp);
133             } catch (final Exception exp) {
134                 if (isDuplicateKeyException(exp)) {
135                     /*
136                      * this is to be expected, if multiple PAPs are processing the same
137                      * heartbeat at a time, thus we log the exception at a trace level
138                      * instead of an error level.
139                      */
140                     LOGGER.info("Failed updating PDP information for {} - may have been added by another PAP",
141                                     message.getName());
142                     LOGGER.trace("Failed updating PDP information for {}", message.getName(), exp);
143                 } else {
144                     LOGGER.error("Failed connecting to database provider", exp);
145                 }
146             }
147         }
148     }
149
150     /**
151      * Determines if the exception indicates a duplicate key.
152      *
153      * @param thrown exception to check
154      * @return {@code true} if the exception occurred due to a duplicate key
155      */
156     protected static boolean isDuplicateKeyException(Throwable thrown) {
157         while (thrown != null) {
158             if (thrown instanceof SQLIntegrityConstraintViolationException) {
159                 return true;
160             }
161
162             if (thrown instanceof EclipseLinkException) {
163                 EclipseLinkException ele = (EclipseLinkException) thrown;
164                 if (isDuplicateKeyException(ele.getInternalException())) {
165                     return true;
166                 }
167             }
168
169             thrown = thrown.getCause();
170         }
171
172         return false;
173     }
174
175     private void handlePdpRegistration(final PdpStatus message) throws PfModelException, PolicyPapException {
176         if (!findAndUpdatePdpGroup(message)) {
177             final var errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - ";
178             LOGGER.debug("{}{}", errorMessage, message);
179             throw new PolicyPapException(errorMessage + message);
180         }
181     }
182
183     private boolean findAndUpdatePdpGroup(final PdpStatus message)
184             throws PfModelException {
185         var pdpGroupFound = false;
186         final PdpGroupFilter filter =
187                 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
188
189         final List<PdpGroup> pdpGroups = pdpGroupService.getFilteredPdpGroups(filter);
190         if (!pdpGroups.isEmpty()) {
191             pdpGroupFound = registerPdp(message, pdpGroups.get(0));
192         }
193         return pdpGroupFound;
194     }
195
196     private boolean registerPdp(final PdpStatus message, final PdpGroup finalizedPdpGroup) throws PfModelException {
197         Optional<PdpSubGroup> subGroup;
198         var pdpGroupFound = false;
199         subGroup = findPdpSubGroup(message, finalizedPdpGroup);
200
201         if (subGroup.isPresent()) {
202             policies = getToscaPolicies(subGroup.get());
203             policiesToBeDeployed = policies.stream().collect(Collectors
204                     .toMap(ToscaPolicy::getIdentifier, policy -> policy));
205             policiesToBeUndeployed = null;
206
207             LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", finalizedPdpGroup, message);
208             Optional<Pdp> pdp = findPdpInstance(message, subGroup.get());
209             if (pdp.isPresent()) {
210                 updatePdpHealthStatus(message, subGroup.get(), pdp.get(), finalizedPdpGroup);
211             } else {
212                 updatePdpSubGroup(finalizedPdpGroup, subGroup.get(), message);
213             }
214             sendPdpMessage(finalizedPdpGroup.getName(), subGroup.get(), message.getName(), null);
215             pdpGroupFound = true;
216         }
217         return pdpGroupFound;
218     }
219
220     private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message) {
221
222         final var pdpInstance = new Pdp();
223         pdpInstance.setInstanceId(message.getName());
224         pdpInstance.setPdpState(PdpState.ACTIVE);
225         pdpInstance.setHealthy(message.getHealthy());
226         pdpInstance.setMessage(message.getDescription());
227         pdpInstance.setLastUpdate(Instant.now());
228         pdpSubGroup.getPdpInstances().add(pdpInstance);
229
230         pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1);
231
232         pdpGroupService.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
233
234         LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup.getName());
235     }
236
237     private void handlePdpHeartbeat(final PdpStatus message) throws PfModelException {
238
239         final PdpGroupFilter filter =
240                 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
241         final List<PdpGroup> pdpGroups = pdpGroupService.getFilteredPdpGroups(filter);
242         if (!pdpGroups.isEmpty()) {
243             var pdpGroup = pdpGroups.get(0);
244             Optional<PdpSubGroup> pdpSubgroup = findPdpSubGroup(message, pdpGroup);
245             if (pdpSubgroup.isPresent()) {
246                 Optional<Pdp> pdpInstance = findPdpInstance(message, pdpSubgroup.get());
247                 if (pdpInstance.isPresent()) {
248                     processPdpDetails(message, pdpSubgroup.get(), pdpInstance.get(), pdpGroup);
249                 } else {
250                     LOGGER.debug("PdpInstance not Found in DB. Sending Pdp for registration - {}", message);
251                     registerPdp(message, pdpGroup);
252                 }
253             }
254         }
255     }
256
257     private Optional<PdpSubGroup> findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) {
258         PdpSubGroup pdpSubgroup = null;
259         for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) {
260             if (message.getPdpType().equals(subGroup.getPdpType())) {
261                 pdpSubgroup = subGroup;
262                 break;
263             }
264         }
265         return Optional.ofNullable(pdpSubgroup);
266     }
267
268     private Optional<Pdp> findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) {
269         Pdp pdpInstance = null;
270         for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) {
271             if (pdpInstanceDetails.getInstanceId().equals(message.getName())) {
272                 pdpInstance = pdpInstanceDetails;
273                 break;
274             }
275         }
276         return Optional.ofNullable(pdpInstance);
277     }
278
279     private void processPdpDetails(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
280             final PdpGroup pdpGroup) throws PfModelException {
281         // all policies
282         policies = getToscaPolicies(pdpSubGroup);
283
284         Map<ToscaConceptIdentifier, ToscaPolicy> policyMap =
285                         policies.stream().collect(Collectors.toMap(ToscaPolicy::getIdentifier, policy -> policy));
286
287         // policies that the PDP already has (-) all
288         policiesToBeUndeployed = message.getPolicies().stream().filter(policyId -> !policyMap.containsKey(policyId))
289                         .collect(Collectors.toList());
290
291         // all (-) policies that the PDP already has
292         policiesToBeDeployed = policyMap;
293         policiesToBeDeployed.keySet().removeAll(message.getPolicies());
294
295         if (PdpState.TERMINATED.equals(message.getState())) {
296             processPdpTermination(pdpSubGroup, pdpInstance, pdpGroup);
297         } else if (validatePdpDetails(message, pdpGroup, pdpSubGroup, pdpInstance)) {
298             LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance);
299             updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup);
300
301             if (savePdpStatistics) {
302                 processPdpStatistics(message, pdpSubGroup, pdpInstance, pdpGroup);
303             } else {
304                 LOGGER.debug("Not processing PdpStatistics - {}", message.getStatistics());
305             }
306         } else {
307             LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance);
308             LOGGER.debug("Policy list in DB - {}. Policy list in heartbeat - {}", pdpSubGroup.getPolicies(),
309                 message.getPolicies());
310             updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup);
311             sendPdpMessage(pdpGroup.getName(), pdpSubGroup, pdpInstance.getInstanceId(), pdpInstance.getPdpState());
312         }
313     }
314
315     private void processPdpStatistics(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
316                     final PdpGroup pdpGroup) {
317         if (validatePdpStatisticsDetails(message, pdpInstance, pdpGroup, pdpSubGroup)) {
318             LOGGER.debug("PdpStatistics details are correct. Saving current statistics in DB - {}",
319                     message.getStatistics());
320             createPdpStatistics(message.getStatistics());
321         } else {
322             LOGGER.debug("PdpStatistics details are not correct - {}", message.getStatistics());
323         }
324     }
325
326     private void processPdpTermination(final PdpSubGroup pdpSubGroup, final Pdp pdpInstance, final PdpGroup pdpGroup) {
327         pdpSubGroup.getPdpInstances().remove(pdpInstance);
328         pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() - 1);
329         pdpGroupService.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
330
331         LOGGER.debug("Deleted PdpInstance - {} belonging to PdpSubGroup - {} and PdpGroup - {}", pdpInstance,
332                 pdpSubGroup, pdpGroup);
333     }
334
335     private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup,
336             final Pdp pdpInstanceDetails) {
337         /*
338          * "EqualsBuilder" is a bit of a misnomer, as it uses containsAll() to check policies. Nevertheless, it does the
339          * job and provides a convenient way to build a bunch of comparisons.
340          */
341         return new EqualsBuilder().append(message.getPdpGroup(), pdpGroup.getName())
342                 .append(message.getPdpSubgroup(), subGroup.getPdpType())
343                 .append(message.getPdpType(), subGroup.getPdpType())
344                 .append(message.getState(), pdpInstanceDetails.getPdpState())
345                 .append(message.getPolicies().containsAll(subGroup.getPolicies()), true)
346                 .append(subGroup.getPolicies().containsAll(message.getPolicies()), true).build();
347     }
348
349     private boolean validatePdpStatisticsDetails(final PdpStatus message, final Pdp pdpInstanceDetails,
350             final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup) {
351         if (message.getStatistics() != null) {
352             return new EqualsBuilder()
353                     .append(message.getStatistics().getPdpInstanceId(), pdpInstanceDetails.getInstanceId())
354                     .append(message.getStatistics().getPdpGroupName(), pdpGroup.getName())
355                     .append(message.getStatistics().getPdpSubGroupName(), pdpSubGroup.getPdpType())
356                     .append(message.getStatistics().getPolicyDeployCount() < 0, false)
357                     .append(message.getStatistics().getPolicyDeployFailCount() < 0, false)
358                     .append(message.getStatistics().getPolicyDeploySuccessCount() < 0, false)
359                     .append(message.getStatistics().getPolicyUndeployCount() < 0, false)
360                     .append(message.getStatistics().getPolicyUndeployFailCount() < 0, false)
361                     .append(message.getStatistics().getPolicyUndeploySuccessCount() < 0, false)
362                     .append(message.getStatistics().getPolicyExecutedCount() < 0, false)
363                     .append(message.getStatistics().getPolicyExecutedFailCount() < 0, false)
364                     .append(message.getStatistics().getPolicyExecutedSuccessCount() < 0, false).build();
365         } else {
366             LOGGER.debug("PdpStatistics is null");
367             return false;
368         }
369     }
370
371     private void updatePdpHealthStatus(final PdpStatus message, final PdpSubGroup pdpSubgroup, final Pdp pdpInstance,
372             final PdpGroup pdpGroup) {
373         pdpInstance.setHealthy(message.getHealthy());
374         pdpInstance.setMessage(message.getDescription());
375         pdpInstance.setLastUpdate(Instant.now());
376         pdpGroupService.updatePdp(pdpGroup.getName(), pdpSubgroup.getPdpType(), pdpInstance);
377
378         LOGGER.debug("Updated Pdp in DB - {}", pdpInstance);
379     }
380
381     private void createPdpStatistics(final PdpStatistics pdpStatistics) {
382         pdpStatisticsService.createPdpStatistics(Arrays.asList(pdpStatistics));
383         LOGGER.debug("Created PdpStatistics in DB - {}", pdpStatistics);
384     }
385
386     private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId,
387             final PdpState pdpState) {
388         final List<ToscaPolicy> polsToBeDeployed = new LinkedList<>(policiesToBeDeployed.values());
389         final var pdpUpdatemessage =
390             createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId,
391                         polsToBeDeployed, policiesToBeUndeployed);
392         final var pdpStateChangeMessage =
393             createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState);
394         updateDeploymentStatus(pdpGroupName, subGroup.getPdpType(), pdpInstanceId, pdpStateChangeMessage.getState(),
395             pdpUpdatemessage.getPoliciesToBeDeployed());
396
397         requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage);
398         LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage);
399         LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage);
400     }
401 }