2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.service.engine.runtime.impl;
25 import java.io.ByteArrayInputStream;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.LinkedHashMap;
30 import java.util.List;
32 import java.util.Map.Entry;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import org.onap.policy.apex.context.ContextException;
36 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
37 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
38 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
39 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
40 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
41 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
42 import org.onap.policy.apex.model.basicmodel.service.ModelService;
43 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
44 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
45 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
46 import org.onap.policy.apex.service.engine.event.ApexEvent;
47 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
48 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
49 import org.onap.policy.apex.service.engine.runtime.EngineService;
50 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
51 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
52 import org.onap.policy.common.parameters.ValidationResult;
53 import org.slf4j.ext.XLogger;
54 import org.slf4j.ext.XLoggerFactory;
57 * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each of which is running
58 * on an identical Apex model. This class handles the management of the engine worker instances, their threads, and
59 * event forwarding to and from the engine workers.
61 * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
62 * @author Liam Fallon (liam.fallon@ericsson.com)
63 * @author John Keeney (john.keeney@ericsson.com)
65 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
66 // Logging static variables
67 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
68 private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
70 // Recurring string constants
71 private static final String ENGINE_KEY_PREAMBLE = "engine with key ";
72 private static final String NOT_FOUND_SUFFIX = " not found in engine service";
73 private static final String ENGINE_KEY_NOT_SPECIFIED = "engine key must be specified and may not be null";
75 // Constants for timing
76 private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
77 private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
78 private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
80 // The ID of this engine
81 private AxArtifactKey engineServiceKey = null;
83 // The Apex engine workers this engine service is handling
84 private final Map<AxArtifactKey, EngineWorker> engineWorkerMap = Collections
85 .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineWorker>());
87 // Event queue for events being sent into the Apex engines, it used by all engines within a
89 private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
91 // Thread factory for thread management
92 private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
94 // Periodic event generator and its period in milliseconds
95 private ApexPeriodicEventGenerator periodicEventGenerator = null;
96 private long periodicEventPeriod;
99 * This constructor instantiates engine workers and adds them to the set of engine workers to be managed. The
100 * constructor is private to prevent subclassing.
102 * @param engineServiceKey the engine service key
103 * @param threadCount the thread count, the number of engine workers to start
104 * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
105 * @throws ApexException on worker instantiation errors
107 private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int threadCount,
108 final long periodicEventPeriod) {
109 LOGGER.entry(engineServiceKey, threadCount);
111 this.engineServiceKey = engineServiceKey;
112 this.periodicEventPeriod = periodicEventPeriod;
114 // Start engine workers
115 for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
116 final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
117 engineServiceKey.getVersion());
118 engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
119 LOGGER.info("Created apex engine {} .", engineWorkerKey.getId());
122 LOGGER.info("APEX service created.");
127 * Create an Apex Engine Service instance. This method does not load the policy so
128 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
129 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
130 * start the Engine Service so {@link #start(AxArtifactKey)} or {@link #startAll()} must be used.
132 * @param config the configuration for this Apex Engine Service.
133 * @return the Engine Service instance
134 * @throws ApexException on worker instantiation errors
136 public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
137 if (config == null) {
138 LOGGER.warn("Engine service configuration parameters is null");
139 throw new ApexException("engine service configuration parameters are null");
142 final ValidationResult validation = config.validate();
143 if (!validation.isValid()) {
144 LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
145 throw new ApexException("Invalid engine service configuration parameters: " + validation);
148 final AxArtifactKey engineServiceKey = config.getEngineKey();
149 final int threadCount = config.getInstanceCount();
151 return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
158 public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
159 LOGGER.entry(apexEventListener);
161 if (listenerName == null) {
162 String message = "listener name must be specified and may not be null";
163 LOGGER.warn(message);
167 if (apexEventListener == null) {
168 String message = "apex event listener must be specified and may not be null";
169 LOGGER.warn(message);
173 // Register the Apex event listener on all engine workers, each worker will return Apex
174 // events to the listening application
175 for (final EngineService engineWorker : engineWorkerMap.values()) {
176 engineWorker.registerActionListener(listenerName, apexEventListener);
179 LOGGER.info("Added the action listener to the engine");
187 public void deregisterActionListener(final String listenerName) {
188 LOGGER.entry(listenerName);
190 // Register the Apex event listener on all engine workers, each worker will return Apex
191 // events to the listening application
192 for (final EngineService engineWorker : engineWorkerMap.values()) {
193 engineWorker.deregisterActionListener(listenerName);
196 LOGGER.info("Removed the action listener from the engine");
204 public EngineServiceEventInterface getEngineServiceEventInterface() {
212 public AxArtifactKey getKey() {
213 return engineServiceKey;
220 public Collection<AxArtifactKey> getEngineKeys() {
221 return engineWorkerMap.keySet();
228 public AxArtifactKey getApexModelKey() {
229 if (engineWorkerMap.size() == 0) {
233 return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
237 * Method to create model.
239 * @param incomingEngineServiceKey incoming engine service key
240 * @param apexModelString apex model string
241 * @return apexPolicyModel the policy model
242 * @throws ApexException apex exception
244 public static AxPolicyModel createModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString)
245 throws ApexException {
246 // Check if the engine service key specified is sane
247 if (incomingEngineServiceKey == null) {
248 String message = ENGINE_KEY_NOT_SPECIFIED;
249 LOGGER.warn(message);
250 throw new ApexException(message);
253 // Check if the Apex model specified is sane
254 if (apexModelString == null || apexModelString.trim().length() == 0) {
255 String emptyModelMessage = "model for updating engine service with key "
256 + incomingEngineServiceKey.getId() + " is empty";
257 LOGGER.warn(emptyModelMessage);
258 throw new ApexException(emptyModelMessage);
261 // Read the Apex model into memory using the Apex Model Reader
262 AxPolicyModel apexPolicyModel = null;
264 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
265 apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
266 } catch (final ApexModelException e) {
267 String message = "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId();
268 LOGGER.error(message, e);
269 throw new ApexException(message, e);
271 return apexPolicyModel;
278 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
279 final boolean forceFlag) throws ApexException {
280 AxPolicyModel apexPolicyModel = createModel(incomingEngineServiceKey, apexModelString);
283 updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
292 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
293 final boolean forceFlag) throws ApexException {
294 LOGGER.entry(incomingEngineServiceKey);
296 // Check if the engine service key specified is sane
297 if (incomingEngineServiceKey == null) {
298 String message = ENGINE_KEY_NOT_SPECIFIED;
299 LOGGER.warn(message);
300 throw new ApexException(message);
303 // Check if the Apex model specified is sane
304 if (apexModel == null) {
305 LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
307 throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
311 // Check if the key on the update request is correct
312 if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
313 LOGGER.warn("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
314 + engineServiceKey.getId() + " of this engine service");
315 throw new ApexException("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
316 + engineServiceKey.getId() + " of this engine service");
319 // Check model compatibility
320 if (ModelService.existsModel(AxPolicyModel.class)) {
321 // The current policy model may or may not be defined
322 final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
323 if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
324 handleIncompatibility(apexModel, forceFlag, currentModel);
328 executeModelUpdate(incomingEngineServiceKey, apexModel, forceFlag);
334 * Execute the model update on the engine instances.
336 * @param incomingEngineServiceKey the engine service key to update
337 * @param apexModel the model to update the engines with
338 * @param forceFlag if true, ignore compatibility problems
339 * @throws ApexException on model update errors
341 private void executeModelUpdate(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
342 final boolean forceFlag) throws ApexException {
345 stopEngines(incomingEngineServiceKey);
348 // Update the engines
349 boolean isSubsequentInstance = false;
350 for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
351 LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
352 EngineWorker engineWorker = engineWorkerEntry.getValue();
353 if (isSubsequentInstance) {
354 // set subsequentInstance flag as true if the current engine worker instance is not the first one
355 // first engine instance will have this flag as false
356 engineWorker.setSubsequentInstance(true);
358 engineWorker.updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
359 isSubsequentInstance = true;
362 // start all engines on this engine service if it was not stopped before the update
364 final long starttime = System.currentTimeMillis();
365 while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
366 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
368 // Check if all engines are running
369 final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
370 for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
371 if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
372 && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
373 notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
374 notRunningEngineIdBuilder.append('(');
375 notRunningEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
376 notRunningEngineIdBuilder.append(") ");
379 if (notRunningEngineIdBuilder.length() > 0) {
380 final String errorString = "engine start error on model update on engine service with key "
381 + incomingEngineServiceKey.getId() + ", engines not running are: "
382 + notRunningEngineIdBuilder.toString().trim();
383 LOGGER.warn(errorString);
384 throw new ApexException(errorString);
389 * Stop engines for a model update.
390 * @param incomingEngineServiceKey the engine service key for the engines that are to be stopped
391 * @throws ApexException on errors stopping engines
393 private void stopEngines(final AxArtifactKey incomingEngineServiceKey) throws ApexException {
394 // Stop all engines on this engine service
396 final long stoptime = System.currentTimeMillis();
397 while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
398 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
400 // Check if all engines are stopped
401 final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
402 for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
403 if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
404 notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
405 notStoppedEngineIdBuilder.append('(');
406 notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
407 notStoppedEngineIdBuilder.append(") ");
410 if (notStoppedEngineIdBuilder.length() > 0) {
411 final String errorString = "cannot update model on engine service with key "
412 + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
413 + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
414 LOGGER.warn(errorString);
415 throw new ApexException(errorString);
420 * Issue compatibility warning or error message.
421 * @param apexModel The model name
422 * @param forceFlag true if we are forcing the update
423 * @param currentModel the existing model that is loaded
424 * @throws ContextException on compatibility errors
426 private void handleIncompatibility(final AxPolicyModel apexModel, final boolean forceFlag,
427 final AxPolicyModel currentModel) throws ContextException {
429 LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
430 + "\" is not a compatible model update from the existing engine model with key \""
431 + currentModel.getKey().getId() + "\"");
433 throw new ContextException("apex model update failed, supplied model with key \""
434 + apexModel.getKey().getId()
435 + "\" is not a compatible model update from the existing engine model with key \""
436 + currentModel.getKey().getId() + "\"");
444 public AxEngineState getState() {
445 // If one worker is running then we are running, otherwise we are stopped
446 for (final EngineService engine : engineWorkerMap.values()) {
447 if (engine.getState() != AxEngineState.STOPPED) {
448 return AxEngineState.EXECUTING;
452 return AxEngineState.STOPPED;
459 public void startAll() throws ApexException {
460 for (final EngineService engine : engineWorkerMap.values()) {
461 start(engine.getKey());
469 public void start(final AxArtifactKey engineKey) throws ApexException {
470 LOGGER.entry(engineKey);
472 if (engineKey == null) {
473 String message = ENGINE_KEY_NOT_SPECIFIED;
474 LOGGER.warn(message);
475 throw new ApexException(message);
478 // Check if we have this key on our map
479 if (!engineWorkerMap.containsKey(engineKey)) {
480 String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX;
481 LOGGER.warn(message);
482 throw new ApexException(message);
486 engineWorkerMap.get(engineKey).start(engineKey);
488 // Check if periodic events should be turned on
489 if (periodicEventPeriod > 0) {
490 startPeriodicEvents(periodicEventPeriod);
493 LOGGER.exit(engineKey);
500 public void stop() throws ApexException {
503 if (periodicEventGenerator != null) {
504 periodicEventGenerator.cancel();
505 periodicEventGenerator = null;
509 for (final EngineService engine : engineWorkerMap.values()) {
510 if (engine.getState() != AxEngineState.STOPPED) {
522 public void stop(final AxArtifactKey engineKey) throws ApexException {
523 LOGGER.entry(engineKey);
525 if (engineKey == null) {
526 String message = ENGINE_KEY_NOT_SPECIFIED;
527 LOGGER.warn(message);
528 throw new ApexException(message);
531 // Check if we have this key on our map
532 if (!engineWorkerMap.containsKey(engineKey)) {
533 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
534 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
538 engineWorkerMap.get(engineKey).stop(engineKey);
540 LOGGER.exit(engineKey);
547 public void clear() throws ApexException {
551 for (final EngineService engine : engineWorkerMap.values()) {
552 if (engine.getState() == AxEngineState.STOPPED) {
564 public void clear(final AxArtifactKey engineKey) throws ApexException {
565 LOGGER.entry(engineKey);
567 if (engineKey == null) {
568 String message = ENGINE_KEY_NOT_SPECIFIED;
569 LOGGER.warn(message);
570 throw new ApexException(message);
573 // Check if we have this key on our map
574 if (!engineWorkerMap.containsKey(engineKey)) {
575 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
576 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
580 if (engineWorkerMap.get(engineKey).getState() == AxEngineState.STOPPED) {
581 engineWorkerMap.get(engineKey).stop(engineKey);
584 LOGGER.exit(engineKey);
588 * Check all engines are started.
590 * @return true if <i>all</i> engines are started
591 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
594 public boolean isStarted() {
595 for (final EngineService engine : engineWorkerMap.values()) {
596 if (!engine.isStarted()) {
607 public boolean isStarted(final AxArtifactKey engineKey) {
608 if (engineKey == null) {
609 String message = ENGINE_KEY_NOT_SPECIFIED;
610 LOGGER.warn(message);
614 // Check if we have this key on our map
615 if (!engineWorkerMap.containsKey(engineKey)) {
616 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
619 return engineWorkerMap.get(engineKey).isStarted();
623 * Check all engines are stopped.
625 * @return true if <i>all</i> engines are stopped
626 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
629 public boolean isStopped() {
630 for (final EngineService engine : engineWorkerMap.values()) {
631 if (!engine.isStopped()) {
642 public boolean isStopped(final AxArtifactKey engineKey) {
643 if (engineKey == null) {
644 String message = ENGINE_KEY_NOT_SPECIFIED;
645 LOGGER.warn(message);
649 // Check if we have this key on our map
650 if (!engineWorkerMap.containsKey(engineKey)) {
651 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
654 return engineWorkerMap.get(engineKey).isStopped();
661 public void startPeriodicEvents(final long period) throws ApexException {
662 // Check if periodic events are already started
663 if (periodicEventGenerator != null) {
664 String message = "Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
665 + periodicEventGenerator.toString();
666 LOGGER.warn(message);
667 throw new ApexException(message);
670 // Set up periodic event execution, its a Java Timer/TimerTask
671 periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
673 // Record the periodic event period because it may have been set over the Web Socket admin
675 this.periodicEventPeriod = period;
682 public void stopPeriodicEvents() throws ApexException {
683 // Check if periodic events are already started
684 if (periodicEventGenerator == null) {
685 LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getId());
686 throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getId());
689 // Stop periodic events
690 periodicEventGenerator.cancel();
691 periodicEventGenerator = null;
692 periodicEventPeriod = 0;
699 public String getStatus(final AxArtifactKey engineKey) throws ApexException {
700 if (engineKey == null) {
701 String message = ENGINE_KEY_NOT_SPECIFIED;
702 LOGGER.warn(message);
703 throw new ApexException(message);
706 // Check if we have this key on our map
707 if (!engineWorkerMap.containsKey(engineKey)) {
708 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
709 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
711 // Return the information for this worker
712 return engineWorkerMap.get(engineKey).getStatus(engineKey);
720 public List<AxEngineModel> getEngineStats() {
721 List<AxEngineModel> engineStats = new ArrayList<>();
722 for (final EngineService engine : engineWorkerMap.values()) {
723 engineStats.addAll(engine.getEngineStats());
732 public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
733 if (engineKey == null) {
734 String message = ENGINE_KEY_NOT_SPECIFIED;
735 LOGGER.warn(message);
736 throw new ApexException(message);
739 // Check if we have this key on our map
740 if (!engineWorkerMap.containsKey(engineKey)) {
741 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
742 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
745 // Return the information for this worker
746 return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
753 public void sendEvent(final ApexEvent event) {
755 LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
759 // Check if we have this key on our map
760 if (getState() == AxEngineState.STOPPED) {
761 LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
762 + engineServiceKey.getId() + " are running");
767 LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
770 // Add the incoming event to the queue, the next available worker will process it