Remove GroupValidationResult
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / runtime / impl / EngineServiceImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.service.engine.runtime.impl;
24
25 import java.io.ByteArrayInputStream;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.LinkedHashMap;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Map.Entry;
33 import java.util.concurrent.BlockingQueue;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import org.onap.policy.apex.context.ContextException;
36 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
37 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
38 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
39 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
40 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
41 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
42 import org.onap.policy.apex.model.basicmodel.service.ModelService;
43 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
44 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
45 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
46 import org.onap.policy.apex.service.engine.event.ApexEvent;
47 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
48 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
49 import org.onap.policy.apex.service.engine.runtime.EngineService;
50 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
51 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
52 import org.onap.policy.common.parameters.ValidationResult;
53 import org.slf4j.ext.XLogger;
54 import org.slf4j.ext.XLoggerFactory;
55
56 /**
57  * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each of which is running
58  * on an identical Apex model. This class handles the management of the engine worker instances, their threads, and
59  * event forwarding to and from the engine workers.
60  *
61  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
62  * @author Liam Fallon (liam.fallon@ericsson.com)
63  * @author John Keeney (john.keeney@ericsson.com)
64  */
65 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
66     // Logging static variables
67     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
68     private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
69
70     // Recurring string constants
71     private static final String ENGINE_KEY_PREAMBLE = "engine with key ";
72     private static final String NOT_FOUND_SUFFIX = " not found in engine service";
73     private static final String ENGINE_KEY_NOT_SPECIFIED = "engine key must be specified and may not be null";
74
75     // Constants for timing
76     private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
77     private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
78     private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
79
80     // The ID of this engine
81     private AxArtifactKey engineServiceKey = null;
82
83     // The Apex engine workers this engine service is handling
84     private final Map<AxArtifactKey, EngineWorker> engineWorkerMap = Collections
85                     .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineWorker>());
86
87     // Event queue for events being sent into the Apex engines, it used by all engines within a
88     // group.
89     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
90
91     // Thread factory for thread management
92     private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
93
94     // Periodic event generator and its period in milliseconds
95     private ApexPeriodicEventGenerator periodicEventGenerator = null;
96     private long periodicEventPeriod;
97
98     /**
99      * This constructor instantiates engine workers and adds them to the set of engine workers to be managed. The
100      * constructor is private to prevent subclassing.
101      *
102      * @param engineServiceKey the engine service key
103      * @param threadCount the thread count, the number of engine workers to start
104      * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
105      * @throws ApexException on worker instantiation errors
106      */
107     private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int threadCount,
108                     final long periodicEventPeriod) {
109         LOGGER.entry(engineServiceKey, threadCount);
110
111         this.engineServiceKey = engineServiceKey;
112         this.periodicEventPeriod = periodicEventPeriod;
113
114         // Start engine workers
115         for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
116             final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
117                             engineServiceKey.getVersion());
118             engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
119             LOGGER.info("Created apex engine {} .", engineWorkerKey.getId());
120         }
121
122         LOGGER.info("APEX service created.");
123         LOGGER.exit();
124     }
125
126     /**
127      * Create an Apex Engine Service instance. This method does not load the policy so
128      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
129      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
130      * start the Engine Service so {@link #start(AxArtifactKey)} or {@link #startAll()} must be used.
131      *
132      * @param config the configuration for this Apex Engine Service.
133      * @return the Engine Service instance
134      * @throws ApexException on worker instantiation errors
135      */
136     public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
137         if (config == null) {
138             LOGGER.warn("Engine service configuration parameters is null");
139             throw new ApexException("engine service configuration parameters are null");
140         }
141
142         final ValidationResult validation = config.validate();
143         if (!validation.isValid()) {
144             LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
145             throw new ApexException("Invalid engine service configuration parameters: " + validation);
146         }
147
148         final AxArtifactKey engineServiceKey = config.getEngineKey();
149         final int threadCount = config.getInstanceCount();
150
151         return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
152     }
153
154     /**
155      * {@inheritDoc}.
156      */
157     @Override
158     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
159         LOGGER.entry(apexEventListener);
160
161         if (listenerName == null) {
162             String message = "listener name must be specified and may not be null";
163             LOGGER.warn(message);
164             return;
165         }
166
167         if (apexEventListener == null) {
168             String message = "apex event listener must be specified and may not be null";
169             LOGGER.warn(message);
170             return;
171         }
172
173         // Register the Apex event listener on all engine workers, each worker will return Apex
174         // events to the listening application
175         for (final EngineService engineWorker : engineWorkerMap.values()) {
176             engineWorker.registerActionListener(listenerName, apexEventListener);
177         }
178
179         LOGGER.info("Added the action listener to the engine");
180         LOGGER.exit();
181     }
182
183     /**
184      * {@inheritDoc}.
185      */
186     @Override
187     public void deregisterActionListener(final String listenerName) {
188         LOGGER.entry(listenerName);
189
190         // Register the Apex event listener on all engine workers, each worker will return Apex
191         // events to the listening application
192         for (final EngineService engineWorker : engineWorkerMap.values()) {
193             engineWorker.deregisterActionListener(listenerName);
194         }
195
196         LOGGER.info("Removed the action listener from the engine");
197         LOGGER.exit();
198     }
199
200     /**
201      * {@inheritDoc}.
202      */
203     @Override
204     public EngineServiceEventInterface getEngineServiceEventInterface() {
205         return this;
206     }
207
208     /**
209      * {@inheritDoc}.
210      */
211     @Override
212     public AxArtifactKey getKey() {
213         return engineServiceKey;
214     }
215
216     /**
217      * {@inheritDoc}.
218      */
219     @Override
220     public Collection<AxArtifactKey> getEngineKeys() {
221         return engineWorkerMap.keySet();
222     }
223
224     /**
225      * {@inheritDoc}.
226      */
227     @Override
228     public AxArtifactKey getApexModelKey() {
229         if (engineWorkerMap.size() == 0) {
230             return null;
231         }
232
233         return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
234     }
235
236     /**
237     * Method to create model.
238     *
239     * @param incomingEngineServiceKey incoming engine service key
240     * @param apexModelString apex model string
241     * @return apexPolicyModel the policy model
242     * @throws ApexException apex exception
243     */
244     public static AxPolicyModel createModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString)
245         throws ApexException {
246         // Check if the engine service key specified is sane
247         if (incomingEngineServiceKey == null) {
248             String message = ENGINE_KEY_NOT_SPECIFIED;
249             LOGGER.warn(message);
250             throw new ApexException(message);
251         }
252
253         // Check if the Apex model specified is sane
254         if (apexModelString == null || apexModelString.trim().length() == 0) {
255             String emptyModelMessage = "model for updating engine service with key "
256                             + incomingEngineServiceKey.getId() + " is empty";
257             LOGGER.warn(emptyModelMessage);
258             throw new ApexException(emptyModelMessage);
259         }
260
261         // Read the Apex model into memory using the Apex Model Reader
262         AxPolicyModel apexPolicyModel = null;
263         try {
264             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
265             apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
266         } catch (final ApexModelException e) {
267             String message = "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId();
268             LOGGER.error(message, e);
269             throw new ApexException(message, e);
270         }
271         return apexPolicyModel;
272     }
273
274     /**
275      * {@inheritDoc}.
276      */
277     @Override
278     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
279                     final boolean forceFlag) throws ApexException {
280         AxPolicyModel apexPolicyModel = createModel(incomingEngineServiceKey, apexModelString);
281
282         // Update the model
283         updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
284
285         LOGGER.exit();
286     }
287
288     /**
289      * {@inheritDoc}.
290      */
291     @Override
292     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
293                     final boolean forceFlag) throws ApexException {
294         LOGGER.entry(incomingEngineServiceKey);
295
296         // Check if the engine service key specified is sane
297         if (incomingEngineServiceKey == null) {
298             String message = ENGINE_KEY_NOT_SPECIFIED;
299             LOGGER.warn(message);
300             throw new ApexException(message);
301         }
302
303         // Check if the Apex model specified is sane
304         if (apexModel == null) {
305             LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
306                             + " is null");
307             throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
308                             + " is null");
309         }
310
311         // Check if the key on the update request is correct
312         if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
313             LOGGER.warn("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
314                             + engineServiceKey.getId() + " of this engine service");
315             throw new ApexException("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
316                             + engineServiceKey.getId() + " of this engine service");
317         }
318
319         // Check model compatibility
320         if (ModelService.existsModel(AxPolicyModel.class)) {
321             // The current policy model may or may not be defined
322             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
323             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
324                 handleIncompatibility(apexModel, forceFlag, currentModel);
325             }
326         }
327
328         executeModelUpdate(incomingEngineServiceKey, apexModel, forceFlag);
329
330         LOGGER.exit();
331     }
332
333     /**
334      * Execute the model update on the engine instances.
335      *
336      * @param incomingEngineServiceKey the engine service key to update
337      * @param apexModel the model to update the engines with
338      * @param forceFlag if true, ignore compatibility problems
339      * @throws ApexException on model update errors
340      */
341     private void executeModelUpdate(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
342                     final boolean forceFlag) throws ApexException {
343
344         if (!isStopped()) {
345             stopEngines(incomingEngineServiceKey);
346         }
347
348         // Update the engines
349         boolean isSubsequentInstance = false;
350         for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
351             LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
352             EngineWorker engineWorker = engineWorkerEntry.getValue();
353             if (isSubsequentInstance) {
354                 // set subsequentInstance flag as true if the current engine worker instance is not the first one
355                 // first engine instance will have this flag as false
356                 engineWorker.setSubsequentInstance(true);
357             }
358             engineWorker.updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
359             isSubsequentInstance = true;
360         }
361
362         // start all engines on this engine service if it was not stopped before the update
363         startAll();
364         final long starttime = System.currentTimeMillis();
365         while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
366             ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
367         }
368         // Check if all engines are running
369         final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
370         for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
371             if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
372                             && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
373                 notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
374                 notRunningEngineIdBuilder.append('(');
375                 notRunningEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
376                 notRunningEngineIdBuilder.append(") ");
377             }
378         }
379         if (notRunningEngineIdBuilder.length() > 0) {
380             final String errorString = "engine start error on model update on engine service with key "
381                             + incomingEngineServiceKey.getId() + ", engines not running are: "
382                             + notRunningEngineIdBuilder.toString().trim();
383             LOGGER.warn(errorString);
384             throw new ApexException(errorString);
385         }
386     }
387
388     /**
389      * Stop engines for a model update.
390      * @param incomingEngineServiceKey the engine service key for the engines that are to be stopped
391      * @throws ApexException on errors stopping engines
392      */
393     private void stopEngines(final AxArtifactKey incomingEngineServiceKey) throws ApexException {
394         // Stop all engines on this engine service
395         stop();
396         final long stoptime = System.currentTimeMillis();
397         while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
398             ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
399         }
400         // Check if all engines are stopped
401         final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
402         for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
403             if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
404                 notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
405                 notStoppedEngineIdBuilder.append('(');
406                 notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
407                 notStoppedEngineIdBuilder.append(") ");
408             }
409         }
410         if (notStoppedEngineIdBuilder.length() > 0) {
411             final String errorString = "cannot update model on engine service with key "
412                             + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
413                             + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
414             LOGGER.warn(errorString);
415             throw new ApexException(errorString);
416         }
417     }
418
419     /**
420      * Issue compatibility warning or error message.
421      * @param apexModel The model name
422      * @param forceFlag true if we are forcing the update
423      * @param currentModel the existing model that is loaded
424      * @throws ContextException on compatibility errors
425      */
426     private void handleIncompatibility(final AxPolicyModel apexModel, final boolean forceFlag,
427                     final AxPolicyModel currentModel) throws ContextException {
428         if (forceFlag) {
429             LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
430                             + "\" is not a compatible model update from the existing engine model with key \""
431                             + currentModel.getKey().getId() + "\"");
432         } else {
433             throw new ContextException("apex model update failed, supplied model with key \""
434                             + apexModel.getKey().getId()
435                             + "\" is not a compatible model update from the existing engine model with key \""
436                             + currentModel.getKey().getId() + "\"");
437         }
438     }
439
440     /**
441      * {@inheritDoc}.
442      */
443     @Override
444     public AxEngineState getState() {
445         // If one worker is running then we are running, otherwise we are stopped
446         for (final EngineService engine : engineWorkerMap.values()) {
447             if (engine.getState() != AxEngineState.STOPPED) {
448                 return AxEngineState.EXECUTING;
449             }
450         }
451
452         return AxEngineState.STOPPED;
453     }
454
455     /**
456      * {@inheritDoc}.
457      */
458     @Override
459     public void startAll() throws ApexException {
460         for (final EngineService engine : engineWorkerMap.values()) {
461             start(engine.getKey());
462         }
463     }
464
465     /**
466      * {@inheritDoc}.
467      */
468     @Override
469     public void start(final AxArtifactKey engineKey) throws ApexException {
470         LOGGER.entry(engineKey);
471
472         if (engineKey == null) {
473             String message = ENGINE_KEY_NOT_SPECIFIED;
474             LOGGER.warn(message);
475             throw new ApexException(message);
476         }
477
478         // Check if we have this key on our map
479         if (!engineWorkerMap.containsKey(engineKey)) {
480             String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX;
481             LOGGER.warn(message);
482             throw new ApexException(message);
483         }
484
485         // Start the engine
486         engineWorkerMap.get(engineKey).start(engineKey);
487
488         // Check if periodic events should be turned on
489         if (periodicEventPeriod > 0) {
490             startPeriodicEvents(periodicEventPeriod);
491         }
492
493         LOGGER.exit(engineKey);
494     }
495
496     /**
497      * {@inheritDoc}.
498      */
499     @Override
500     public void stop() throws ApexException {
501         LOGGER.entry();
502
503         if (periodicEventGenerator != null) {
504             periodicEventGenerator.cancel();
505             periodicEventGenerator = null;
506         }
507
508         // Stop each engine
509         for (final EngineService engine : engineWorkerMap.values()) {
510             if (engine.getState() != AxEngineState.STOPPED) {
511                 engine.stop();
512             }
513         }
514
515         LOGGER.exit();
516     }
517
518     /**
519      * {@inheritDoc}.
520      */
521     @Override
522     public void stop(final AxArtifactKey engineKey) throws ApexException {
523         LOGGER.entry(engineKey);
524
525         if (engineKey == null) {
526             String message = ENGINE_KEY_NOT_SPECIFIED;
527             LOGGER.warn(message);
528             throw new ApexException(message);
529         }
530
531         // Check if we have this key on our map
532         if (!engineWorkerMap.containsKey(engineKey)) {
533             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
534             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
535         }
536
537         // Stop the engine
538         engineWorkerMap.get(engineKey).stop(engineKey);
539
540         LOGGER.exit(engineKey);
541     }
542
543     /**
544      * {@inheritDoc}.
545      */
546     @Override
547     public void clear() throws ApexException {
548         LOGGER.entry();
549
550         // Stop each engine
551         for (final EngineService engine : engineWorkerMap.values()) {
552             if (engine.getState() == AxEngineState.STOPPED) {
553                 engine.clear();
554             }
555         }
556
557         LOGGER.exit();
558     }
559
560     /**
561      * {@inheritDoc}.
562      */
563     @Override
564     public void clear(final AxArtifactKey engineKey) throws ApexException {
565         LOGGER.entry(engineKey);
566
567         if (engineKey == null) {
568             String message = ENGINE_KEY_NOT_SPECIFIED;
569             LOGGER.warn(message);
570             throw new ApexException(message);
571         }
572
573         // Check if we have this key on our map
574         if (!engineWorkerMap.containsKey(engineKey)) {
575             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
576             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
577         }
578
579         // Clear the engine
580         if (engineWorkerMap.get(engineKey).getState() == AxEngineState.STOPPED) {
581             engineWorkerMap.get(engineKey).stop(engineKey);
582         }
583
584         LOGGER.exit(engineKey);
585     }
586
587     /**
588      * Check all engines are started.
589      *
590      * @return true if <i>all</i> engines are started
591      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
592      */
593     @Override
594     public boolean isStarted() {
595         for (final EngineService engine : engineWorkerMap.values()) {
596             if (!engine.isStarted()) {
597                 return false;
598             }
599         }
600         return true;
601     }
602
603     /**
604      * {@inheritDoc}.
605      */
606     @Override
607     public boolean isStarted(final AxArtifactKey engineKey) {
608         if (engineKey == null) {
609             String message = ENGINE_KEY_NOT_SPECIFIED;
610             LOGGER.warn(message);
611             return false;
612         }
613
614         // Check if we have this key on our map
615         if (!engineWorkerMap.containsKey(engineKey)) {
616             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
617             return false;
618         }
619         return engineWorkerMap.get(engineKey).isStarted();
620     }
621
622     /**
623      * Check all engines are stopped.
624      *
625      * @return true if <i>all</i> engines are stopped
626      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
627      */
628     @Override
629     public boolean isStopped() {
630         for (final EngineService engine : engineWorkerMap.values()) {
631             if (!engine.isStopped()) {
632                 return false;
633             }
634         }
635         return true;
636     }
637
638     /**
639      * {@inheritDoc}.
640      */
641     @Override
642     public boolean isStopped(final AxArtifactKey engineKey) {
643         if (engineKey == null) {
644             String message = ENGINE_KEY_NOT_SPECIFIED;
645             LOGGER.warn(message);
646             return true;
647         }
648
649         // Check if we have this key on our map
650         if (!engineWorkerMap.containsKey(engineKey)) {
651             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
652             return true;
653         }
654         return engineWorkerMap.get(engineKey).isStopped();
655     }
656
657     /**
658      * {@inheritDoc}.
659      */
660     @Override
661     public void startPeriodicEvents(final long period) throws ApexException {
662         // Check if periodic events are already started
663         if (periodicEventGenerator != null) {
664             String message = "Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
665                             + periodicEventGenerator.toString();
666             LOGGER.warn(message);
667             throw new ApexException(message);
668         }
669
670         // Set up periodic event execution, its a Java Timer/TimerTask
671         periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
672
673         // Record the periodic event period because it may have been set over the Web Socket admin
674         // interface
675         this.periodicEventPeriod = period;
676     }
677
678     /**
679      * {@inheritDoc}.
680      */
681     @Override
682     public void stopPeriodicEvents() throws ApexException {
683         // Check if periodic events are already started
684         if (periodicEventGenerator == null) {
685             LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getId());
686             throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getId());
687         }
688
689         // Stop periodic events
690         periodicEventGenerator.cancel();
691         periodicEventGenerator = null;
692         periodicEventPeriod = 0;
693     }
694
695     /**
696      * {@inheritDoc}.
697      */
698     @Override
699     public String getStatus(final AxArtifactKey engineKey) throws ApexException {
700         if (engineKey == null) {
701             String message = ENGINE_KEY_NOT_SPECIFIED;
702             LOGGER.warn(message);
703             throw new ApexException(message);
704         }
705
706         // Check if we have this key on our map
707         if (!engineWorkerMap.containsKey(engineKey)) {
708             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
709             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
710         }
711         // Return the information for this worker
712         return engineWorkerMap.get(engineKey).getStatus(engineKey);
713     }
714
715     /**
716      * {@inheritDoc}.
717      *
718      */
719     @Override
720     public List<AxEngineModel> getEngineStats() {
721         List<AxEngineModel> engineStats = new ArrayList<>();
722         for (final EngineService engine : engineWorkerMap.values()) {
723             engineStats.addAll(engine.getEngineStats());
724         }
725         return engineStats;
726     }
727
728     /**
729      * {@inheritDoc}.
730      */
731     @Override
732     public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
733         if (engineKey == null) {
734             String message = ENGINE_KEY_NOT_SPECIFIED;
735             LOGGER.warn(message);
736             throw new ApexException(message);
737         }
738
739         // Check if we have this key on our map
740         if (!engineWorkerMap.containsKey(engineKey)) {
741             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
742             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
743         }
744
745         // Return the information for this worker
746         return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
747     }
748
749     /**
750      * {@inheritDoc}.
751      */
752     @Override
753     public void sendEvent(final ApexEvent event) {
754         if (event == null) {
755             LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
756             return;
757         }
758
759         // Check if we have this key on our map
760         if (getState() == AxEngineState.STOPPED) {
761             LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
762                             + engineServiceKey.getId() + " are running");
763             return;
764         }
765
766         if (DEBUG_ENABLED) {
767             LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
768         }
769
770         // Add the incoming event to the queue, the next available worker will process it
771         queue.add(event);
772     }
773 }