Fix sonars in policy-pap
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / comm / PdpStatusMessageHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019-2020 Nordix Foundation.
4  *  Modifications Copyright (C) 2019-2021 AT&T Intellectual Property.
5  *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.pap.main.comm;
24
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Optional;
31 import java.util.concurrent.TimeUnit;
32 import java.util.stream.Collectors;
33 import org.apache.commons.lang3.builder.EqualsBuilder;
34 import org.onap.policy.common.utils.services.Registry;
35 import org.onap.policy.models.base.PfModelException;
36 import org.onap.policy.models.pdp.concepts.Pdp;
37 import org.onap.policy.models.pdp.concepts.PdpGroup;
38 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
39 import org.onap.policy.models.pdp.concepts.PdpStatistics;
40 import org.onap.policy.models.pdp.concepts.PdpStatus;
41 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
42 import org.onap.policy.models.pdp.enums.PdpState;
43 import org.onap.policy.models.provider.PolicyModelsProvider;
44 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
45 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
46 import org.onap.policy.pap.main.PapConstants;
47 import org.onap.policy.pap.main.PolicyPapException;
48 import org.onap.policy.pap.main.parameters.PdpParameters;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52
53 /**
54  * Handler for PDP Status messages which either represent registration or heart beat.
55  *
56  * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
57  */
58 public class PdpStatusMessageHandler extends PdpMessageGenerator {
59     private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class);
60
61     private final PdpParameters params;
62
63     /**
64      * List to store policies present in db.
65      */
66     List<ToscaPolicy> policies = new LinkedList<>();
67
68     /**
69      * List to store policies to be deployed (heartbeat).
70      */
71     Map<ToscaConceptIdentifier, ToscaPolicy> policiesToBeDeployed = new HashMap<>();
72
73     /**
74      * List to store policies to be undeployed (heartbeat).
75      */
76     List<ToscaConceptIdentifier> policiesToBeUndeployed = new LinkedList<>();
77
78     /**
79      * Constructs the object.
80      *
81      * @param params PDP parameters
82      */
83     public PdpStatusMessageHandler(PdpParameters params) {
84         super(true);
85         this.params = params;
86     }
87
88     /**
89      * Handles the PdpStatus message coming from various PDP's.
90      *
91      * @param message the PdpStatus message
92      */
93     public void handlePdpStatus(final PdpStatus message) {
94         long diffms = System.currentTimeMillis() - message.getTimestampMs();
95         if (diffms > params.getMaxMessageAgeMs()) {
96             long diffsec = TimeUnit.SECONDS.convert(diffms, TimeUnit.MILLISECONDS);
97             LOGGER.info("discarding status message from {} age {}s", message.getName(), diffsec);
98             return;
99         }
100
101         synchronized (updateLock) {
102             try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) {
103                 if (message.getPdpSubgroup() == null) {
104                     handlePdpRegistration(message, databaseProvider);
105                 } else {
106                     handlePdpHeartbeat(message, databaseProvider);
107                 }
108                 /*
109                  * Indicate that a heart beat was received from the PDP. This is invoked only if handleXxx() does not
110                  * throw an exception.
111                  */
112                 if (message.getName() != null) {
113                     final var pdpTracker = (PdpTracker) Registry.get(PapConstants.REG_PDP_TRACKER);
114                     pdpTracker.add(message.getName());
115                 }
116             } catch (final PolicyPapException exp) {
117                 LOGGER.error("Operation Failed", exp);
118             } catch (final Exception exp) {
119                 LOGGER.error("Failed connecting to database provider", exp);
120             }
121         }
122     }
123
124     private void handlePdpRegistration(final PdpStatus message, final PolicyModelsProvider databaseProvider)
125             throws PfModelException, PolicyPapException {
126         if (!findAndUpdatePdpGroup(message, databaseProvider)) {
127             final var errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - ";
128             LOGGER.debug("{}{}", errorMessage, message);
129             throw new PolicyPapException(errorMessage + message);
130         }
131     }
132
133     private boolean findAndUpdatePdpGroup(final PdpStatus message, final PolicyModelsProvider databaseProvider)
134             throws PfModelException {
135         var pdpGroupFound = false;
136         final PdpGroupFilter filter =
137                 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
138
139         final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
140         if (!pdpGroups.isEmpty()) {
141             pdpGroupFound = registerPdp(message, databaseProvider, pdpGroups.get(0));
142         }
143         return pdpGroupFound;
144     }
145
146     private boolean registerPdp(final PdpStatus message, final PolicyModelsProvider databaseProvider,
147             final PdpGroup finalizedPdpGroup) throws PfModelException {
148         Optional<PdpSubGroup> subGroup;
149         var pdpGroupFound = false;
150         subGroup = findPdpSubGroup(message, finalizedPdpGroup);
151
152         if (subGroup.isPresent()) {
153             policies = getToscaPolicies(subGroup.get(), databaseProvider);
154             policiesToBeDeployed = policies.stream().collect(Collectors
155                     .toMap(ToscaPolicy::getIdentifier, policy -> policy));
156             policiesToBeUndeployed = null;
157
158             LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", finalizedPdpGroup, message);
159             if (!findPdpInstance(message, subGroup.get()).isPresent()) {
160                 updatePdpSubGroup(finalizedPdpGroup, subGroup.get(), message, databaseProvider);
161             }
162             sendPdpMessage(finalizedPdpGroup.getName(), subGroup.get(), message.getName(), null, databaseProvider);
163             pdpGroupFound = true;
164         }
165         return pdpGroupFound;
166     }
167
168     private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message,
169             final PolicyModelsProvider databaseProvider) throws PfModelException {
170
171         final var pdpInstance = new Pdp();
172         pdpInstance.setInstanceId(message.getName());
173         pdpInstance.setPdpState(PdpState.ACTIVE);
174         pdpInstance.setHealthy(message.getHealthy());
175         pdpInstance.setMessage(message.getDescription());
176         pdpSubGroup.getPdpInstances().add(pdpInstance);
177
178         pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1);
179
180         databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
181
182         LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup);
183     }
184
185     private void handlePdpHeartbeat(final PdpStatus message, final PolicyModelsProvider databaseProvider)
186             throws PfModelException {
187
188         final PdpGroupFilter filter =
189                 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
190         final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
191         if (!pdpGroups.isEmpty()) {
192             var pdpGroup = pdpGroups.get(0);
193             Optional<PdpSubGroup> pdpSubgroup = findPdpSubGroup(message, pdpGroup);
194             if (pdpSubgroup.isPresent()) {
195                 Optional<Pdp> pdpInstance = findPdpInstance(message, pdpSubgroup.get());
196                 if (pdpInstance.isPresent()) {
197                     processPdpDetails(message, pdpSubgroup.get(), pdpInstance.get(), pdpGroup, databaseProvider);
198                 } else {
199                     LOGGER.debug("PdpInstance not Found in DB. Sending Pdp for registration - {}", message);
200                     registerPdp(message, databaseProvider, pdpGroup);
201                 }
202             }
203         }
204     }
205
206     private Optional<PdpSubGroup> findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) {
207         PdpSubGroup pdpSubgroup = null;
208         for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) {
209             if (message.getPdpType().equals(subGroup.getPdpType())) {
210                 pdpSubgroup = subGroup;
211                 break;
212             }
213         }
214         return Optional.ofNullable(pdpSubgroup);
215     }
216
217     private Optional<Pdp> findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) {
218         Pdp pdpInstance = null;
219         for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) {
220             if (pdpInstanceDetails.getInstanceId().equals(message.getName())) {
221                 pdpInstance = pdpInstanceDetails;
222                 break;
223             }
224         }
225         return Optional.ofNullable(pdpInstance);
226     }
227
228     private void processPdpDetails(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
229             final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider)
230                     throws PfModelException {
231         // all policies
232         policies = getToscaPolicies(pdpSubGroup, databaseProvider);
233
234         policiesToBeDeployed =
235             policies.stream().collect(Collectors.toMap(ToscaPolicy::getIdentifier, policy -> policy));
236         // all (-) policies that the PDP already has
237         policiesToBeDeployed.keySet().removeAll(message.getPolicies());
238
239         // policies that the PDP already has (-) all
240         policiesToBeUndeployed = new LinkedList<>(message.getPolicies());
241         policiesToBeUndeployed.removeAll(policies.stream().map(ToscaPolicy::getIdentifier)
242                 .collect(Collectors.toList()));
243
244         if (PdpState.TERMINATED.equals(message.getState())) {
245             processPdpTermination(pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
246         } else if (validatePdpDetails(message, pdpGroup, pdpSubGroup, pdpInstance)) {
247             LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance);
248             updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
249
250             if (validatePdpStatisticsDetails(message, pdpInstance, pdpGroup, pdpSubGroup)) {
251                 LOGGER.debug("PdpStatistics details are correct. Saving current statistics in DB - {}",
252                         message.getStatistics());
253                 createPdpStatistics(message.getStatistics(), databaseProvider);
254             } else {
255                 LOGGER.debug("PdpStatistics details are not correct - {}", message.getStatistics());
256             }
257         } else {
258             LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance);
259             LOGGER.debug("Policy list in DB - {}. Policy list in heartbeat - {}", pdpSubGroup.getPolicies(),
260                 message.getPolicies());
261             sendPdpMessage(pdpGroup.getName(), pdpSubGroup, pdpInstance.getInstanceId(), pdpInstance.getPdpState(),
262                 databaseProvider);
263         }
264     }
265
266     private void processPdpTermination(final PdpSubGroup pdpSubGroup, final Pdp pdpInstance, final PdpGroup pdpGroup,
267             final PolicyModelsProvider databaseProvider) throws PfModelException {
268         pdpSubGroup.getPdpInstances().remove(pdpInstance);
269         pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() - 1);
270         databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
271
272         LOGGER.debug("Deleted PdpInstance - {} belonging to PdpSubGroup - {} and PdpGroup - {}", pdpInstance,
273                 pdpSubGroup, pdpGroup);
274     }
275
276     private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup,
277             final Pdp pdpInstanceDetails) {
278         /*
279          * "EqualsBuilder" is a bit of a misnomer, as it uses containsAll() to check policies. Nevertheless, it does the
280          * job and provides a convenient way to build a bunch of comparisons.
281          */
282         return new EqualsBuilder().append(message.getPdpGroup(), pdpGroup.getName())
283                 .append(message.getPdpSubgroup(), subGroup.getPdpType())
284                 .append(message.getPdpType(), subGroup.getPdpType())
285                 .append(message.getState(), pdpInstanceDetails.getPdpState())
286                 .append(message.getPolicies().containsAll(subGroup.getPolicies()), true)
287                 .append(subGroup.getPolicies().containsAll(message.getPolicies()), true).build();
288     }
289
290     private boolean validatePdpStatisticsDetails(final PdpStatus message, final Pdp pdpInstanceDetails,
291             final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup) {
292         if (message.getStatistics() != null) {
293             return new EqualsBuilder()
294                     .append(message.getStatistics().getPdpInstanceId(), pdpInstanceDetails.getInstanceId())
295                     .append(message.getStatistics().getPdpGroupName(), pdpGroup.getName())
296                     .append(message.getStatistics().getPdpSubGroupName(), pdpSubGroup.getPdpType())
297                     .append(message.getStatistics().getPolicyDeployCount() < 0, false)
298                     .append(message.getStatistics().getPolicyDeployFailCount() < 0, false)
299                     .append(message.getStatistics().getPolicyDeploySuccessCount() < 0, false)
300                     .append(message.getStatistics().getPolicyExecutedCount() < 0, false)
301                     .append(message.getStatistics().getPolicyExecutedFailCount() < 0, false)
302                     .append(message.getStatistics().getPolicyExecutedSuccessCount() < 0, false).build();
303         } else {
304             LOGGER.debug("PdpStatistics is null");
305             return false;
306         }
307     }
308
309     private void updatePdpHealthStatus(final PdpStatus message, final PdpSubGroup pdpSubgroup, final Pdp pdpInstance,
310             final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider) throws PfModelException {
311         pdpInstance.setHealthy(message.getHealthy());
312         databaseProvider.updatePdp(pdpGroup.getName(), pdpSubgroup.getPdpType(), pdpInstance);
313
314         LOGGER.debug("Updated Pdp in DB - {}", pdpInstance);
315     }
316
317     private void createPdpStatistics(final PdpStatistics pdpStatistics, final PolicyModelsProvider databaseProvider)
318             throws PfModelException {
319         databaseProvider.createPdpStatistics(Arrays.asList(pdpStatistics));
320         LOGGER.debug("Created PdpStatistics in DB - {}", pdpStatistics);
321     }
322
323     private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId,
324             final PdpState pdpState, final PolicyModelsProvider databaseProvider)
325                     throws PfModelException {
326         final List<ToscaPolicy> polsToBeDeployed = new LinkedList<>(policiesToBeDeployed.values());
327         final var pdpUpdatemessage =
328             createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId, policies,
329                         polsToBeDeployed, policiesToBeUndeployed);
330         final var pdpStateChangeMessage =
331             createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState);
332         updateDeploymentStatus(pdpGroupName, subGroup.getPdpType(), pdpInstanceId, pdpStateChangeMessage.getState(),
333             databaseProvider, pdpUpdatemessage.getPolicies());
334
335         requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage);
336         LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage);
337         LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage);
338     }
339 }