2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.service.engine.main;
24 import java.io.IOException;
25 import java.util.LinkedHashMap;
26 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.stream.Stream;
32 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
33 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
34 import org.onap.policy.apex.model.basicmodel.service.ModelService;
35 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
36 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
37 import org.onap.policy.apex.model.policymodel.handling.PolicyModelMerger;
38 import org.onap.policy.apex.service.engine.engdep.EngDepMessagingService;
39 import org.onap.policy.apex.service.engine.event.ApexEventException;
40 import org.onap.policy.apex.service.engine.runtime.EngineService;
41 import org.onap.policy.apex.service.engine.runtime.impl.EngineServiceImpl;
42 import org.onap.policy.apex.service.parameters.ApexParameters;
43 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
44 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
45 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
46 import org.onap.policy.common.parameters.ParameterService;
47 import org.onap.policy.common.utils.resources.TextFileUtils;
48 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
49 import org.slf4j.ext.XLogger;
50 import org.slf4j.ext.XLoggerFactory;
53 * This class wraps an Apex engine so that it can be activated as a complete service together with all its context,
54 * executor, and event plugins.
56 * @author Liam Fallon (liam.fallon@ericsson.com)
58 public class ApexActivator {
59 private static final String APEX_ENGINE_FAILED_MSG = "Apex engine failed to start as a service";
61 // The logger for this class
62 private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexActivator.class);
64 // The parameters of the Apex activator when running with multiple policies
67 private Map<ToscaPolicyIdentifier, ApexParameters> apexParametersMap;
70 Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap;
72 // Event unmarshalers are used to receive events asynchronously into Apex
73 private final Map<String, ApexEventUnmarshaller> unmarshallerMap = new LinkedHashMap<>();
75 // Event marshalers are used to send events asynchronously from Apex
76 private final Map<String, ApexEventMarshaller> marshallerMap = new LinkedHashMap<>();
78 // The engine service handler holds the references to the engine and its EngDep deployment
79 // interface. It also acts as a receiver for asynchronous
80 // and synchronous events from the engine.
81 private ApexEngineServiceHandler engineServiceHandler = null;
84 private EngineService apexEngineService;
87 * Instantiate the activator for the Apex engine as a complete service.
89 * @param parametersMap the apex parameters map for the Apex service
91 public ApexActivator(Map<ToscaPolicyIdentifier, ApexParameters> parametersMap) {
92 apexParametersMap = parametersMap;
96 * Initialize the Apex engine as a complete service.
98 * @throws ApexActivatorException on errors in initializing the engine
100 public void initialize() throws ApexActivatorException {
101 LOGGER.debug("Apex engine starting as a service . . .");
104 ApexParameters apexParameters = apexParametersMap.values().iterator().next();
105 // totalInstanceCount is the sum of instance counts required as per each policy
106 int totalInstanceCount = apexParametersMap.values().stream()
107 .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
108 apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
109 instantiateEngine(apexParameters);
110 setUpModelMarhsallerAndUnmarshaller(apexParameters);
111 } catch (final Exception e) {
112 LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
113 throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
116 LOGGER.debug("Apex engine started as a service");
119 private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws IOException, ApexException {
120 policyModelsMap = new LinkedHashMap<>();
121 Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
122 Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
124 for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) {
125 ApexParameters apexParams = apexParamsEntry.getValue();
126 boolean duplicateInputParameterExist =
127 apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
128 boolean duplicateOutputParameterExist =
129 apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
130 if (duplicateInputParameterExist || duplicateOutputParameterExist) {
131 LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.",
132 apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
133 apexParametersMap.remove(apexParamsEntry.getKey());
136 inputParametersMap.putAll(apexParams.getEventInputParameters());
137 outputParametersMap.putAll(apexParams.getEventOutputParameters());
138 // Check if a policy model file has been specified
139 if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) {
140 LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .",
141 apexParams.getEngineServiceParameters().getPolicyModelFileName());
143 final String policyModelString =
144 TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
145 AxPolicyModel policyModel = EngineServiceImpl
146 .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
147 policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
150 AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
152 // Set the policy model in the engine
153 apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel,
156 setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
157 outputParametersMap);
159 // Wire up pairings between marhsallers and unmarshallers
160 setUpMarshalerPairings(inputParametersMap);
162 // Start event processing
163 startUnmarshallers(inputParametersMap);
166 private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
167 Map.Entry<ToscaPolicyIdentifier, AxPolicyModel> firstEntry = policyModelsMap.entrySet().iterator().next();
168 ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey());
169 AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue());
170 Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream =
171 policyModelsMap.entrySet().stream().skip(1);
172 Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry =
173 policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
176 PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
177 } catch (ApexModelException exc) {
178 LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
179 entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
180 apexParametersMap.remove(entry2.getKey());
181 policyModelsMap.remove(entry2.getKey());
185 AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue());
186 policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap
187 return finalPolicyModel;
190 private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
191 Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap)
192 throws ApexEventException {
193 // Producer parameters specify what event marshalers to handle events leaving Apex are
194 // set up and how they are set up
195 for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) {
196 final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(),
197 engineServiceParameters, outputParameters.getValue());
199 apexEngineService.registerActionListener(outputParameters.getKey(), marshaller);
200 marshallerMap.put(outputParameters.getKey(), marshaller);
203 // Consumer parameters specify what event unmarshalers to handle events coming into Apex
204 // are set up and how they are set up
205 for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
206 final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(),
207 engineServiceParameters, inputParameters.getValue());
208 unmarshallerMap.put(inputParameters.getKey(), unmarshaller);
209 unmarshaller.init(engineServiceHandler);
213 private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
214 if (null != apexEngineService
215 && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) {
216 throw new ApexException("Apex Engine already initialized.");
218 // Create engine with specified thread count
219 LOGGER.debug("starting apex engine service . . .");
220 apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters());
222 // Instantiate and start the messaging service for Deployment
223 LOGGER.debug("starting apex deployment service . . .");
224 final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService,
225 apexParameters.getEngineServiceParameters().getDeploymentPort());
226 engDepService.start();
228 // Create the engine holder to hold the engine's references and act as an event receiver
229 engineServiceHandler = new ApexEngineServiceHandler(apexEngineService, engDepService);
233 * Set up unmarshaler/marshaler pairing for synchronized event handling. We only need to traverse the unmarshalers
234 * because the unmarshalers and marshalers are paired one to one uniquely so if we find a synchronized unmarshaler
235 * we'll also find its paired marshaler
237 * @param inputParametersMap the apex parameters
239 private void setUpMarshalerPairings(Map<String, EventHandlerParameters> inputParametersMap) {
240 for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
241 final ApexEventUnmarshaller unmarshaller = unmarshallerMap.get(inputParameters.getKey());
243 // Pair up peered unmarshalers and marshalers
244 for (final EventHandlerPeeredMode peeredMode : EventHandlerPeeredMode.values()) {
245 // Check if the unmarshaler is synchronized with a marshaler
246 if (inputParameters.getValue().isPeeredMode(peeredMode)) {
247 // Find the unmarshaler and marshaler
248 final ApexEventMarshaller peeredMarshaler =
249 marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
251 // Connect the unmarshaler and marshaler
252 unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
259 * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done.
261 * @param inputParametersMap the apex parameters
263 private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) {
264 for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
265 unmarshallerMap.get(inputParameters.getKey()).start();
270 * Updates the APEX Engine with the model created from new Policies.
272 * @param apexParamsMap the apex parameters map for the Apex service
273 * @throws ApexException on errors
275 public void updateModel(Map<ToscaPolicyIdentifier, ApexParameters> apexParamsMap) throws ApexException {
277 shutdownMarshallerAndUnmarshaller();
278 ApexParameters apexParameters = apexParamsMap.values().iterator().next();
279 setUpModelMarhsallerAndUnmarshaller(apexParameters);
280 } catch (final Exception e) {
281 LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
282 throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
287 * Get the Apex engine worker stats.
289 public List<AxEngineModel> getEngineStats() {
290 List<AxEngineModel> engineStats = null;
291 if (apexEngineService != null) {
292 engineStats = apexEngineService.getEngineStats();
298 * Terminate the Apex engine.
300 * @throws ApexException on termination errors
302 public void terminate() throws ApexException {
303 // Shut down all marshalers and unmarshalers
304 shutdownMarshallerAndUnmarshaller();
306 // Check if the engine service handler has been shut down already
307 if (engineServiceHandler != null) {
308 engineServiceHandler.terminate();
309 engineServiceHandler = null;
312 // Clear the services
313 ModelService.clear();
314 ParameterService.clear();
318 * Shuts down all marshallers and unmarshallers.
320 private void shutdownMarshallerAndUnmarshaller() {
321 marshallerMap.values().forEach(ApexEventMarshaller::stop);
322 marshallerMap.clear();
323 unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop);
324 unmarshallerMap.clear();