2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2021 Nordix Foundation.
4 * Modifications Copyright (C) 2019-2021 AT&T Intellectual Property.
5 * Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.pap.main.comm;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.LinkedList;
28 import java.util.List;
30 import java.util.Optional;
31 import java.util.concurrent.TimeUnit;
32 import java.util.stream.Collectors;
33 import org.apache.commons.lang3.builder.EqualsBuilder;
34 import org.onap.policy.common.utils.services.Registry;
35 import org.onap.policy.models.base.PfModelException;
36 import org.onap.policy.models.pdp.concepts.Pdp;
37 import org.onap.policy.models.pdp.concepts.PdpGroup;
38 import org.onap.policy.models.pdp.concepts.PdpGroupFilter;
39 import org.onap.policy.models.pdp.concepts.PdpStatistics;
40 import org.onap.policy.models.pdp.concepts.PdpStatus;
41 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
42 import org.onap.policy.models.pdp.enums.PdpState;
43 import org.onap.policy.models.provider.PolicyModelsProvider;
44 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
45 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
46 import org.onap.policy.pap.main.PapConstants;
47 import org.onap.policy.pap.main.PolicyPapException;
48 import org.onap.policy.pap.main.parameters.PdpParameters;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
54 * Handler for PDP Status messages which either represent registration or heart beat.
56 * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
58 public class PdpStatusMessageHandler extends PdpMessageGenerator {
59 private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class);
61 private final PdpParameters params;
64 * List to store policies present in db.
66 List<ToscaPolicy> policies = new LinkedList<>();
69 * List to store policies to be deployed (heartbeat).
71 Map<ToscaConceptIdentifier, ToscaPolicy> policiesToBeDeployed = new HashMap<>();
74 * List to store policies to be undeployed (heartbeat).
76 List<ToscaConceptIdentifier> policiesToBeUndeployed = new LinkedList<>();
79 * Constructs the object.
81 * @param params PDP parameters
83 public PdpStatusMessageHandler(PdpParameters params) {
89 * Handles the PdpStatus message coming from various PDP's.
91 * @param message the PdpStatus message
93 public void handlePdpStatus(final PdpStatus message) {
94 long diffms = System.currentTimeMillis() - message.getTimestampMs();
95 if (diffms > params.getMaxMessageAgeMs()) {
96 long diffsec = TimeUnit.SECONDS.convert(diffms, TimeUnit.MILLISECONDS);
97 LOGGER.info("discarding status message from {} age {}s", message.getName(), diffsec);
101 synchronized (updateLock) {
102 try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) {
103 if (message.getPdpSubgroup() == null) {
104 handlePdpRegistration(message, databaseProvider);
106 handlePdpHeartbeat(message, databaseProvider);
109 * Indicate that a heart beat was received from the PDP. This is invoked only if handleXxx() does not
110 * throw an exception.
112 if (message.getName() != null) {
113 final var pdpTracker = (PdpTracker) Registry.get(PapConstants.REG_PDP_TRACKER);
114 pdpTracker.add(message.getName());
116 } catch (final PolicyPapException exp) {
117 LOGGER.error("Operation Failed", exp);
118 } catch (final Exception exp) {
119 LOGGER.error("Failed connecting to database provider", exp);
124 private void handlePdpRegistration(final PdpStatus message, final PolicyModelsProvider databaseProvider)
125 throws PfModelException, PolicyPapException {
126 if (!findAndUpdatePdpGroup(message, databaseProvider)) {
127 final var errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - ";
128 LOGGER.debug("{}{}", errorMessage, message);
129 throw new PolicyPapException(errorMessage + message);
133 private boolean findAndUpdatePdpGroup(final PdpStatus message, final PolicyModelsProvider databaseProvider)
134 throws PfModelException {
135 var pdpGroupFound = false;
136 final PdpGroupFilter filter =
137 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
139 final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
140 if (!pdpGroups.isEmpty()) {
141 pdpGroupFound = registerPdp(message, databaseProvider, pdpGroups.get(0));
143 return pdpGroupFound;
146 private boolean registerPdp(final PdpStatus message, final PolicyModelsProvider databaseProvider,
147 final PdpGroup finalizedPdpGroup) throws PfModelException {
148 Optional<PdpSubGroup> subGroup;
149 var pdpGroupFound = false;
150 subGroup = findPdpSubGroup(message, finalizedPdpGroup);
152 if (subGroup.isPresent()) {
153 policies = getToscaPolicies(subGroup.get(), databaseProvider);
154 policiesToBeDeployed = policies.stream().collect(Collectors
155 .toMap(ToscaPolicy::getIdentifier, policy -> policy));
156 policiesToBeUndeployed = null;
158 LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", finalizedPdpGroup, message);
159 if (!findPdpInstance(message, subGroup.get()).isPresent()) {
160 updatePdpSubGroup(finalizedPdpGroup, subGroup.get(), message, databaseProvider);
162 sendPdpMessage(finalizedPdpGroup.getName(), subGroup.get(), message.getName(), null, databaseProvider);
163 pdpGroupFound = true;
165 return pdpGroupFound;
168 private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message,
169 final PolicyModelsProvider databaseProvider) throws PfModelException {
171 final var pdpInstance = new Pdp();
172 pdpInstance.setInstanceId(message.getName());
173 pdpInstance.setPdpState(PdpState.ACTIVE);
174 pdpInstance.setHealthy(message.getHealthy());
175 pdpInstance.setMessage(message.getDescription());
176 pdpSubGroup.getPdpInstances().add(pdpInstance);
178 pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1);
180 databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
182 LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup);
185 private void handlePdpHeartbeat(final PdpStatus message, final PolicyModelsProvider databaseProvider)
186 throws PfModelException {
188 final PdpGroupFilter filter =
189 PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build();
190 final List<PdpGroup> pdpGroups = databaseProvider.getFilteredPdpGroups(filter);
191 if (!pdpGroups.isEmpty()) {
192 var pdpGroup = pdpGroups.get(0);
193 Optional<PdpSubGroup> pdpSubgroup = findPdpSubGroup(message, pdpGroup);
194 if (pdpSubgroup.isPresent()) {
195 Optional<Pdp> pdpInstance = findPdpInstance(message, pdpSubgroup.get());
196 if (pdpInstance.isPresent()) {
197 processPdpDetails(message, pdpSubgroup.get(), pdpInstance.get(), pdpGroup, databaseProvider);
199 LOGGER.debug("PdpInstance not Found in DB. Sending Pdp for registration - {}", message);
200 registerPdp(message, databaseProvider, pdpGroup);
206 private Optional<PdpSubGroup> findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) {
207 PdpSubGroup pdpSubgroup = null;
208 for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) {
209 if (message.getPdpType().equals(subGroup.getPdpType())) {
210 pdpSubgroup = subGroup;
214 return Optional.ofNullable(pdpSubgroup);
217 private Optional<Pdp> findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) {
218 Pdp pdpInstance = null;
219 for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) {
220 if (pdpInstanceDetails.getInstanceId().equals(message.getName())) {
221 pdpInstance = pdpInstanceDetails;
225 return Optional.ofNullable(pdpInstance);
228 private void processPdpDetails(final PdpStatus message, final PdpSubGroup pdpSubGroup, final Pdp pdpInstance,
229 final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider)
230 throws PfModelException {
232 policies = getToscaPolicies(pdpSubGroup, databaseProvider);
234 policiesToBeDeployed =
235 policies.stream().collect(Collectors.toMap(ToscaPolicy::getIdentifier, policy -> policy));
236 // all (-) policies that the PDP already has
237 policiesToBeDeployed.keySet().removeAll(message.getPolicies());
239 // policies that the PDP already has (-) all
240 policiesToBeUndeployed = new LinkedList<>(message.getPolicies());
241 policiesToBeUndeployed.removeAll(policies.stream().map(ToscaPolicy::getIdentifier)
242 .collect(Collectors.toList()));
244 if (PdpState.TERMINATED.equals(message.getState())) {
245 processPdpTermination(pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
246 } else if (validatePdpDetails(message, pdpGroup, pdpSubGroup, pdpInstance)) {
247 LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance);
248 updatePdpHealthStatus(message, pdpSubGroup, pdpInstance, pdpGroup, databaseProvider);
250 if (validatePdpStatisticsDetails(message, pdpInstance, pdpGroup, pdpSubGroup)) {
251 LOGGER.debug("PdpStatistics details are correct. Saving current statistics in DB - {}",
252 message.getStatistics());
253 createPdpStatistics(message.getStatistics(), databaseProvider);
255 LOGGER.debug("PdpStatistics details are not correct - {}", message.getStatistics());
258 LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance);
259 LOGGER.debug("Policy list in DB - {}. Policy list in heartbeat - {}", pdpSubGroup.getPolicies(),
260 message.getPolicies());
261 sendPdpMessage(pdpGroup.getName(), pdpSubGroup, pdpInstance.getInstanceId(), pdpInstance.getPdpState(),
266 private void processPdpTermination(final PdpSubGroup pdpSubGroup, final Pdp pdpInstance, final PdpGroup pdpGroup,
267 final PolicyModelsProvider databaseProvider) throws PfModelException {
268 pdpSubGroup.getPdpInstances().remove(pdpInstance);
269 pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() - 1);
270 databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpSubGroup);
272 LOGGER.debug("Deleted PdpInstance - {} belonging to PdpSubGroup - {} and PdpGroup - {}", pdpInstance,
273 pdpSubGroup, pdpGroup);
276 private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup,
277 final Pdp pdpInstanceDetails) {
279 * "EqualsBuilder" is a bit of a misnomer, as it uses containsAll() to check policies. Nevertheless, it does the
280 * job and provides a convenient way to build a bunch of comparisons.
282 return new EqualsBuilder().append(message.getPdpGroup(), pdpGroup.getName())
283 .append(message.getPdpSubgroup(), subGroup.getPdpType())
284 .append(message.getPdpType(), subGroup.getPdpType())
285 .append(message.getState(), pdpInstanceDetails.getPdpState())
286 .append(message.getPolicies().containsAll(subGroup.getPolicies()), true)
287 .append(subGroup.getPolicies().containsAll(message.getPolicies()), true).build();
290 private boolean validatePdpStatisticsDetails(final PdpStatus message, final Pdp pdpInstanceDetails,
291 final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup) {
292 if (message.getStatistics() != null) {
293 return new EqualsBuilder()
294 .append(message.getStatistics().getPdpInstanceId(), pdpInstanceDetails.getInstanceId())
295 .append(message.getStatistics().getPdpGroupName(), pdpGroup.getName())
296 .append(message.getStatistics().getPdpSubGroupName(), pdpSubGroup.getPdpType())
297 .append(message.getStatistics().getPolicyDeployCount() < 0, false)
298 .append(message.getStatistics().getPolicyDeployFailCount() < 0, false)
299 .append(message.getStatistics().getPolicyDeploySuccessCount() < 0, false)
300 .append(message.getStatistics().getPolicyExecutedCount() < 0, false)
301 .append(message.getStatistics().getPolicyExecutedFailCount() < 0, false)
302 .append(message.getStatistics().getPolicyExecutedSuccessCount() < 0, false).build();
304 LOGGER.debug("PdpStatistics is null");
309 private void updatePdpHealthStatus(final PdpStatus message, final PdpSubGroup pdpSubgroup, final Pdp pdpInstance,
310 final PdpGroup pdpGroup, final PolicyModelsProvider databaseProvider) throws PfModelException {
311 pdpInstance.setHealthy(message.getHealthy());
312 databaseProvider.updatePdp(pdpGroup.getName(), pdpSubgroup.getPdpType(), pdpInstance);
314 LOGGER.debug("Updated Pdp in DB - {}", pdpInstance);
317 private void createPdpStatistics(final PdpStatistics pdpStatistics, final PolicyModelsProvider databaseProvider)
318 throws PfModelException {
319 databaseProvider.createPdpStatistics(Arrays.asList(pdpStatistics));
320 LOGGER.debug("Created PdpStatistics in DB - {}", pdpStatistics);
323 private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId,
324 final PdpState pdpState, final PolicyModelsProvider databaseProvider)
325 throws PfModelException {
326 final List<ToscaPolicy> polsToBeDeployed = new LinkedList<>(policiesToBeDeployed.values());
327 final var pdpUpdatemessage =
328 createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId,
329 polsToBeDeployed, policiesToBeUndeployed);
330 final var pdpStateChangeMessage =
331 createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState);
332 updateDeploymentStatus(pdpGroupName, subGroup.getPdpType(), pdpInstanceId, pdpStateChangeMessage.getState(),
333 databaseProvider, pdpUpdatemessage.getPoliciesToBeDeployed());
335 requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage);
336 LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage);
337 LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage);