fddbcb79f373db6829fce7d1288987d45a2c84ef
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / main / ApexActivator.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.main;
23
24 import java.io.IOException;
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.stream.Stream;
30
31 import lombok.Getter;
32 import lombok.Setter;
33
34 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
35 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
36 import org.onap.policy.apex.model.basicmodel.service.ModelService;
37 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
38 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
39 import org.onap.policy.apex.model.policymodel.handling.PolicyModelMerger;
40 import org.onap.policy.apex.service.engine.engdep.EngDepMessagingService;
41 import org.onap.policy.apex.service.engine.event.ApexEventException;
42 import org.onap.policy.apex.service.engine.runtime.EngineService;
43 import org.onap.policy.apex.service.engine.runtime.impl.EngineServiceImpl;
44 import org.onap.policy.apex.service.parameters.ApexParameters;
45 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
46 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
47 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
48 import org.onap.policy.common.parameters.ParameterService;
49 import org.onap.policy.common.utils.resources.TextFileUtils;
50 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
51 import org.slf4j.ext.XLogger;
52 import org.slf4j.ext.XLoggerFactory;
53
54 /**
55  * This class wraps an Apex engine so that it can be activated as a complete service together with all its context,
56  * executor, and event plugins.
57  *
58  * @author Liam Fallon (liam.fallon@ericsson.com)
59  */
60 public class ApexActivator {
61     private static final String APEX_ENGINE_FAILED_MSG = "Apex engine failed to start as a service";
62
63     // The logger for this class
64     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexActivator.class);
65
66     // The parameters of the Apex activator when running with multiple policies
67     @Getter
68     @Setter
69     private Map<ToscaPolicyIdentifier, ApexParameters> apexParametersMap;
70
71     @Getter
72     Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap;
73
74     // Event unmarshalers are used to receive events asynchronously into Apex
75     private final Map<String, ApexEventUnmarshaller> unmarshallerMap = new LinkedHashMap<>();
76
77     // Event marshalers are used to send events asynchronously from Apex
78     private final Map<String, ApexEventMarshaller> marshallerMap = new LinkedHashMap<>();
79
80     // The engine service handler holds the references to the engine and its EngDep deployment
81     // interface. It also acts as a receiver for asynchronous
82     // and synchronous events from the engine.
83     private ApexEngineServiceHandler engineServiceHandler = null;
84
85     // The engine service
86     private EngineService apexEngineService;
87
88     /**
89      * Instantiate the activator for the Apex engine as a complete service.
90      *
91      * @param parametersMap the apex parameters map for the Apex service
92      */
93     public ApexActivator(Map<ToscaPolicyIdentifier, ApexParameters> parametersMap) {
94         apexParametersMap = parametersMap;
95     }
96
97     /**
98      * Initialize the Apex engine as a complete service.
99      *
100      * @throws ApexActivatorException on errors in initializing the engine
101      */
102     public void initialize() throws ApexActivatorException {
103         LOGGER.debug("Apex engine starting as a service . . .");
104
105         try {
106             ApexParameters apexParameters = apexParametersMap.values().iterator().next();
107             // totalInstanceCount is the sum of instance counts required as per each policy
108             int totalInstanceCount = apexParametersMap.values().stream()
109                 .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
110             apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
111             instantiateEngine(apexParameters);
112             setUpModelMarhsallerAndUnmarshaller(apexParameters);
113         } catch (final Exception e) {
114             LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
115             throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
116         }
117
118         LOGGER.debug("Apex engine started as a service");
119     }
120
121     private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws IOException, ApexException {
122         policyModelsMap = new LinkedHashMap<>();
123         Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
124         Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
125
126         for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) {
127             ApexParameters apexParams = apexParamsEntry.getValue();
128             boolean duplicateInputParameterExist =
129                 apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
130             boolean duplicateOutputParameterExist =
131                 apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
132             if (duplicateInputParameterExist || duplicateOutputParameterExist) {
133                 LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.",
134                     apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
135                 apexParametersMap.remove(apexParamsEntry.getKey());
136                 continue;
137             }
138             inputParametersMap.putAll(apexParams.getEventInputParameters());
139             outputParametersMap.putAll(apexParams.getEventOutputParameters());
140             // Check if a policy model file has been specified
141             if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) {
142                 LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .",
143                     apexParams.getEngineServiceParameters().getPolicyModelFileName());
144
145                 final String policyModelString =
146                     TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
147                 AxPolicyModel policyModel = EngineServiceImpl
148                     .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
149                 policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
150             }
151         }
152         AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
153
154         // Set the policy model in the engine
155         apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel,
156             true);
157
158         setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
159             outputParametersMap);
160
161         // Wire up pairings between marhsallers and unmarshallers
162         setUpMarshalerPairings(inputParametersMap);
163
164         // Start event processing
165         startUnmarshallers(inputParametersMap);
166     }
167
168     private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
169         Map.Entry<ToscaPolicyIdentifier, AxPolicyModel> firstEntry = policyModelsMap.entrySet().iterator().next();
170         ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey());
171         AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue());
172         Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream =
173             policyModelsMap.entrySet().stream().skip(1);
174         Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry =
175             policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
176                 try {
177                     entry1.setValue(
178                         PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
179                 } catch (ApexModelException exc) {
180                     LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
181                         entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
182                     apexParametersMap.remove(entry2.getKey());
183                     policyModelsMap.remove(entry2.getKey());
184                 }
185                 return entry1;
186             }));
187         AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue());
188         policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap
189         return finalPolicyModel;
190     }
191
192     private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
193         Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap)
194         throws ApexEventException {
195         // Producer parameters specify what event marshalers to handle events leaving Apex are
196         // set up and how they are set up
197         for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) {
198             final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(),
199                 engineServiceParameters, outputParameters.getValue());
200             marshaller.init();
201             apexEngineService.registerActionListener(outputParameters.getKey(), marshaller);
202             marshallerMap.put(outputParameters.getKey(), marshaller);
203         }
204
205         // Consumer parameters specify what event unmarshalers to handle events coming into Apex
206         // are set up and how they are set up
207         for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
208             final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(),
209                 engineServiceParameters, inputParameters.getValue());
210             unmarshallerMap.put(inputParameters.getKey(), unmarshaller);
211             unmarshaller.init(engineServiceHandler);
212         }
213     }
214
215     private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
216         if (null != apexEngineService
217             && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) {
218             throw new ApexException("Apex Engine already initialized.");
219         }
220         // Create engine with specified thread count
221         LOGGER.debug("starting apex engine service . . .");
222         apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters());
223
224         // Instantiate and start the messaging service for Deployment
225         LOGGER.debug("starting apex deployment service . . .");
226         final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService,
227             apexParameters.getEngineServiceParameters().getDeploymentPort());
228         engDepService.start();
229
230         // Create the engine holder to hold the engine's references and act as an event receiver
231         engineServiceHandler = new ApexEngineServiceHandler(apexEngineService, engDepService);
232     }
233
234     /**
235      * Set up unmarshaler/marshaler pairing for synchronized event handling. We only need to traverse the unmarshalers
236      * because the unmarshalers and marshalers are paired one to one uniquely so if we find a synchronized unmarshaler
237      * we'll also find its paired marshaler
238      * 
239      * @param inputParametersMap the apex parameters
240      */
241     private void setUpMarshalerPairings(Map<String, EventHandlerParameters> inputParametersMap) {
242         for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
243             final ApexEventUnmarshaller unmarshaller = unmarshallerMap.get(inputParameters.getKey());
244
245             // Pair up peered unmarshalers and marshalers
246             for (final EventHandlerPeeredMode peeredMode : EventHandlerPeeredMode.values()) {
247                 // Check if the unmarshaler is synchronized with a marshaler
248                 if (inputParameters.getValue().isPeeredMode(peeredMode)) {
249                     // Find the unmarshaler and marshaler
250                     final ApexEventMarshaller peeredMarshaler =
251                         marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
252
253                     // Connect the unmarshaler and marshaler
254                     unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
255                 }
256             }
257         }
258     }
259
260     /**
261      * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done.
262      *
263      * @param inputParametersMap the apex parameters
264      */
265     private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) {
266         for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
267             unmarshallerMap.get(inputParameters.getKey()).start();
268         }
269     }
270
271     /**
272      * Updates the APEX Engine with the model created from new Policies.
273      *
274      * @param apexParamsMap the apex parameters map for the Apex service
275      * @throws ApexException on errors
276      */
277     public void updateModel(Map<ToscaPolicyIdentifier, ApexParameters> apexParamsMap) throws ApexException {
278         try {
279             shutdownMarshallerAndUnmarshaller();
280             ApexParameters apexParameters = apexParamsMap.values().iterator().next();
281             setUpModelMarhsallerAndUnmarshaller(apexParameters);
282         } catch (final Exception e) {
283             LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
284             throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
285         }
286     }
287
288     /**
289      * Get the Apex engine worker stats.
290      */
291     public List<AxEngineModel> getEngineStats() {
292         List<AxEngineModel> engineStats = null;
293         if (apexEngineService != null) {
294             engineStats = apexEngineService.getEngineStats();
295         }
296         return engineStats;
297     }
298
299     /**
300      * Terminate the Apex engine.
301      *
302      * @throws ApexException on termination errors
303      */
304     public void terminate() throws ApexException {
305         // Shut down all marshalers and unmarshalers
306         shutdownMarshallerAndUnmarshaller();
307
308         // Check if the engine service handler has been shut down already
309         if (engineServiceHandler != null) {
310             engineServiceHandler.terminate();
311             engineServiceHandler = null;
312         }
313
314         // Clear the services
315         ModelService.clear();
316         ParameterService.clear();
317     }
318
319     /**
320      * Shuts down all marshallers and unmarshallers.
321      */
322     private void shutdownMarshallerAndUnmarshaller() {
323         marshallerMap.values().forEach(ApexEventMarshaller::stop);
324         marshallerMap.clear();
325         unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop);
326         unmarshallerMap.clear();
327     }
328 }