2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.service.engine.main;
24 import java.io.IOException;
25 import java.util.LinkedHashMap;
26 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.stream.Stream;
34 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
35 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
36 import org.onap.policy.apex.model.basicmodel.service.ModelService;
37 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
38 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
39 import org.onap.policy.apex.model.policymodel.handling.PolicyModelMerger;
40 import org.onap.policy.apex.service.engine.engdep.EngDepMessagingService;
41 import org.onap.policy.apex.service.engine.event.ApexEventException;
42 import org.onap.policy.apex.service.engine.runtime.EngineService;
43 import org.onap.policy.apex.service.engine.runtime.impl.EngineServiceImpl;
44 import org.onap.policy.apex.service.parameters.ApexParameters;
45 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
46 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
47 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
48 import org.onap.policy.common.parameters.ParameterService;
49 import org.onap.policy.common.utils.resources.TextFileUtils;
50 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
51 import org.slf4j.ext.XLogger;
52 import org.slf4j.ext.XLoggerFactory;
55 * This class wraps an Apex engine so that it can be activated as a complete service together with all its context,
56 * executor, and event plugins.
58 * @author Liam Fallon (liam.fallon@ericsson.com)
60 public class ApexActivator {
61 private static final String APEX_ENGINE_FAILED_MSG = "Apex engine failed to start as a service";
63 // The logger for this class
64 private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexActivator.class);
66 // The parameters of the Apex activator when running with multiple policies
69 private Map<ToscaPolicyIdentifier, ApexParameters> apexParametersMap;
72 Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap;
74 // Event unmarshalers are used to receive events asynchronously into Apex
75 private final Map<String, ApexEventUnmarshaller> unmarshallerMap = new LinkedHashMap<>();
77 // Event marshalers are used to send events asynchronously from Apex
78 private final Map<String, ApexEventMarshaller> marshallerMap = new LinkedHashMap<>();
80 // The engine service handler holds the references to the engine and its EngDep deployment
81 // interface. It also acts as a receiver for asynchronous
82 // and synchronous events from the engine.
83 private ApexEngineServiceHandler engineServiceHandler = null;
86 private EngineService apexEngineService;
89 * Instantiate the activator for the Apex engine as a complete service.
91 * @param parametersMap the apex parameters map for the Apex service
93 public ApexActivator(Map<ToscaPolicyIdentifier, ApexParameters> parametersMap) {
94 apexParametersMap = parametersMap;
98 * Initialize the Apex engine as a complete service.
100 * @throws ApexActivatorException on errors in initializing the engine
102 public void initialize() throws ApexActivatorException {
103 LOGGER.debug("Apex engine starting as a service . . .");
106 ApexParameters apexParameters = apexParametersMap.values().iterator().next();
107 // totalInstanceCount is the sum of instance counts required as per each policy
108 int totalInstanceCount = apexParametersMap.values().stream()
109 .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
110 apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
111 instantiateEngine(apexParameters);
112 setUpModelMarhsallerAndUnmarshaller(apexParameters);
113 } catch (final Exception e) {
114 LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
115 throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
118 LOGGER.debug("Apex engine started as a service");
121 private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws IOException, ApexException {
122 policyModelsMap = new LinkedHashMap<>();
123 Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
124 Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
126 for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) {
127 ApexParameters apexParams = apexParamsEntry.getValue();
128 boolean duplicateInputParameterExist =
129 apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
130 boolean duplicateOutputParameterExist =
131 apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
132 if (duplicateInputParameterExist || duplicateOutputParameterExist) {
133 LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.",
134 apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
135 apexParametersMap.remove(apexParamsEntry.getKey());
138 inputParametersMap.putAll(apexParams.getEventInputParameters());
139 outputParametersMap.putAll(apexParams.getEventOutputParameters());
140 // Check if a policy model file has been specified
141 if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) {
142 LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .",
143 apexParams.getEngineServiceParameters().getPolicyModelFileName());
145 final String policyModelString =
146 TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
147 AxPolicyModel policyModel = EngineServiceImpl
148 .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
149 policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
152 AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
154 // Set the policy model in the engine
155 apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel,
158 setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
159 outputParametersMap);
161 // Wire up pairings between marhsallers and unmarshallers
162 setUpMarshalerPairings(inputParametersMap);
164 // Start event processing
165 startUnmarshallers(inputParametersMap);
168 private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
169 Map.Entry<ToscaPolicyIdentifier, AxPolicyModel> firstEntry = policyModelsMap.entrySet().iterator().next();
170 ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey());
171 AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue());
172 Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream =
173 policyModelsMap.entrySet().stream().skip(1);
174 Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry =
175 policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
178 PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
179 } catch (ApexModelException exc) {
180 LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
181 entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
182 apexParametersMap.remove(entry2.getKey());
183 policyModelsMap.remove(entry2.getKey());
187 AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue());
188 policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap
189 return finalPolicyModel;
192 private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
193 Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap)
194 throws ApexEventException {
195 // Producer parameters specify what event marshalers to handle events leaving Apex are
196 // set up and how they are set up
197 for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) {
198 final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(),
199 engineServiceParameters, outputParameters.getValue());
201 apexEngineService.registerActionListener(outputParameters.getKey(), marshaller);
202 marshallerMap.put(outputParameters.getKey(), marshaller);
205 // Consumer parameters specify what event unmarshalers to handle events coming into Apex
206 // are set up and how they are set up
207 for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
208 final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(),
209 engineServiceParameters, inputParameters.getValue());
210 unmarshallerMap.put(inputParameters.getKey(), unmarshaller);
211 unmarshaller.init(engineServiceHandler);
215 private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
216 if (null != apexEngineService
217 && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) {
218 throw new ApexException("Apex Engine already initialized.");
220 // Create engine with specified thread count
221 LOGGER.debug("starting apex engine service . . .");
222 apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters());
224 // Instantiate and start the messaging service for Deployment
225 LOGGER.debug("starting apex deployment service . . .");
226 final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService,
227 apexParameters.getEngineServiceParameters().getDeploymentPort());
228 engDepService.start();
230 // Create the engine holder to hold the engine's references and act as an event receiver
231 engineServiceHandler = new ApexEngineServiceHandler(apexEngineService, engDepService);
235 * Set up unmarshaler/marshaler pairing for synchronized event handling. We only need to traverse the unmarshalers
236 * because the unmarshalers and marshalers are paired one to one uniquely so if we find a synchronized unmarshaler
237 * we'll also find its paired marshaler
239 * @param inputParametersMap the apex parameters
241 private void setUpMarshalerPairings(Map<String, EventHandlerParameters> inputParametersMap) {
242 for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
243 final ApexEventUnmarshaller unmarshaller = unmarshallerMap.get(inputParameters.getKey());
245 // Pair up peered unmarshalers and marshalers
246 for (final EventHandlerPeeredMode peeredMode : EventHandlerPeeredMode.values()) {
247 // Check if the unmarshaler is synchronized with a marshaler
248 if (inputParameters.getValue().isPeeredMode(peeredMode)) {
249 // Find the unmarshaler and marshaler
250 final ApexEventMarshaller peeredMarshaler =
251 marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
253 // Connect the unmarshaler and marshaler
254 unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
261 * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done.
263 * @param inputParametersMap the apex parameters
265 private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) {
266 for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
267 unmarshallerMap.get(inputParameters.getKey()).start();
272 * Updates the APEX Engine with the model created from new Policies.
274 * @param apexParamsMap the apex parameters map for the Apex service
275 * @throws ApexException on errors
277 public void updateModel(Map<ToscaPolicyIdentifier, ApexParameters> apexParamsMap) throws ApexException {
279 shutdownMarshallerAndUnmarshaller();
280 ApexParameters apexParameters = apexParamsMap.values().iterator().next();
281 setUpModelMarhsallerAndUnmarshaller(apexParameters);
282 } catch (final Exception e) {
283 LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
284 throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
289 * Get the Apex engine worker stats.
291 public List<AxEngineModel> getEngineStats() {
292 List<AxEngineModel> engineStats = null;
293 if (apexEngineService != null) {
294 engineStats = apexEngineService.getEngineStats();
300 * Terminate the Apex engine.
302 * @throws ApexException on termination errors
304 public void terminate() throws ApexException {
305 // Shut down all marshalers and unmarshalers
306 shutdownMarshallerAndUnmarshaller();
308 // Check if the engine service handler has been shut down already
309 if (engineServiceHandler != null) {
310 engineServiceHandler.terminate();
311 engineServiceHandler = null;
314 // Clear the services
315 ModelService.clear();
316 ParameterService.clear();
320 * Shuts down all marshallers and unmarshallers.
322 private void shutdownMarshallerAndUnmarshaller() {
323 marshallerMap.values().forEach(ApexEventMarshaller::stop);
324 marshallerMap.clear();
325 unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop);
326 unmarshallerMap.clear();