2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2021 Nordix Foundation.
4 * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
5 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.services.onappf.handler;
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.LinkedHashMap;
31 import java.util.List;
34 import java.util.stream.Collectors;
35 import org.onap.policy.apex.core.engine.TaskParameters;
36 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
37 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
38 import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInfo;
39 import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation;
40 import org.onap.policy.apex.model.basicmodel.service.ModelService;
41 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
42 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
43 import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema;
44 import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas;
45 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
46 import org.onap.policy.apex.model.eventmodel.concepts.AxEvent;
47 import org.onap.policy.apex.model.eventmodel.concepts.AxEvents;
48 import org.onap.policy.apex.model.policymodel.concepts.AxPolicies;
49 import org.onap.policy.apex.model.policymodel.concepts.AxPolicy;
50 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
51 import org.onap.policy.apex.model.policymodel.concepts.AxTask;
52 import org.onap.policy.apex.model.policymodel.concepts.AxTasks;
53 import org.onap.policy.apex.service.engine.main.ApexMain;
54 import org.onap.policy.apex.service.parameters.ApexParameterConstants;
55 import org.onap.policy.apex.service.parameters.ApexParameters;
56 import org.onap.policy.apex.services.onappf.exception.ApexStarterException;
57 import org.onap.policy.common.parameters.ParameterService;
58 import org.onap.policy.common.utils.coder.CoderException;
59 import org.onap.policy.common.utils.coder.StandardCoder;
60 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
61 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
62 import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
63 import org.onap.policy.models.tosca.authorative.concepts.ToscaTopologyTemplate;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
68 * This class instantiates the Apex Engine based on instruction from PAP.
70 * @author Ajith Sreekumar (ajith.sreekumar@est.tech)
72 public class ApexEngineHandler {
74 private static final Logger LOGGER = LoggerFactory.getLogger(ApexEngineHandler.class);
76 private Map<ToscaConceptIdentifier, ApexMain> apexMainMap = new LinkedHashMap<>();
79 * Constructs the object. Extracts the config and model files from each policy and instantiates the apex engine.
81 * @param policies the list of policies
82 * @throws ApexStarterException if the apex engine instantiation failed using the policies passed
84 public ApexEngineHandler(List<ToscaPolicy> policies) throws ApexStarterException {
85 LOGGER.debug("Starting apex engine.");
86 initiateApexEngineForPolicies(policies);
90 * Updates the Apex Engine with the policy model created from new list of policies.
93 * @param polsToDeploy list of policies to deploy which will be modified to remove running policies
94 * @param polsToUndeploy list of policies to undeploy which will be modified to remove policies not running
95 * @throws ApexStarterException if the apex engine instantiation failed using the policies passed
97 public void updateApexEngine(List<ToscaPolicy> polsToDeploy, List<ToscaConceptIdentifier> polsToUndeploy)
98 throws ApexStarterException {
99 Set<ToscaConceptIdentifier> runningPolicies = new HashSet<>(getRunningPolicies());
100 List<ToscaPolicy> policiesToDeploy = polsToDeploy;
101 policiesToDeploy.removeIf(p -> runningPolicies.contains(p.getIdentifier()));
102 List<ToscaConceptIdentifier> policiesToUnDeploy = polsToUndeploy;
103 policiesToUnDeploy.removeIf(p -> !runningPolicies.contains(p));
104 Map<ToscaConceptIdentifier, ApexMain> undeployedPoliciesMainMap = new LinkedHashMap<>();
105 policiesToUnDeploy.forEach(policyId -> {
106 var apexMain = apexMainMap.get(policyId);
109 undeployedPoliciesMainMap.put(policyId, apexMain);
110 apexMainMap.remove(policyId);
111 } catch (ApexException e) {
112 LOGGER.error("Shutting down policy {} failed", policyId, e);
115 if (!undeployedPoliciesMainMap.isEmpty() && !apexMainMap.isEmpty()) {
116 updateModelAndParameterServices(undeployedPoliciesMainMap);
118 if (!policiesToDeploy.isEmpty()) {
119 initiateApexEngineForPolicies(policiesToDeploy);
121 if (apexMainMap.isEmpty()) {
122 ModelService.clear();
123 ParameterService.clear();
128 * Clear the corresponding items from ModelService and ParameterService.
130 * @param undeployedPoliciesMainMap the policies that are undeployed
132 private void updateModelAndParameterServices(Map<ToscaConceptIdentifier, ApexMain> undeployedPoliciesMainMap) {
133 Set<String> inputParamKeysToRetain = new HashSet<>();
134 Set<String> outputParamKeysToRetain = new HashSet<>();
135 List<TaskParameters> taskParametersToRetain = new ArrayList<>();
136 List<String> executorParamKeysToRetain = new ArrayList<>();
137 List<String> schemaParamKeysToRetain = new ArrayList<>();
139 Map<AxArtifactKey, AxKeyInfo> keyInfoMapToRetain = new HashMap<>();
140 Map<AxArtifactKey, AxContextSchema> schemaMapToRetain = new HashMap<>();
141 Map<AxArtifactKey, AxEvent> eventMapToRetain = new HashMap<>();
142 Map<AxArtifactKey, AxContextAlbum> albumMapToRetain = new HashMap<>();
143 Map<AxArtifactKey, AxTask> taskMapToRetain = new HashMap<>();
144 Map<AxArtifactKey, AxPolicy> policyMapToRetain = new HashMap<>();
146 apexMainMap.values().forEach(main -> {
147 inputParamKeysToRetain.addAll(main.getApexParameters().getEventInputParameters().keySet());
148 outputParamKeysToRetain.addAll(main.getApexParameters().getEventOutputParameters().keySet());
149 taskParametersToRetain.addAll(
150 main.getApexParameters().getEngineServiceParameters().getEngineParameters().getTaskParameters());
151 executorParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters()
152 .getExecutorParameterMap().keySet());
153 schemaParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters()
154 .getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet());
156 AxPolicyModel policyModel = main.getActivator().getPolicyModel();
157 keyInfoMapToRetain.putAll(policyModel.getKeyInformation().getKeyInfoMap());
158 schemaMapToRetain.putAll(policyModel.getSchemas().getSchemasMap());
159 eventMapToRetain.putAll(policyModel.getEvents().getEventMap());
160 albumMapToRetain.putAll(policyModel.getAlbums().getAlbumsMap());
161 taskMapToRetain.putAll(policyModel.getTasks().getTaskMap());
162 policyMapToRetain.putAll(policyModel.getPolicies().getPolicyMap());
164 for (ApexMain main : undeployedPoliciesMainMap.values()) {
165 if (null != main.getApexParameters()) {
166 handleParametersRemoval(inputParamKeysToRetain, outputParamKeysToRetain, taskParametersToRetain,
167 executorParamKeysToRetain, schemaParamKeysToRetain, main);
169 if (null != main.getActivator() && null != main.getActivator().getPolicyModel()) {
170 handleAxConceptsRemoval(keyInfoMapToRetain, schemaMapToRetain, eventMapToRetain, albumMapToRetain,
171 taskMapToRetain, policyMapToRetain, main);
176 private void handleParametersRemoval(Set<String> inputParamKeysToRetain, Set<String> outputParamKeysToRetain,
177 List<TaskParameters> taskParametersToRetain, List<String> executorParamKeysToRetain,
178 List<String> schemaParamKeysToRetain, ApexMain main) {
179 ApexParameters existingParameters = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME);
180 List<String> eventInputParamKeysToRemove = main.getApexParameters().getEventInputParameters().keySet().stream()
181 .filter(key -> !inputParamKeysToRetain.contains(key)).collect(Collectors.toList());
182 List<String> eventOutputParamKeysToRemove = main.getApexParameters().getEventOutputParameters().keySet()
183 .stream().filter(key -> !outputParamKeysToRetain.contains(key)).collect(Collectors.toList());
184 eventInputParamKeysToRemove.forEach(existingParameters.getEventInputParameters()::remove);
185 eventOutputParamKeysToRemove.forEach(existingParameters.getEventOutputParameters()::remove);
186 var engineParameters = main.getApexParameters().getEngineServiceParameters().getEngineParameters();
187 final List<TaskParameters> taskParametersToRemove = engineParameters.getTaskParameters().stream()
188 .filter(taskParameter -> !taskParametersToRetain.contains(taskParameter)).collect(Collectors.toList());
189 final List<String> executorParamKeysToRemove = engineParameters.getExecutorParameterMap().keySet().stream()
190 .filter(key -> !executorParamKeysToRetain.contains(key)).collect(Collectors.toList());
191 final List<String> schemaParamKeysToRemove =
192 engineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet()
193 .stream().filter(key -> !schemaParamKeysToRetain.contains(key)).collect(Collectors.toList());
194 var aggregatedEngineParameters = existingParameters.getEngineServiceParameters().getEngineParameters();
195 aggregatedEngineParameters.getTaskParameters().removeAll(taskParametersToRemove);
196 executorParamKeysToRemove.forEach(aggregatedEngineParameters.getExecutorParameterMap()::remove);
197 schemaParamKeysToRemove.forEach(aggregatedEngineParameters.getContextParameters().getSchemaParameters()
198 .getSchemaHelperParameterMap()::remove);
201 private void handleAxConceptsRemoval(Map<AxArtifactKey, AxKeyInfo> keyInfoMapToRetain,
202 Map<AxArtifactKey, AxContextSchema> schemaMapToRetain, Map<AxArtifactKey, AxEvent> eventMapToRetain,
203 Map<AxArtifactKey, AxContextAlbum> albumMapToRetain, Map<AxArtifactKey, AxTask> taskMapToRetain,
204 Map<AxArtifactKey, AxPolicy> policyMapToRetain, ApexMain main) {
205 final AxPolicyModel policyModel = main.getActivator().getPolicyModel();
206 final List<AxArtifactKey> keyInfoKeystoRemove = policyModel.getKeyInformation().getKeyInfoMap().keySet()
207 .stream().filter(key -> !keyInfoMapToRetain.containsKey(key)).collect(Collectors.toList());
208 final List<AxArtifactKey> schemaKeystoRemove = policyModel.getSchemas().getSchemasMap().keySet().stream()
209 .filter(key -> !schemaMapToRetain.containsKey(key)).collect(Collectors.toList());
210 final List<AxArtifactKey> eventKeystoRemove = policyModel.getEvents().getEventMap().keySet().stream()
211 .filter(key -> !eventMapToRetain.containsKey(key)).collect(Collectors.toList());
212 final List<AxArtifactKey> albumKeystoRemove = policyModel.getAlbums().getAlbumsMap().keySet().stream()
213 .filter(key -> !albumMapToRetain.containsKey(key)).collect(Collectors.toList());
214 final List<AxArtifactKey> taskKeystoRemove = policyModel.getTasks().getTaskMap().keySet().stream()
215 .filter(key -> !taskMapToRetain.containsKey(key)).collect(Collectors.toList());
216 final List<AxArtifactKey> policyKeystoRemove = policyModel.getPolicies().getPolicyMap().keySet().stream()
217 .filter(key -> !policyMapToRetain.containsKey(key)).collect(Collectors.toList());
219 final Map<AxArtifactKey, AxKeyInfo> keyInfoMap = ModelService.getModel(AxKeyInformation.class).getKeyInfoMap();
220 final Map<AxArtifactKey, AxContextSchema> schemasMap =
221 ModelService.getModel(AxContextSchemas.class).getSchemasMap();
222 final Map<AxArtifactKey, AxEvent> eventMap = ModelService.getModel(AxEvents.class).getEventMap();
223 final Map<AxArtifactKey, AxContextAlbum> albumsMap =
224 ModelService.getModel(AxContextAlbums.class).getAlbumsMap();
225 final Map<AxArtifactKey, AxTask> taskMap = ModelService.getModel(AxTasks.class).getTaskMap();
226 final Map<AxArtifactKey, AxPolicy> policyMap = ModelService.getModel(AxPolicies.class).getPolicyMap();
228 // replace the ModelService with the right concept definition
229 // this can get corrupted in case of deploying policies with duplicate concept keys
230 keyInfoMap.putAll(keyInfoMapToRetain);
231 schemasMap.putAll(schemaMapToRetain);
232 eventMap.putAll(eventMapToRetain);
233 albumsMap.putAll(albumMapToRetain);
234 taskMap.putAll(taskMapToRetain);
235 policyMap.putAll(policyMapToRetain);
237 keyInfoKeystoRemove.forEach(keyInfoMap::remove);
238 schemaKeystoRemove.forEach(schemasMap::remove);
239 eventKeystoRemove.forEach(eventMap::remove);
240 albumKeystoRemove.forEach(albumsMap::remove);
241 taskKeystoRemove.forEach(taskMap::remove);
242 policyKeystoRemove.forEach(policyMap::remove);
245 private void initiateApexEngineForPolicies(List<ToscaPolicy> policies)
246 throws ApexStarterException {
247 Map<ToscaConceptIdentifier, ApexMain> failedPoliciesMainMap = new LinkedHashMap<>();
248 for (ToscaPolicy policy : policies) {
249 String policyName = policy.getIdentifier().getName();
250 final var standardCoder = new StandardCoder();
251 var toscaServiceTemplate = new ToscaServiceTemplate();
252 var toscaTopologyTemplate = new ToscaTopologyTemplate();
253 toscaTopologyTemplate.setPolicies(List.of(Map.of(policyName, policy)));
254 toscaServiceTemplate.setToscaTopologyTemplate(toscaTopologyTemplate);
257 file = File.createTempFile(policyName, ".json");
258 standardCoder.encode(file, toscaServiceTemplate);
259 } catch (CoderException | IOException e) {
260 throw new ApexStarterException(e);
262 final var apexArgs = new String[] {"-p", file.getAbsolutePath()};
263 LOGGER.info("Starting apex engine for policy {}", policy.getIdentifier());
264 var apexMain = new ApexMain(apexArgs);
265 if (apexMain.isAlive()) {
266 apexMainMap.put(policy.getIdentifier(), apexMain);
268 failedPoliciesMainMap.put(policy.getIdentifier(), apexMain);
269 LOGGER.error("Execution of policy {} failed", policy.getIdentifier());
272 if (apexMainMap.isEmpty()) {
273 ModelService.clear();
274 ParameterService.clear();
275 throw new ApexStarterException("Apex Engine failed to start.");
276 } else if (failedPoliciesMainMap.size() > 0) {
277 updateModelAndParameterServices(failedPoliciesMainMap);
278 if (failedPoliciesMainMap.size() == policies.size()) {
279 throw new ApexStarterException("Updating the APEX engine with new policies failed.");
285 * Method to get the APEX engine statistics.
287 public List<AxEngineModel> getEngineStats() {
288 // engineStats from all the apexMain instances running individual tosca policies are combined here.
289 return apexMainMap.values().stream().filter(apexMain -> (null != apexMain && apexMain.isAlive()))
290 .flatMap(m -> m.getEngineStats().stream()).collect(Collectors.toList());
294 * Method to check whether the apex engine is running or not.
296 public boolean isApexEngineRunning() {
297 return apexMainMap.values().stream().anyMatch(apexMain -> (null != apexMain && apexMain.isAlive()));
301 * Method that return the list of running policies in the apex engine.
303 public List<ToscaConceptIdentifier> getRunningPolicies() {
304 return new ArrayList<>(apexMainMap.keySet());
308 * Method to shut down the apex engine.
310 public void shutdown() throws ApexStarterException {
312 LOGGER.debug("Shutting down apex engine.");
313 for (ApexMain apexMain : apexMainMap.values()) {
317 ModelService.clear();
318 ParameterService.clear();
319 } catch (final ApexException e) {
320 throw new ApexStarterException(e);