Fix sonar code smells
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / comm / PdpStatusMessageHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019-2021,2023 Nordix Foundation.
4  *  Modifications Copyright (C) 2019-2021 AT&T Intellectual Property.
5  *  Modifications Copyright (C) 2021-2023 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.pap.main.comm;
24
25 import java.sql.SQLIntegrityConstraintViolationException;
26 import java.time.Instant;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Optional;
33 import java.util.concurrent.TimeUnit;
34 import java.util.stream.Collectors;
35 import org.apache.commons.lang3.builder.EqualsBuilder;
36 import org.onap.policy.models.base.PfModelException;
37 import org.onap.policy.models.pdp.concepts.Pdp;
38 import org.onap.policy.models.pdp.concepts.PdpGroup;
39 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
40 import org.onap.policy.models.pdp.concepts.PdpStatus;
41 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
42 import org.onap.policy.models.pdp.enums.PdpState;
43 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
44 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
45 import org.onap.policy.pap.main.PolicyPapException;
46 import org.onap.policy.pap.main.parameters.PapParameterGroup;
47 import org.onap.policy.pap.main.parameters.PdpParameters;
48 import org.onap.policy.pap.main.service.PdpGroupService;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import org.springframework.stereotype.Component;
52
53
54 /**
55  * Handler for PDP Status messages which either represent registration or heart beat.
56  *
57  * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
58  */
59
60 @Component
61 public class PdpStatusMessageHandler extends PdpMessageGenerator {
62     private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class);
63
64     private final PdpParameters params;
65
66     private final PdpGroupService pdpGroupService;
67
68     /**
69      * List to store policies present in db.
70      */
71     List<ToscaPolicy> policies = new LinkedList<>();
72
73     /**
74      * List to store policies to be deployed (heartbeat).
75      */
76     Map<ToscaConceptIdentifier, ToscaPolicy> policiesToBeDeployed = new HashMap<>();
77
78     /**
79      * List to store policies to be undeployed (heartbeat).
80      */
81     List<ToscaConceptIdentifier> policiesToBeUndeployed = new LinkedList<>();
82
83     /**
84      * Constructs the object.
85      *
86      * @param parameterGroup the parameterGroup
87      * @param pdpGroupService the pdpGroupService
88      */
89     public PdpStatusMessageHandler(PapParameterGroup parameterGroup, PdpGroupService pdpGroupService) {
90         super(true);
91         this.params = parameterGroup.getPdpParameters();
92         this.pdpGroupService = pdpGroupService;
93     }
94
95     /**
96      * Handles the PdpStatus message coming from various PDP's.
97      *
98      * @param message the PdpStatus message
99      */
100     public void handlePdpStatus(final PdpStatus message) {
101         if (message.getPolicies() == null) {
102             message.setPolicies(Collections.emptyList());
103         }
104
105         long diffms = System.currentTimeMillis() - message.getTimestampMs();
106         if (diffms > params.getMaxMessageAgeMs()) {
107             long diffsec = TimeUnit.SECONDS.convert(diffms, TimeUnit.MILLISECONDS);
108             LOGGER.info("discarding status message from {} age {}s", message.getName(), diffsec);
109             return;
110         }
111
112         synchronized (updateLock) {
113             try {
114                 if (message.getPdpSubgroup() == null) {
115                     handlePdpRegistration(message);
116                 } else {
117                     handlePdpHeartbeat(message);
118                 }
119             } catch (final PolicyPapException exp) {
120                 LOGGER.error("Operation Failed", exp);
121             } catch (final Exception exp) {
122                 if (isDuplicateKeyException(exp, Exception.class)) {
123                     /*
124                      * this is to be expected, if multiple PAPs are processing the same
125                      * heartbeat at a time, thus we log the exception at a trace level
126                      * instead of an error level.
127                      */
128                     LOGGER.info("Failed updating PDP information for {} - may have been added by another PAP",
129                                     message.getName());
130                     LOGGER.trace("Failed updating PDP information for {}", message.getName(), exp);
131                 } else {
132                     LOGGER.error("Failed connecting to database provider", exp);
133                 }
134             }
135         }
136     }
137
138     /**
139      * Determines if the exception indicates a duplicate key.
140      *
141      * @param thrown exception to check
142      * @param exceptionClazz the class to check against
143      * @return {@code true} if the exception occurred due to a duplicate key
144      */
145     protected static boolean isDuplicateKeyException(Throwable thrown, Class<? extends Throwable> exceptionClazz) {
146         while (thrown != null) {
147             if (thrown instanceof SQLIntegrityConstraintViolationException) {
148                 return true;
149             }
150
151             if (exceptionClazz.isInstance(thrown) && isDuplicateKeyException(thrown.getCause(), exceptionClazz)) {
152                 return true;
153             }
154
155             thrown = thrown.getCause();
156         }
157
158         return false;
159     }
160
161     private void handlePdpRegistration(final PdpStatus message) throws PfModelException, PolicyPapException {
162         if (!findAndUpdatePdpGroup(message)) {
163             final var errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - ";
164             LOGGER.debug("{}{}", errorMessage, message);
165             throw new PolicyPapException(errorMessage + message);
166         }
167     }
168
169     private boolean findAndUpdatePdpGroup(final PdpStatus message)
170             throws PfModelException {
171         var pdpGroupFound = false;
172         final PdpGroupFilter filter =
173                 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
174
175         final List<PdpGroup> pdpGroups = pdpGroupService.getFilteredPdpGroups(filter);
176         if (!pdpGroups.isEmpty()) {
177             pdpGroupFound = registerPdp(message, pdpGroups.get(0));
178         }
179         return pdpGroupFound;
180     }
181
182     private boolean registerPdp(final PdpStatus message, final PdpGroup finalizedPdpGroup) throws PfModelException {
183         Optional<PdpSubGroup> subGroup;
184         var pdpGroupFound = false;
185         subGroup = findPdpSubGroup(message, finalizedPdpGroup);
186
187         if (subGroup.isPresent()) {
188             policies = getToscaPolicies(subGroup.get());
189             policiesToBeDeployed = policies.stream().collect(Collectors
190                     .toMap(ToscaPolicy::getIdentifier, policy -> policy));
191             policiesToBeUndeployed = null;
192
193             LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", finalizedPdpGroup, message);
194             Optional<Pdp> pdp = findPdpInstance(message, subGroup.get());
195             if (pdp.isPresent()) {
196                 updatePdpHealthStatus(message, subGroup.get(), pdp.get(), finalizedPdpGroup);
197             } else {
198                 updatePdpSubGroup(finalizedPdpGroup, subGroup.get(), message);
199             }
200             sendPdpMessage(finalizedPdpGroup.getName(), subGroup.get(), message.getName(), null);
201             pdpGroupFound = true;
202         }
203         return pdpGroupFound;
204     }
205
206     private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message) {
207
208         final var pdpInstance = new Pdp();
209         pdpInstance.setInstanceId(message.getName());
210         pdpInstance.setPdpState(PdpState.ACTIVE);
211         pdpInstance.setHealthy(message.getHealthy());
212         pdpInstance.setMessage(message.getDescription());
213         pdpInstance.setLastUpdate(Instant.now());
214         pdpSubGroup.getPdpInstances().add(pdpInstance);
215
216         pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1);
217
218         pdpGroupService.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
219
220         LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup.getName());
221     }
222
223     private void handlePdpHeartbeat(final PdpStatus message) throws PfModelException {
224
225         final PdpGroupFilter filter =
226                 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
227         final List<PdpGroup> pdpGroups = pdpGroupService.getFilteredPdpGroups(filter);
228         if (!pdpGroups.isEmpty()) {
229             var pdpGroup = pdpGroups.get(0);
230             Optional<PdpSubGroup> pdpSubgroup = findPdpSubGroup(message, pdpGroup);
231             if (pdpSubgroup.isPresent()) {
232                 Optional<Pdp> pdpInstance = findPdpInstance(message, pdpSubgroup.get());
233                 if (pdpInstance.isPresent()) {
234                     processPdpDetails(message, pdpSubgroup.get(), pdpInstance.get(), pdpGroup);
235                 } else {
236                     LOGGER.debug("PdpInstance not Found in DB. Sending Pdp for registration - {}", message);
237                     registerPdp(message, pdpGroup);
238                 }
239             }
240         }
241     }
242
243     private Optional<PdpSubGroup> findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) {
244         PdpSubGroup pdpSubgroup = null;
245         for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) {
246             if (message.getPdpType().equals(subGroup.getPdpType())) {
247                 pdpSubgroup = subGroup;
248                 break;
249             }
250         }
251         return Optional.ofNullable(pdpSubgroup);
252     }
253
254     private Optional<Pdp> findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) {
255         Pdp pdpInstance = null;
256         for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) {
257             if (pdpInstanceDetails.getInstanceId().equals(message.getName())) {
258                 pdpInstance = pdpInstanceDetails;
259                 break;
260             }
261         }
262         return Optional.ofNullable(pdpInstance);
263     }
264
265     private void processPdpDetails(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
266             final PdpGroup pdpGroup) throws PfModelException {
267         // all policies
268         policies = getToscaPolicies(pdpSubGroup);
269
270         Map<ToscaConceptIdentifier, ToscaPolicy> policyMap =
271                         policies.stream().collect(Collectors.toMap(ToscaPolicy::getIdentifier, policy -> policy));
272
273         // policies that the PDP already has (-) all
274         policiesToBeUndeployed = message.getPolicies().stream().filter(policyId -> !policyMap.containsKey(policyId))
275                         .collect(Collectors.toList());
276
277         // all (-) policies that the PDP already has
278         policiesToBeDeployed = policyMap;
279         policiesToBeDeployed.keySet().removeAll(message.getPolicies());
280
281         if (PdpState.TERMINATED.equals(message.getState())) {
282             processPdpTermination(pdpSubGroup, pdpInstance, pdpGroup);
283         } else if (validatePdpDetails(message, pdpGroup, pdpSubGroup, pdpInstance)) {
284             LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance);
285             updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup);
286         } else {
287             LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance);
288             LOGGER.debug("Policy list in DB - {}. Policy list in heartbeat - {}", pdpSubGroup.getPolicies(),
289                 message.getPolicies());
290             updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup);
291             sendPdpMessage(pdpGroup.getName(), pdpSubGroup, pdpInstance.getInstanceId(), pdpInstance.getPdpState());
292         }
293     }
294
295     private void processPdpTermination(final PdpSubGroup pdpSubGroup, final Pdp pdpInstance, final PdpGroup pdpGroup) {
296         pdpSubGroup.getPdpInstances().remove(pdpInstance);
297         pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() - 1);
298         pdpGroupService.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
299
300         LOGGER.debug("Deleted PdpInstance - {} belonging to PdpSubGroup - {} and PdpGroup - {}", pdpInstance,
301                 pdpSubGroup, pdpGroup);
302     }
303
304     private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup,
305             final Pdp pdpInstanceDetails) {
306         /*
307          * "EqualsBuilder" is a bit of a misnomer, as it uses containsAll() to check policies. Nevertheless, it does the
308          * job and provides a convenient way to build a bunch of comparisons.
309          */
310         return new EqualsBuilder().append(message.getPdpGroup(), pdpGroup.getName())
311                 .append(message.getPdpSubgroup(), subGroup.getPdpType())
312                 .append(message.getPdpType(), subGroup.getPdpType())
313                 .append(message.getState(), pdpInstanceDetails.getPdpState())
314                 .append(message.getPolicies().containsAll(subGroup.getPolicies()), true)
315                 .append(subGroup.getPolicies().containsAll(message.getPolicies()), true).build();
316     }
317
318     private void updatePdpHealthStatus(final PdpStatus message, final PdpSubGroup pdpSubgroup, final Pdp pdpInstance,
319             final PdpGroup pdpGroup) {
320         pdpInstance.setHealthy(message.getHealthy());
321         pdpInstance.setMessage(message.getDescription());
322         pdpInstance.setLastUpdate(Instant.now());
323         pdpGroupService.updatePdp(pdpGroup.getName(), pdpSubgroup.getPdpType(), pdpInstance);
324
325         LOGGER.debug("Updated Pdp in DB - {}", pdpInstance);
326     }
327
328     private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId,
329             final PdpState pdpState) {
330         final List<ToscaPolicy> polsToBeDeployed = new LinkedList<>(policiesToBeDeployed.values());
331         final var pdpUpdatemessage =
332             createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId,
333                         polsToBeDeployed, policiesToBeUndeployed);
334         final var pdpStateChangeMessage =
335             createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState);
336         updateDeploymentStatus(pdpGroupName, subGroup.getPdpType(), pdpInstanceId, pdpStateChangeMessage.getState(),
337             pdpUpdatemessage.getPoliciesToBeDeployed());
338
339         requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage);
340         LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage);
341         LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage);
342     }
343 }