a558b9946b578ab2c50c41ac17c61fafb4c0bfdd
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.runtime.impl;
23
24 import java.io.ByteArrayInputStream;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.LinkedHashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.LinkedBlockingQueue;
34 import org.onap.policy.apex.context.ContextException;
35 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
36 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
37 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
38 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
39 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
40 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
41 import org.onap.policy.apex.model.basicmodel.service.ModelService;
42 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
43 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
44 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
45 import org.onap.policy.apex.service.engine.event.ApexEvent;
46 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
47 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
48 import org.onap.policy.apex.service.engine.runtime.EngineService;
49 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
50 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
51 import org.onap.policy.common.parameters.GroupValidationResult;
52 import org.slf4j.ext.XLogger;
53 import org.slf4j.ext.XLoggerFactory;
54
55 /**
56  * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each of which is running
57  * on an identical Apex model. This class handles the management of the engine worker instances, their threads, and
58  * event forwarding to and from the engine workers.
59  *
60  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
61  * @author Liam Fallon (liam.fallon@ericsson.com)
62  * @author John Keeney (john.keeney@ericsson.com)
63  */
64 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
65     // Logging static variables
66     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
67     private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
68
69     // Recurring string constants
70     private static final String ENGINE_KEY_PREAMBLE = "engine with key ";
71     private static final String NOT_FOUND_SUFFIX = " not found in engine service";
72     private static final String ENGINE_KEY_NOT_SPECIFIED = "engine key must be specified and may not be null";
73
74     // Constants for timing
75     private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
76     private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
77     private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
78
79     // The ID of this engine
80     private AxArtifactKey engineServiceKey = null;
81
82     // The Apex engine workers this engine service is handling
83     private final Map<AxArtifactKey, EngineWorker> engineWorkerMap = Collections
84                     .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineWorker>());
85
86     // Event queue for events being sent into the Apex engines, it used by all engines within a
87     // group.
88     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
89
90     // Thread factory for thread management
91     private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
92
93     // Periodic event generator and its period in milliseconds
94     private ApexPeriodicEventGenerator periodicEventGenerator = null;
95     private long periodicEventPeriod;
96
97     /**
98      * This constructor instantiates engine workers and adds them to the set of engine workers to be managed. The
99      * constructor is private to prevent subclassing.
100      *
101      * @param engineServiceKey the engine service key
102      * @param threadCount the thread count, the number of engine workers to start
103      * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
104      * @throws ApexException on worker instantiation errors
105      */
106     private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int threadCount,
107                     final long periodicEventPeriod) {
108         LOGGER.entry(engineServiceKey, threadCount);
109
110         this.engineServiceKey = engineServiceKey;
111         this.periodicEventPeriod = periodicEventPeriod;
112
113         // Start engine workers
114         for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
115             final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
116                             engineServiceKey.getVersion());
117             engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
118             LOGGER.info("Created apex engine {} .", engineWorkerKey.getId());
119         }
120
121         LOGGER.info("APEX service created.");
122         LOGGER.exit();
123     }
124
125     /**
126      * Create an Apex Engine Service instance. This method does not load the policy so
127      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
128      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
129      * start the Engine Service so {@link #start(AxArtifactKey)} or {@link #startAll()} must be used.
130      *
131      * @param config the configuration for this Apex Engine Service.
132      * @return the Engine Service instance
133      * @throws ApexException on worker instantiation errors
134      */
135     public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
136         if (config == null) {
137             LOGGER.warn("Engine service configuration parameters is null");
138             throw new ApexException("engine service configuration parameters are null");
139         }
140
141         final GroupValidationResult validation = config.validate();
142         if (!validation.isValid()) {
143             LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
144             throw new ApexException("Invalid engine service configuration parameters: " + validation);
145         }
146
147         final AxArtifactKey engineServiceKey = config.getEngineKey();
148         final int threadCount = config.getInstanceCount();
149
150         return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
151     }
152
153     /**
154      * {@inheritDoc}.
155      */
156     @Override
157     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
158         LOGGER.entry(apexEventListener);
159
160         if (listenerName == null) {
161             String message = "listener name must be specified and may not be null";
162             LOGGER.warn(message);
163             return;
164         }
165
166         if (apexEventListener == null) {
167             String message = "apex event listener must be specified and may not be null";
168             LOGGER.warn(message);
169             return;
170         }
171
172         // Register the Apex event listener on all engine workers, each worker will return Apex
173         // events to the listening application
174         for (final EngineService engineWorker : engineWorkerMap.values()) {
175             engineWorker.registerActionListener(listenerName, apexEventListener);
176         }
177
178         LOGGER.info("Added the action listener to the engine");
179         LOGGER.exit();
180     }
181
182     /**
183      * {@inheritDoc}.
184      */
185     @Override
186     public void deregisterActionListener(final String listenerName) {
187         LOGGER.entry(listenerName);
188
189         // Register the Apex event listener on all engine workers, each worker will return Apex
190         // events to the listening application
191         for (final EngineService engineWorker : engineWorkerMap.values()) {
192             engineWorker.deregisterActionListener(listenerName);
193         }
194
195         LOGGER.info("Removed the action listener from the engine");
196         LOGGER.exit();
197     }
198
199     /**
200      * {@inheritDoc}.
201      */
202     @Override
203     public EngineServiceEventInterface getEngineServiceEventInterface() {
204         return this;
205     }
206
207     /**
208      * {@inheritDoc}.
209      */
210     @Override
211     public AxArtifactKey getKey() {
212         return engineServiceKey;
213     }
214
215     /**
216      * {@inheritDoc}.
217      */
218     @Override
219     public Collection<AxArtifactKey> getEngineKeys() {
220         return engineWorkerMap.keySet();
221     }
222
223     /**
224      * {@inheritDoc}.
225      */
226     @Override
227     public AxArtifactKey getApexModelKey() {
228         if (engineWorkerMap.size() == 0) {
229             return null;
230         }
231
232         return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
233     }
234
235     /**
236     * Method to create model.
237     *
238     * @param incomingEngineServiceKey incoming engine service key
239     * @param apexModelString apex model string
240     * @return apexPolicyModel the policy model
241     * @throws ApexException apex exception
242     */
243     public static AxPolicyModel createModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString)
244         throws ApexException {
245         // Check if the engine service key specified is sane
246         if (incomingEngineServiceKey == null) {
247             String message = ENGINE_KEY_NOT_SPECIFIED;
248             LOGGER.warn(message);
249             throw new ApexException(message);
250         }
251
252         // Check if the Apex model specified is sane
253         if (apexModelString == null || apexModelString.trim().length() == 0) {
254             String emptyModelMessage = "model for updating engine service with key "
255                             + incomingEngineServiceKey.getId() + " is empty";
256             LOGGER.warn(emptyModelMessage);
257             throw new ApexException(emptyModelMessage);
258         }
259
260         // Read the Apex model into memory using the Apex Model Reader
261         AxPolicyModel apexPolicyModel = null;
262         try {
263             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
264             apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
265         } catch (final ApexModelException e) {
266             String message = "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId();
267             LOGGER.error(message, e);
268             throw new ApexException(message, e);
269         }
270         return apexPolicyModel;
271     }
272
273     /**
274      * {@inheritDoc}.
275      */
276     @Override
277     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
278                     final boolean forceFlag) throws ApexException {
279         AxPolicyModel apexPolicyModel = createModel(incomingEngineServiceKey, apexModelString);
280
281         // Update the model
282         updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
283
284         LOGGER.exit();
285     }
286
287     /**
288      * {@inheritDoc}.
289      */
290     @Override
291     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
292                     final boolean forceFlag) throws ApexException {
293         LOGGER.entry(incomingEngineServiceKey);
294
295         // Check if the engine service key specified is sane
296         if (incomingEngineServiceKey == null) {
297             String message = ENGINE_KEY_NOT_SPECIFIED;
298             LOGGER.warn(message);
299             throw new ApexException(message);
300         }
301
302         // Check if the Apex model specified is sane
303         if (apexModel == null) {
304             LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
305                             + " is null");
306             throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
307                             + " is null");
308         }
309
310         // Check if the key on the update request is correct
311         if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
312             LOGGER.warn("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
313                             + engineServiceKey.getId() + " of this engine service");
314             throw new ApexException("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
315                             + engineServiceKey.getId() + " of this engine service");
316         }
317
318         // Check model compatibility
319         if (ModelService.existsModel(AxPolicyModel.class)) {
320             // The current policy model may or may not be defined
321             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
322             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
323                 handleIncompatibility(apexModel, forceFlag, currentModel);
324             }
325         }
326
327         executeModelUpdate(incomingEngineServiceKey, apexModel, forceFlag);
328
329         LOGGER.exit();
330     }
331
332     /**
333      * Execute the model update on the engine instances.
334      *
335      * @param incomingEngineServiceKey the engine service key to update
336      * @param apexModel the model to update the engines with
337      * @param forceFlag if true, ignore compatibility problems
338      * @throws ApexException on model update errors
339      */
340     private void executeModelUpdate(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
341                     final boolean forceFlag) throws ApexException {
342
343         if (!isStopped()) {
344             stopEngines(incomingEngineServiceKey);
345         }
346
347         // Update the engines
348         boolean isSubsequentInstance = false;
349         for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
350             LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
351             EngineWorker engineWorker = engineWorkerEntry.getValue();
352             if (isSubsequentInstance) {
353                 // set subsequentInstance flag as true if the current engine worker instance is not the first one
354                 // first engine instance will have this flag as false
355                 engineWorker.setSubsequentInstance(true);
356             }
357             engineWorker.updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
358             isSubsequentInstance = true;
359         }
360
361         // start all engines on this engine service if it was not stopped before the update
362         startAll();
363         final long starttime = System.currentTimeMillis();
364         while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
365             ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
366         }
367         // Check if all engines are running
368         final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
369         for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
370             if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
371                             && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
372                 notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
373                 notRunningEngineIdBuilder.append('(');
374                 notRunningEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
375                 notRunningEngineIdBuilder.append(") ");
376             }
377         }
378         if (notRunningEngineIdBuilder.length() > 0) {
379             final String errorString = "engine start error on model update on engine service with key "
380                             + incomingEngineServiceKey.getId() + ", engines not running are: "
381                             + notRunningEngineIdBuilder.toString().trim();
382             LOGGER.warn(errorString);
383             throw new ApexException(errorString);
384         }
385     }
386
387     /**
388      * Stop engines for a model update.
389      * @param incomingEngineServiceKey the engine service key for the engines that are to be stopped
390      * @throws ApexException on errors stopping engines
391      */
392     private void stopEngines(final AxArtifactKey incomingEngineServiceKey) throws ApexException {
393         // Stop all engines on this engine service
394         stop();
395         final long stoptime = System.currentTimeMillis();
396         while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
397             ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
398         }
399         // Check if all engines are stopped
400         final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
401         for (final Entry<AxArtifactKey, EngineWorker> engineWorkerEntry : engineWorkerMap.entrySet()) {
402             if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
403                 notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
404                 notStoppedEngineIdBuilder.append('(');
405                 notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
406                 notStoppedEngineIdBuilder.append(") ");
407             }
408         }
409         if (notStoppedEngineIdBuilder.length() > 0) {
410             final String errorString = "cannot update model on engine service with key "
411                             + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
412                             + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
413             LOGGER.warn(errorString);
414             throw new ApexException(errorString);
415         }
416     }
417
418     /**
419      * Issue compatibility warning or error message.
420      * @param apexModel The model name
421      * @param forceFlag true if we are forcing the update
422      * @param currentModel the existing model that is loaded
423      * @throws ContextException on compatibility errors
424      */
425     private void handleIncompatibility(final AxPolicyModel apexModel, final boolean forceFlag,
426                     final AxPolicyModel currentModel) throws ContextException {
427         if (forceFlag) {
428             LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
429                             + "\" is not a compatible model update from the existing engine model with key \""
430                             + currentModel.getKey().getId() + "\"");
431         } else {
432             throw new ContextException("apex model update failed, supplied model with key \""
433                             + apexModel.getKey().getId()
434                             + "\" is not a compatible model update from the existing engine model with key \""
435                             + currentModel.getKey().getId() + "\"");
436         }
437     }
438
439     /**
440      * {@inheritDoc}.
441      */
442     @Override
443     public AxEngineState getState() {
444         // If one worker is running then we are running, otherwise we are stopped
445         for (final EngineService engine : engineWorkerMap.values()) {
446             if (engine.getState() != AxEngineState.STOPPED) {
447                 return AxEngineState.EXECUTING;
448             }
449         }
450
451         return AxEngineState.STOPPED;
452     }
453
454     /**
455      * {@inheritDoc}.
456      */
457     @Override
458     public void startAll() throws ApexException {
459         for (final EngineService engine : engineWorkerMap.values()) {
460             start(engine.getKey());
461         }
462     }
463
464     /**
465      * {@inheritDoc}.
466      */
467     @Override
468     public void start(final AxArtifactKey engineKey) throws ApexException {
469         LOGGER.entry(engineKey);
470
471         if (engineKey == null) {
472             String message = ENGINE_KEY_NOT_SPECIFIED;
473             LOGGER.warn(message);
474             throw new ApexException(message);
475         }
476
477         // Check if we have this key on our map
478         if (!engineWorkerMap.containsKey(engineKey)) {
479             String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX;
480             LOGGER.warn(message);
481             throw new ApexException(message);
482         }
483
484         // Start the engine
485         engineWorkerMap.get(engineKey).start(engineKey);
486
487         // Check if periodic events should be turned on
488         if (periodicEventPeriod > 0) {
489             startPeriodicEvents(periodicEventPeriod);
490         }
491
492         LOGGER.exit(engineKey);
493     }
494
495     /**
496      * {@inheritDoc}.
497      */
498     @Override
499     public void stop() throws ApexException {
500         LOGGER.entry();
501
502         if (periodicEventGenerator != null) {
503             periodicEventGenerator.cancel();
504             periodicEventGenerator = null;
505         }
506
507         // Stop each engine
508         for (final EngineService engine : engineWorkerMap.values()) {
509             if (engine.getState() != AxEngineState.STOPPED) {
510                 engine.stop();
511             }
512         }
513
514         LOGGER.exit();
515     }
516
517     /**
518      * {@inheritDoc}.
519      */
520     @Override
521     public void stop(final AxArtifactKey engineKey) throws ApexException {
522         LOGGER.entry(engineKey);
523
524         if (engineKey == null) {
525             String message = ENGINE_KEY_NOT_SPECIFIED;
526             LOGGER.warn(message);
527             throw new ApexException(message);
528         }
529
530         // Check if we have this key on our map
531         if (!engineWorkerMap.containsKey(engineKey)) {
532             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
533             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
534         }
535
536         // Stop the engine
537         engineWorkerMap.get(engineKey).stop(engineKey);
538
539         LOGGER.exit(engineKey);
540     }
541
542     /**
543      * {@inheritDoc}.
544      */
545     @Override
546     public void clear() throws ApexException {
547         LOGGER.entry();
548
549         // Stop each engine
550         for (final EngineService engine : engineWorkerMap.values()) {
551             if (engine.getState() == AxEngineState.STOPPED) {
552                 engine.clear();
553             }
554         }
555
556         LOGGER.exit();
557     }
558
559     /**
560      * {@inheritDoc}.
561      */
562     @Override
563     public void clear(final AxArtifactKey engineKey) throws ApexException {
564         LOGGER.entry(engineKey);
565
566         if (engineKey == null) {
567             String message = ENGINE_KEY_NOT_SPECIFIED;
568             LOGGER.warn(message);
569             throw new ApexException(message);
570         }
571
572         // Check if we have this key on our map
573         if (!engineWorkerMap.containsKey(engineKey)) {
574             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
575             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
576         }
577
578         // Clear the engine
579         if (engineWorkerMap.get(engineKey).getState() == AxEngineState.STOPPED) {
580             engineWorkerMap.get(engineKey).stop(engineKey);
581         }
582
583         LOGGER.exit(engineKey);
584     }
585
586     /**
587      * Check all engines are started.
588      *
589      * @return true if <i>all</i> engines are started
590      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
591      */
592     @Override
593     public boolean isStarted() {
594         for (final EngineService engine : engineWorkerMap.values()) {
595             if (!engine.isStarted()) {
596                 return false;
597             }
598         }
599         return true;
600     }
601
602     /**
603      * {@inheritDoc}.
604      */
605     @Override
606     public boolean isStarted(final AxArtifactKey engineKey) {
607         if (engineKey == null) {
608             String message = ENGINE_KEY_NOT_SPECIFIED;
609             LOGGER.warn(message);
610             return false;
611         }
612
613         // Check if we have this key on our map
614         if (!engineWorkerMap.containsKey(engineKey)) {
615             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
616             return false;
617         }
618         return engineWorkerMap.get(engineKey).isStarted();
619     }
620
621     /**
622      * Check all engines are stopped.
623      *
624      * @return true if <i>all</i> engines are stopped
625      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
626      */
627     @Override
628     public boolean isStopped() {
629         for (final EngineService engine : engineWorkerMap.values()) {
630             if (!engine.isStopped()) {
631                 return false;
632             }
633         }
634         return true;
635     }
636
637     /**
638      * {@inheritDoc}.
639      */
640     @Override
641     public boolean isStopped(final AxArtifactKey engineKey) {
642         if (engineKey == null) {
643             String message = ENGINE_KEY_NOT_SPECIFIED;
644             LOGGER.warn(message);
645             return true;
646         }
647
648         // Check if we have this key on our map
649         if (!engineWorkerMap.containsKey(engineKey)) {
650             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
651             return true;
652         }
653         return engineWorkerMap.get(engineKey).isStopped();
654     }
655
656     /**
657      * {@inheritDoc}.
658      */
659     @Override
660     public void startPeriodicEvents(final long period) throws ApexException {
661         // Check if periodic events are already started
662         if (periodicEventGenerator != null) {
663             String message = "Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
664                             + periodicEventGenerator.toString();
665             LOGGER.warn(message);
666             throw new ApexException(message);
667         }
668
669         // Set up periodic event execution, its a Java Timer/TimerTask
670         periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
671
672         // Record the periodic event period because it may have been set over the Web Socket admin
673         // interface
674         this.periodicEventPeriod = period;
675     }
676
677     /**
678      * {@inheritDoc}.
679      */
680     @Override
681     public void stopPeriodicEvents() throws ApexException {
682         // Check if periodic events are already started
683         if (periodicEventGenerator == null) {
684             LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getId());
685             throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getId());
686         }
687
688         // Stop periodic events
689         periodicEventGenerator.cancel();
690         periodicEventGenerator = null;
691         periodicEventPeriod = 0;
692     }
693
694     /**
695      * {@inheritDoc}.
696      */
697     @Override
698     public String getStatus(final AxArtifactKey engineKey) throws ApexException {
699         if (engineKey == null) {
700             String message = ENGINE_KEY_NOT_SPECIFIED;
701             LOGGER.warn(message);
702             throw new ApexException(message);
703         }
704
705         // Check if we have this key on our map
706         if (!engineWorkerMap.containsKey(engineKey)) {
707             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
708             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
709         }
710         // Return the information for this worker
711         return engineWorkerMap.get(engineKey).getStatus(engineKey);
712     }
713
714     /**
715      * {@inheritDoc}.
716      *
717      */
718     @Override
719     public List<AxEngineModel> getEngineStats() {
720         List<AxEngineModel> engineStats = new ArrayList<>();
721         for (final EngineService engine : engineWorkerMap.values()) {
722             engineStats.addAll(engine.getEngineStats());
723         }
724         return engineStats;
725     }
726
727     /**
728      * {@inheritDoc}.
729      */
730     @Override
731     public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
732         if (engineKey == null) {
733             String message = ENGINE_KEY_NOT_SPECIFIED;
734             LOGGER.warn(message);
735             throw new ApexException(message);
736         }
737
738         // Check if we have this key on our map
739         if (!engineWorkerMap.containsKey(engineKey)) {
740             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
741             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
742         }
743
744         // Return the information for this worker
745         return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
746     }
747
748     /**
749      * {@inheritDoc}.
750      */
751     @Override
752     public void sendEvent(final ApexEvent event) {
753         if (event == null) {
754             LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
755             return;
756         }
757
758         // Check if we have this key on our map
759         if (getState() == AxEngineState.STOPPED) {
760             LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
761                             + engineServiceKey.getId() + " are running");
762             return;
763         }
764
765         if (DEBUG_ENABLED) {
766             LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
767         }
768
769         // Add the incoming event to the queue, the next available worker will process it
770         queue.add(event);
771     }
772 }