2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2020 Nordix Foundation.
4 * Modifications Copyright (C) 2019-2020 AT&T Intellectual Property.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.pap.main.comm;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
28 import org.apache.commons.lang3.builder.EqualsBuilder;
29 import org.onap.policy.common.utils.services.Registry;
30 import org.onap.policy.models.base.PfModelException;
31 import org.onap.policy.models.pdp.concepts.Pdp;
32 import org.onap.policy.models.pdp.concepts.PdpGroup;
33 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
34 import org.onap.policy.models.pdp.concepts.PdpStateChange;
35 import org.onap.policy.models.pdp.concepts.PdpStatistics;
36 import org.onap.policy.models.pdp.concepts.PdpStatus;
37 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
38 import org.onap.policy.models.pdp.concepts.PdpUpdate;
39 import org.onap.policy.models.pdp.enums.PdpState;
40 import org.onap.policy.models.provider.PolicyModelsProvider;
41 import org.onap.policy.pap.main.PapConstants;
42 import org.onap.policy.pap.main.PolicyPapException;
43 import org.onap.policy.pap.main.parameters.PdpParameters;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
49 * Handler for PDP Status messages which either represent registration or heart beat.
51 * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
53 public class PdpStatusMessageHandler extends PdpMessageGenerator {
54 private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class);
56 private final PdpParameters params;
59 * Constructs the object.
61 * @param params PDP parameters
63 public PdpStatusMessageHandler(PdpParameters params) {
69 * Handles the PdpStatus message coming from various PDP's.
71 * @param message the PdpStatus message
73 public void handlePdpStatus(final PdpStatus message) {
74 long diffms = System.currentTimeMillis() - message.getTimestampMs();
75 if (diffms > params.getMaxMessageAgeMs()) {
76 long diffsec = TimeUnit.SECONDS.convert(diffms, TimeUnit.MILLISECONDS);
77 LOGGER.info("discarding status message from {} age {}s", message.getName(), diffsec);
81 synchronized (updateLock) {
82 try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) {
83 if (message.getPdpSubgroup() == null) {
84 handlePdpRegistration(message, databaseProvider);
86 handlePdpHeartbeat(message, databaseProvider);
89 * Indicate that a heart beat was received from the PDP. This is invoked only if handleXxx() does not
92 if (message.getName() != null) {
93 final PdpTracker pdpTracker = Registry.get(PapConstants.REG_PDP_TRACKER);
94 pdpTracker.add(message.getName());
96 } catch (final PolicyPapException exp) {
97 LOGGER.error("Operation Failed", exp);
98 } catch (final Exception exp) {
99 LOGGER.error("Failed connecting to database provider", exp);
104 private void handlePdpRegistration(final PdpStatus message, final PolicyModelsProvider databaseProvider)
105 throws PfModelException, PolicyPapException {
106 if (!findAndUpdatePdpGroup(message, databaseProvider)) {
107 final String errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - ";
108 LOGGER.debug("{}{}", errorMessage, message);
109 throw new PolicyPapException(errorMessage + message);
113 private boolean findAndUpdatePdpGroup(final PdpStatus message, final PolicyModelsProvider databaseProvider)
114 throws PfModelException {
115 boolean pdpGroupFound = false;
116 final PdpGroupFilter filter =
117 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
119 final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
120 if (!pdpGroups.isEmpty()) {
121 pdpGroupFound = registerPdp(message, databaseProvider, pdpGroups.get(0));
123 return pdpGroupFound;
126 private boolean registerPdp(final PdpStatus message, final PolicyModelsProvider databaseProvider,
127 final PdpGroup finalizedPdpGroup) throws PfModelException {
128 Optional<PdpSubGroup> subGroup;
129 boolean pdpGroupFound = false;
130 subGroup = findPdpSubGroup(message, finalizedPdpGroup);
131 if (subGroup.isPresent()) {
132 LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", finalizedPdpGroup, message);
133 if (!findPdpInstance(message, subGroup.get()).isPresent()) {
134 updatePdpSubGroup(finalizedPdpGroup, subGroup.get(), message, databaseProvider);
136 sendPdpMessage(finalizedPdpGroup.getName(), subGroup.get(), message.getName(), null, databaseProvider);
137 pdpGroupFound = true;
139 return pdpGroupFound;
142 private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message,
143 final PolicyModelsProvider databaseProvider) throws PfModelException {
145 final Pdp pdpInstance = new Pdp();
146 pdpInstance.setInstanceId(message.getName());
147 pdpInstance.setPdpState(PdpState.ACTIVE);
148 pdpInstance.setHealthy(message.getHealthy());
149 pdpInstance.setMessage(message.getDescription());
150 pdpSubGroup.getPdpInstances().add(pdpInstance);
152 pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1);
154 databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
156 LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup);
159 private void handlePdpHeartbeat(final PdpStatus message, final PolicyModelsProvider databaseProvider)
160 throws PfModelException {
162 final PdpGroupFilter filter =
163 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
164 final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
165 if (!pdpGroups.isEmpty()) {
166 PdpGroup pdpGroup = pdpGroups.get(0);
167 Optional<PdpSubGroup> pdpSubgroup = findPdpSubGroup(message, pdpGroup);
168 if (pdpSubgroup.isPresent()) {
169 Optional<Pdp> pdpInstance = findPdpInstance(message, pdpSubgroup.get());
170 if (pdpInstance.isPresent()) {
171 processPdpDetails(message, pdpSubgroup.get(), pdpInstance.get(), pdpGroup, databaseProvider);
173 LOGGER.debug("PdpInstance not Found in DB. Sending Pdp for registration - {}", message);
174 registerPdp(message, databaseProvider, pdpGroup);
180 private Optional<PdpSubGroup> findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) {
181 PdpSubGroup pdpSubgroup = null;
182 for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) {
183 if (message.getPdpType().equals(subGroup.getPdpType())) {
184 pdpSubgroup = subGroup;
188 return Optional.ofNullable(pdpSubgroup);
191 private Optional<Pdp> findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) {
192 Pdp pdpInstance = null;
193 for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) {
194 if (pdpInstanceDetails.getInstanceId().equals(message.getName())) {
195 pdpInstance = pdpInstanceDetails;
199 return Optional.ofNullable(pdpInstance);
202 private void processPdpDetails(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
203 final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider) throws PfModelException {
204 if (PdpState.TERMINATED.equals(message.getState())) {
205 processPdpTermination(pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
206 } else if (validatePdpDetails(message, pdpGroup, pdpSubGroup, pdpInstance)) {
207 LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance);
208 updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
210 if (validatePdpStatisticsDetails(message, pdpInstance, pdpGroup, pdpSubGroup)) {
211 LOGGER.debug("PdpStatistics details are correct. Saving current statistics in DB - {}",
212 message.getStatistics());
213 createPdpStatistics(message.getStatistics(), databaseProvider);
215 LOGGER.debug("PdpStatistics details are not correct - {}", message.getStatistics());
218 LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance);
219 sendPdpMessage(pdpGroup.getName(), pdpSubGroup, pdpInstance.getInstanceId(), pdpInstance.getPdpState(),
224 private void processPdpTermination(final PdpSubGroup pdpSubGroup, final Pdp pdpInstance, final PdpGroup pdpGroup,
225 final PolicyModelsProvider databaseProvider) throws PfModelException {
226 pdpSubGroup.getPdpInstances().remove(pdpInstance);
227 pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() - 1);
228 databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
230 LOGGER.debug("Deleted PdpInstance - {} belonging to PdpSubGroup - {} and PdpGroup - {}", pdpInstance,
231 pdpSubGroup, pdpGroup);
234 private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup,
235 final Pdp pdpInstanceDetails) {
237 * "EqualsBuilder" is a bit of a misnomer, as it uses containsAll() to check policies. Nevertheless, it does the
238 * job and provides a convenient way to build a bunch of comparisons.
240 return new EqualsBuilder().append(message.getPdpGroup(), pdpGroup.getName())
241 .append(message.getPdpSubgroup(), subGroup.getPdpType())
242 .append(message.getPdpType(), subGroup.getPdpType())
243 .append(message.getState(), pdpInstanceDetails.getPdpState())
244 .append(message.getPolicies().containsAll(subGroup.getPolicies()), true)
245 .append(subGroup.getPolicies().containsAll(message.getPolicies()), true).build();
248 private boolean validatePdpStatisticsDetails(final PdpStatus message, final Pdp pdpInstanceDetails,
249 final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup) {
250 if (message.getStatistics() != null) {
251 return new EqualsBuilder()
252 .append(message.getStatistics().getPdpInstanceId(), pdpInstanceDetails.getInstanceId())
253 .append(message.getStatistics().getPdpGroupName(), pdpGroup.getName())
254 .append(message.getStatistics().getPdpSubGroupName(), pdpSubGroup.getPdpType())
255 .append(message.getStatistics().getPolicyDeployCount() < 0, false)
256 .append(message.getStatistics().getPolicyDeployFailCount() < 0, false)
257 .append(message.getStatistics().getPolicyDeploySuccessCount() < 0, false)
258 .append(message.getStatistics().getPolicyExecutedCount() < 0, false)
259 .append(message.getStatistics().getPolicyExecutedFailCount() < 0, false)
260 .append(message.getStatistics().getPolicyExecutedSuccessCount() < 0, false).build();
262 LOGGER.debug("PdpStatistics is null");
267 private void updatePdpHealthStatus(final PdpStatus message, final PdpSubGroup pdpSubgroup, final Pdp pdpInstance,
268 final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider) throws PfModelException {
269 pdpInstance.setHealthy(message.getHealthy());
270 databaseProvider.updatePdp(pdpGroup.getName(), pdpSubgroup.getPdpType(), pdpInstance);
272 LOGGER.debug("Updated Pdp in DB - {}", pdpInstance);
275 private void createPdpStatistics(final PdpStatistics pdpStatistics, final PolicyModelsProvider databaseProvider)
276 throws PfModelException {
277 databaseProvider.createPdpStatistics(Arrays.asList(pdpStatistics));
278 LOGGER.debug("Created PdpStatistics in DB - {}", pdpStatistics);
281 private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId,
282 final PdpState pdpState, final PolicyModelsProvider databaseProvider) throws PfModelException {
283 final PdpUpdate pdpUpdatemessage =
284 createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId, databaseProvider);
285 final PdpStateChange pdpStateChangeMessage =
286 createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState);
287 requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage);
288 LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage);
289 LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage);