Changes for checkstyle 8.32
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / main / ApexActivator.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.main;
23
24 import java.io.IOException;
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.stream.Stream;
30 import lombok.Getter;
31 import lombok.Setter;
32 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
33 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
34 import org.onap.policy.apex.model.basicmodel.service.ModelService;
35 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
36 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
37 import org.onap.policy.apex.model.policymodel.handling.PolicyModelMerger;
38 import org.onap.policy.apex.service.engine.engdep.EngDepMessagingService;
39 import org.onap.policy.apex.service.engine.event.ApexEventException;
40 import org.onap.policy.apex.service.engine.runtime.EngineService;
41 import org.onap.policy.apex.service.engine.runtime.impl.EngineServiceImpl;
42 import org.onap.policy.apex.service.parameters.ApexParameters;
43 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
44 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
45 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
46 import org.onap.policy.common.parameters.ParameterService;
47 import org.onap.policy.common.utils.resources.TextFileUtils;
48 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
49 import org.slf4j.ext.XLogger;
50 import org.slf4j.ext.XLoggerFactory;
51
52 /**
53  * This class wraps an Apex engine so that it can be activated as a complete service together with all its context,
54  * executor, and event plugins.
55  *
56  * @author Liam Fallon (liam.fallon@ericsson.com)
57  */
58 public class ApexActivator {
59     private static final String APEX_ENGINE_FAILED_MSG = "Apex engine failed to start as a service";
60
61     // The logger for this class
62     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexActivator.class);
63
64     // The parameters of the Apex activator when running with multiple policies
65     @Getter
66     @Setter
67     private Map<ToscaPolicyIdentifier, ApexParameters> apexParametersMap;
68
69     @Getter
70     Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap;
71
72     // Event unmarshalers are used to receive events asynchronously into Apex
73     private final Map<String, ApexEventUnmarshaller> unmarshallerMap = new LinkedHashMap<>();
74
75     // Event marshalers are used to send events asynchronously from Apex
76     private final Map<String, ApexEventMarshaller> marshallerMap = new LinkedHashMap<>();
77
78     // The engine service handler holds the references to the engine and its EngDep deployment
79     // interface. It also acts as a receiver for asynchronous
80     // and synchronous events from the engine.
81     private ApexEngineServiceHandler engineServiceHandler = null;
82
83     // The engine service
84     private EngineService apexEngineService;
85
86     /**
87      * Instantiate the activator for the Apex engine as a complete service.
88      *
89      * @param parametersMap the apex parameters map for the Apex service
90      */
91     public ApexActivator(Map<ToscaPolicyIdentifier, ApexParameters> parametersMap) {
92         apexParametersMap = parametersMap;
93     }
94
95     /**
96      * Initialize the Apex engine as a complete service.
97      *
98      * @throws ApexActivatorException on errors in initializing the engine
99      */
100     public void initialize() throws ApexActivatorException {
101         LOGGER.debug("Apex engine starting as a service . . .");
102
103         try {
104             ApexParameters apexParameters = apexParametersMap.values().iterator().next();
105             // totalInstanceCount is the sum of instance counts required as per each policy
106             int totalInstanceCount = apexParametersMap.values().stream()
107                 .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
108             apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
109             instantiateEngine(apexParameters);
110             setUpModelMarhsallerAndUnmarshaller(apexParameters);
111         } catch (final Exception e) {
112             LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
113             throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
114         }
115
116         LOGGER.debug("Apex engine started as a service");
117     }
118
119     private void setUpModelMarhsallerAndUnmarshaller(ApexParameters apexParameters) throws IOException, ApexException {
120         policyModelsMap = new LinkedHashMap<>();
121         Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
122         Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
123
124         for (Entry<ToscaPolicyIdentifier, ApexParameters> apexParamsEntry : apexParametersMap.entrySet()) {
125             ApexParameters apexParams = apexParamsEntry.getValue();
126             boolean duplicateInputParameterExist =
127                 apexParams.getEventInputParameters().keySet().stream().anyMatch(inputParametersMap::containsKey);
128             boolean duplicateOutputParameterExist =
129                 apexParams.getEventOutputParameters().keySet().stream().anyMatch(outputParametersMap::containsKey);
130             if (duplicateInputParameterExist || duplicateOutputParameterExist) {
131                 LOGGER.error("I/O Parameters for {}:{} has duplicates. So this policy is not executed.",
132                     apexParamsEntry.getKey().getName(), apexParamsEntry.getKey().getVersion());
133                 apexParametersMap.remove(apexParamsEntry.getKey());
134                 continue;
135             }
136             inputParametersMap.putAll(apexParams.getEventInputParameters());
137             outputParametersMap.putAll(apexParams.getEventOutputParameters());
138             // Check if a policy model file has been specified
139             if (apexParams.getEngineServiceParameters().getPolicyModelFileName() != null) {
140                 LOGGER.debug("deploying policy model in \"{}\" to the apex engines . . .",
141                     apexParams.getEngineServiceParameters().getPolicyModelFileName());
142
143                 final String policyModelString =
144                     TextFileUtils.getTextFileAsString(apexParams.getEngineServiceParameters().getPolicyModelFileName());
145                 AxPolicyModel policyModel = EngineServiceImpl
146                     .createModel(apexParams.getEngineServiceParameters().getEngineKey(), policyModelString);
147                 policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
148             }
149         }
150         AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
151
152         // Set the policy model in the engine
153         apexEngineService.updateModel(apexParameters.getEngineServiceParameters().getEngineKey(), finalPolicyModel,
154             true);
155
156         setUpMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
157             outputParametersMap);
158
159         // Wire up pairings between marhsallers and unmarshallers
160         setUpMarshalerPairings(inputParametersMap);
161
162         // Start event processing
163         startUnmarshallers(inputParametersMap);
164     }
165
166     private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
167         Map.Entry<ToscaPolicyIdentifier, AxPolicyModel> firstEntry = policyModelsMap.entrySet().iterator().next();
168         ToscaPolicyIdentifier tempId = new ToscaPolicyIdentifier(firstEntry.getKey());
169         AxPolicyModel tempModel = new AxPolicyModel(firstEntry.getValue());
170         Stream<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelStream =
171             policyModelsMap.entrySet().stream().skip(1);
172         Entry<ToscaPolicyIdentifier, AxPolicyModel> finalPolicyModelEntry =
173             policyModelStream.reduce(firstEntry, ((entry1, entry2) -> {
174                 try {
175                     entry1.setValue(
176                         PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
177                 } catch (ApexModelException exc) {
178                     LOGGER.error("Policy model for {} : {} is having duplicates. So this policy is not executed.",
179                         entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
180                     apexParametersMap.remove(entry2.getKey());
181                     policyModelsMap.remove(entry2.getKey());
182                 }
183                 return entry1;
184             }));
185         AxPolicyModel finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.getValue());
186         policyModelsMap.put(tempId, tempModel); // put back the original first entry into the policyModelsMap
187         return finalPolicyModel;
188     }
189
190     private void setUpMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
191         Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap)
192         throws ApexEventException {
193         // Producer parameters specify what event marshalers to handle events leaving Apex are
194         // set up and how they are set up
195         for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) {
196             final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(),
197                 engineServiceParameters, outputParameters.getValue());
198             marshaller.init();
199             apexEngineService.registerActionListener(outputParameters.getKey(), marshaller);
200             marshallerMap.put(outputParameters.getKey(), marshaller);
201         }
202
203         // Consumer parameters specify what event unmarshalers to handle events coming into Apex
204         // are set up and how they are set up
205         for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
206             final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(),
207                 engineServiceParameters, inputParameters.getValue());
208             unmarshallerMap.put(inputParameters.getKey(), unmarshaller);
209             unmarshaller.init(engineServiceHandler);
210         }
211     }
212
213     private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
214         if (null != apexEngineService
215             && apexEngineService.getKey().equals(apexParameters.getEngineServiceParameters().getEngineKey())) {
216             throw new ApexException("Apex Engine already initialized.");
217         }
218         // Create engine with specified thread count
219         LOGGER.debug("starting apex engine service . . .");
220         apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters());
221
222         // Instantiate and start the messaging service for Deployment
223         LOGGER.debug("starting apex deployment service . . .");
224         final EngDepMessagingService engDepService = new EngDepMessagingService(apexEngineService,
225             apexParameters.getEngineServiceParameters().getDeploymentPort());
226         engDepService.start();
227
228         // Create the engine holder to hold the engine's references and act as an event receiver
229         engineServiceHandler = new ApexEngineServiceHandler(apexEngineService, engDepService);
230     }
231
232     /**
233      * Set up unmarshaler/marshaler pairing for synchronized event handling. We only need to traverse the unmarshalers
234      * because the unmarshalers and marshalers are paired one to one uniquely so if we find a synchronized unmarshaler
235      * we'll also find its paired marshaler
236      * 
237      * @param inputParametersMap the apex parameters
238      */
239     private void setUpMarshalerPairings(Map<String, EventHandlerParameters> inputParametersMap) {
240         for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
241             final ApexEventUnmarshaller unmarshaller = unmarshallerMap.get(inputParameters.getKey());
242
243             // Pair up peered unmarshalers and marshalers
244             for (final EventHandlerPeeredMode peeredMode : EventHandlerPeeredMode.values()) {
245                 // Check if the unmarshaler is synchronized with a marshaler
246                 if (inputParameters.getValue().isPeeredMode(peeredMode)) {
247                     // Find the unmarshaler and marshaler
248                     final ApexEventMarshaller peeredMarshaler =
249                         marshallerMap.get(inputParameters.getValue().getPeer(peeredMode));
250
251                     // Connect the unmarshaler and marshaler
252                     unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
253                 }
254             }
255         }
256     }
257
258     /**
259      * Start up event processing, this happens once all marshaller to unmarshaller wiring has been done.
260      *
261      * @param inputParametersMap the apex parameters
262      */
263     private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) {
264         for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
265             unmarshallerMap.get(inputParameters.getKey()).start();
266         }
267     }
268
269     /**
270      * Updates the APEX Engine with the model created from new Policies.
271      *
272      * @param apexParamsMap the apex parameters map for the Apex service
273      * @throws ApexException on errors
274      */
275     public void updateModel(Map<ToscaPolicyIdentifier, ApexParameters> apexParamsMap) throws ApexException {
276         try {
277             shutdownMarshallerAndUnmarshaller();
278             ApexParameters apexParameters = apexParamsMap.values().iterator().next();
279             setUpModelMarhsallerAndUnmarshaller(apexParameters);
280         } catch (final Exception e) {
281             LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
282             throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
283         }
284     }
285
286     /**
287      * Get the Apex engine worker stats.
288      */
289     public List<AxEngineModel> getEngineStats() {
290         List<AxEngineModel> engineStats = null;
291         if (apexEngineService != null) {
292             engineStats = apexEngineService.getEngineStats();
293         }
294         return engineStats;
295     }
296
297     /**
298      * Terminate the Apex engine.
299      *
300      * @throws ApexException on termination errors
301      */
302     public void terminate() throws ApexException {
303         // Shut down all marshalers and unmarshalers
304         shutdownMarshallerAndUnmarshaller();
305
306         // Check if the engine service handler has been shut down already
307         if (engineServiceHandler != null) {
308             engineServiceHandler.terminate();
309             engineServiceHandler = null;
310         }
311
312         // Clear the services
313         ModelService.clear();
314         ParameterService.clear();
315     }
316
317     /**
318      * Shuts down all marshallers and unmarshallers.
319      */
320     private void shutdownMarshallerAndUnmarshaller() {
321         marshallerMap.values().forEach(ApexEventMarshaller::stop);
322         marshallerMap.clear();
323         unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop);
324         unmarshallerMap.clear();
325     }
326 }