2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2021 Nordix Foundation.
4 * Modifications Copyright (C) 2019-2021 AT&T Intellectual Property.
5 * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.pap.main.comm;
25 import java.sql.SQLIntegrityConstraintViolationException;
26 import java.time.Instant;
27 import java.util.Arrays;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.LinkedList;
31 import java.util.List;
33 import java.util.Optional;
34 import java.util.concurrent.TimeUnit;
35 import java.util.stream.Collectors;
36 import org.apache.commons.lang3.builder.EqualsBuilder;
37 import org.eclipse.persistence.exceptions.EclipseLinkException;
38 import org.onap.policy.models.base.PfModelException;
39 import org.onap.policy.models.pdp.concepts.Pdp;
40 import org.onap.policy.models.pdp.concepts.PdpGroup;
41 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
42 import org.onap.policy.models.pdp.concepts.PdpStatistics;
43 import org.onap.policy.models.pdp.concepts.PdpStatus;
44 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
45 import org.onap.policy.models.pdp.enums.PdpState;
46 import org.onap.policy.models.provider.PolicyModelsProvider;
47 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
48 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
49 import org.onap.policy.pap.main.PolicyPapException;
50 import org.onap.policy.pap.main.parameters.PdpParameters;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
56 * Handler for PDP Status messages which either represent registration or heart beat.
58 * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
60 public class PdpStatusMessageHandler extends PdpMessageGenerator {
61 private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class);
63 private final PdpParameters params;
65 private final boolean savePdpStatistics;
68 * List to store policies present in db.
70 List<ToscaPolicy> policies = new LinkedList<>();
73 * List to store policies to be deployed (heartbeat).
75 Map<ToscaConceptIdentifier, ToscaPolicy> policiesToBeDeployed = new HashMap<>();
78 * List to store policies to be undeployed (heartbeat).
80 List<ToscaConceptIdentifier> policiesToBeUndeployed = new LinkedList<>();
83 * Constructs the object.
85 * @param params PDP parameters
87 public PdpStatusMessageHandler(PdpParameters params, boolean savePdpStatistics) {
91 this.savePdpStatistics = savePdpStatistics;
95 * Handles the PdpStatus message coming from various PDP's.
97 * @param message the PdpStatus message
99 public void handlePdpStatus(final PdpStatus message) {
100 if (message.getPolicies() == null) {
101 message.setPolicies(Collections.emptyList());
104 long diffms = System.currentTimeMillis() - message.getTimestampMs();
105 if (diffms > params.getMaxMessageAgeMs()) {
106 long diffsec = TimeUnit.SECONDS.convert(diffms, TimeUnit.MILLISECONDS);
107 LOGGER.info("discarding status message from {} age {}s", message.getName(), diffsec);
111 synchronized (updateLock) {
112 try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) {
113 if (message.getPdpSubgroup() == null) {
114 handlePdpRegistration(message, databaseProvider);
116 handlePdpHeartbeat(message, databaseProvider);
118 } catch (final PolicyPapException exp) {
119 LOGGER.error("Operation Failed", exp);
120 } catch (final Exception exp) {
121 if (isDuplicateKeyException(exp)) {
123 * this is to be expected, if multiple PAPs are processing the same
124 * heartbeat at a time, thus we log the exception at a trace level
125 * instead of an error level.
127 LOGGER.info("Failed updating PDP information for {} - may have been added by another PAP",
129 LOGGER.trace("Failed updating PDP information for {}", message.getName(), exp);
131 LOGGER.error("Failed connecting to database provider", exp);
138 * Determines if the exception indicates a duplicate key.
140 * @param thrown exception to check
141 * @return {@code true} if the exception occurred due to a duplicate key
143 protected static boolean isDuplicateKeyException(Throwable thrown) {
144 while (thrown != null) {
145 if (thrown instanceof SQLIntegrityConstraintViolationException) {
149 if (thrown instanceof EclipseLinkException) {
150 EclipseLinkException ele = (EclipseLinkException) thrown;
151 if (isDuplicateKeyException(ele.getInternalException())) {
156 thrown = thrown.getCause();
162 private void handlePdpRegistration(final PdpStatus message, final PolicyModelsProvider databaseProvider)
163 throws PfModelException, PolicyPapException {
164 if (!findAndUpdatePdpGroup(message, databaseProvider)) {
165 final var errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - ";
166 LOGGER.debug("{}{}", errorMessage, message);
167 throw new PolicyPapException(errorMessage + message);
171 private boolean findAndUpdatePdpGroup(final PdpStatus message, final PolicyModelsProvider databaseProvider)
172 throws PfModelException {
173 var pdpGroupFound = false;
174 final PdpGroupFilter filter =
175 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
177 final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
178 if (!pdpGroups.isEmpty()) {
179 pdpGroupFound = registerPdp(message, databaseProvider, pdpGroups.get(0));
181 return pdpGroupFound;
184 private boolean registerPdp(final PdpStatus message, final PolicyModelsProvider databaseProvider,
185 final PdpGroup finalizedPdpGroup) throws PfModelException {
186 Optional<PdpSubGroup> subGroup;
187 var pdpGroupFound = false;
188 subGroup = findPdpSubGroup(message, finalizedPdpGroup);
190 if (subGroup.isPresent()) {
191 policies = getToscaPolicies(subGroup.get(), databaseProvider);
192 policiesToBeDeployed = policies.stream().collect(Collectors
193 .toMap(ToscaPolicy::getIdentifier, policy -> policy));
194 policiesToBeUndeployed = null;
196 LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", finalizedPdpGroup, message);
197 Optional<Pdp> pdp = findPdpInstance(message, subGroup.get());
198 if (pdp.isPresent()) {
199 updatePdpHealthStatus(message, subGroup.get(), pdp.get(), finalizedPdpGroup, databaseProvider);
201 updatePdpSubGroup(finalizedPdpGroup, subGroup.get(), message, databaseProvider);
203 sendPdpMessage(finalizedPdpGroup.getName(), subGroup.get(), message.getName(), null, databaseProvider);
204 pdpGroupFound = true;
206 return pdpGroupFound;
209 private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message,
210 final PolicyModelsProvider databaseProvider) throws PfModelException {
212 final var pdpInstance = new Pdp();
213 pdpInstance.setInstanceId(message.getName());
214 pdpInstance.setPdpState(PdpState.ACTIVE);
215 pdpInstance.setHealthy(message.getHealthy());
216 pdpInstance.setMessage(message.getDescription());
217 pdpInstance.setLastUpdate(Instant.now());
218 pdpSubGroup.getPdpInstances().add(pdpInstance);
220 pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1);
222 databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
224 LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup.getName());
227 private void handlePdpHeartbeat(final PdpStatus message, final PolicyModelsProvider databaseProvider)
228 throws PfModelException {
230 final PdpGroupFilter filter =
231 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
232 final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
233 if (!pdpGroups.isEmpty()) {
234 var pdpGroup = pdpGroups.get(0);
235 Optional<PdpSubGroup> pdpSubgroup = findPdpSubGroup(message, pdpGroup);
236 if (pdpSubgroup.isPresent()) {
237 Optional<Pdp> pdpInstance = findPdpInstance(message, pdpSubgroup.get());
238 if (pdpInstance.isPresent()) {
239 processPdpDetails(message, pdpSubgroup.get(), pdpInstance.get(), pdpGroup, databaseProvider);
241 LOGGER.debug("PdpInstance not Found in DB. Sending Pdp for registration - {}", message);
242 registerPdp(message, databaseProvider, pdpGroup);
248 private Optional<PdpSubGroup> findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) {
249 PdpSubGroup pdpSubgroup = null;
250 for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) {
251 if (message.getPdpType().equals(subGroup.getPdpType())) {
252 pdpSubgroup = subGroup;
256 return Optional.ofNullable(pdpSubgroup);
259 private Optional<Pdp> findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) {
260 Pdp pdpInstance = null;
261 for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) {
262 if (pdpInstanceDetails.getInstanceId().equals(message.getName())) {
263 pdpInstance = pdpInstanceDetails;
267 return Optional.ofNullable(pdpInstance);
270 private void processPdpDetails(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
271 final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider)
272 throws PfModelException {
274 policies = getToscaPolicies(pdpSubGroup, databaseProvider);
276 Map<ToscaConceptIdentifier, ToscaPolicy> policyMap =
277 policies.stream().collect(Collectors.toMap(ToscaPolicy::getIdentifier, policy -> policy));
279 // policies that the PDP already has (-) all
280 policiesToBeUndeployed = message.getPolicies().stream().filter(policyId -> !policyMap.containsKey(policyId))
281 .collect(Collectors.toList());
283 // all (-) policies that the PDP already has
284 policiesToBeDeployed = policyMap;
285 policiesToBeDeployed.keySet().removeAll(message.getPolicies());
287 if (PdpState.TERMINATED.equals(message.getState())) {
288 processPdpTermination(pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
289 } else if (validatePdpDetails(message, pdpGroup, pdpSubGroup, pdpInstance)) {
290 LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance);
291 updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
293 if (savePdpStatistics) {
294 processPdpStatistics(message, pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
296 LOGGER.debug("Not processing PdpStatistics - {}", message.getStatistics());
299 LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance);
300 LOGGER.debug("Policy list in DB - {}. Policy list in heartbeat - {}", pdpSubGroup.getPolicies(),
301 message.getPolicies());
302 updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
303 sendPdpMessage(pdpGroup.getName(), pdpSubGroup, pdpInstance.getInstanceId(), pdpInstance.getPdpState(),
308 private void processPdpStatistics(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
309 final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider) throws PfModelException {
310 if (validatePdpStatisticsDetails(message, pdpInstance, pdpGroup, pdpSubGroup)) {
311 LOGGER.debug("PdpStatistics details are correct. Saving current statistics in DB - {}",
312 message.getStatistics());
313 createPdpStatistics(message.getStatistics(), databaseProvider);
315 LOGGER.debug("PdpStatistics details are not correct - {}", message.getStatistics());
319 private void processPdpTermination(final PdpSubGroup pdpSubGroup, final Pdp pdpInstance, final PdpGroup pdpGroup,
320 final PolicyModelsProvider databaseProvider) throws PfModelException {
321 pdpSubGroup.getPdpInstances().remove(pdpInstance);
322 pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() - 1);
323 databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
325 LOGGER.debug("Deleted PdpInstance - {} belonging to PdpSubGroup - {} and PdpGroup - {}", pdpInstance,
326 pdpSubGroup, pdpGroup);
329 private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup,
330 final Pdp pdpInstanceDetails) {
332 * "EqualsBuilder" is a bit of a misnomer, as it uses containsAll() to check policies. Nevertheless, it does the
333 * job and provides a convenient way to build a bunch of comparisons.
335 return new EqualsBuilder().append(message.getPdpGroup(), pdpGroup.getName())
336 .append(message.getPdpSubgroup(), subGroup.getPdpType())
337 .append(message.getPdpType(), subGroup.getPdpType())
338 .append(message.getState(), pdpInstanceDetails.getPdpState())
339 .append(message.getPolicies().containsAll(subGroup.getPolicies()), true)
340 .append(subGroup.getPolicies().containsAll(message.getPolicies()), true).build();
343 private boolean validatePdpStatisticsDetails(final PdpStatus message, final Pdp pdpInstanceDetails,
344 final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup) {
345 if (message.getStatistics() != null) {
346 return new EqualsBuilder()
347 .append(message.getStatistics().getPdpInstanceId(), pdpInstanceDetails.getInstanceId())
348 .append(message.getStatistics().getPdpGroupName(), pdpGroup.getName())
349 .append(message.getStatistics().getPdpSubGroupName(), pdpSubGroup.getPdpType())
350 .append(message.getStatistics().getPolicyDeployCount() < 0, false)
351 .append(message.getStatistics().getPolicyDeployFailCount() < 0, false)
352 .append(message.getStatistics().getPolicyDeploySuccessCount() < 0, false)
353 .append(message.getStatistics().getPolicyUndeployCount() < 0, false)
354 .append(message.getStatistics().getPolicyUndeployFailCount() < 0, false)
355 .append(message.getStatistics().getPolicyUndeploySuccessCount() < 0, false)
356 .append(message.getStatistics().getPolicyExecutedCount() < 0, false)
357 .append(message.getStatistics().getPolicyExecutedFailCount() < 0, false)
358 .append(message.getStatistics().getPolicyExecutedSuccessCount() < 0, false).build();
360 LOGGER.debug("PdpStatistics is null");
365 private void updatePdpHealthStatus(final PdpStatus message, final PdpSubGroup pdpSubgroup, final Pdp pdpInstance,
366 final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider) throws PfModelException {
367 pdpInstance.setHealthy(message.getHealthy());
368 pdpInstance.setMessage(message.getDescription());
369 pdpInstance.setLastUpdate(Instant.now());
370 databaseProvider.updatePdp(pdpGroup.getName(), pdpSubgroup.getPdpType(), pdpInstance);
372 LOGGER.debug("Updated Pdp in DB - {}", pdpInstance);
375 private void createPdpStatistics(final PdpStatistics pdpStatistics, final PolicyModelsProvider databaseProvider)
376 throws PfModelException {
377 databaseProvider.createPdpStatistics(Arrays.asList(pdpStatistics));
378 LOGGER.debug("Created PdpStatistics in DB - {}", pdpStatistics);
381 private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId,
382 final PdpState pdpState, final PolicyModelsProvider databaseProvider)
383 throws PfModelException {
384 final List<ToscaPolicy> polsToBeDeployed = new LinkedList<>(policiesToBeDeployed.values());
385 final var pdpUpdatemessage =
386 createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId,
387 polsToBeDeployed, policiesToBeUndeployed);
388 final var pdpStateChangeMessage =
389 createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState);
390 updateDeploymentStatus(pdpGroupName, subGroup.getPdpType(), pdpInstanceId, pdpStateChangeMessage.getState(),
391 databaseProvider, pdpUpdatemessage.getPoliciesToBeDeployed());
393 requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage);
394 LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage);
395 LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage);