b8e361e1e6e8b2b99a5fe8f8902112150dcce341
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019-2021 Nordix Foundation.
4  *  Modifications Copyright (C) 2021-2022 Bell Canada. All rights reserved.
5  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.services.onappf.handler;
24
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Set;
30 import java.util.stream.Collectors;
31 import org.onap.policy.apex.service.engine.main.ApexPolicyStatisticsManager;
32 import org.onap.policy.apex.services.onappf.ApexStarterConstants;
33 import org.onap.policy.apex.services.onappf.comm.PdpStatusPublisher;
34 import org.onap.policy.apex.services.onappf.exception.ApexStarterException;
35 import org.onap.policy.common.endpoints.event.comm.TopicSink;
36 import org.onap.policy.common.utils.services.Registry;
37 import org.onap.policy.models.pdp.concepts.PdpResponseDetails;
38 import org.onap.policy.models.pdp.concepts.PdpStatus;
39 import org.onap.policy.models.pdp.concepts.PdpUpdate;
40 import org.onap.policy.models.pdp.enums.PdpResponseStatus;
41 import org.onap.policy.models.pdp.enums.PdpState;
42 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
43 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
44 import org.onap.policy.models.tosca.authorative.concepts.ToscaWithTypeAndObjectProperties;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * This class supports the handling of pdp update messages.
50  *
51  * @author Ajith Sreekumar (ajith.sreekumar@est.tech)
52  */
53 public class PdpUpdateMessageHandler {
54
55     private static final Logger LOGGER = LoggerFactory.getLogger(PdpUpdateMessageHandler.class);
56
57     /**
58      * Method which handles a pdp update event from PAP.
59      *
60      * @param pdpUpdateMsg pdp update message
61      */
62     public void handlePdpUpdateEvent(final PdpUpdate pdpUpdateMsg) {
63         final var pdpMessageHandler = new PdpMessageHandler();
64         final var pdpStatusContext = Registry.get(ApexStarterConstants.REG_PDP_STATUS_OBJECT, PdpStatus.class);
65         PdpResponseDetails pdpResponseDetails = null;
66         if (pdpUpdateMsg.appliesTo(pdpStatusContext.getName(), pdpStatusContext.getPdpGroup(),
67                 pdpStatusContext.getPdpSubgroup())) {
68             if (checkIfAlreadyHandled(pdpUpdateMsg, pdpStatusContext)) {
69                 pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
70                         PdpResponseStatus.SUCCESS, "Pdp already updated");
71             } else {
72                 pdpResponseDetails = handlePdpUpdate(pdpUpdateMsg, pdpMessageHandler, pdpStatusContext);
73             }
74             final var pdpStatusPublisherTemp =
75                     Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER, PdpStatusPublisher.class);
76             final var pdpStatus = pdpMessageHandler.createPdpStatusFromContext();
77             pdpStatus.setResponse(pdpResponseDetails);
78             pdpStatus.setDescription("Pdp status response message for PdpUpdate");
79             pdpStatusPublisherTemp.send(pdpStatus);
80         }
81     }
82
83     /**
84      * Method to do pdp update.
85      *
86      * @param pdpUpdateMsg the pdp update message
87      * @param pdpMessageHandler the message handler
88      * @param pdpStatusContext the pdp status in memory
89      * @return pdpResponseDetails the pdp response
90      */
91     private PdpResponseDetails handlePdpUpdate(final PdpUpdate pdpUpdateMsg, final PdpMessageHandler pdpMessageHandler,
92         final PdpStatus pdpStatusContext) {
93         PdpResponseDetails pdpResponseDetails = null;
94         final var pdpStatusPublisher =
95                         Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER, PdpStatusPublisher.class);
96         if (null != pdpUpdateMsg.getPdpHeartbeatIntervalMs() && pdpUpdateMsg.getPdpHeartbeatIntervalMs() > 0
97                 && pdpStatusPublisher.getInterval() != pdpUpdateMsg.getPdpHeartbeatIntervalMs()) {
98             updateInterval(pdpUpdateMsg.getPdpHeartbeatIntervalMs());
99         }
100         pdpStatusContext.setPdpGroup(pdpUpdateMsg.getPdpGroup());
101         pdpStatusContext.setPdpSubgroup(pdpUpdateMsg.getPdpSubgroup());
102         @SuppressWarnings("unchecked")
103         List<ToscaPolicy> policies = Registry.getOrDefault(ApexStarterConstants.REG_APEX_TOSCA_POLICY_LIST,
104                 List.class, new ArrayList<>());
105         policies.addAll(pdpUpdateMsg.getPoliciesToBeDeployed());
106         policies.removeIf(policy -> pdpUpdateMsg.getPoliciesToBeUndeployed().contains(policy.getIdentifier()));
107         Set<ToscaConceptIdentifier> policiesInDeployment = policies.stream().map(ToscaPolicy::getIdentifier)
108                 .collect(Collectors.toSet());
109         policiesInDeployment.removeAll(pdpUpdateMsg.getPoliciesToBeUndeployed());
110         pdpStatusContext.setPolicies(new ArrayList<>(policiesInDeployment));
111         Registry.registerOrReplace(ApexStarterConstants.REG_APEX_TOSCA_POLICY_LIST,
112                 policies);
113         if (pdpStatusContext.getState().equals(PdpState.ACTIVE)) {
114             pdpResponseDetails = startOrStopApexEngineBasedOnPolicies(pdpUpdateMsg, pdpMessageHandler);
115
116             var apexEngineHandler =
117                 Registry.getOrDefault(ApexStarterConstants.REG_APEX_ENGINE_HANDLER, ApexEngineHandler.class, null);
118             // in hearbeat while in active state, only the policies which are running should be there.
119             // if some policy fails, that shouldn't go in the heartbeat.
120             // If no policies are running, then the policy list in the heartbeat can be empty
121             if (null != apexEngineHandler && apexEngineHandler.isApexEngineRunning()) {
122                 var runningPolicies = apexEngineHandler.getRunningPolicies();
123                 pdpStatusContext.setPolicies(runningPolicies);
124                 policies.removeIf(policy -> !runningPolicies.contains(policy.getIdentifier()));
125             } else {
126                 pdpStatusContext.setPolicies(Collections.emptyList());
127                 policies.clear();
128             }
129         }
130         if (null == pdpResponseDetails) {
131             pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
132                     PdpResponseStatus.SUCCESS, "Pdp update successful.");
133         }
134         return pdpResponseDetails;
135     }
136
137     /**
138      * Method to start or stop apex engine based on the list of policies received from pap. When current state is
139      * active, if PAP sends PdpUpdate with empty policies list, stop apex engine, or, if there is a change in policies,
140      * stop the current running policies and the deploy the new ones.
141      *
142      * @param pdpUpdateMsg the pdp update message from pap
143      * @param pdpMessageHandler pdp message handler
144      * @return pdpResponseDetails the pdp response
145      */
146     private PdpResponseDetails startOrStopApexEngineBasedOnPolicies(final PdpUpdate pdpUpdateMsg,
147             final PdpMessageHandler pdpMessageHandler) {
148         PdpResponseDetails pdpResponseDetails = null;
149         ApexEngineHandler apexEngineHandler = null;
150         try {
151             apexEngineHandler = Registry.get(ApexStarterConstants.REG_APEX_ENGINE_HANDLER);
152         } catch (final IllegalArgumentException e) {
153             LOGGER.debug("ApenEngineHandler not in registry.", e);
154         }
155         if (null != apexEngineHandler
156             && pdpUpdateMsg.getPoliciesToBeUndeployed().containsAll(apexEngineHandler.getRunningPolicies())
157             && pdpUpdateMsg.getPoliciesToBeDeployed().isEmpty()) {
158             pdpResponseDetails = stopApexEngineBasedOnPolicies(pdpUpdateMsg, pdpMessageHandler, apexEngineHandler);
159         } else {
160             pdpResponseDetails = startApexEngineBasedOnPolicies(pdpUpdateMsg, pdpMessageHandler, apexEngineHandler);
161         }
162         return pdpResponseDetails;
163     }
164
165     private PdpResponseDetails stopApexEngineBasedOnPolicies(final PdpUpdate pdpUpdateMsg,
166         final PdpMessageHandler pdpMessageHandler, ApexEngineHandler apexEngineHandler) {
167         PdpResponseDetails pdpResponseDetails = null;
168         if (null != apexEngineHandler && apexEngineHandler.isApexEngineRunning()) {
169             List<ToscaConceptIdentifier> runningPolicies = apexEngineHandler.getRunningPolicies();
170             try {
171                 apexEngineHandler.shutdown();
172                 runningPolicies = apexEngineHandler.getRunningPolicies();
173                 pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
174                     PdpResponseStatus.SUCCESS, "Pdp update successful. No policies are running.");
175             } catch (final ApexStarterException e) {
176                 LOGGER.error("Pdp update failed as the policies couldn't be undeployed.", e);
177                 pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
178                         PdpResponseStatus.FAIL, "Pdp update failed as the policies couldn't be undeployed.");
179             }
180             updateDeploymentCounts(runningPolicies, pdpUpdateMsg);
181         }
182         return pdpResponseDetails;
183     }
184
185     private PdpResponseDetails startApexEngineBasedOnPolicies(final PdpUpdate pdpUpdateMsg,
186         final PdpMessageHandler pdpMessageHandler, ApexEngineHandler apexEngineHandler) {
187         PdpResponseDetails pdpResponseDetails = null;
188         List<ToscaConceptIdentifier> runningPolicies = new ArrayList<>();
189         try {
190             if (null != apexEngineHandler && apexEngineHandler.isApexEngineRunning()) {
191                 apexEngineHandler.updateApexEngine(pdpUpdateMsg.getPoliciesToBeDeployed(),
192                     pdpUpdateMsg.getPoliciesToBeUndeployed());
193                 runningPolicies = apexEngineHandler.getRunningPolicies();
194             } else {
195                 apexEngineHandler = new ApexEngineHandler(pdpUpdateMsg.getPoliciesToBeDeployed());
196                 Registry.registerOrReplace(ApexStarterConstants.REG_APEX_ENGINE_HANDLER, apexEngineHandler);
197             }
198             if (apexEngineHandler.isApexEngineRunning()) {
199                 pdpResponseDetails =
200                     populateResponseForEngineInitiation(pdpUpdateMsg, pdpMessageHandler, apexEngineHandler);
201                 runningPolicies = apexEngineHandler.getRunningPolicies();
202             } else {
203                 pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
204                     PdpResponseStatus.FAIL, "Apex engine failed to start.");
205             }
206         } catch (final ApexStarterException e) {
207             LOGGER.error("Apex engine service running failed. ", e);
208             pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
209                     PdpResponseStatus.FAIL, "Apex engine service running failed. " + e.getMessage());
210         }
211         updateDeploymentCounts(runningPolicies, pdpUpdateMsg);
212         return pdpResponseDetails;
213     }
214
215     private PdpResponseDetails populateResponseForEngineInitiation(final PdpUpdate pdpUpdateMsg,
216         final PdpMessageHandler pdpMessageHandler, ApexEngineHandler apexEngineHandler) {
217         PdpResponseDetails pdpResponseDetails;
218         Set<ToscaConceptIdentifier> runningPolicies = new HashSet<>(apexEngineHandler.getRunningPolicies());
219         List<ToscaConceptIdentifier> policiesToBeDeployed =
220             pdpMessageHandler.getToscaPolicyIdentifiers(pdpUpdateMsg.getPoliciesToBeDeployed());
221         List<ToscaConceptIdentifier> policiesToBeUndeployed = pdpUpdateMsg.getPoliciesToBeUndeployed();
222         if (runningPolicies.containsAll(policiesToBeDeployed)
223             && !containsAny(runningPolicies, policiesToBeUndeployed)) {
224             var message = new StringBuilder("Apex engine started. ");
225             if (!policiesToBeDeployed.isEmpty()) {
226                 message.append("Deployed policies are: ");
227                 for (ToscaConceptIdentifier policy : policiesToBeDeployed) {
228                     message.append(policy.getName()).append(":").append(policy.getVersion()).append("  ");
229                 }
230             }
231             if (!policiesToBeUndeployed.isEmpty()) {
232                 message.append("Undeployed policies are: ");
233                 for (ToscaConceptIdentifier policy : policiesToBeUndeployed) {
234                     message.append(policy.getName()).append(":").append(policy.getVersion()).append("  ");
235                 }
236             }
237             pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
238                 PdpResponseStatus.SUCCESS, message.toString());
239         } else {
240             var message =
241                 new StringBuilder("Apex engine started. But, only the following polices are running - ");
242             for (ToscaConceptIdentifier policy : runningPolicies) {
243                 message.append(policy.getName()).append(":").append(policy.getVersion()).append("  ");
244             }
245             message.append(". Other policies failed execution. Please see the logs for more details.");
246             pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
247                 PdpResponseStatus.SUCCESS, message.toString());
248         }
249         return pdpResponseDetails;
250     }
251
252     /**
253      * Method checks if the Pdp update message is already handled by checking the values in the context.
254      *
255      * @param pdpUpdateMsg pdp update message received from pap
256      * @param pdpStatusContext values saved in context memory
257      * @return boolean flag which tells if the information is same or not
258      */
259     private boolean checkIfAlreadyHandled(final PdpUpdate pdpUpdateMsg, final PdpStatus pdpStatusContext) {
260         return null != pdpStatusContext.getPdpGroup()
261             && pdpStatusContext.getPdpGroup().equals(pdpUpdateMsg.getPdpGroup())
262             && null != pdpStatusContext.getPdpSubgroup()
263             && pdpStatusContext.getPdpSubgroup().equals(pdpUpdateMsg.getPdpSubgroup())
264             && null != pdpStatusContext.getPolicies()
265             && pdpStatusContext.getPolicies()
266                 .containsAll(new PdpMessageHandler().getToscaPolicyIdentifiers(pdpUpdateMsg.getPoliciesToBeDeployed()))
267             && !containsAny(new HashSet<>(pdpStatusContext.getPolicies()), pdpUpdateMsg.getPoliciesToBeUndeployed());
268     }
269
270     /**
271      * Method to update the time interval used by the timer task.
272      *
273      * @param interval time interval received in the pdp update message from pap
274      */
275     public void updateInterval(final long interval) {
276         final var pdpStatusPublisher =
277                         Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER, PdpStatusPublisher.class);
278         pdpStatusPublisher.terminate();
279         final List<TopicSink> topicSinks = Registry.get(ApexStarterConstants.REG_APEX_PDP_TOPIC_SINKS);
280         Registry.registerOrReplace(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER,
281                 new PdpStatusPublisher(topicSinks, interval));
282     }
283
284     /**
285      * Checks if one list contains any element of another.
286      *
287      * @param listToCheckWith list to check contents of
288      * @param listToCheckAgainst list to check against other list for similarities
289      * @return boolean flag which tells if lists share same elements or not
290      */
291     private boolean containsAny(Set<ToscaConceptIdentifier> listToCheckWith,
292             List<ToscaConceptIdentifier> listToCheckAgainst) {
293         return listToCheckAgainst.stream().anyMatch(listToCheckWith::contains);
294     }
295
296     /**
297      * Update count values for deployment actions (deploy and undeploy) when applicable.
298      * @param runningPolicies the policies running in apex engine
299      * @param pdpUpdateMsg the pdp update message from pap
300      */
301     private void updateDeploymentCounts(final List<ToscaConceptIdentifier> runningPolicies,
302         final PdpUpdate pdpUpdateMsg) {
303         final var statisticsManager = ApexPolicyStatisticsManager.getInstanceFromRegistry();
304         if (statisticsManager != null && runningPolicies != null) {
305             if (pdpUpdateMsg.getPoliciesToBeDeployed() != null && !pdpUpdateMsg.getPoliciesToBeDeployed().isEmpty()) {
306                 var policiesToDeploy = pdpUpdateMsg.getPoliciesToBeDeployed().stream()
307                     .map(ToscaWithTypeAndObjectProperties::getIdentifier).collect(Collectors.toList());
308
309                 var policiesSuccessfullyDeployed = new ArrayList<>(policiesToDeploy);
310                 policiesSuccessfullyDeployed.retainAll(runningPolicies);
311                 policiesSuccessfullyDeployed.forEach(policy -> statisticsManager.updatePolicyDeployCounter(true));
312
313                 var policiesFailedToDeploy =  new ArrayList<>(policiesToDeploy);
314                 policiesFailedToDeploy.removeIf(runningPolicies::contains);
315                 policiesFailedToDeploy.forEach(policy -> statisticsManager.updatePolicyDeployCounter(false));
316             }
317
318             var policiesToUndeploy = pdpUpdateMsg.getPoliciesToBeUndeployed();
319             if (policiesToUndeploy != null && !policiesToUndeploy.isEmpty()) {
320                 var policiesFailedToUndeploy = new ArrayList<>(policiesToUndeploy);
321                 policiesFailedToUndeploy.retainAll(runningPolicies);
322                 policiesFailedToUndeploy.forEach(policy -> statisticsManager.updatePolicyUndeployCounter(false));
323
324                 var policiesSuccessfullyUndeployed =  new ArrayList<>(policiesToUndeploy);
325                 policiesSuccessfullyUndeployed.removeIf(runningPolicies::contains);
326                 policiesSuccessfullyUndeployed.forEach(policy -> statisticsManager.updatePolicyUndeployCounter(true));
327             }
328         }
329     }
330 }