f5e36e86474ae11a5d3551a358c1cdff0b21dcd1
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.runtime.impl;
23
24 import java.io.ByteArrayInputStream;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.LinkedHashMap;
28 import java.util.Map;
29 import java.util.Map.Entry;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import org.onap.policy.apex.context.ContextException;
33 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
36 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
37 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
38 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
39 import org.onap.policy.apex.model.basicmodel.service.ModelService;
40 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
41 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
42 import org.onap.policy.apex.service.engine.event.ApexEvent;
43 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
44 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
45 import org.onap.policy.apex.service.engine.runtime.EngineService;
46 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
47 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
48 import org.onap.policy.common.parameters.GroupValidationResult;
49 import org.slf4j.ext.XLogger;
50 import org.slf4j.ext.XLoggerFactory;
51
52 /**
53  * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each of which is running
54  * on an identical Apex model. This class handles the management of the engine worker instances, their threads, and
55  * event forwarding to and from the engine workers.
56  *
57  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
58  * @author Liam Fallon (liam.fallon@ericsson.com)
59  * @author John Keeney (john.keeney@ericsson.com)
60  */
61 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
62     // Logging static variables
63     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
64     private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
65
66     // Recurring string constants
67     private static final String ENGINE_KEY_PREAMBLE = "engine with key ";
68     private static final String NOT_FOUND_SUFFIX = " not found in engine service";
69     private static final String ENGINE_KEY_NOT_SPECIFIED = "engine key must be specified and may not be null";
70
71     // Constants for timing
72     private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
73     private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
74     private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
75
76     // The ID of this engine
77     private AxArtifactKey engineServiceKey = null;
78
79     // The Apex engine workers this engine service is handling
80     private final Map<AxArtifactKey, EngineWorker> engineWorkerMap = Collections
81                     .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineWorker>());
82
83     // Event queue for events being sent into the Apex engines, it used by all engines within a
84     // group.
85     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
86
87     // Thread factory for thread management
88     private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
89
90     // Periodic event generator and its period in milliseconds
91     private ApexPeriodicEventGenerator periodicEventGenerator = null;
92     private long periodicEventPeriod;
93
94     /**
95      * This constructor instantiates engine workers and adds them to the set of engine workers to be managed. The
96      * constructor is private to prevent subclassing.
97      *
98      * @param engineServiceKey the engine service key
99      * @param threadCount the thread count, the number of engine workers to start
100      * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
101      * @throws ApexException on worker instantiation errors
102      */
103     private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int threadCount,
104                     final long periodicEventPeriod) {
105         LOGGER.entry(engineServiceKey, threadCount);
106
107         this.engineServiceKey = engineServiceKey;
108         this.periodicEventPeriod = periodicEventPeriod;
109
110         // Start engine workers
111         for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
112             final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
113                             engineServiceKey.getVersion());
114             engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
115             LOGGER.info("Created apex engine {} .", engineWorkerKey.getId());
116         }
117
118         LOGGER.info("APEX service created.");
119         LOGGER.exit();
120     }
121
122     /**
123      * Create an Apex Engine Service instance. This method does not load the policy so
124      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
125      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
126      * start the Engine Service so {@link #start(AxArtifactKey)} or {@link #startAll()} must be used.
127      *
128      * @param config the configuration for this Apex Engine Service.
129      * @return the Engine Service instance
130      * @throws ApexException on worker instantiation errors
131      */
132     public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
133         if (config == null) {
134             LOGGER.warn("Engine service configuration parameters is null");
135             throw new ApexException("engine service configuration parameters are null");
136         }
137
138         final GroupValidationResult validation = config.validate();
139         if (!validation.isValid()) {
140             LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
141             throw new ApexException("Invalid engine service configuration parameters: " + validation);
142         }
143
144         final AxArtifactKey engineServiceKey = config.getEngineKey();
145         final int threadCount = config.getInstanceCount();
146
147         return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
148     }
149
150     /**
151      * {@inheritDoc}.
152      */
153     @Override
154     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
155         LOGGER.entry(apexEventListener);
156
157         if (listenerName == null) {
158             String message = "listener name must be specified and may not be null";
159             LOGGER.warn(message);
160             return;
161         }
162
163         if (apexEventListener == null) {
164             String message = "apex event listener must be specified and may not be null";
165             LOGGER.warn(message);
166             return;
167         }
168
169         // Register the Apex event listener on all engine workers, each worker will return Apex
170         // events to the listening application
171         for (final EngineService engineWorker : engineWorkerMap.values()) {
172             engineWorker.registerActionListener(listenerName, apexEventListener);
173         }
174
175         LOGGER.info("Added the action listener to the engine");
176         LOGGER.exit();
177     }
178
179     /**
180      * {@inheritDoc}.
181      */
182     @Override
183     public void deregisterActionListener(final String listenerName) {
184         LOGGER.entry(listenerName);
185
186         // Register the Apex event listener on all engine workers, each worker will return Apex
187         // events to the listening application
188         for (final EngineService engineWorker : engineWorkerMap.values()) {
189             engineWorker.deregisterActionListener(listenerName);
190         }
191
192         LOGGER.info("Removed the action listener from the engine");
193         LOGGER.exit();
194     }
195
196     /**
197      * {@inheritDoc}.
198      */
199     @Override
200     public EngineServiceEventInterface getEngineServiceEventInterface() {
201         return this;
202     }
203
204     /**
205      * {@inheritDoc}.
206      */
207     @Override
208     public AxArtifactKey getKey() {
209         return engineServiceKey;
210     }
211
212     /**
213      * {@inheritDoc}.
214      */
215     @Override
216     public Collection<AxArtifactKey> getEngineKeys() {
217         return engineWorkerMap.keySet();
218     }
219
220     /**
221      * {@inheritDoc}.
222      */
223     @Override
224     public AxArtifactKey getApexModelKey() {
225         if (engineWorkerMap.size() == 0) {
226             return null;
227         }
228
229         return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
230     }
231
232     /**
233     * Method to create model.
234     *
235     * @param incomingEngineServiceKey incoming engine service key
236     * @param apexModelString apex model string
237     * @return apexPolicyModel the policy model
238     * @throws ApexException apex exception
239     */
240     public static AxPolicyModel createModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString)
241         throws ApexException {
242         // Check if the engine service key specified is sane
243         if (incomingEngineServiceKey == null) {
244             String message = ENGINE_KEY_NOT_SPECIFIED;
245             LOGGER.warn(message);
246             throw new ApexException(message);
247         }
248
249         // Check if the Apex model specified is sane
250         if (apexModelString == null || apexModelString.trim().length() == 0) {
251             String emptyModelMessage = "model for updating engine service with key "
252                             + incomingEngineServiceKey.getId() + " is empty";
253             LOGGER.warn(emptyModelMessage);
254             throw new ApexException(emptyModelMessage);
255         }
256
257         // Read the Apex model into memory using the Apex Model Reader
258         AxPolicyModel apexPolicyModel = null;
259         try {
260             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
261             apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
262         } catch (final ApexModelException e) {
263             String message = "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId();
264             LOGGER.error(message, e);
265             throw new ApexException(message, e);
266         }
267         return apexPolicyModel;
268     }
269
270     /**
271      * {@inheritDoc}.
272      */
273     @Override
274     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
275                     final boolean forceFlag) throws ApexException {
276         AxPolicyModel apexPolicyModel = createModel(incomingEngineServiceKey, apexModelString);
277
278         // Update the model
279         updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
280
281         LOGGER.exit();
282     }
283
284     /**
285      * {@inheritDoc}.
286      */
287     @Override
288     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
289                     final boolean forceFlag) throws ApexException {
290         LOGGER.entry(incomingEngineServiceKey);
291
292         // Check if the engine service key specified is sane
293         if (incomingEngineServiceKey == null) {
294             String message = ENGINE_KEY_NOT_SPECIFIED;
295             LOGGER.warn(message);
296             throw new ApexException(message);
297         }
298
299         // Check if the Apex model specified is sane
300         if (apexModel == null) {
301             LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
302                             + " is null");
303             throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
304                             + " is null");
305         }
306
307         // Check if the key on the update request is correct
308         if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
309             LOGGER.warn("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
310                             + engineServiceKey.getId() + " of this engine service");
311             throw new ApexException("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
312                             + engineServiceKey.getId() + " of this engine service");
313         }
314
315         // Check model compatibility
316         if (ModelService.existsModel(AxPolicyModel.class)) {
317             // The current policy model may or may not be defined
318             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
319             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
320                 handleIncompatibility(apexModel, forceFlag, currentModel);
321             }
322         }
323
324         executeModelUpdate(incomingEngineServiceKey, apexModel, forceFlag);
325
326         LOGGER.exit();
327     }
328
329     /**
330      * Execute the model update on the engine instances.
331      *
332      * @param incomingEngineServiceKey the engine service key to update
333      * @param apexModel the model to update the engines with
334      * @param forceFlag if true, ignore compatibility problems
335      * @throws ApexException on model update errors
336      */
337     private void executeModelUpdate(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
338                     final boolean forceFlag) throws ApexException {
339
340         if (!isStopped()) {
341             stopEngines(incomingEngineServiceKey);
342         }
343
344         // Update the engines
345         boolean isSubsequentInstance = false;
346         for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
347             LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
348             EngineWorker engineWorker = engineWorkerEntry.getValue();
349             if (isSubsequentInstance) {
350                 // set subsequentInstance flag as true if the current engine worker instance is not the first one
351                 // first engine instance will have this flag as false
352                 engineWorker.setSubsequentInstance(true);
353             }
354             engineWorker.updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
355             isSubsequentInstance = true;
356         }
357
358         // start all engines on this engine service if it was not stopped before the update
359         startAll();
360         final long starttime = System.currentTimeMillis();
361         while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
362             ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
363         }
364         // Check if all engines are running
365         final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
366         for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
367             if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
368                             && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
369                 notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
370                 notRunningEngineIdBuilder.append('(');
371                 notRunningEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
372                 notRunningEngineIdBuilder.append(") ");
373             }
374         }
375         if (notRunningEngineIdBuilder.length() > 0) {
376             final String errorString = "engine start error on model update on engine service with key "
377                             + incomingEngineServiceKey.getId() + ", engines not running are: "
378                             + notRunningEngineIdBuilder.toString().trim();
379             LOGGER.warn(errorString);
380             throw new ApexException(errorString);
381         }
382     }
383
384     /**
385      * Stop engines for a model update.
386      * @param incomingEngineServiceKey the engine service key for the engines that are to be stopped
387      * @throws ApexException on errors stopping engines
388      */
389     private void stopEngines(final AxArtifactKey incomingEngineServiceKey) throws ApexException {
390         // Stop all engines on this engine service
391         stop();
392         final long stoptime = System.currentTimeMillis();
393         while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
394             ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
395         }
396         // Check if all engines are stopped
397         final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
398         for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
399             if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
400                 notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
401                 notStoppedEngineIdBuilder.append('(');
402                 notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
403                 notStoppedEngineIdBuilder.append(") ");
404             }
405         }
406         if (notStoppedEngineIdBuilder.length() > 0) {
407             final String errorString = "cannot update model on engine service with key "
408                             + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
409                             + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
410             LOGGER.warn(errorString);
411             throw new ApexException(errorString);
412         }
413     }
414
415     /**
416      * Issue compatibility warning or error message.
417      * @param apexModel The model name
418      * @param forceFlag true if we are forcing the update
419      * @param currentModel the existing model that is loaded
420      * @throws ContextException on compatibility errors
421      */
422     private void handleIncompatibility(final AxPolicyModel apexModel, final boolean forceFlag,
423                     final AxPolicyModel currentModel) throws ContextException {
424         if (forceFlag) {
425             LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
426                             + "\" is not a compatible model update from the existing engine model with key \""
427                             + currentModel.getKey().getId() + "\"");
428         } else {
429             throw new ContextException("apex model update failed, supplied model with key \""
430                             + apexModel.getKey().getId()
431                             + "\" is not a compatible model update from the existing engine model with key \""
432                             + currentModel.getKey().getId() + "\"");
433         }
434     }
435
436     /**
437      * {@inheritDoc}.
438      */
439     @Override
440     public AxEngineState getState() {
441         // If one worker is running then we are running, otherwise we are stopped
442         for (final EngineService engine : engineWorkerMap.values()) {
443             if (engine.getState() != AxEngineState.STOPPED) {
444                 return AxEngineState.EXECUTING;
445             }
446         }
447
448         return AxEngineState.STOPPED;
449     }
450
451     /**
452      * {@inheritDoc}.
453      */
454     @Override
455     public void startAll() throws ApexException {
456         for (final EngineService engine : engineWorkerMap.values()) {
457             start(engine.getKey());
458         }
459     }
460
461     /**
462      * {@inheritDoc}.
463      */
464     @Override
465     public void start(final AxArtifactKey engineKey) throws ApexException {
466         LOGGER.entry(engineKey);
467
468         if (engineKey == null) {
469             String message = ENGINE_KEY_NOT_SPECIFIED;
470             LOGGER.warn(message);
471             throw new ApexException(message);
472         }
473
474         // Check if we have this key on our map
475         if (!engineWorkerMap.containsKey(engineKey)) {
476             String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX;
477             LOGGER.warn(message);
478             throw new ApexException(message);
479         }
480
481         // Start the engine
482         engineWorkerMap.get(engineKey).start(engineKey);
483
484         // Check if periodic events should be turned on
485         if (periodicEventPeriod > 0) {
486             startPeriodicEvents(periodicEventPeriod);
487         }
488
489         LOGGER.exit(engineKey);
490     }
491
492     /**
493      * {@inheritDoc}.
494      */
495     @Override
496     public void stop() throws ApexException {
497         LOGGER.entry();
498
499         if (periodicEventGenerator != null) {
500             periodicEventGenerator.cancel();
501             periodicEventGenerator = null;
502         }
503
504         // Stop each engine
505         for (final EngineService engine : engineWorkerMap.values()) {
506             if (engine.getState() != AxEngineState.STOPPED) {
507                 engine.stop();
508             }
509         }
510
511         LOGGER.exit();
512     }
513
514     /**
515      * {@inheritDoc}.
516      */
517     @Override
518     public void stop(final AxArtifactKey engineKey) throws ApexException {
519         LOGGER.entry(engineKey);
520
521         if (engineKey == null) {
522             String message = ENGINE_KEY_NOT_SPECIFIED;
523             LOGGER.warn(message);
524             throw new ApexException(message);
525         }
526
527         // Check if we have this key on our map
528         if (!engineWorkerMap.containsKey(engineKey)) {
529             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
530             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
531         }
532
533         // Stop the engine
534         engineWorkerMap.get(engineKey).stop(engineKey);
535
536         LOGGER.exit(engineKey);
537     }
538
539     /**
540      * {@inheritDoc}.
541      */
542     @Override
543     public void clear() throws ApexException {
544         LOGGER.entry();
545
546         // Stop each engine
547         for (final EngineService engine : engineWorkerMap.values()) {
548             if (engine.getState() == AxEngineState.STOPPED) {
549                 engine.clear();
550             }
551         }
552
553         LOGGER.exit();
554     }
555
556     /**
557      * {@inheritDoc}.
558      */
559     @Override
560     public void clear(final AxArtifactKey engineKey) throws ApexException {
561         LOGGER.entry(engineKey);
562
563         if (engineKey == null) {
564             String message = ENGINE_KEY_NOT_SPECIFIED;
565             LOGGER.warn(message);
566             throw new ApexException(message);
567         }
568
569         // Check if we have this key on our map
570         if (!engineWorkerMap.containsKey(engineKey)) {
571             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
572             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
573         }
574
575         // Clear the engine
576         if (engineWorkerMap.get(engineKey).getState() == AxEngineState.STOPPED) {
577             engineWorkerMap.get(engineKey).stop(engineKey);
578         }
579
580         LOGGER.exit(engineKey);
581     }
582
583     /**
584      * Check all engines are started.
585      *
586      * @return true if <i>all</i> engines are started
587      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
588      */
589     @Override
590     public boolean isStarted() {
591         for (final EngineService engine : engineWorkerMap.values()) {
592             if (!engine.isStarted()) {
593                 return false;
594             }
595         }
596         return true;
597     }
598
599     /**
600      * {@inheritDoc}.
601      */
602     @Override
603     public boolean isStarted(final AxArtifactKey engineKey) {
604         if (engineKey == null) {
605             String message = ENGINE_KEY_NOT_SPECIFIED;
606             LOGGER.warn(message);
607             return false;
608         }
609
610         // Check if we have this key on our map
611         if (!engineWorkerMap.containsKey(engineKey)) {
612             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
613             return false;
614         }
615         return engineWorkerMap.get(engineKey).isStarted();
616     }
617
618     /**
619      * Check all engines are stopped.
620      *
621      * @return true if <i>all</i> engines are stopped
622      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
623      */
624     @Override
625     public boolean isStopped() {
626         for (final EngineService engine : engineWorkerMap.values()) {
627             if (!engine.isStopped()) {
628                 return false;
629             }
630         }
631         return true;
632     }
633
634     /**
635      * {@inheritDoc}.
636      */
637     @Override
638     public boolean isStopped(final AxArtifactKey engineKey) {
639         if (engineKey == null) {
640             String message = ENGINE_KEY_NOT_SPECIFIED;
641             LOGGER.warn(message);
642             return true;
643         }
644
645         // Check if we have this key on our map
646         if (!engineWorkerMap.containsKey(engineKey)) {
647             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
648             return true;
649         }
650         return engineWorkerMap.get(engineKey).isStopped();
651     }
652
653     /**
654      * {@inheritDoc}.
655      */
656     @Override
657     public void startPeriodicEvents(final long period) throws ApexException {
658         // Check if periodic events are already started
659         if (periodicEventGenerator != null) {
660             String message = "Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
661                             + periodicEventGenerator.toString();
662             LOGGER.warn(message);
663             throw new ApexException(message);
664         }
665
666         // Set up periodic event execution, its a Java Timer/TimerTask
667         periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
668
669         // Record the periodic event period because it may have been set over the Web Socket admin
670         // interface
671         this.periodicEventPeriod = period;
672     }
673
674     /**
675      * {@inheritDoc}.
676      */
677     @Override
678     public void stopPeriodicEvents() throws ApexException {
679         // Check if periodic events are already started
680         if (periodicEventGenerator == null) {
681             LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getId());
682             throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getId());
683         }
684
685         // Stop periodic events
686         periodicEventGenerator.cancel();
687         periodicEventGenerator = null;
688         periodicEventPeriod = 0;
689     }
690
691     /**
692      * {@inheritDoc}.
693      */
694     @Override
695     public String getStatus(final AxArtifactKey engineKey) throws ApexException {
696         if (engineKey == null) {
697             String message = ENGINE_KEY_NOT_SPECIFIED;
698             LOGGER.warn(message);
699             throw new ApexException(message);
700         }
701
702         // Check if we have this key on our map
703         if (!engineWorkerMap.containsKey(engineKey)) {
704             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
705             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
706         }
707
708         // Return the information for this worker
709         return engineWorkerMap.get(engineKey).getStatus(engineKey);
710     }
711
712     /**
713      * {@inheritDoc}.
714      */
715     @Override
716     public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
717         if (engineKey == null) {
718             String message = ENGINE_KEY_NOT_SPECIFIED;
719             LOGGER.warn(message);
720             throw new ApexException(message);
721         }
722
723         // Check if we have this key on our map
724         if (!engineWorkerMap.containsKey(engineKey)) {
725             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
726             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
727         }
728
729         // Return the information for this worker
730         return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
731     }
732
733     /**
734      * {@inheritDoc}.
735      */
736     @Override
737     public void sendEvent(final ApexEvent event) {
738         if (event == null) {
739             LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
740             return;
741         }
742
743         // Check if we have this key on our map
744         if (getState() == AxEngineState.STOPPED) {
745             LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
746                             + engineServiceKey.getId() + " are running");
747             return;
748         }
749
750         if (DEBUG_ENABLED) {
751             LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
752         }
753
754         // Add the incoming event to the queue, the next available worker will process it
755         queue.add(event);
756     }
757 }