APEX standalone support for ToscaPolicy format
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / main / ApexActivator.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  *  Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.service.engine.main;
24
25 import java.util.AbstractMap;
26 import java.util.ArrayList;
27 import java.util.LinkedHashMap;
28 import java.util.LinkedHashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.Optional;
33 import java.util.Set;
34 import java.util.stream.Collectors;
35 import lombok.Getter;
36 import lombok.Setter;
37 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
38 import org.onap.policy.apex.model.basicmodel.concepts.ApexRuntimeException;
39 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
40 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
41 import org.onap.policy.apex.model.basicmodel.service.ModelService;
42 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
43 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
44 import org.onap.policy.apex.model.policymodel.handling.PolicyModelMerger;
45 import org.onap.policy.apex.service.engine.event.ApexEventException;
46 import org.onap.policy.apex.service.engine.runtime.EngineService;
47 import org.onap.policy.apex.service.engine.runtime.impl.EngineServiceImpl;
48 import org.onap.policy.apex.service.parameters.ApexParameters;
49 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
50 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
51 import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
52 import org.onap.policy.common.parameters.ParameterService;
53 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
54 import org.slf4j.ext.XLogger;
55 import org.slf4j.ext.XLoggerFactory;
56
57 /**
58  * This class wraps an Apex engine so that it can be activated as a complete
59  * service together with all its context, executor, and event plugins.
60  *
61  * @author Liam Fallon (liam.fallon@ericsson.com)
62  */
63 public class ApexActivator {
64     private static final String APEX_ENGINE_FAILED_MSG = "Apex engine failed to start as a service";
65
66     // The logger for this class
67     private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexActivator.class);
68
69     // The parameters of the Apex activator when running with multiple policies
70     @Getter
71     @Setter
72     private Map<ToscaPolicyIdentifier, ApexParameters> apexParametersMap;
73
74     @Getter
75     Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap;
76
77     // Event unmarshalers are used to receive events asynchronously into Apex
78     private final Map<String, ApexEventUnmarshaller> unmarshallerMap = new LinkedHashMap<>();
79
80     // Event marshalers are used to send events asynchronously from Apex
81     private final Map<String, ApexEventMarshaller> marshallerMap = new LinkedHashMap<>();
82
83     // The engine service handler holds the references to the engine and its EngDep
84     // deployment
85     // interface. It also acts as a receiver for asynchronous
86     // and synchronous events from the engine.
87     private ApexEngineServiceHandler engineServiceHandler = null;
88
89     // The engine service
90     private EngineService apexEngineService;
91     private AxArtifactKey engineKey;
92
93     /**
94      * Instantiate the activator for the Apex engine as a complete service.
95      *
96      * @param parametersMap the apex parameters map for the Apex service
97      */
98     public ApexActivator(Map<ToscaPolicyIdentifier, ApexParameters> parametersMap) {
99         apexParametersMap = parametersMap;
100     }
101
102     /**
103      * Initialize the Apex engine as a complete service.
104      *
105      * @throws ApexActivatorException on errors in initializing the engine
106      */
107     public void initialize() throws ApexActivatorException {
108         LOGGER.debug("Apex engine starting as a service . . .");
109
110         try {
111             ApexParameters apexParameters = apexParametersMap.values().iterator().next();
112             // totalInstanceCount is the sum of instance counts required as per each policy
113             int totalInstanceCount = apexParametersMap.values().stream()
114                 .mapToInt(p -> p.getEngineServiceParameters().getInstanceCount()).sum();
115             apexParameters.getEngineServiceParameters().setInstanceCount(totalInstanceCount);
116             engineKey = apexParameters.getEngineServiceParameters().getEngineKey();
117             instantiateEngine(apexParameters);
118             setUpModelMarshallerAndUnmarshaller(apexParameters);
119         } catch (final Exception e) {
120             LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
121             throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
122         }
123
124         LOGGER.debug("Apex engine started as a service");
125     }
126
127     private void setUpModelMarshallerAndUnmarshaller(ApexParameters apexParameters) throws ApexException {
128         policyModelsMap = new LinkedHashMap<>();
129         Map<String, EventHandlerParameters> inputParametersMap = new LinkedHashMap<>();
130         Map<String, EventHandlerParameters> outputParametersMap = new LinkedHashMap<>();
131         Set<Entry<ToscaPolicyIdentifier, ApexParameters>> apexParamsEntrySet = new LinkedHashSet<>(
132             apexParametersMap.entrySet());
133         apexParamsEntrySet.stream().forEach(apexParamsEntry -> {
134             ApexParameters apexParams = apexParamsEntry.getValue();
135             List<String> duplicateInputParameters = new ArrayList<>(apexParams.getEventInputParameters().keySet());
136             duplicateInputParameters.retainAll(inputParametersMap.keySet());
137             List<String> duplicateOutputParameters = new ArrayList<>(apexParams.getEventOutputParameters().keySet());
138             duplicateOutputParameters.retainAll(outputParametersMap.keySet());
139
140             if (!(duplicateInputParameters.isEmpty() && duplicateOutputParameters.isEmpty())) {
141                 LOGGER.error("I/O Parameters {}/{} for {}:{} are duplicates. So this policy is not executed.",
142                     duplicateInputParameters, duplicateOutputParameters, apexParamsEntry.getKey().getName(),
143                     apexParamsEntry.getKey().getVersion());
144                 apexParametersMap.remove(apexParamsEntry.getKey());
145                 return;
146             }
147
148             inputParametersMap.putAll(apexParams.getEventInputParameters());
149             outputParametersMap.putAll(apexParams.getEventOutputParameters());
150             // Check if a policy model file has been specified
151             if (apexParams.getEngineServiceParameters().getPolicyModel() != null) {
152                 LOGGER.debug("deploying policy model to the apex engines . . .");
153                 try {
154                     final String policyModelString = apexParams.getEngineServiceParameters().getPolicyModel();
155                     AxPolicyModel policyModel = EngineServiceImpl.createModel(engineKey, policyModelString);
156                     policyModelsMap.put(apexParamsEntry.getKey(), policyModel);
157                 } catch (ApexException e) {
158                     throw new ApexRuntimeException("Failed to create the apex model.", e);
159                 }
160             }
161         });
162         AxPolicyModel finalPolicyModel = aggregatePolicyModels(policyModelsMap);
163
164         // Set the policy model in the engine
165         apexEngineService.updateModel(engineKey, finalPolicyModel, true);
166
167         handleExistingMarshallerAndUnmarshaller(inputParametersMap, outputParametersMap);
168         setUpNewMarshallerAndUnmarshaller(apexParameters.getEngineServiceParameters(), inputParametersMap,
169             outputParametersMap);
170
171         // Wire up pairings between marhsallers and unmarshallers
172         setUpMarshalerPairings(inputParametersMap);
173
174         // Start event processing
175         startUnmarshallers(inputParametersMap);
176     }
177
178     private AxPolicyModel aggregatePolicyModels(Map<ToscaPolicyIdentifier, AxPolicyModel> policyModelsMap) {
179         // Doing a deep copy so that original values in policyModelsMap is retained
180         // after reduction operation
181         Set<Entry<ToscaPolicyIdentifier, AxPolicyModel>> policyModelsEntries = policyModelsMap.entrySet().stream()
182             .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue())).collect(Collectors.toSet());
183         Optional<Entry<ToscaPolicyIdentifier, AxPolicyModel>> finalPolicyModelEntry = policyModelsEntries.stream()
184             .reduce((entry1, entry2) -> {
185                 try {
186                     entry1.setValue(
187                         PolicyModelMerger.getMergedPolicyModel(entry1.getValue(), entry2.getValue(), true, true));
188                 } catch (ApexModelException exc) {
189                     LOGGER.error("Policy model for {} : {} has duplicates. So this policy is not executed.",
190                         entry2.getKey().getName(), entry2.getKey().getVersion(), exc);
191                     apexParametersMap.remove(entry2.getKey());
192                     policyModelsMap.remove(entry2.getKey());
193                 }
194                 return entry1;
195             });
196         AxPolicyModel finalPolicyModel = null;
197         if (finalPolicyModelEntry.isPresent()) {
198             finalPolicyModel = new AxPolicyModel(finalPolicyModelEntry.get().getValue());
199         }
200         return finalPolicyModel;
201     }
202
203     private void setUpNewMarshallerAndUnmarshaller(EngineServiceParameters engineServiceParameters,
204         Map<String, EventHandlerParameters> inputParametersMap, Map<String, EventHandlerParameters> outputParametersMap)
205         throws ApexEventException {
206
207         // Producer parameters specify what event marshalers to handle events leaving
208         // Apex are
209         // set up and how they are set up
210         for (Entry<String, EventHandlerParameters> outputParameters : outputParametersMap.entrySet()) {
211             final ApexEventMarshaller marshaller = new ApexEventMarshaller(outputParameters.getKey(),
212                 engineServiceParameters, outputParameters.getValue());
213             marshaller.init();
214             apexEngineService.registerActionListener(outputParameters.getKey(), marshaller);
215             marshallerMap.put(outputParameters.getKey(), marshaller);
216         }
217
218         // Consumer parameters specify what event unmarshalers to handle events coming
219         // into Apex
220         // are set up and how they are set up
221         for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
222             final ApexEventUnmarshaller unmarshaller = new ApexEventUnmarshaller(inputParameters.getKey(),
223                 engineServiceParameters, inputParameters.getValue());
224             unmarshallerMap.put(inputParameters.getKey(), unmarshaller);
225             unmarshaller.init(engineServiceHandler);
226         }
227     }
228
229     private void handleExistingMarshallerAndUnmarshaller(Map<String, EventHandlerParameters> inputParametersMap,
230         Map<String, EventHandlerParameters> outputParametersMap) {
231         // stop and remove any marshaller/unmarshaller that is part of a policy that is
232         // undeployed
233         marshallerMap.entrySet().stream()
234             .filter(marshallerEntry -> !outputParametersMap.containsKey(marshallerEntry.getKey()))
235             .forEach(marshallerEntry -> marshallerEntry.getValue().stop());
236         marshallerMap.keySet().removeIf(marshallerKey -> !outputParametersMap.containsKey(marshallerKey));
237         unmarshallerMap.entrySet().stream()
238             .filter(unmarshallerEntry -> !inputParametersMap.containsKey(unmarshallerEntry.getKey()))
239             .forEach(unmarshallerEntry -> unmarshallerEntry.getValue().stop());
240         unmarshallerMap.keySet().removeIf(unmarshallerKey -> !inputParametersMap.containsKey(unmarshallerKey));
241
242         // If a marshaller/unmarshaller is already initialized, they don't need to be
243         // reinitialized during model update.
244         outputParametersMap.keySet().removeIf(marshallerMap::containsKey);
245         inputParametersMap.keySet().removeIf(unmarshallerMap::containsKey);
246     }
247
248     private void instantiateEngine(ApexParameters apexParameters) throws ApexException {
249         if (null != apexEngineService && apexEngineService.getKey().equals(engineKey)) {
250             throw new ApexException("Apex Engine already initialized.");
251         }
252         // Create engine with specified thread count
253         LOGGER.debug("starting apex engine service . . .");
254         apexEngineService = EngineServiceImpl.create(apexParameters.getEngineServiceParameters());
255
256         // Create the engine holder to hold the engine's references and act as an event
257         // receiver
258         engineServiceHandler = new ApexEngineServiceHandler(apexEngineService);
259     }
260
261     /**
262      * Set up unmarshaler/marshaler pairing for synchronized event handling. We only
263      * need to traverse the unmarshalers because the unmarshalers and marshalers are
264      * paired one to one uniquely so if we find a synchronized unmarshaler we'll
265      * also find its paired marshaler
266      *
267      * @param inputParametersMap the apex parameters
268      */
269     private void setUpMarshalerPairings(Map<String, EventHandlerParameters> inputParametersMap) {
270         for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
271             final ApexEventUnmarshaller unmarshaller = unmarshallerMap.get(inputParameters.getKey());
272
273             // Pair up peered unmarshalers and marshalers
274             for (final EventHandlerPeeredMode peeredMode : EventHandlerPeeredMode.values()) {
275                 // Check if the unmarshaler is synchronized with a marshaler
276                 if (inputParameters.getValue().isPeeredMode(peeredMode)) {
277                     // Find the unmarshaler and marshaler
278                     final ApexEventMarshaller peeredMarshaler = marshallerMap
279                         .get(inputParameters.getValue().getPeer(peeredMode));
280
281                     // Connect the unmarshaler and marshaler
282                     unmarshaller.connectMarshaler(peeredMode, peeredMarshaler);
283                 }
284             }
285         }
286     }
287
288     /**
289      * Start up event processing, this happens once all marshaller to unmarshaller
290      * wiring has been done.
291      *
292      * @param inputParametersMap the apex parameters
293      */
294     private void startUnmarshallers(Map<String, EventHandlerParameters> inputParametersMap) {
295         for (final Entry<String, EventHandlerParameters> inputParameters : inputParametersMap.entrySet()) {
296             unmarshallerMap.get(inputParameters.getKey()).start();
297         }
298     }
299
300     /**
301      * Updates the APEX Engine with the model created from new Policies.
302      *
303      * @param apexParamsMap the apex parameters map for the Apex service
304      * @throws ApexException on errors
305      */
306     public void updateModel(Map<ToscaPolicyIdentifier, ApexParameters> apexParamsMap) throws ApexException {
307         try {
308             ApexParameters apexParameters = apexParamsMap.values().iterator().next();
309             setUpModelMarshallerAndUnmarshaller(apexParameters);
310         } catch (final Exception e) {
311             LOGGER.debug(APEX_ENGINE_FAILED_MSG, e);
312             throw new ApexActivatorException(APEX_ENGINE_FAILED_MSG, e);
313         }
314     }
315
316     /**
317      * Get the Apex engine worker stats.
318      */
319     public List<AxEngineModel> getEngineStats() {
320         List<AxEngineModel> engineStats = null;
321         if (apexEngineService != null) {
322             engineStats = apexEngineService.getEngineStats();
323         }
324         return engineStats;
325     }
326
327     /**
328      * Terminate the Apex engine.
329      *
330      * @throws ApexException on termination errors
331      */
332     public void terminate() throws ApexException {
333         // Shut down all marshalers and unmarshalers
334         shutdownMarshallerAndUnmarshaller();
335
336         // Check if the engine service handler has been shut down already
337         if (engineServiceHandler != null) {
338             engineServiceHandler.terminate();
339             engineServiceHandler = null;
340         }
341
342         // Clear the services
343         ModelService.clear();
344         ParameterService.clear();
345     }
346
347     /**
348      * Shuts down all marshallers and unmarshallers.
349      */
350     private void shutdownMarshallerAndUnmarshaller() {
351         marshallerMap.values().forEach(ApexEventMarshaller::stop);
352         marshallerMap.clear();
353         unmarshallerMap.values().forEach(ApexEventUnmarshaller::stop);
354         unmarshallerMap.clear();
355     }
356 }