2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.service.engine.runtime.impl;
24 import java.io.ByteArrayInputStream;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.LinkedHashMap;
29 import java.util.Map.Entry;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import org.onap.policy.apex.context.ContextException;
33 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
36 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
37 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
38 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
39 import org.onap.policy.apex.model.basicmodel.service.ModelService;
40 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
41 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
42 import org.onap.policy.apex.service.engine.event.ApexEvent;
43 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
44 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
45 import org.onap.policy.apex.service.engine.runtime.EngineService;
46 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
47 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
48 import org.onap.policy.common.parameters.GroupValidationResult;
49 import org.slf4j.ext.XLogger;
50 import org.slf4j.ext.XLoggerFactory;
53 * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each of which is running
54 * on an identical Apex model. This class handles the management of the engine worker instances, their threads, and
55 * event forwarding to and from the engine workers.
57 * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
58 * @author Liam Fallon (liam.fallon@ericsson.com)
59 * @author John Keeney (john.keeney@ericsson.com)
61 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
62 // Logging static variables
63 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
64 private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
66 // Recurring string constants
67 private static final String ENGINE_KEY_PREAMBLE = "engine with key ";
68 private static final String NOT_FOUND_SUFFIX = " not found in engine service";
69 private static final String ENGINE_KEY_NOT_SPECIFIED = "engine key must be specified and may not be null";
71 // Constants for timing
72 private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
73 private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
74 private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
76 // The ID of this engine
77 private AxArtifactKey engineServiceKey = null;
79 // The Apex engine workers this engine service is handling
80 private final Map<AxArtifactKey, EngineWorker> engineWorkerMap = Collections
81 .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineWorker>());
83 // Event queue for events being sent into the Apex engines, it used by all engines within a
85 private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
87 // Thread factory for thread management
88 private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
90 // Periodic event generator and its period in milliseconds
91 private ApexPeriodicEventGenerator periodicEventGenerator = null;
92 private long periodicEventPeriod;
95 * This constructor instantiates engine workers and adds them to the set of engine workers to be managed. The
96 * constructor is private to prevent subclassing.
98 * @param engineServiceKey the engine service key
99 * @param threadCount the thread count, the number of engine workers to start
100 * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
101 * @throws ApexException on worker instantiation errors
103 private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int threadCount,
104 final long periodicEventPeriod) {
105 LOGGER.entry(engineServiceKey, threadCount);
107 this.engineServiceKey = engineServiceKey;
108 this.periodicEventPeriod = periodicEventPeriod;
110 // Start engine workers
111 for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
112 final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
113 engineServiceKey.getVersion());
114 engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
115 LOGGER.info("Created apex engine {} .", engineWorkerKey.getId());
118 LOGGER.info("APEX service created.");
123 * Create an Apex Engine Service instance. This method does not load the policy so
124 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
125 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
126 * start the Engine Service so {@link #start(AxArtifactKey)} or {@link #startAll()} must be used.
128 * @param config the configuration for this Apex Engine Service.
129 * @return the Engine Service instance
130 * @throws ApexException on worker instantiation errors
132 public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
133 if (config == null) {
134 LOGGER.warn("Engine service configuration parameters is null");
135 throw new ApexException("engine service configuration parameters are null");
138 final GroupValidationResult validation = config.validate();
139 if (!validation.isValid()) {
140 LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
141 throw new ApexException("Invalid engine service configuration parameters: " + validation);
144 final AxArtifactKey engineServiceKey = config.getEngineKey();
145 final int threadCount = config.getInstanceCount();
147 return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
154 public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
155 LOGGER.entry(apexEventListener);
157 if (listenerName == null) {
158 String message = "listener name must be specified and may not be null";
159 LOGGER.warn(message);
163 if (apexEventListener == null) {
164 String message = "apex event listener must be specified and may not be null";
165 LOGGER.warn(message);
169 // Register the Apex event listener on all engine workers, each worker will return Apex
170 // events to the listening application
171 for (final EngineService engineWorker : engineWorkerMap.values()) {
172 engineWorker.registerActionListener(listenerName, apexEventListener);
175 LOGGER.info("Added the action listener to the engine");
183 public void deregisterActionListener(final String listenerName) {
184 LOGGER.entry(listenerName);
186 // Register the Apex event listener on all engine workers, each worker will return Apex
187 // events to the listening application
188 for (final EngineService engineWorker : engineWorkerMap.values()) {
189 engineWorker.deregisterActionListener(listenerName);
192 LOGGER.info("Removed the action listener from the engine");
200 public EngineServiceEventInterface getEngineServiceEventInterface() {
208 public AxArtifactKey getKey() {
209 return engineServiceKey;
216 public Collection<AxArtifactKey> getEngineKeys() {
217 return engineWorkerMap.keySet();
224 public AxArtifactKey getApexModelKey() {
225 if (engineWorkerMap.size() == 0) {
229 return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
233 * Method to create model.
235 * @param incomingEngineServiceKey incoming engine service key
236 * @param apexModelString apex model string
237 * @return apexPolicyModel the policy model
238 * @throws ApexException apex exception
240 public static AxPolicyModel createModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString)
241 throws ApexException {
242 // Check if the engine service key specified is sane
243 if (incomingEngineServiceKey == null) {
244 String message = ENGINE_KEY_NOT_SPECIFIED;
245 LOGGER.warn(message);
246 throw new ApexException(message);
249 // Check if the Apex model specified is sane
250 if (apexModelString == null || apexModelString.trim().length() == 0) {
251 String emptyModelMessage = "model for updating engine service with key "
252 + incomingEngineServiceKey.getId() + " is empty";
253 LOGGER.warn(emptyModelMessage);
254 throw new ApexException(emptyModelMessage);
257 // Read the Apex model into memory using the Apex Model Reader
258 AxPolicyModel apexPolicyModel = null;
260 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
261 apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
262 } catch (final ApexModelException e) {
263 String message = "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId();
264 LOGGER.error(message, e);
265 throw new ApexException(message, e);
267 return apexPolicyModel;
274 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
275 final boolean forceFlag) throws ApexException {
276 AxPolicyModel apexPolicyModel = createModel(incomingEngineServiceKey, apexModelString);
279 updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
288 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
289 final boolean forceFlag) throws ApexException {
290 LOGGER.entry(incomingEngineServiceKey);
292 // Check if the engine service key specified is sane
293 if (incomingEngineServiceKey == null) {
294 String message = ENGINE_KEY_NOT_SPECIFIED;
295 LOGGER.warn(message);
296 throw new ApexException(message);
299 // Check if the Apex model specified is sane
300 if (apexModel == null) {
301 LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
303 throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
307 // Check if the key on the update request is correct
308 if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
309 LOGGER.warn("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
310 + engineServiceKey.getId() + " of this engine service");
311 throw new ApexException("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
312 + engineServiceKey.getId() + " of this engine service");
315 // Check model compatibility
316 if (ModelService.existsModel(AxPolicyModel.class)) {
317 // The current policy model may or may not be defined
318 final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
319 if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
320 handleIncompatibility(apexModel, forceFlag, currentModel);
324 executeModelUpdate(incomingEngineServiceKey, apexModel, forceFlag);
330 * Execute the model update on the engine instances.
332 * @param incomingEngineServiceKey the engine service key to update
333 * @param apexModel the model to update the engines with
334 * @param forceFlag if true, ignore compatibility problems
335 * @throws ApexException on model update errors
337 private void executeModelUpdate(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
338 final boolean forceFlag) throws ApexException {
341 stopEngines(incomingEngineServiceKey);
344 // Update the engines
345 boolean isSubsequentInstance = false;
346 for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
347 LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
348 EngineWorker engineWorker = engineWorkerEntry.getValue();
349 if (isSubsequentInstance) {
350 // set subsequentInstance flag as true if the current engine worker instance is not the first one
351 // first engine instance will have this flag as false
352 engineWorker.setSubsequentInstance(true);
354 engineWorker.updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
355 isSubsequentInstance = true;
358 // start all engines on this engine service if it was not stopped before the update
360 final long starttime = System.currentTimeMillis();
361 while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
362 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
364 // Check if all engines are running
365 final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
366 for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
367 if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
368 && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
369 notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
370 notRunningEngineIdBuilder.append('(');
371 notRunningEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
372 notRunningEngineIdBuilder.append(") ");
375 if (notRunningEngineIdBuilder.length() > 0) {
376 final String errorString = "engine start error on model update on engine service with key "
377 + incomingEngineServiceKey.getId() + ", engines not running are: "
378 + notRunningEngineIdBuilder.toString().trim();
379 LOGGER.warn(errorString);
380 throw new ApexException(errorString);
385 * Stop engines for a model update.
386 * @param incomingEngineServiceKey the engine service key for the engines that are to be stopped
387 * @throws ApexException on errors stopping engines
389 private void stopEngines(final AxArtifactKey incomingEngineServiceKey) throws ApexException {
390 // Stop all engines on this engine service
392 final long stoptime = System.currentTimeMillis();
393 while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
394 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
396 // Check if all engines are stopped
397 final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
398 for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
399 if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
400 notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
401 notStoppedEngineIdBuilder.append('(');
402 notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
403 notStoppedEngineIdBuilder.append(") ");
406 if (notStoppedEngineIdBuilder.length() > 0) {
407 final String errorString = "cannot update model on engine service with key "
408 + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
409 + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
410 LOGGER.warn(errorString);
411 throw new ApexException(errorString);
416 * Issue compatibility warning or error message.
417 * @param apexModel The model name
418 * @param forceFlag true if we are forcing the update
419 * @param currentModel the existing model that is loaded
420 * @throws ContextException on compatibility errors
422 private void handleIncompatibility(final AxPolicyModel apexModel, final boolean forceFlag,
423 final AxPolicyModel currentModel) throws ContextException {
425 LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
426 + "\" is not a compatible model update from the existing engine model with key \""
427 + currentModel.getKey().getId() + "\"");
429 throw new ContextException("apex model update failed, supplied model with key \""
430 + apexModel.getKey().getId()
431 + "\" is not a compatible model update from the existing engine model with key \""
432 + currentModel.getKey().getId() + "\"");
440 public AxEngineState getState() {
441 // If one worker is running then we are running, otherwise we are stopped
442 for (final EngineService engine : engineWorkerMap.values()) {
443 if (engine.getState() != AxEngineState.STOPPED) {
444 return AxEngineState.EXECUTING;
448 return AxEngineState.STOPPED;
455 public void startAll() throws ApexException {
456 for (final EngineService engine : engineWorkerMap.values()) {
457 start(engine.getKey());
465 public void start(final AxArtifactKey engineKey) throws ApexException {
466 LOGGER.entry(engineKey);
468 if (engineKey == null) {
469 String message = ENGINE_KEY_NOT_SPECIFIED;
470 LOGGER.warn(message);
471 throw new ApexException(message);
474 // Check if we have this key on our map
475 if (!engineWorkerMap.containsKey(engineKey)) {
476 String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX;
477 LOGGER.warn(message);
478 throw new ApexException(message);
482 engineWorkerMap.get(engineKey).start(engineKey);
484 // Check if periodic events should be turned on
485 if (periodicEventPeriod > 0) {
486 startPeriodicEvents(periodicEventPeriod);
489 LOGGER.exit(engineKey);
496 public void stop() throws ApexException {
499 if (periodicEventGenerator != null) {
500 periodicEventGenerator.cancel();
501 periodicEventGenerator = null;
505 for (final EngineService engine : engineWorkerMap.values()) {
506 if (engine.getState() != AxEngineState.STOPPED) {
518 public void stop(final AxArtifactKey engineKey) throws ApexException {
519 LOGGER.entry(engineKey);
521 if (engineKey == null) {
522 String message = ENGINE_KEY_NOT_SPECIFIED;
523 LOGGER.warn(message);
524 throw new ApexException(message);
527 // Check if we have this key on our map
528 if (!engineWorkerMap.containsKey(engineKey)) {
529 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
530 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
534 engineWorkerMap.get(engineKey).stop(engineKey);
536 LOGGER.exit(engineKey);
543 public void clear() throws ApexException {
547 for (final EngineService engine : engineWorkerMap.values()) {
548 if (engine.getState() == AxEngineState.STOPPED) {
560 public void clear(final AxArtifactKey engineKey) throws ApexException {
561 LOGGER.entry(engineKey);
563 if (engineKey == null) {
564 String message = ENGINE_KEY_NOT_SPECIFIED;
565 LOGGER.warn(message);
566 throw new ApexException(message);
569 // Check if we have this key on our map
570 if (!engineWorkerMap.containsKey(engineKey)) {
571 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
572 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
576 if (engineWorkerMap.get(engineKey).getState() == AxEngineState.STOPPED) {
577 engineWorkerMap.get(engineKey).stop(engineKey);
580 LOGGER.exit(engineKey);
584 * Check all engines are started.
586 * @return true if <i>all</i> engines are started
587 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
590 public boolean isStarted() {
591 for (final EngineService engine : engineWorkerMap.values()) {
592 if (!engine.isStarted()) {
603 public boolean isStarted(final AxArtifactKey engineKey) {
604 if (engineKey == null) {
605 String message = ENGINE_KEY_NOT_SPECIFIED;
606 LOGGER.warn(message);
610 // Check if we have this key on our map
611 if (!engineWorkerMap.containsKey(engineKey)) {
612 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
615 return engineWorkerMap.get(engineKey).isStarted();
619 * Check all engines are stopped.
621 * @return true if <i>all</i> engines are stopped
622 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
625 public boolean isStopped() {
626 for (final EngineService engine : engineWorkerMap.values()) {
627 if (!engine.isStopped()) {
638 public boolean isStopped(final AxArtifactKey engineKey) {
639 if (engineKey == null) {
640 String message = ENGINE_KEY_NOT_SPECIFIED;
641 LOGGER.warn(message);
645 // Check if we have this key on our map
646 if (!engineWorkerMap.containsKey(engineKey)) {
647 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
650 return engineWorkerMap.get(engineKey).isStopped();
657 public void startPeriodicEvents(final long period) throws ApexException {
658 // Check if periodic events are already started
659 if (periodicEventGenerator != null) {
660 String message = "Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
661 + periodicEventGenerator.toString();
662 LOGGER.warn(message);
663 throw new ApexException(message);
666 // Set up periodic event execution, its a Java Timer/TimerTask
667 periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
669 // Record the periodic event period because it may have been set over the Web Socket admin
671 this.periodicEventPeriod = period;
678 public void stopPeriodicEvents() throws ApexException {
679 // Check if periodic events are already started
680 if (periodicEventGenerator == null) {
681 LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getId());
682 throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getId());
685 // Stop periodic events
686 periodicEventGenerator.cancel();
687 periodicEventGenerator = null;
688 periodicEventPeriod = 0;
695 public String getStatus(final AxArtifactKey engineKey) throws ApexException {
696 if (engineKey == null) {
697 String message = ENGINE_KEY_NOT_SPECIFIED;
698 LOGGER.warn(message);
699 throw new ApexException(message);
702 // Check if we have this key on our map
703 if (!engineWorkerMap.containsKey(engineKey)) {
704 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
705 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
708 // Return the information for this worker
709 return engineWorkerMap.get(engineKey).getStatus(engineKey);
716 public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
717 if (engineKey == null) {
718 String message = ENGINE_KEY_NOT_SPECIFIED;
719 LOGGER.warn(message);
720 throw new ApexException(message);
723 // Check if we have this key on our map
724 if (!engineWorkerMap.containsKey(engineKey)) {
725 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
726 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
729 // Return the information for this worker
730 return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
737 public void sendEvent(final ApexEvent event) {
739 LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
743 // Check if we have this key on our map
744 if (getState() == AxEngineState.STOPPED) {
745 LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
746 + engineServiceKey.getId() + " are running");
751 LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
754 // Add the incoming event to the queue, the next available worker will process it