09fb92dd366b4f42fde912ce5c3d54cf775b28c1
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2019-2021 Nordix Foundation.
4  *  Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.services.onappf.handler;
23
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Set;
29 import java.util.stream.Collectors;
30 import org.onap.policy.apex.service.engine.main.ApexPolicyStatisticsManager;
31 import org.onap.policy.apex.services.onappf.ApexStarterConstants;
32 import org.onap.policy.apex.services.onappf.comm.PdpStatusPublisher;
33 import org.onap.policy.apex.services.onappf.exception.ApexStarterException;
34 import org.onap.policy.common.endpoints.event.comm.TopicSink;
35 import org.onap.policy.common.utils.services.Registry;
36 import org.onap.policy.models.pdp.concepts.PdpResponseDetails;
37 import org.onap.policy.models.pdp.concepts.PdpStatus;
38 import org.onap.policy.models.pdp.concepts.PdpUpdate;
39 import org.onap.policy.models.pdp.enums.PdpResponseStatus;
40 import org.onap.policy.models.pdp.enums.PdpState;
41 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
42 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * This class supports the handling of pdp update messages.
48  *
49  * @author Ajith Sreekumar (ajith.sreekumar@est.tech)
50  */
51 public class PdpUpdateMessageHandler {
52
53     private static final Logger LOGGER = LoggerFactory.getLogger(PdpUpdateMessageHandler.class);
54
55     /**
56      * Method which handles a pdp update event from PAP.
57      *
58      * @param pdpUpdateMsg pdp update message
59      */
60     public void handlePdpUpdateEvent(final PdpUpdate pdpUpdateMsg) {
61         final PdpMessageHandler pdpMessageHandler = new PdpMessageHandler();
62         final PdpStatus pdpStatusContext = Registry.get(ApexStarterConstants.REG_PDP_STATUS_OBJECT, PdpStatus.class);
63         PdpResponseDetails pdpResponseDetails = null;
64         if (pdpUpdateMsg.appliesTo(pdpStatusContext.getName(), pdpStatusContext.getPdpGroup(),
65                 pdpStatusContext.getPdpSubgroup())) {
66             if (checkIfAlreadyHandled(pdpUpdateMsg, pdpStatusContext)) {
67                 pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
68                         PdpResponseStatus.SUCCESS, "Pdp already updated");
69             } else {
70                 pdpResponseDetails = handlePdpUpdate(pdpUpdateMsg, pdpMessageHandler, pdpStatusContext);
71             }
72             final PdpStatusPublisher pdpStatusPublisherTemp =
73                     Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER);
74             final PdpStatus pdpStatus = pdpMessageHandler.createPdpStatusFromContext();
75             pdpStatus.setResponse(pdpResponseDetails);
76             pdpStatus.setDescription("Pdp status response message for PdpUpdate");
77             pdpStatusPublisherTemp.send(pdpStatus);
78         }
79     }
80
81     /**
82      * Method to do pdp update.
83      *
84      * @param pdpUpdateMsg the pdp update message
85      * @param pdpMessageHandler the message handler
86      * @param pdpStatusContext the pdp status in memory
87      * @return pdpResponseDetails the pdp response
88      */
89     private PdpResponseDetails handlePdpUpdate(final PdpUpdate pdpUpdateMsg, final PdpMessageHandler pdpMessageHandler,
90         final PdpStatus pdpStatusContext) {
91         PdpResponseDetails pdpResponseDetails = null;
92         final PdpStatusPublisher pdpStatusPublisher = Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER);
93         if (null != pdpUpdateMsg.getPdpHeartbeatIntervalMs() && pdpUpdateMsg.getPdpHeartbeatIntervalMs() > 0
94                 && pdpStatusPublisher.getInterval() != pdpUpdateMsg.getPdpHeartbeatIntervalMs()) {
95             updateInterval(pdpUpdateMsg.getPdpHeartbeatIntervalMs());
96         }
97         pdpStatusContext.setPdpGroup(pdpUpdateMsg.getPdpGroup());
98         pdpStatusContext.setPdpSubgroup(pdpUpdateMsg.getPdpSubgroup());
99         @SuppressWarnings("unchecked")
100         List<ToscaPolicy> policies = Registry.getOrDefault(ApexStarterConstants.REG_APEX_TOSCA_POLICY_LIST,
101                 List.class, new ArrayList<>());
102         policies.addAll(pdpUpdateMsg.getPoliciesToBeDeployed());
103         Set<ToscaConceptIdentifier> policiesInDeployment = policies.stream().map(ToscaPolicy::getIdentifier)
104                 .collect(Collectors.toSet());
105         policiesInDeployment.removeAll(pdpUpdateMsg.getPoliciesToBeUndeployed());
106         pdpStatusContext.setPolicies(new ArrayList<>(policiesInDeployment));
107         Registry.registerOrReplace(ApexStarterConstants.REG_APEX_TOSCA_POLICY_LIST,
108                 policies);
109         if (pdpStatusContext.getState().equals(PdpState.ACTIVE)) {
110             pdpResponseDetails = startOrStopApexEngineBasedOnPolicies(pdpUpdateMsg, pdpMessageHandler);
111
112             ApexEngineHandler apexEngineHandler =
113                 Registry.getOrDefault(ApexStarterConstants.REG_APEX_ENGINE_HANDLER, ApexEngineHandler.class, null);
114             // in hearbeat while in active state, only the policies which are running should be there.
115             // if some policy fails, that shouldn't go in the heartbeat.
116             // If no policies are running, then the policy list in the heartbeat can be empty
117             if (null != apexEngineHandler && apexEngineHandler.isApexEngineRunning()) {
118                 pdpStatusContext.setPolicies(apexEngineHandler.getRunningPolicies());
119             } else {
120                 pdpStatusContext.setPolicies(Collections.emptyList());
121             }
122         }
123         if (null == pdpResponseDetails) {
124             pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
125                     PdpResponseStatus.SUCCESS, "Pdp update successful.");
126         }
127         return pdpResponseDetails;
128     }
129
130     /**
131      * Method to start or stop apex engine based on the list of policies received from pap. When current state is
132      * active, if PAP sends PdpUpdate with empty policies list, stop apex engine, or, if there is a change in policies,
133      * stop the current running policies and the deploy the new ones.
134      *
135      * @param pdpUpdateMsg the pdp update message from pap
136      * @param pdpMessageHandler pdp message handler
137      * @return pdpResponseDetails the pdp response
138      */
139     private PdpResponseDetails startOrStopApexEngineBasedOnPolicies(final PdpUpdate pdpUpdateMsg,
140             final PdpMessageHandler pdpMessageHandler) {
141         PdpResponseDetails pdpResponseDetails = null;
142         ApexEngineHandler apexEngineHandler = null;
143         try {
144             apexEngineHandler = Registry.get(ApexStarterConstants.REG_APEX_ENGINE_HANDLER);
145         } catch (final IllegalArgumentException e) {
146             LOGGER.debug("ApenEngineHandler not in registry.", e);
147         }
148         if (null != apexEngineHandler
149             && pdpUpdateMsg.getPoliciesToBeUndeployed().containsAll(apexEngineHandler.getRunningPolicies())
150             && pdpUpdateMsg.getPoliciesToBeDeployed().isEmpty()) {
151             pdpResponseDetails = stopApexEngineBasedOnPolicies(pdpUpdateMsg, pdpMessageHandler, apexEngineHandler);
152         } else {
153             pdpResponseDetails = startApexEngineBasedOnPolicies(pdpUpdateMsg, pdpMessageHandler, apexEngineHandler);
154         }
155         return pdpResponseDetails;
156     }
157
158     private PdpResponseDetails stopApexEngineBasedOnPolicies(final PdpUpdate pdpUpdateMsg,
159         final PdpMessageHandler pdpMessageHandler, ApexEngineHandler apexEngineHandler) {
160         PdpResponseDetails pdpResponseDetails = null;
161         if (null != apexEngineHandler && apexEngineHandler.isApexEngineRunning()) {
162             try {
163                 apexEngineHandler.shutdown();
164                 pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
165                     PdpResponseStatus.SUCCESS, "Pdp update successful. No policies are running.");
166             } catch (final ApexStarterException e) {
167                 LOGGER.error("Pdp update failed as the policies couldn't be undeployed.", e);
168                 pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
169                         PdpResponseStatus.FAIL, "Pdp update failed as the policies couldn't be undeployed.");
170             }
171             updateDeploymentCounts(pdpUpdateMsg, pdpResponseDetails);
172         }
173         return pdpResponseDetails;
174     }
175
176     private PdpResponseDetails startApexEngineBasedOnPolicies(final PdpUpdate pdpUpdateMsg,
177         final PdpMessageHandler pdpMessageHandler, ApexEngineHandler apexEngineHandler) {
178         PdpResponseDetails pdpResponseDetails = null;
179
180         try {
181             if (null != apexEngineHandler && apexEngineHandler.isApexEngineRunning()) {
182                 apexEngineHandler.updateApexEngine(pdpUpdateMsg.getPoliciesToBeDeployed(),
183                     pdpUpdateMsg.getPoliciesToBeUndeployed());
184             } else {
185                 apexEngineHandler = new ApexEngineHandler(pdpUpdateMsg.getPoliciesToBeDeployed());
186                 Registry.registerOrReplace(ApexStarterConstants.REG_APEX_ENGINE_HANDLER, apexEngineHandler);
187             }
188             if (apexEngineHandler.isApexEngineRunning()) {
189                 pdpResponseDetails =
190                     populateResponseForEngineInitiation(pdpUpdateMsg, pdpMessageHandler, apexEngineHandler);
191             } else {
192                 pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
193                     PdpResponseStatus.FAIL, "Apex engine failed to start.");
194             }
195         } catch (final ApexStarterException e) {
196             LOGGER.error("Apex engine service running failed. ", e);
197             pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
198                     PdpResponseStatus.FAIL, "Apex engine service running failed. " + e.getMessage());
199         }
200         updateDeploymentCounts(pdpUpdateMsg, pdpResponseDetails);
201         return pdpResponseDetails;
202     }
203
204     private PdpResponseDetails populateResponseForEngineInitiation(final PdpUpdate pdpUpdateMsg,
205         final PdpMessageHandler pdpMessageHandler, ApexEngineHandler apexEngineHandler) {
206         PdpResponseDetails pdpResponseDetails;
207         Set<ToscaConceptIdentifier> runningPolicies = new HashSet<>(apexEngineHandler.getRunningPolicies());
208         List<ToscaConceptIdentifier> policiesToBeDeployed =
209             pdpMessageHandler.getToscaPolicyIdentifiers(pdpUpdateMsg.getPoliciesToBeDeployed());
210         List<ToscaConceptIdentifier> policiesToBeUndeployed = pdpUpdateMsg.getPoliciesToBeUndeployed();
211         if (runningPolicies.containsAll(policiesToBeDeployed)
212             && !containsAny(runningPolicies, policiesToBeUndeployed)) {
213             StringBuilder message = new StringBuilder("Apex engine started. ");
214             if (!policiesToBeDeployed.isEmpty()) {
215                 message.append("Deployed policies are: ");
216                 for (ToscaConceptIdentifier policy : policiesToBeDeployed) {
217                     message.append(policy.getName()).append(":").append(policy.getVersion()).append("  ");
218                 }
219             }
220             if (!policiesToBeUndeployed.isEmpty()) {
221                 message.append("Undeployed policies are: ");
222                 for (ToscaConceptIdentifier policy : policiesToBeUndeployed) {
223                     message.append(policy.getName()).append(":").append(policy.getVersion()).append("  ");
224                 }
225             }
226             pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
227                 PdpResponseStatus.SUCCESS, message.toString());
228         } else {
229             StringBuilder message =
230                 new StringBuilder("Apex engine started. But, only the following polices are running - ");
231             for (ToscaConceptIdentifier policy : runningPolicies) {
232                 message.append(policy.getName()).append(":").append(policy.getVersion()).append("  ");
233             }
234             message.append(". Other policies failed execution. Please see the logs for more details.");
235             pdpResponseDetails = pdpMessageHandler.createPdpResonseDetails(pdpUpdateMsg.getRequestId(),
236                 PdpResponseStatus.SUCCESS, message.toString());
237         }
238         return pdpResponseDetails;
239     }
240
241     /**
242      * Method checks if the Pdp update message is already handled by checking the values in the context.
243      *
244      * @param pdpUpdateMsg pdp update message received from pap
245      * @param pdpStatusContext values saved in context memory
246      * @return boolean flag which tells if the information is same or not
247      */
248     private boolean checkIfAlreadyHandled(final PdpUpdate pdpUpdateMsg, final PdpStatus pdpStatusContext) {
249         return null != pdpStatusContext.getPdpGroup()
250             && pdpStatusContext.getPdpGroup().equals(pdpUpdateMsg.getPdpGroup())
251             && null != pdpStatusContext.getPdpSubgroup()
252             && pdpStatusContext.getPdpSubgroup().equals(pdpUpdateMsg.getPdpSubgroup())
253             && null != pdpStatusContext.getPolicies()
254             && pdpStatusContext.getPolicies()
255                 .containsAll(new PdpMessageHandler().getToscaPolicyIdentifiers(pdpUpdateMsg.getPoliciesToBeDeployed()))
256             && !containsAny(new HashSet<>(pdpStatusContext.getPolicies()), pdpUpdateMsg.getPoliciesToBeUndeployed());
257     }
258
259     /**
260      * Method to update the time interval used by the timer task.
261      *
262      * @param interval time interval received in the pdp update message from pap
263      */
264     public void updateInterval(final long interval) {
265         final PdpStatusPublisher pdpStatusPublisher = Registry.get(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER);
266         pdpStatusPublisher.terminate();
267         final List<TopicSink> topicSinks = Registry.get(ApexStarterConstants.REG_APEX_PDP_TOPIC_SINKS);
268         Registry.registerOrReplace(ApexStarterConstants.REG_PDP_STATUS_PUBLISHER,
269                 new PdpStatusPublisher(topicSinks, interval));
270     }
271
272     /**
273      * Checks if one list contains any element of another.
274      *
275      * @param listToCheckWith list to check contents of
276      * @param listToCheckAgainst list to check against other list for similarities
277      * @return boolean flag which tells if lists share same elements or not
278      */
279     private boolean containsAny(Set<ToscaConceptIdentifier> listToCheckWith,
280             List<ToscaConceptIdentifier> listToCheckAgainst) {
281         return listToCheckAgainst.stream().anyMatch(listToCheckWith::contains);
282     }
283
284     /**
285      * Update count values for deployment actions (deploy and undeploy) when applicable.
286      * @param pdpUpdateMsg the pdp update message from pap
287      * @param pdpResponseDetails the pdp response
288      */
289     private void updateDeploymentCounts(final PdpUpdate pdpUpdateMsg, PdpResponseDetails pdpResponseDetails) {
290         final ApexPolicyStatisticsManager statisticsManager =
291                 ApexPolicyStatisticsManager.getInstanceFromRegistry();
292
293         if (statisticsManager != null) {
294             if (pdpUpdateMsg.getPoliciesToBeDeployed() != null && !pdpUpdateMsg.getPoliciesToBeDeployed().isEmpty()) {
295                 statisticsManager.updatePolicyDeployCounter(
296                         pdpResponseDetails.getResponseStatus() == PdpResponseStatus.SUCCESS);
297             }
298
299             if (pdpUpdateMsg.getPoliciesToBeUndeployed() != null
300                     && !pdpUpdateMsg.getPoliciesToBeUndeployed().isEmpty()) {
301                 statisticsManager.updatePolicyUndeployCounter(
302                         pdpResponseDetails.getResponseStatus() == PdpResponseStatus.SUCCESS);
303             }
304         }
305     }
306 }