2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2019-2021 Nordix Foundation.
4 * Modifications Copyright (C) 2020-2021 Bell Canada. All rights reserved.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.services.onappf.handler;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.HashSet;
28 import java.util.LinkedHashMap;
29 import java.util.List;
32 import java.util.stream.Collectors;
33 import org.onap.policy.apex.core.engine.EngineParameters;
34 import org.onap.policy.apex.core.engine.TaskParameters;
35 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
36 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
37 import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInfo;
38 import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation;
39 import org.onap.policy.apex.model.basicmodel.service.ModelService;
40 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
41 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
42 import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema;
43 import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas;
44 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
45 import org.onap.policy.apex.model.eventmodel.concepts.AxEvent;
46 import org.onap.policy.apex.model.eventmodel.concepts.AxEvents;
47 import org.onap.policy.apex.model.policymodel.concepts.AxPolicies;
48 import org.onap.policy.apex.model.policymodel.concepts.AxPolicy;
49 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
50 import org.onap.policy.apex.model.policymodel.concepts.AxTask;
51 import org.onap.policy.apex.model.policymodel.concepts.AxTasks;
52 import org.onap.policy.apex.service.engine.main.ApexMain;
53 import org.onap.policy.apex.service.parameters.ApexParameterConstants;
54 import org.onap.policy.apex.service.parameters.ApexParameters;
55 import org.onap.policy.apex.services.onappf.exception.ApexStarterException;
56 import org.onap.policy.common.parameters.ParameterService;
57 import org.onap.policy.common.utils.coder.CoderException;
58 import org.onap.policy.common.utils.coder.StandardCoder;
59 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
60 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
61 import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate;
62 import org.onap.policy.models.tosca.authorative.concepts.ToscaTopologyTemplate;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
67 * This class instantiates the Apex Engine based on instruction from PAP.
69 * @author Ajith Sreekumar (ajith.sreekumar@est.tech)
71 public class ApexEngineHandler {
73 private static final Logger LOGGER = LoggerFactory.getLogger(ApexEngineHandler.class);
75 private Map<ToscaConceptIdentifier, ApexMain> apexMainMap = new LinkedHashMap<>();
78 * Constructs the object. Extracts the config and model files from each policy and instantiates the apex engine.
80 * @param policies the list of policies
81 * @throws ApexStarterException if the apex engine instantiation failed using the policies passed
83 public ApexEngineHandler(List<ToscaPolicy> policies) throws ApexStarterException {
84 LOGGER.debug("Starting apex engine.");
85 initiateApexEngineForPolicies(policies);
89 * Updates the Apex Engine with the policy model created from new list of policies.
91 * @param policies the list of policies
92 * @throws ApexStarterException if the apex engine instantiation failed using the policies passed
94 public void updateApexEngine(List<ToscaPolicy> policies) throws ApexStarterException {
95 List<ToscaConceptIdentifier> runningPolicies = getRunningPolicies();
96 List<ToscaPolicy> policiesToDeploy = policies.stream()
97 .filter(policy -> !runningPolicies.contains(policy.getIdentifier())).collect(Collectors.toList());
98 List<ToscaConceptIdentifier> policiesToUnDeploy = runningPolicies.stream()
99 .filter(polId -> policies.stream().noneMatch(policy -> policy.getIdentifier().equals(polId)))
100 .collect(Collectors.toList());
101 Map<ToscaConceptIdentifier, ApexMain> undeployedPoliciesMainMap = new LinkedHashMap<>();
102 policiesToUnDeploy.forEach(policyId -> {
103 ApexMain apexMain = apexMainMap.get(policyId);
106 undeployedPoliciesMainMap.put(policyId, apexMain);
107 apexMainMap.remove(policyId);
108 } catch (ApexException e) {
109 LOGGER.error("Shutting down policy {} failed", policyId, e);
112 if (!undeployedPoliciesMainMap.isEmpty()) {
113 updateModelAndParameterServices(undeployedPoliciesMainMap);
115 if (!policiesToDeploy.isEmpty()) {
116 initiateApexEngineForPolicies(policiesToDeploy);
118 if (apexMainMap.isEmpty()) {
119 ModelService.clear();
120 ParameterService.clear();
125 * Clear the corresponding items from ModelService and ParameterService.
127 * @param undeployedPoliciesMainMap the policies that are undeployed
129 private void updateModelAndParameterServices(Map<ToscaConceptIdentifier, ApexMain> undeployedPoliciesMainMap) {
130 Set<String> inputParamKeysToRetain = new HashSet<>();
131 Set<String> outputParamKeysToRetain = new HashSet<>();
132 List<TaskParameters> taskParametersToRetain = new ArrayList<>();
133 List<String> executorParamKeysToRetain = new ArrayList<>();
134 List<String> schemaParamKeysToRetain = new ArrayList<>();
136 List<AxArtifactKey> keyInfoKeystoRetain = new ArrayList<>();
137 List<AxArtifactKey> schemaKeystoRetain = new ArrayList<>();
138 List<AxArtifactKey> eventKeystoRetain = new ArrayList<>();
139 List<AxArtifactKey> albumKeystoRetain = new ArrayList<>();
140 List<AxArtifactKey> taskKeystoRetain = new ArrayList<>();
141 List<AxArtifactKey> policyKeystoRetain = new ArrayList<>();
143 apexMainMap.values().forEach(main -> {
144 inputParamKeysToRetain.addAll(main.getApexParameters().getEventInputParameters().keySet());
145 outputParamKeysToRetain.addAll(main.getApexParameters().getEventOutputParameters().keySet());
146 taskParametersToRetain.addAll(
147 main.getApexParameters().getEngineServiceParameters().getEngineParameters().getTaskParameters());
148 executorParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters()
149 .getExecutorParameterMap().keySet());
150 schemaParamKeysToRetain.addAll(main.getApexParameters().getEngineServiceParameters().getEngineParameters()
151 .getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet());
154 .addAll(main.getActivator().getPolicyModel().getKeyInformation().getKeyInfoMap().keySet());
155 schemaKeystoRetain.addAll(main.getActivator().getPolicyModel().getSchemas().getSchemasMap().keySet());
156 eventKeystoRetain.addAll(main.getActivator().getPolicyModel().getEvents().getEventMap().keySet());
157 albumKeystoRetain.addAll(main.getActivator().getPolicyModel().getAlbums().getAlbumsMap().keySet());
158 taskKeystoRetain.addAll(main.getActivator().getPolicyModel().getTasks().getTaskMap().keySet());
159 policyKeystoRetain.addAll(main.getActivator().getPolicyModel().getPolicies().getPolicyMap().keySet());
161 for (ApexMain main : undeployedPoliciesMainMap.values()) {
162 handleParametersRemoval(inputParamKeysToRetain, outputParamKeysToRetain, taskParametersToRetain,
163 executorParamKeysToRetain, schemaParamKeysToRetain, main);
165 if (null != main.getActivator() && null != main.getActivator().getPolicyModel()) {
166 handleAxConceptsRemoval(keyInfoKeystoRetain, schemaKeystoRetain, eventKeystoRetain, albumKeystoRetain,
167 taskKeystoRetain, policyKeystoRetain, main);
172 private void handleParametersRemoval(Set<String> inputParamKeysToRetain, Set<String> outputParamKeysToRetain,
173 List<TaskParameters> taskParametersToRetain, List<String> executorParamKeysToRetain,
174 List<String> schemaParamKeysToRetain, ApexMain main) {
175 ApexParameters existingParameters = ParameterService.get(ApexParameterConstants.MAIN_GROUP_NAME);
176 List<String> eventInputParamKeysToRemove = main.getApexParameters().getEventInputParameters().keySet().stream()
177 .filter(key -> !inputParamKeysToRetain.contains(key)).collect(Collectors.toList());
178 List<String> eventOutputParamKeysToRemove = main.getApexParameters().getEventOutputParameters().keySet()
179 .stream().filter(key -> !outputParamKeysToRetain.contains(key)).collect(Collectors.toList());
180 eventInputParamKeysToRemove.forEach(existingParameters.getEventInputParameters()::remove);
181 eventOutputParamKeysToRemove.forEach(existingParameters.getEventOutputParameters()::remove);
182 EngineParameters engineParameters = main.getApexParameters().getEngineServiceParameters().getEngineParameters();
183 final List<TaskParameters> taskParametersToRemove = engineParameters.getTaskParameters().stream()
184 .filter(taskParameter -> !taskParametersToRetain.contains(taskParameter)).collect(Collectors.toList());
185 final List<String> executorParamKeysToRemove = engineParameters.getExecutorParameterMap().keySet().stream()
186 .filter(key -> !executorParamKeysToRetain.contains(key)).collect(Collectors.toList());
187 final List<String> schemaParamKeysToRemove =
188 engineParameters.getContextParameters().getSchemaParameters().getSchemaHelperParameterMap().keySet()
189 .stream().filter(key -> !schemaParamKeysToRetain.contains(key)).collect(Collectors.toList());
190 EngineParameters aggregatedEngineParameters =
191 existingParameters.getEngineServiceParameters().getEngineParameters();
192 aggregatedEngineParameters.getTaskParameters().removeAll(taskParametersToRemove);
193 executorParamKeysToRemove.forEach(aggregatedEngineParameters.getExecutorParameterMap()::remove);
194 schemaParamKeysToRemove.forEach(aggregatedEngineParameters.getContextParameters().getSchemaParameters()
195 .getSchemaHelperParameterMap()::remove);
198 private void handleAxConceptsRemoval(List<AxArtifactKey> keyInfoKeystoRetain,
199 List<AxArtifactKey> schemaKeystoRetain, List<AxArtifactKey> eventKeystoRetain,
200 List<AxArtifactKey> albumKeystoRetain, List<AxArtifactKey> taskKeystoRetain,
201 List<AxArtifactKey> policyKeystoRetain, ApexMain main) {
202 final AxPolicyModel policyModel = main.getActivator().getPolicyModel();
203 final List<AxArtifactKey> keyInfoKeystoRemove = policyModel.getKeyInformation().getKeyInfoMap().keySet()
204 .stream().filter(key -> !keyInfoKeystoRetain.contains(key)).collect(Collectors.toList());
205 final List<AxArtifactKey> schemaKeystoRemove = policyModel.getSchemas().getSchemasMap().keySet().stream()
206 .filter(key -> !schemaKeystoRetain.contains(key)).collect(Collectors.toList());
207 final List<AxArtifactKey> eventKeystoRemove = policyModel.getEvents().getEventMap().keySet().stream()
208 .filter(key -> !eventKeystoRetain.contains(key)).collect(Collectors.toList());
209 final List<AxArtifactKey> albumKeystoRemove = policyModel.getAlbums().getAlbumsMap().keySet().stream()
210 .filter(key -> !albumKeystoRetain.contains(key)).collect(Collectors.toList());
211 final List<AxArtifactKey> taskKeystoRemove = policyModel.getTasks().getTaskMap().keySet().stream()
212 .filter(key -> !taskKeystoRetain.contains(key)).collect(Collectors.toList());
213 final List<AxArtifactKey> policyKeystoRemove = policyModel.getPolicies().getPolicyMap().keySet().stream()
214 .filter(key -> !policyKeystoRetain.contains(key)).collect(Collectors.toList());
216 final Map<AxArtifactKey, AxKeyInfo> keyInfoMap = ModelService.getModel(AxKeyInformation.class).getKeyInfoMap();
217 final Map<AxArtifactKey, AxContextSchema> schemasMap =
218 ModelService.getModel(AxContextSchemas.class).getSchemasMap();
219 final Map<AxArtifactKey, AxEvent> eventMap = ModelService.getModel(AxEvents.class).getEventMap();
220 final Map<AxArtifactKey, AxContextAlbum> albumsMap =
221 ModelService.getModel(AxContextAlbums.class).getAlbumsMap();
222 final Map<AxArtifactKey, AxTask> taskMap = ModelService.getModel(AxTasks.class).getTaskMap();
223 final Map<AxArtifactKey, AxPolicy> policyMap = ModelService.getModel(AxPolicies.class).getPolicyMap();
225 keyInfoKeystoRemove.forEach(keyInfoMap::remove);
226 schemaKeystoRemove.forEach(schemasMap::remove);
227 eventKeystoRemove.forEach(eventMap::remove);
228 albumKeystoRemove.forEach(albumsMap::remove);
229 taskKeystoRemove.forEach(taskMap::remove);
230 policyKeystoRemove.forEach(policyMap::remove);
233 private void initiateApexEngineForPolicies(List<ToscaPolicy> policies)
234 throws ApexStarterException {
235 Map<ToscaConceptIdentifier, ApexMain> failedPoliciesMainMap = new LinkedHashMap<>();
236 for (ToscaPolicy policy : policies) {
237 String policyName = policy.getIdentifier().getName();
238 final StandardCoder standardCoder = new StandardCoder();
239 ToscaServiceTemplate toscaServiceTemplate = new ToscaServiceTemplate();
240 ToscaTopologyTemplate toscaTopologyTemplate = new ToscaTopologyTemplate();
241 toscaTopologyTemplate.setPolicies(List.of(Map.of(policyName, policy)));
242 toscaServiceTemplate.setToscaTopologyTemplate(toscaTopologyTemplate);
245 file = File.createTempFile(policyName, ".json");
246 standardCoder.encode(file, toscaServiceTemplate);
247 } catch (CoderException | IOException e) {
248 throw new ApexStarterException(e);
250 final String[] apexArgs = {"-p", file.getAbsolutePath()};
251 LOGGER.info("Starting apex engine for policy {}", policy.getIdentifier());
252 ApexMain apexMain = new ApexMain(apexArgs);
253 if (apexMain.isAlive()) {
254 apexMainMap.put(policy.getIdentifier(), apexMain);
256 failedPoliciesMainMap.put(policy.getIdentifier(), apexMain);
257 LOGGER.error("Execution of policy {} failed", policy.getIdentifier());
260 if (apexMainMap.isEmpty()) {
261 ModelService.clear();
262 ParameterService.clear();
263 throw new ApexStarterException("Apex Engine failed to start.");
264 } else if (failedPoliciesMainMap.size() > 0) {
265 updateModelAndParameterServices(failedPoliciesMainMap);
266 if (failedPoliciesMainMap.size() == policies.size()) {
267 throw new ApexStarterException("Updating the APEX engine with new policies failed.");
273 * Method to get the APEX engine statistics.
275 public List<AxEngineModel> getEngineStats() {
276 // engineStats from all the apexMain instances running individual tosca policies are combined here.
277 return apexMainMap.values().stream().filter(apexMain -> (null != apexMain && apexMain.isAlive()))
278 .flatMap(m -> m.getEngineStats().stream()).collect(Collectors.toList());
282 * Method to check whether the apex engine is running or not.
284 public boolean isApexEngineRunning() {
285 return apexMainMap.values().stream().anyMatch(apexMain -> (null != apexMain && apexMain.isAlive()));
289 * Method that return the list of running policies in the apex engine.
291 public List<ToscaConceptIdentifier> getRunningPolicies() {
292 return new ArrayList<>(apexMainMap.keySet());
296 * Method to shut down the apex engine.
298 public void shutdown() throws ApexStarterException {
300 LOGGER.debug("Shutting down apex engine.");
301 for (ApexMain apexMain : apexMainMap.values()) {
305 ModelService.clear();
306 ParameterService.clear();
307 } catch (final ApexException e) {
308 throw new ApexStarterException(e);