2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.service.engine.runtime.impl;
24 import java.io.ByteArrayInputStream;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.LinkedHashMap;
29 import java.util.List;
31 import java.util.Map.Entry;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.LinkedBlockingQueue;
34 import org.onap.policy.apex.context.ContextException;
35 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
36 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
37 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
38 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
39 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
40 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
41 import org.onap.policy.apex.model.basicmodel.service.ModelService;
42 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
43 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
44 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
45 import org.onap.policy.apex.service.engine.event.ApexEvent;
46 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
47 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
48 import org.onap.policy.apex.service.engine.runtime.EngineService;
49 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
50 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
51 import org.onap.policy.common.parameters.GroupValidationResult;
52 import org.slf4j.ext.XLogger;
53 import org.slf4j.ext.XLoggerFactory;
56 * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each of which is running
57 * on an identical Apex model. This class handles the management of the engine worker instances, their threads, and
58 * event forwarding to and from the engine workers.
60 * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
61 * @author Liam Fallon (liam.fallon@ericsson.com)
62 * @author John Keeney (john.keeney@ericsson.com)
64 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
65 // Logging static variables
66 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
67 private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
69 // Recurring string constants
70 private static final String ENGINE_KEY_PREAMBLE = "engine with key ";
71 private static final String NOT_FOUND_SUFFIX = " not found in engine service";
72 private static final String ENGINE_KEY_NOT_SPECIFIED = "engine key must be specified and may not be null";
74 // Constants for timing
75 private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
76 private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
77 private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
79 // The ID of this engine
80 private AxArtifactKey engineServiceKey = null;
82 // The Apex engine workers this engine service is handling
83 private final Map<AxArtifactKey, EngineWorker> engineWorkerMap = Collections
84 .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineWorker>());
86 // Event queue for events being sent into the Apex engines, it used by all engines within a
88 private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
90 // Thread factory for thread management
91 private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
93 // Periodic event generator and its period in milliseconds
94 private ApexPeriodicEventGenerator periodicEventGenerator = null;
95 private long periodicEventPeriod;
98 * This constructor instantiates engine workers and adds them to the set of engine workers to be managed. The
99 * constructor is private to prevent subclassing.
101 * @param engineServiceKey the engine service key
102 * @param threadCount the thread count, the number of engine workers to start
103 * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
104 * @throws ApexException on worker instantiation errors
106 private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int threadCount,
107 final long periodicEventPeriod) {
108 LOGGER.entry(engineServiceKey, threadCount);
110 this.engineServiceKey = engineServiceKey;
111 this.periodicEventPeriod = periodicEventPeriod;
113 // Start engine workers
114 for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
115 final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
116 engineServiceKey.getVersion());
117 engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
118 LOGGER.info("Created apex engine {} .", engineWorkerKey.getId());
121 LOGGER.info("APEX service created.");
126 * Create an Apex Engine Service instance. This method does not load the policy so
127 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
128 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
129 * start the Engine Service so {@link #start(AxArtifactKey)} or {@link #startAll()} must be used.
131 * @param config the configuration for this Apex Engine Service.
132 * @return the Engine Service instance
133 * @throws ApexException on worker instantiation errors
135 public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
136 if (config == null) {
137 LOGGER.warn("Engine service configuration parameters is null");
138 throw new ApexException("engine service configuration parameters are null");
141 final GroupValidationResult validation = config.validate();
142 if (!validation.isValid()) {
143 LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
144 throw new ApexException("Invalid engine service configuration parameters: " + validation);
147 final AxArtifactKey engineServiceKey = config.getEngineKey();
148 final int threadCount = config.getInstanceCount();
150 return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
157 public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
158 LOGGER.entry(apexEventListener);
160 if (listenerName == null) {
161 String message = "listener name must be specified and may not be null";
162 LOGGER.warn(message);
166 if (apexEventListener == null) {
167 String message = "apex event listener must be specified and may not be null";
168 LOGGER.warn(message);
172 // Register the Apex event listener on all engine workers, each worker will return Apex
173 // events to the listening application
174 for (final EngineService engineWorker : engineWorkerMap.values()) {
175 engineWorker.registerActionListener(listenerName, apexEventListener);
178 LOGGER.info("Added the action listener to the engine");
186 public void deregisterActionListener(final String listenerName) {
187 LOGGER.entry(listenerName);
189 // Register the Apex event listener on all engine workers, each worker will return Apex
190 // events to the listening application
191 for (final EngineService engineWorker : engineWorkerMap.values()) {
192 engineWorker.deregisterActionListener(listenerName);
195 LOGGER.info("Removed the action listener from the engine");
203 public EngineServiceEventInterface getEngineServiceEventInterface() {
211 public AxArtifactKey getKey() {
212 return engineServiceKey;
219 public Collection<AxArtifactKey> getEngineKeys() {
220 return engineWorkerMap.keySet();
227 public AxArtifactKey getApexModelKey() {
228 if (engineWorkerMap.size() == 0) {
232 return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
236 * Method to create model.
238 * @param incomingEngineServiceKey incoming engine service key
239 * @param apexModelString apex model string
240 * @return apexPolicyModel the policy model
241 * @throws ApexException apex exception
243 public static AxPolicyModel createModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString)
244 throws ApexException {
245 // Check if the engine service key specified is sane
246 if (incomingEngineServiceKey == null) {
247 String message = ENGINE_KEY_NOT_SPECIFIED;
248 LOGGER.warn(message);
249 throw new ApexException(message);
252 // Check if the Apex model specified is sane
253 if (apexModelString == null || apexModelString.trim().length() == 0) {
254 String emptyModelMessage = "model for updating engine service with key "
255 + incomingEngineServiceKey.getId() + " is empty";
256 LOGGER.warn(emptyModelMessage);
257 throw new ApexException(emptyModelMessage);
260 // Read the Apex model into memory using the Apex Model Reader
261 AxPolicyModel apexPolicyModel = null;
263 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
264 apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
265 } catch (final ApexModelException e) {
266 String message = "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId();
267 LOGGER.error(message, e);
268 throw new ApexException(message, e);
270 return apexPolicyModel;
277 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
278 final boolean forceFlag) throws ApexException {
279 AxPolicyModel apexPolicyModel = createModel(incomingEngineServiceKey, apexModelString);
282 updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
291 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
292 final boolean forceFlag) throws ApexException {
293 LOGGER.entry(incomingEngineServiceKey);
295 // Check if the engine service key specified is sane
296 if (incomingEngineServiceKey == null) {
297 String message = ENGINE_KEY_NOT_SPECIFIED;
298 LOGGER.warn(message);
299 throw new ApexException(message);
302 // Check if the Apex model specified is sane
303 if (apexModel == null) {
304 LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
306 throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
310 // Check if the key on the update request is correct
311 if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
312 LOGGER.warn("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
313 + engineServiceKey.getId() + " of this engine service");
314 throw new ApexException("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
315 + engineServiceKey.getId() + " of this engine service");
318 // Check model compatibility
319 if (ModelService.existsModel(AxPolicyModel.class)) {
320 // The current policy model may or may not be defined
321 final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
322 if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
323 handleIncompatibility(apexModel, forceFlag, currentModel);
327 executeModelUpdate(incomingEngineServiceKey, apexModel, forceFlag);
333 * Execute the model update on the engine instances.
335 * @param incomingEngineServiceKey the engine service key to update
336 * @param apexModel the model to update the engines with
337 * @param forceFlag if true, ignore compatibility problems
338 * @throws ApexException on model update errors
340 private void executeModelUpdate(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
341 final boolean forceFlag) throws ApexException {
344 stopEngines(incomingEngineServiceKey);
347 // Update the engines
348 boolean isSubsequentInstance = false;
349 for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
350 LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
351 EngineWorker engineWorker = engineWorkerEntry.getValue();
352 if (isSubsequentInstance) {
353 // set subsequentInstance flag as true if the current engine worker instance is not the first one
354 // first engine instance will have this flag as false
355 engineWorker.setSubsequentInstance(true);
357 engineWorker.updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
358 isSubsequentInstance = true;
361 // start all engines on this engine service if it was not stopped before the update
363 final long starttime = System.currentTimeMillis();
364 while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
365 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
367 // Check if all engines are running
368 final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
369 for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
370 if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
371 && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
372 notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
373 notRunningEngineIdBuilder.append('(');
374 notRunningEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
375 notRunningEngineIdBuilder.append(") ");
378 if (notRunningEngineIdBuilder.length() > 0) {
379 final String errorString = "engine start error on model update on engine service with key "
380 + incomingEngineServiceKey.getId() + ", engines not running are: "
381 + notRunningEngineIdBuilder.toString().trim();
382 LOGGER.warn(errorString);
383 throw new ApexException(errorString);
388 * Stop engines for a model update.
389 * @param incomingEngineServiceKey the engine service key for the engines that are to be stopped
390 * @throws ApexException on errors stopping engines
392 private void stopEngines(final AxArtifactKey incomingEngineServiceKey) throws ApexException {
393 // Stop all engines on this engine service
395 final long stoptime = System.currentTimeMillis();
396 while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
397 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
399 // Check if all engines are stopped
400 final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
401 for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
402 if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
403 notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
404 notStoppedEngineIdBuilder.append('(');
405 notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
406 notStoppedEngineIdBuilder.append(") ");
409 if (notStoppedEngineIdBuilder.length() > 0) {
410 final String errorString = "cannot update model on engine service with key "
411 + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
412 + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
413 LOGGER.warn(errorString);
414 throw new ApexException(errorString);
419 * Issue compatibility warning or error message.
420 * @param apexModel The model name
421 * @param forceFlag true if we are forcing the update
422 * @param currentModel the existing model that is loaded
423 * @throws ContextException on compatibility errors
425 private void handleIncompatibility(final AxPolicyModel apexModel, final boolean forceFlag,
426 final AxPolicyModel currentModel) throws ContextException {
428 LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
429 + "\" is not a compatible model update from the existing engine model with key \""
430 + currentModel.getKey().getId() + "\"");
432 throw new ContextException("apex model update failed, supplied model with key \""
433 + apexModel.getKey().getId()
434 + "\" is not a compatible model update from the existing engine model with key \""
435 + currentModel.getKey().getId() + "\"");
443 public AxEngineState getState() {
444 // If one worker is running then we are running, otherwise we are stopped
445 for (final EngineService engine : engineWorkerMap.values()) {
446 if (engine.getState() != AxEngineState.STOPPED) {
447 return AxEngineState.EXECUTING;
451 return AxEngineState.STOPPED;
458 public void startAll() throws ApexException {
459 for (final EngineService engine : engineWorkerMap.values()) {
460 start(engine.getKey());
468 public void start(final AxArtifactKey engineKey) throws ApexException {
469 LOGGER.entry(engineKey);
471 if (engineKey == null) {
472 String message = ENGINE_KEY_NOT_SPECIFIED;
473 LOGGER.warn(message);
474 throw new ApexException(message);
477 // Check if we have this key on our map
478 if (!engineWorkerMap.containsKey(engineKey)) {
479 String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX;
480 LOGGER.warn(message);
481 throw new ApexException(message);
485 engineWorkerMap.get(engineKey).start(engineKey);
487 // Check if periodic events should be turned on
488 if (periodicEventPeriod > 0) {
489 startPeriodicEvents(periodicEventPeriod);
492 LOGGER.exit(engineKey);
499 public void stop() throws ApexException {
502 if (periodicEventGenerator != null) {
503 periodicEventGenerator.cancel();
504 periodicEventGenerator = null;
508 for (final EngineService engine : engineWorkerMap.values()) {
509 if (engine.getState() != AxEngineState.STOPPED) {
521 public void stop(final AxArtifactKey engineKey) throws ApexException {
522 LOGGER.entry(engineKey);
524 if (engineKey == null) {
525 String message = ENGINE_KEY_NOT_SPECIFIED;
526 LOGGER.warn(message);
527 throw new ApexException(message);
530 // Check if we have this key on our map
531 if (!engineWorkerMap.containsKey(engineKey)) {
532 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
533 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
537 engineWorkerMap.get(engineKey).stop(engineKey);
539 LOGGER.exit(engineKey);
546 public void clear() throws ApexException {
550 for (final EngineService engine : engineWorkerMap.values()) {
551 if (engine.getState() == AxEngineState.STOPPED) {
563 public void clear(final AxArtifactKey engineKey) throws ApexException {
564 LOGGER.entry(engineKey);
566 if (engineKey == null) {
567 String message = ENGINE_KEY_NOT_SPECIFIED;
568 LOGGER.warn(message);
569 throw new ApexException(message);
572 // Check if we have this key on our map
573 if (!engineWorkerMap.containsKey(engineKey)) {
574 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
575 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
579 if (engineWorkerMap.get(engineKey).getState() == AxEngineState.STOPPED) {
580 engineWorkerMap.get(engineKey).stop(engineKey);
583 LOGGER.exit(engineKey);
587 * Check all engines are started.
589 * @return true if <i>all</i> engines are started
590 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
593 public boolean isStarted() {
594 for (final EngineService engine : engineWorkerMap.values()) {
595 if (!engine.isStarted()) {
606 public boolean isStarted(final AxArtifactKey engineKey) {
607 if (engineKey == null) {
608 String message = ENGINE_KEY_NOT_SPECIFIED;
609 LOGGER.warn(message);
613 // Check if we have this key on our map
614 if (!engineWorkerMap.containsKey(engineKey)) {
615 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
618 return engineWorkerMap.get(engineKey).isStarted();
622 * Check all engines are stopped.
624 * @return true if <i>all</i> engines are stopped
625 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
628 public boolean isStopped() {
629 for (final EngineService engine : engineWorkerMap.values()) {
630 if (!engine.isStopped()) {
641 public boolean isStopped(final AxArtifactKey engineKey) {
642 if (engineKey == null) {
643 String message = ENGINE_KEY_NOT_SPECIFIED;
644 LOGGER.warn(message);
648 // Check if we have this key on our map
649 if (!engineWorkerMap.containsKey(engineKey)) {
650 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
653 return engineWorkerMap.get(engineKey).isStopped();
660 public void startPeriodicEvents(final long period) throws ApexException {
661 // Check if periodic events are already started
662 if (periodicEventGenerator != null) {
663 String message = "Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
664 + periodicEventGenerator.toString();
665 LOGGER.warn(message);
666 throw new ApexException(message);
669 // Set up periodic event execution, its a Java Timer/TimerTask
670 periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
672 // Record the periodic event period because it may have been set over the Web Socket admin
674 this.periodicEventPeriod = period;
681 public void stopPeriodicEvents() throws ApexException {
682 // Check if periodic events are already started
683 if (periodicEventGenerator == null) {
684 LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getId());
685 throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getId());
688 // Stop periodic events
689 periodicEventGenerator.cancel();
690 periodicEventGenerator = null;
691 periodicEventPeriod = 0;
698 public String getStatus(final AxArtifactKey engineKey) throws ApexException {
699 if (engineKey == null) {
700 String message = ENGINE_KEY_NOT_SPECIFIED;
701 LOGGER.warn(message);
702 throw new ApexException(message);
705 // Check if we have this key on our map
706 if (!engineWorkerMap.containsKey(engineKey)) {
707 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
708 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
710 // Return the information for this worker
711 return engineWorkerMap.get(engineKey).getStatus(engineKey);
719 public List<AxEngineModel> getEngineStats() {
720 List<AxEngineModel> engineStats = new ArrayList<>();
721 for (final EngineService engine : engineWorkerMap.values()) {
722 engineStats.addAll(engine.getEngineStats());
731 public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
732 if (engineKey == null) {
733 String message = ENGINE_KEY_NOT_SPECIFIED;
734 LOGGER.warn(message);
735 throw new ApexException(message);
738 // Check if we have this key on our map
739 if (!engineWorkerMap.containsKey(engineKey)) {
740 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
741 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
744 // Return the information for this worker
745 return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
752 public void sendEvent(final ApexEvent event) {
754 LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
758 // Check if we have this key on our map
759 if (getState() == AxEngineState.STOPPED) {
760 LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
761 + engineServiceKey.getId() + " are running");
766 LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
769 // Add the incoming event to the queue, the next available worker will process it