d7ee15bd7636dbe23636aca26cf88a24a709e8b4
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / comm / PdpStatusMessageHandler.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019-2020 Nordix Foundation.
4  *  Modifications Copyright (C) 2019-2020 AT&T Intellectual Property.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.pap.main.comm;
23
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
28 import org.apache.commons.lang3.builder.EqualsBuilder;
29 import org.onap.policy.common.utils.services.Registry;
30 import org.onap.policy.models.base.PfModelException;
31 import org.onap.policy.models.pdp.concepts.Pdp;
32 import org.onap.policy.models.pdp.concepts.PdpGroup;
33 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
34 import org.onap.policy.models.pdp.concepts.PdpStateChange;
35 import org.onap.policy.models.pdp.concepts.PdpStatistics;
36 import org.onap.policy.models.pdp.concepts.PdpStatus;
37 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
38 import org.onap.policy.models.pdp.concepts.PdpUpdate;
39 import org.onap.policy.models.pdp.enums.PdpState;
40 import org.onap.policy.models.provider.PolicyModelsProvider;
41 import org.onap.policy.pap.main.PapConstants;
42 import org.onap.policy.pap.main.PolicyPapException;
43 import org.onap.policy.pap.main.parameters.PdpParameters;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47
48 /**
49  * Handler for PDP Status messages which either represent registration or heart beat.
50  *
51  * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
52  */
53 public class PdpStatusMessageHandler extends PdpMessageGenerator {
54     private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class);
55
56     private final PdpParameters params;
57
58     /**
59      * Constructs the object.
60      *
61      * @param params PDP parameters
62      */
63     public PdpStatusMessageHandler(PdpParameters params) {
64         super(true);
65         this.params = params;
66     }
67
68     /**
69      * Handles the PdpStatus message coming from various PDP's.
70      *
71      * @param message the PdpStatus message
72      */
73     public void handlePdpStatus(final PdpStatus message) {
74         long diffms = System.currentTimeMillis() - message.getTimestampMs();
75         if (diffms > params.getMaxMessageAgeMs()) {
76             long diffsec = TimeUnit.SECONDS.convert(diffms, TimeUnit.MILLISECONDS);
77             LOGGER.info("discarding status message from {} age {}s", message.getName(), diffsec);
78             return;
79         }
80
81         synchronized (updateLock) {
82             try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) {
83                 if (message.getPdpSubgroup() == null) {
84                     handlePdpRegistration(message, databaseProvider);
85                 } else {
86                     handlePdpHeartbeat(message, databaseProvider);
87                 }
88                 /*
89                  * Indicate that a heart beat was received from the PDP. This is invoked only if handleXxx() does not
90                  * throw an exception.
91                  */
92                 if (message.getName() != null) {
93                     final PdpTracker pdpTracker = Registry.get(PapConstants.REG_PDP_TRACKER);
94                     pdpTracker.add(message.getName());
95                 }
96             } catch (final PolicyPapException exp) {
97                 LOGGER.error("Operation Failed", exp);
98             } catch (final Exception exp) {
99                 LOGGER.error("Failed connecting to database provider", exp);
100             }
101         }
102     }
103
104     private void handlePdpRegistration(final PdpStatus message, final PolicyModelsProvider databaseProvider)
105             throws PfModelException, PolicyPapException {
106         if (!findAndUpdatePdpGroup(message, databaseProvider)) {
107             final String errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - ";
108             LOGGER.debug("{}{}", errorMessage, message);
109             throw new PolicyPapException(errorMessage + message);
110         }
111     }
112
113     private boolean findAndUpdatePdpGroup(final PdpStatus message, final PolicyModelsProvider databaseProvider)
114             throws PfModelException {
115         boolean pdpGroupFound = false;
116         final PdpGroupFilter filter =
117                 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
118
119         final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
120         if (!pdpGroups.isEmpty()) {
121             pdpGroupFound = registerPdp(message, databaseProvider, pdpGroups.get(0));
122         }
123         return pdpGroupFound;
124     }
125
126     private boolean registerPdp(final PdpStatus message, final PolicyModelsProvider databaseProvider,
127             final PdpGroup finalizedPdpGroup) throws PfModelException {
128         Optional<PdpSubGroup> subGroup;
129         boolean pdpGroupFound = false;
130         subGroup = findPdpSubGroup(message, finalizedPdpGroup);
131         if (subGroup.isPresent()) {
132             LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", finalizedPdpGroup, message);
133             if (!findPdpInstance(message, subGroup.get()).isPresent()) {
134                 updatePdpSubGroup(finalizedPdpGroup, subGroup.get(), message, databaseProvider);
135             }
136             sendPdpMessage(finalizedPdpGroup.getName(), subGroup.get(), message.getName(), null, databaseProvider);
137             pdpGroupFound = true;
138         }
139         return pdpGroupFound;
140     }
141
142     private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message,
143             final PolicyModelsProvider databaseProvider) throws PfModelException {
144
145         final Pdp pdpInstance = new Pdp();
146         pdpInstance.setInstanceId(message.getName());
147         pdpInstance.setPdpState(PdpState.ACTIVE);
148         pdpInstance.setHealthy(message.getHealthy());
149         pdpInstance.setMessage(message.getDescription());
150         pdpSubGroup.getPdpInstances().add(pdpInstance);
151
152         pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1);
153
154         databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
155
156         LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup);
157     }
158
159     private void handlePdpHeartbeat(final PdpStatus message, final PolicyModelsProvider databaseProvider)
160             throws PfModelException {
161
162         final PdpGroupFilter filter =
163                 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
164         final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
165         if (!pdpGroups.isEmpty()) {
166             PdpGroup pdpGroup = pdpGroups.get(0);
167             Optional<PdpSubGroup> pdpSubgroup = findPdpSubGroup(message, pdpGroup);
168             if (pdpSubgroup.isPresent()) {
169                 Optional<Pdp> pdpInstance = findPdpInstance(message, pdpSubgroup.get());
170                 if (pdpInstance.isPresent()) {
171                     processPdpDetails(message, pdpSubgroup.get(), pdpInstance.get(), pdpGroup, databaseProvider);
172                 } else {
173                     LOGGER.debug("PdpInstance not Found in DB. Sending Pdp for registration - {}", message);
174                     registerPdp(message, databaseProvider, pdpGroup);
175                 }
176             }
177         }
178     }
179
180     private Optional<PdpSubGroup> findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) {
181         PdpSubGroup pdpSubgroup = null;
182         for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) {
183             if (message.getPdpType().equals(subGroup.getPdpType())) {
184                 pdpSubgroup = subGroup;
185                 break;
186             }
187         }
188         return Optional.ofNullable(pdpSubgroup);
189     }
190
191     private Optional<Pdp> findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) {
192         Pdp pdpInstance = null;
193         for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) {
194             if (pdpInstanceDetails.getInstanceId().equals(message.getName())) {
195                 pdpInstance = pdpInstanceDetails;
196                 break;
197             }
198         }
199         return Optional.ofNullable(pdpInstance);
200     }
201
202     private void processPdpDetails(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
203             final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider) throws PfModelException {
204         if (PdpState.TERMINATED.equals(message.getState())) {
205             processPdpTermination(pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
206         } else if (validatePdpDetails(message, pdpGroup, pdpSubGroup, pdpInstance)) {
207             LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance);
208             updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
209
210             if (validatePdpStatisticsDetails(message, pdpInstance, pdpGroup, pdpSubGroup)) {
211                 LOGGER.debug("PdpStatistics details are correct. Saving current statistics in DB - {}",
212                         message.getStatistics());
213                 createPdpStatistics(message.getStatistics(), databaseProvider);
214             } else {
215                 LOGGER.debug("PdpStatistics details are not correct - {}", message.getStatistics());
216             }
217         } else {
218             LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance);
219             sendPdpMessage(pdpGroup.getName(), pdpSubGroup, pdpInstance.getInstanceId(), pdpInstance.getPdpState(),
220                     databaseProvider);
221         }
222     }
223
224     private void processPdpTermination(final PdpSubGroup pdpSubGroup, final Pdp pdpInstance, final PdpGroup pdpGroup,
225             final PolicyModelsProvider databaseProvider) throws PfModelException {
226         pdpSubGroup.getPdpInstances().remove(pdpInstance);
227         pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() - 1);
228         databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
229
230         LOGGER.debug("Deleted PdpInstance - {} belonging to PdpSubGroup - {} and PdpGroup - {}", pdpInstance,
231                 pdpSubGroup, pdpGroup);
232     }
233
234     private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup,
235             final Pdp pdpInstanceDetails) {
236         /*
237          * "EqualsBuilder" is a bit of a misnomer, as it uses containsAll() to check policies. Nevertheless, it does the
238          * job and provides a convenient way to build a bunch of comparisons.
239          */
240         return new EqualsBuilder().append(message.getPdpGroup(), pdpGroup.getName())
241                 .append(message.getPdpSubgroup(), subGroup.getPdpType())
242                 .append(message.getPdpType(), subGroup.getPdpType())
243                 .append(message.getState(), pdpInstanceDetails.getPdpState())
244                 .append(message.getPolicies().containsAll(subGroup.getPolicies()), true)
245                 .append(subGroup.getPolicies().containsAll(message.getPolicies()), true).build();
246     }
247
248     private boolean validatePdpStatisticsDetails(final PdpStatus message, final Pdp pdpInstanceDetails,
249             final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup) {
250         if (message.getStatistics() != null) {
251             return new EqualsBuilder()
252                     .append(message.getStatistics().getPdpInstanceId(), pdpInstanceDetails.getInstanceId())
253                     .append(message.getStatistics().getPdpGroupName(), pdpGroup.getName())
254                     .append(message.getStatistics().getPdpSubGroupName(), pdpSubGroup.getPdpType())
255                     .append(message.getStatistics().getPolicyDeployCount() < 0, false)
256                     .append(message.getStatistics().getPolicyDeployFailCount() < 0, false)
257                     .append(message.getStatistics().getPolicyDeploySuccessCount() < 0, false)
258                     .append(message.getStatistics().getPolicyExecutedCount() < 0, false)
259                     .append(message.getStatistics().getPolicyExecutedFailCount() < 0, false)
260                     .append(message.getStatistics().getPolicyExecutedSuccessCount() < 0, false).build();
261         } else {
262             LOGGER.debug("PdpStatistics is null");
263             return false;
264         }
265     }
266
267     private void updatePdpHealthStatus(final PdpStatus message, final PdpSubGroup pdpSubgroup, final Pdp pdpInstance,
268             final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider) throws PfModelException {
269         pdpInstance.setHealthy(message.getHealthy());
270         databaseProvider.updatePdp(pdpGroup.getName(), pdpSubgroup.getPdpType(), pdpInstance);
271
272         LOGGER.debug("Updated Pdp in DB - {}", pdpInstance);
273     }
274
275     private void createPdpStatistics(final PdpStatistics pdpStatistics, final PolicyModelsProvider databaseProvider)
276             throws PfModelException {
277         databaseProvider.createPdpStatistics(Arrays.asList(pdpStatistics));
278         LOGGER.debug("Created PdpStatistics in DB - {}", pdpStatistics);
279     }
280
281     private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId,
282             final PdpState pdpState, final PolicyModelsProvider databaseProvider) throws PfModelException {
283         final PdpUpdate pdpUpdatemessage =
284                 createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId, databaseProvider);
285         final PdpStateChange pdpStateChangeMessage =
286                 createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState);
287         requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage);
288         LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage);
289         LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage);
290     }
291 }