2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2021,2023 Nordix Foundation.
4 * Modifications Copyright (C) 2019-2021 AT&T Intellectual Property.
5 * Modifications Copyright (C) 2021-2023 Bell Canada. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.pap.main.comm;
25 import java.sql.SQLIntegrityConstraintViolationException;
26 import java.time.Instant;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.LinkedList;
30 import java.util.List;
32 import java.util.Optional;
33 import java.util.concurrent.TimeUnit;
34 import java.util.stream.Collectors;
35 import org.apache.commons.lang3.builder.EqualsBuilder;
36 import org.onap.policy.models.base.PfModelException;
37 import org.onap.policy.models.pdp.concepts.Pdp;
38 import org.onap.policy.models.pdp.concepts.PdpGroup;
39 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
40 import org.onap.policy.models.pdp.concepts.PdpStatus;
41 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
42 import org.onap.policy.models.pdp.enums.PdpState;
43 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
44 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
45 import org.onap.policy.pap.main.PolicyPapException;
46 import org.onap.policy.pap.main.parameters.PapParameterGroup;
47 import org.onap.policy.pap.main.parameters.PdpParameters;
48 import org.onap.policy.pap.main.service.PdpGroupService;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import org.springframework.stereotype.Component;
55 * Handler for PDP Status messages which either represent registration or heart beat.
57 * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
61 public class PdpStatusMessageHandler extends PdpMessageGenerator {
62 private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class);
64 private final PdpParameters params;
66 private final PdpGroupService pdpGroupService;
69 * List to store policies present in db.
71 List<ToscaPolicy> policies = new LinkedList<>();
74 * List to store policies to be deployed (heartbeat).
76 Map<ToscaConceptIdentifier, ToscaPolicy> policiesToBeDeployed = new HashMap<>();
79 * List to store policies to be undeployed (heartbeat).
81 List<ToscaConceptIdentifier> policiesToBeUndeployed = new LinkedList<>();
84 * Constructs the object.
86 * @param parameterGroup the parameterGroup
87 * @param pdpGroupService the pdpGroupService
89 public PdpStatusMessageHandler(PapParameterGroup parameterGroup, PdpGroupService pdpGroupService) {
91 this.params = parameterGroup.getPdpParameters();
92 this.pdpGroupService = pdpGroupService;
96 * Handles the PdpStatus message coming from various PDP's.
98 * @param message the PdpStatus message
100 public void handlePdpStatus(final PdpStatus message) {
101 if (message.getPolicies() == null) {
102 message.setPolicies(Collections.emptyList());
105 long diffms = System.currentTimeMillis() - message.getTimestampMs();
106 if (diffms > params.getMaxMessageAgeMs()) {
107 long diffsec = TimeUnit.SECONDS.convert(diffms, TimeUnit.MILLISECONDS);
108 LOGGER.info("discarding status message from {} age {}s", message.getName(), diffsec);
112 synchronized (updateLock) {
114 if (message.getPdpSubgroup() == null) {
115 handlePdpRegistration(message);
117 handlePdpHeartbeat(message);
119 } catch (final PolicyPapException exp) {
120 LOGGER.error("Operation Failed", exp);
121 } catch (final Exception exp) {
122 if (isDuplicateKeyException(exp, Exception.class)) {
124 * this is to be expected, if multiple PAPs are processing the same
125 * heartbeat at a time, thus we log the exception at a trace level
126 * instead of an error level.
128 LOGGER.info("Failed updating PDP information for {} - may have been added by another PAP",
130 LOGGER.trace("Failed updating PDP information for {}", message.getName(), exp);
132 LOGGER.error("Failed connecting to database provider", exp);
139 * Determines if the exception indicates a duplicate key.
141 * @param thrown exception to check
142 * @param exceptionClazz the class to check against
143 * @return {@code true} if the exception occurred due to a duplicate key
145 protected static boolean isDuplicateKeyException(Throwable thrown, Class<? extends Throwable> exceptionClazz) {
146 while (thrown != null) {
147 if (thrown instanceof SQLIntegrityConstraintViolationException) {
151 if (exceptionClazz.isInstance(thrown) && isDuplicateKeyException(thrown.getCause(), exceptionClazz)) {
155 thrown = thrown.getCause();
161 private void handlePdpRegistration(final PdpStatus message) throws PfModelException, PolicyPapException {
162 if (!findAndUpdatePdpGroup(message)) {
163 final var errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - ";
164 LOGGER.debug("{}{}", errorMessage, message);
165 throw new PolicyPapException(errorMessage + message);
169 private boolean findAndUpdatePdpGroup(final PdpStatus message)
170 throws PfModelException {
171 var pdpGroupFound = false;
172 final PdpGroupFilter filter =
173 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
175 final List<PdpGroup> pdpGroups = pdpGroupService.getFilteredPdpGroups(filter);
176 if (!pdpGroups.isEmpty()) {
177 pdpGroupFound = registerPdp(message, pdpGroups.get(0));
179 return pdpGroupFound;
182 private boolean registerPdp(final PdpStatus message, final PdpGroup finalizedPdpGroup) throws PfModelException {
183 Optional<PdpSubGroup> subGroup;
184 var pdpGroupFound = false;
185 subGroup = findPdpSubGroup(message, finalizedPdpGroup);
187 if (subGroup.isPresent()) {
188 policies = getToscaPolicies(subGroup.get());
189 policiesToBeDeployed = policies.stream().collect(Collectors
190 .toMap(ToscaPolicy::getIdentifier, policy -> policy));
191 policiesToBeUndeployed = null;
193 LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", finalizedPdpGroup, message);
194 Optional<Pdp> pdp = findPdpInstance(message, subGroup.get());
195 if (pdp.isPresent()) {
196 updatePdpHealthStatus(message, subGroup.get(), pdp.get(), finalizedPdpGroup);
198 updatePdpSubGroup(finalizedPdpGroup, subGroup.get(), message);
200 sendPdpMessage(finalizedPdpGroup.getName(), subGroup.get(), message.getName(), null);
201 pdpGroupFound = true;
203 return pdpGroupFound;
206 private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message) {
208 final var pdpInstance = new Pdp();
209 pdpInstance.setInstanceId(message.getName());
210 pdpInstance.setPdpState(PdpState.ACTIVE);
211 pdpInstance.setHealthy(message.getHealthy());
212 pdpInstance.setMessage(message.getDescription());
213 pdpInstance.setLastUpdate(Instant.now());
214 pdpSubGroup.getPdpInstances().add(pdpInstance);
216 pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1);
218 pdpGroupService.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
220 LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup.getName());
223 private void handlePdpHeartbeat(final PdpStatus message) throws PfModelException {
225 final PdpGroupFilter filter =
226 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
227 final List<PdpGroup> pdpGroups = pdpGroupService.getFilteredPdpGroups(filter);
228 if (!pdpGroups.isEmpty()) {
229 var pdpGroup = pdpGroups.get(0);
230 Optional<PdpSubGroup> pdpSubgroup = findPdpSubGroup(message, pdpGroup);
231 if (pdpSubgroup.isPresent()) {
232 Optional<Pdp> pdpInstance = findPdpInstance(message, pdpSubgroup.get());
233 if (pdpInstance.isPresent()) {
234 processPdpDetails(message, pdpSubgroup.get(), pdpInstance.get(), pdpGroup);
236 LOGGER.debug("PdpInstance not Found in DB. Sending Pdp for registration - {}", message);
237 registerPdp(message, pdpGroup);
243 private Optional<PdpSubGroup> findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) {
244 PdpSubGroup pdpSubgroup = null;
245 for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) {
246 if (message.getPdpType().equals(subGroup.getPdpType())) {
247 pdpSubgroup = subGroup;
251 return Optional.ofNullable(pdpSubgroup);
254 private Optional<Pdp> findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) {
255 Pdp pdpInstance = null;
256 for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) {
257 if (pdpInstanceDetails.getInstanceId().equals(message.getName())) {
258 pdpInstance = pdpInstanceDetails;
262 return Optional.ofNullable(pdpInstance);
265 private void processPdpDetails(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
266 final PdpGroup pdpGroup) throws PfModelException {
268 policies = getToscaPolicies(pdpSubGroup);
270 Map<ToscaConceptIdentifier, ToscaPolicy> policyMap =
271 policies.stream().collect(Collectors.toMap(ToscaPolicy::getIdentifier, policy -> policy));
273 // policies that the PDP already has (-) all
274 policiesToBeUndeployed = message.getPolicies().stream().filter(policyId -> !policyMap.containsKey(policyId))
275 .collect(Collectors.toList());
277 // all (-) policies that the PDP already has
278 policiesToBeDeployed = policyMap;
279 policiesToBeDeployed.keySet().removeAll(message.getPolicies());
281 if (PdpState.TERMINATED.equals(message.getState())) {
282 processPdpTermination(pdpSubGroup, pdpInstance, pdpGroup);
283 } else if (validatePdpDetails(message, pdpGroup, pdpSubGroup, pdpInstance)) {
284 LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance);
285 updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup);
287 LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance);
288 LOGGER.debug("Policy list in DB - {}. Policy list in heartbeat - {}", pdpSubGroup.getPolicies(),
289 message.getPolicies());
290 updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup);
291 sendPdpMessage(pdpGroup.getName(), pdpSubGroup, pdpInstance.getInstanceId(), pdpInstance.getPdpState());
295 private void processPdpTermination(final PdpSubGroup pdpSubGroup, final Pdp pdpInstance, final PdpGroup pdpGroup) {
296 pdpSubGroup.getPdpInstances().remove(pdpInstance);
297 pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() - 1);
298 pdpGroupService.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
300 LOGGER.debug("Deleted PdpInstance - {} belonging to PdpSubGroup - {} and PdpGroup - {}", pdpInstance,
301 pdpSubGroup, pdpGroup);
304 private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup,
305 final Pdp pdpInstanceDetails) {
307 * "EqualsBuilder" is a bit of a misnomer, as it uses containsAll() to check policies. Nevertheless, it does the
308 * job and provides a convenient way to build a bunch of comparisons.
310 return new EqualsBuilder().append(message.getPdpGroup(), pdpGroup.getName())
311 .append(message.getPdpSubgroup(), subGroup.getPdpType())
312 .append(message.getPdpType(), subGroup.getPdpType())
313 .append(message.getState(), pdpInstanceDetails.getPdpState())
314 .append(message.getPolicies().containsAll(subGroup.getPolicies()), true)
315 .append(subGroup.getPolicies().containsAll(message.getPolicies()), true).build();
318 private void updatePdpHealthStatus(final PdpStatus message, final PdpSubGroup pdpSubgroup, final Pdp pdpInstance,
319 final PdpGroup pdpGroup) {
320 pdpInstance.setHealthy(message.getHealthy());
321 pdpInstance.setMessage(message.getDescription());
322 pdpInstance.setLastUpdate(Instant.now());
323 pdpGroupService.updatePdp(pdpGroup.getName(), pdpSubgroup.getPdpType(), pdpInstance);
325 LOGGER.debug("Updated Pdp in DB - {}", pdpInstance);
328 private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId,
329 final PdpState pdpState) {
330 final List<ToscaPolicy> polsToBeDeployed = new LinkedList<>(policiesToBeDeployed.values());
331 final var pdpUpdatemessage =
332 createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId,
333 polsToBeDeployed, policiesToBeUndeployed);
334 final var pdpStateChangeMessage =
335 createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState);
336 updateDeploymentStatus(pdpGroupName, subGroup.getPdpType(), pdpInstanceId, pdpStateChangeMessage.getState(),
337 pdpUpdatemessage.getPoliciesToBeDeployed());
339 requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage);
340 LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage);
341 LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage);