2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.service.engine.main;
25 import java.util.AbstractMap;
26 import java.util.ArrayList;
27 import java.util.LinkedHashMap;
28 import java.util.LinkedHashSet;
29 import java.util.List;
31 import java.util.Map.Entry;
32 import java.util.Optional;
34 import java.util.stream.Collectors;
37 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
38 import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException;
39 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
40 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
41 import org.onap.policy.apex.model.basicmodel.service.ModelService;
42 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
43 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
44 import org.onap.policy.apex.model.policymodel.handling.PolicyModelMerger;
45 import org.onap.policy.apex.service.engine.event.ApexEventException;
46 import org.onap.policy.apex.service.engine.runtime.EngineService;
47 import org.onap.policy.apex.service.engine.runtime.impl.EngineServiceImpl;
48 import org.onap.policy.apex.service.parameters.ApexParameters;
49 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
50 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
51 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
52 import org.onap.policy.common.parameters.ParameterService;
53 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
54 import org.slf4j.ext.XLogger;
55 import org.slf4j.ext.XLoggerFactory;
58 * This class wraps an Apex engine so that it can be activated as a complete
59 * service together with all its context, executor, and event plugins.
61 * @author Liam Fallon (liam.fallon@ericsson.com)
63 public class ApexActivator {
64 private static final String APEX_ENGINE_FAILED_MSG = "Apex engine failed to start as a service";
66 // The logger for this class
67 private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexActivator.class);
69 // The parameters of the Apex activator when running with multiple policies
72 private Map<ToscaPolicyIdentifier, ApexParameters> apexParametersMap;
75 Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap;
77 // Event unmarshalers are used to receive events asynchronously into Apex
78 private final Map<String, ApexEventUnmarshaller> unmarshallerMap = new LinkedHashMap<>();
80 // Event marshalers are used to send events asynchronously from Apex
81 private final Map<String, ApexEventMarshaller> marshallerMap = new LinkedHashMap<>();
83 // The engine service handler holds the references to the engine and its EngDep
85 // interface. It also acts as a receiver for asynchronous
86 // and synchronous events from the engine.
87 private ApexEngineServiceHandler engineServiceHandler = null;
90 private EngineService apexEngineService;
91 private AxArtifactKey engineKey;
94 * Instantiate the activator for the Apex engine as a complete service.
96 * @param parametersMap the apex parameters map for the Apex service
98 public ApexActivator(Map<ToscaPolicyIdentifier, ApexParameters> parametersMap) {
99 apexParametersMap = parametersMap;
103 * Initialize the Apex engine as a complete service.
105 * @throws ApexActivatorException on errors in initializing the engine
107 public void initialize() throws ApexActivatorException {
108 LOGGER.debug("Apex engine starting as a service . . .");
111 ApexParameters apexParameters = apexParametersMap.values().iterator().next();
112 // totalInstanceCount is the sum of instance counts required as per each policy
113 int totalInstanceCount = apexParametersMap.values().stream()
114 .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
115 apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
116 engineKey = apexParameters.getEngineServiceParameters().getEngineKey();
117 instantiateEngine(apexParameters);
118 setUpModelMarshallerAndUnmarshaller(apexParameters);
119 } catch (final Exception e) {
120 LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
121 throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
124 LOGGER.debug("Apex engine started as a service");
127 private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException {
128 policyModelsMap = new LinkedHashMap<>();
129 Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
130 Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
131 Set<Entry<ToscaPolicyIdentifier, ApexParameters>> apexParamsEntrySet = new LinkedHashSet<>(
132 apexParametersMap.entrySet());
133 apexParamsEntrySet.stream().forEach(apexParamsEntry -> {
134 ApexParameters apexParams = apexParamsEntry.getValue();
135 List<String> duplicateInputParameters = new ArrayList<>(apexParams.getEventInputParameters().keySet());
136 duplicateInputParameters.retainAll(inputParametersMap.keySet());
137 List<String> duplicateOutputParameters = new ArrayList<>(apexParams.getEventOutputParameters().keySet());
138 duplicateOutputParameters.retainAll(outputParametersMap.keySet());
140 if (!(duplicateInputParameters.isEmpty() && duplicateOutputParameters.isEmpty())) {
141 LOGGER.error("I/O Parameters {}/{} for {}:{} are duplicates. So this policy is not executed.",
142 duplicateInputParameters, duplicateOutputParameters, apexParamsEntry.getKey().getName(),
143 apexParamsEntry.getKey().getVersion());
144 apexParametersMap.remove(apexParamsEntry.getKey());
148 inputParametersMap.putAll(apexParams.getEventInputParameters());
149 outputParametersMap.putAll(apexParams.getEventOutputParameters());
150 // Check if a policy model file has been specified
151 if (apexParams.getEngineServiceParameters().getPolicyModel() != null) {
152 LOGGER.debug("deploying policy model to the apex engines . . .");
154 final String policyModelString = apexParams.getEngineServiceParameters().getPolicyModel();
155 AxPolicyModel policyModel = EngineServiceImpl.createModel(engineKey, policyModelString);
156 policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
157 } catch (ApexException e) {
158 throw new ApexRuntimeException("Failed to create the apex model.", e);
162 AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
164 // Set the policy model in the engine
165 apexEngineService.updateModel(engineKey, finalPolicyModel, true);
167 handleExistingMarshallerAndUnmarshaller(inputParametersMap, outputParametersMap);
168 setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
169 outputParametersMap);
171 // Wire up pairings between marhsallers and unmarshallers
172 setUpMarshalerPairings(inputParametersMap);
174 // Start event processing
175 startUnmarshallers(inputParametersMap);
178 private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
179 // Doing a deep copy so that original values in policyModelsMap is retained
180 // after reduction operation
181 Set<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelsEntries = policyModelsMap.entrySet().stream()
182 .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue())).collect(Collectors.toSet());
183 Optional<Entry<ToscaPolicyIdentifier, AxPolicyModel>> finalPolicyModelEntry = policyModelsEntries.stream()
184 .reduce((entry1, entry2) -> {
187 PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
188 } catch (ApexModelException exc) {
189 LOGGER.error("Policy model for {} : {} has duplicates. So this policy is not executed.",
190 entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
191 apexParametersMap.remove(entry2.getKey());
192 policyModelsMap.remove(entry2.getKey());
196 AxPolicyModel finalPolicyModel = null;
197 if (finalPolicyModelEntry.isPresent()) {
198 finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.get().getValue());
200 return finalPolicyModel;
203 private void setUpNewMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
204 Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap)
205 throws ApexEventException {
207 // Producer parameters specify what event marshalers to handle events leaving
209 // set up and how they are set up
210 for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) {
211 final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(),
212 engineServiceParameters, outputParameters.getValue());
214 apexEngineService.registerActionListener(outputParameters.getKey(), marshaller);
215 marshallerMap.put(outputParameters.getKey(), marshaller);
218 // Consumer parameters specify what event unmarshalers to handle events coming
220 // are set up and how they are set up
221 for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
222 final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(),
223 engineServiceParameters, inputParameters.getValue());
224 unmarshallerMap.put(inputParameters.getKey(), unmarshaller);
225 unmarshaller.init(engineServiceHandler);
229 private void handleExistingMarshallerAndUnmarshaller(Map<String, EventHandlerParameters> inputParametersMap,
230 Map<String, EventHandlerParameters> outputParametersMap) {
231 // stop and remove any marshaller/unmarshaller that is part of a policy that is
233 marshallerMap.entrySet().stream()
234 .filter(marshallerEntry -> !outputParametersMap.containsKey(marshallerEntry.getKey()))
235 .forEach(marshallerEntry -> marshallerEntry.getValue().stop());
236 marshallerMap.keySet().removeIf(marshallerKey -> !outputParametersMap.containsKey(marshallerKey));
237 unmarshallerMap.entrySet().stream()
238 .filter(unmarshallerEntry -> !inputParametersMap.containsKey(unmarshallerEntry.getKey()))
239 .forEach(unmarshallerEntry -> unmarshallerEntry.getValue().stop());
240 unmarshallerMap.keySet().removeIf(unmarshallerKey -> !inputParametersMap.containsKey(unmarshallerKey));
242 // If a marshaller/unmarshaller is already initialized, they don't need to be
243 // reinitialized during model update.
244 outputParametersMap.keySet().removeIf(marshallerMap::containsKey);
245 inputParametersMap.keySet().removeIf(unmarshallerMap::containsKey);
248 private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
249 if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) {
250 throw new ApexException("Apex Engine already initialized.");
252 // Create engine with specified thread count
253 LOGGER.debug("starting apex engine service . . .");
254 apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters());
256 // Create the engine holder to hold the engine's references and act as an event
258 engineServiceHandler = new ApexEngineServiceHandler(apexEngineService);
262 * Set up unmarshaler/marshaler pairing for synchronized event handling. We only
263 * need to traverse the unmarshalers because the unmarshalers and marshalers are
264 * paired one to one uniquely so if we find a synchronized unmarshaler we'll
265 * also find its paired marshaler
267 * @param inputParametersMap the apex parameters
269 private void setUpMarshalerPairings(Map<String, EventHandlerParameters> inputParametersMap) {
270 for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
271 final ApexEventUnmarshaller unmarshaller = unmarshallerMap.get(inputParameters.getKey());
273 // Pair up peered unmarshalers and marshalers
274 for (final EventHandlerPeeredMode peeredMode : EventHandlerPeeredMode.values()) {
275 // Check if the unmarshaler is synchronized with a marshaler
276 if (inputParameters.getValue().isPeeredMode(peeredMode)) {
277 // Find the unmarshaler and marshaler
278 final ApexEventMarshaller peeredMarshaler = marshallerMap
279 .get(inputParameters.getValue().getPeer(peeredMode));
281 // Connect the unmarshaler and marshaler
282 unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
289 * Start up event processing, this happens once all marshaller to unmarshaller
290 * wiring has been done.
292 * @param inputParametersMap the apex parameters
294 private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) {
295 for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
296 unmarshallerMap.get(inputParameters.getKey()).start();
301 * Updates the APEX Engine with the model created from new Policies.
303 * @param apexParamsMap the apex parameters map for the Apex service
304 * @throws ApexException on errors
306 public void updateModel(Map<ToscaPolicyIdentifier, ApexParameters> apexParamsMap) throws ApexException {
308 ApexParameters apexParameters = apexParamsMap.values().iterator().next();
309 setUpModelMarshallerAndUnmarshaller(apexParameters);
310 } catch (final Exception e) {
311 LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
312 throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
317 * Get the Apex engine worker stats.
319 public List<AxEngineModel> getEngineStats() {
320 List<AxEngineModel> engineStats = null;
321 if (apexEngineService != null) {
322 engineStats = apexEngineService.getEngineStats();
328 * Terminate the Apex engine.
330 * @throws ApexException on termination errors
332 public void terminate() throws ApexException {
333 // Shut down all marshalers and unmarshalers
334 shutdownMarshallerAndUnmarshaller();
336 // Check if the engine service handler has been shut down already
337 if (engineServiceHandler != null) {
338 engineServiceHandler.terminate();
339 engineServiceHandler = null;
342 // Clear the services
343 ModelService.clear();
344 ParameterService.clear();
348 * Shuts down all marshallers and unmarshallers.
350 private void shutdownMarshallerAndUnmarshaller() {
351 marshallerMap.values().forEach(ApexEventMarshaller::stop);
352 marshallerMap.clear();
353 unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop);
354 unmarshallerMap.clear();