c99987542e7f765a3ff03b0c0393334661e8bebb
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.runtime.impl;
22
23 import java.io.ByteArrayInputStream;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.LinkedHashMap;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingQueue;
31
32 import org.onap.policy.apex.context.ContextException;
33 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
36 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
37 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
38 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
39 import org.onap.policy.apex.model.basicmodel.service.ModelService;
40 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
41 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
42 import org.onap.policy.apex.service.engine.event.ApexEvent;
43 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
44 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
45 import org.onap.policy.apex.service.engine.runtime.EngineService;
46 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
47 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
48 import org.onap.policy.common.parameters.GroupValidationResult;
49 import org.slf4j.ext.XLogger;
50 import org.slf4j.ext.XLoggerFactory;
51
52 /**
53  * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each of which is running
54  * on an identical Apex model. This class handles the management of the engine worker instances, their threads, and
55  * event forwarding to and from the engine workers.
56  *
57  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
58  * @author Liam Fallon (liam.fallon@ericsson.com)
59  * @author John Keeney (john.keeney@ericsson.com)
60  */
61 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
62     // Logging static variables
63     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
64     private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
65
66     // Recurring string constants
67     private static final String ENGINE_KEY_PREAMBLE = "engine with key ";
68     private static final String NOT_FOUND_SUFFIX = " not found in engine service";
69
70     // Constants for timing
71     private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
72     private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
73     private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
74
75     // The ID of this engine
76     private AxArtifactKey engineServiceKey = null;
77
78     // The Apex engine workers this engine service is handling
79     private final Map<AxArtifactKey, EngineService> engineWorkerMap = Collections
80                     .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineService>());
81
82     // Event queue for events being sent into the Apex engines, it used by all engines within a
83     // group.
84     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
85
86     // Thread factory for thread management
87     private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
88
89     // Periodic event generator and its period in milliseconds
90     private ApexPeriodicEventGenerator periodicEventGenerator = null;
91     private long periodicEventPeriod;
92
93     /**
94      * This constructor instantiates engine workers and adds them to the set of engine workers to be managed. The
95      * constructor is private to prevent subclassing.
96      *
97      * @param engineServiceKey the engine service key
98      * @param incomingThreadCount the thread count, the number of engine workers to start
99      * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
100      * @throws ApexException on worker instantiation errors
101      */
102     private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int incomingThreadCount,
103                     final long periodicEventPeriod) {
104         LOGGER.entry(engineServiceKey, incomingThreadCount);
105
106         this.engineServiceKey = engineServiceKey;
107         this.periodicEventPeriod = periodicEventPeriod;
108
109         int threadCount = incomingThreadCount;
110         if (threadCount <= 0) {
111             // Just start one engine worker
112             threadCount = 1;
113         }
114
115         // Start engine workers
116         for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
117             final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
118                             engineServiceKey.getVersion());
119             engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
120             LOGGER.info("Created apex engine {} .", engineWorkerKey.getId());
121         }
122
123         LOGGER.info("APEX service created.");
124         LOGGER.exit();
125     }
126
127     /**
128      * Create an Apex Engine Service instance. This method does not load the policy so
129      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
130      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
131      * start the Engine Service so {@link #start(AxArtifactKey)} or {@link #startAll()} must be used.
132      *
133      * @param config the configuration for this Apex Engine Service.
134      * @return the Engine Service instance
135      * @throws ApexException on worker instantiation errors
136      */
137     public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
138         if (config == null) {
139             LOGGER.warn("Engine service configuration parameters is null");
140             throw new ApexException("engine service configuration parameters is null");
141         }
142         final GroupValidationResult validation = config.validate();
143         if (!validation.isValid()) {
144             LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
145             throw new ApexException("Invalid engine service configuration parameters: " + validation);
146         }
147         final AxArtifactKey engineServiceKey = config.getEngineKey();
148         final int threadCount = config.getInstanceCount();
149
150         // Check if the Apex model specified is sane
151         if (engineServiceKey == null) {
152             LOGGER.warn("engine service key is null");
153             throw new ApexException("engine service key is null");
154         }
155
156         return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
157     }
158
159     /*
160      * (non-Javadoc)
161      * 
162      * @see org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. String,
163      * org.onap.policy.apex.service.engine.runtime.ApexEventListener)
164      */
165     @Override
166     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
167         LOGGER.entry(apexEventListener);
168
169         // Register the Apex event listener on all engine workers, each worker will return Apex
170         // events to the listening application
171         for (final EngineService engineWorker : engineWorkerMap.values()) {
172             engineWorker.registerActionListener(listenerName, apexEventListener);
173         }
174
175         LOGGER.info("Added the action listener to the engine");
176         LOGGER.exit();
177     }
178
179     /*
180      * (non-Javadoc)
181      * 
182      * @see org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. String)
183      */
184     @Override
185     public void deregisterActionListener(final String listenerName) {
186         LOGGER.entry(listenerName);
187
188         // Register the Apex event listener on all engine workers, each worker will return Apex
189         // events to the listening application
190         for (final EngineService engineWorker : engineWorkerMap.values()) {
191             engineWorker.deregisterActionListener(listenerName);
192         }
193
194         LOGGER.info("Removed the action listener from the engine");
195         LOGGER.exit();
196     }
197
198     /*
199      * (non-Javadoc)
200      *
201      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
202      */
203     @Override
204     public EngineServiceEventInterface getEngineServiceEventInterface() {
205         return this;
206     }
207
208     /*
209      * (non-Javadoc)
210      *
211      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey()
212      */
213     @Override
214     public AxArtifactKey getKey() {
215         return engineServiceKey;
216     }
217
218     /*
219      * (non-Javadoc)
220      *
221      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo()
222      */
223     @Override
224     public Collection<AxArtifactKey> getEngineKeys() {
225         return engineWorkerMap.keySet();
226     }
227
228     /*
229      * (non-Javadoc)
230      *
231      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey()
232      */
233     @Override
234     public AxArtifactKey getApexModelKey() {
235         if (engineWorkerMap.size() == 0) {
236             return null;
237         }
238
239         return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
240     }
241
242     /*
243      * (non-Javadoc)
244      *
245      * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.model.
246      * basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
247      */
248     @Override
249     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
250                     final boolean forceFlag) throws ApexException {
251         // Check if the Apex model specified is sane
252         if (apexModelString == null || apexModelString.trim().length() == 0) {
253             String emptyModelMessage = "model for updating engine service with key "
254                             + incomingEngineServiceKey.getId() + " is empty";
255             LOGGER.warn(emptyModelMessage);
256             throw new ApexException(emptyModelMessage);
257         }
258
259         // Read the Apex model into memory using the Apex Model Reader
260         AxPolicyModel apexPolicyModel = null;
261         try {
262             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
263             apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
264         } catch (final ApexModelException e) {
265             String message = "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId();
266             LOGGER.error(message, e);
267             throw new ApexException(message, e);
268         }
269
270         if (apexPolicyModel == null) {
271             String message = "apex model null on engine service " + incomingEngineServiceKey.getId();
272             LOGGER.error(message);
273             throw new ApexException(message);
274         }
275
276         // Update the model
277         updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
278
279         LOGGER.exit();
280     }
281
282     /*
283      * (non-Javadoc)
284      *
285      * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.model.
286      * basicmodel.concepts.AxArtifactKey, org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean)
287      */
288     @Override
289     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
290                     final boolean forceFlag) throws ApexException {
291         LOGGER.entry(incomingEngineServiceKey);
292
293         // Check if the Apex model specified is sane
294         if (apexModel == null) {
295             LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
296                             + " is null");
297             throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
298                             + " is null");
299         }
300
301         // Check if the key on the update request is correct
302         if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
303             LOGGER.warn("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
304                             + engineServiceKey.getId() + " of this engine service");
305             throw new ApexException("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
306                             + engineServiceKey.getId() + " of this engine service");
307         }
308
309         // Check model compatibility
310         if (ModelService.existsModel(AxPolicyModel.class)) {
311             // The current policy model may or may not be defined
312             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
313             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
314                 handleIncompatibility(apexModel, forceFlag, currentModel);
315             }
316         }
317
318         if (!isStopped()) {
319             stopEngines(incomingEngineServiceKey);
320         }
321
322         // Update the engines
323         for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
324             LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
325             engineWorkerEntry.getValue().updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
326         }
327
328         // start all engines on this engine service if it was not stopped before the update
329         startAll();
330         final long starttime = System.currentTimeMillis();
331         while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
332             ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
333         }
334         // Check if all engines are running
335         final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
336         for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
337             if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
338                             && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
339                 notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
340                 notRunningEngineIdBuilder.append('(');
341                 notRunningEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
342                 notRunningEngineIdBuilder.append(") ");
343             }
344         }
345         if (notRunningEngineIdBuilder.length() > 0) {
346             final String errorString = "engine start error on model update on engine service with key "
347                             + incomingEngineServiceKey.getId() + ", engines not running are: "
348                             + notRunningEngineIdBuilder.toString().trim();
349             LOGGER.warn(errorString);
350             throw new ApexException(errorString);
351         }
352
353         LOGGER.exit();
354     }
355
356     /**
357      * Stop engines for a model update.
358      * @param incomingEngineServiceKey the engine service key for the engines that are to be stopped
359      * @throws ApexException on errors stopping engines
360      */
361     private void stopEngines(final AxArtifactKey incomingEngineServiceKey) throws ApexException {
362         // Stop all engines on this engine service
363         stop();
364         final long stoptime = System.currentTimeMillis();
365         while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
366             ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
367         }
368         // Check if all engines are stopped
369         final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
370         for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
371             if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
372                 notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
373                 notStoppedEngineIdBuilder.append('(');
374                 notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
375                 notStoppedEngineIdBuilder.append(") ");
376             }
377         }
378         if (notStoppedEngineIdBuilder.length() > 0) {
379             final String errorString = "cannot update model on engine service with key "
380                             + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
381                             + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
382             LOGGER.warn(errorString);
383             throw new ApexException(errorString);
384         }
385     }
386
387     /**
388      * Issue compatibility warning or error message.
389      * @param apexModel The model name
390      * @param forceFlag true if we are forcing the update
391      * @param currentModel the existing model that is loaded
392      * @throws ContextException on compatibility errors
393      */
394     private void handleIncompatibility(final AxPolicyModel apexModel, final boolean forceFlag,
395                     final AxPolicyModel currentModel) throws ContextException {
396         if (forceFlag) {
397             LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
398                             + "\" is not a compatible model update from the existing engine model with key \""
399                             + currentModel.getKey().getId() + "\"");
400         } else {
401             throw new ContextException("apex model update failed, supplied model with key \""
402                             + apexModel.getKey().getId()
403                             + "\" is not a compatible model update from the existing engine model with key \""
404                             + currentModel.getKey().getId() + "\"");
405         }
406     }
407
408     /*
409      * (non-Javadoc)
410      *
411      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState()
412      */
413     @Override
414     public AxEngineState getState() {
415         // If one worker is running then we are running, otherwise we are stopped
416         for (final EngineService engine : engineWorkerMap.values()) {
417             if (engine.getState() != AxEngineState.STOPPED) {
418                 return AxEngineState.EXECUTING;
419             }
420         }
421
422         return AxEngineState.STOPPED;
423     }
424
425     /*
426      * (non-Javadoc)
427      *
428      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll()
429      */
430     @Override
431     public void startAll() throws ApexException {
432         for (final EngineService engine : engineWorkerMap.values()) {
433             start(engine.getKey());
434         }
435
436         // Check if periodic events should be turned on
437         if (periodicEventPeriod > 0) {
438             startPeriodicEvents(periodicEventPeriod);
439         }
440     }
441
442     /*
443      * (non-Javadoc)
444      *
445      * @see org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core.model.
446      * concepts.AxArtifactKey)
447      */
448     @Override
449     public void start(final AxArtifactKey engineKey) throws ApexException {
450         LOGGER.entry(engineKey);
451
452         // Check if we have this key on our map
453         if (!engineWorkerMap.containsKey(engineKey)) {
454             String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX;
455             LOGGER.warn(message);
456             throw new ApexException(message);
457         }
458
459         // Start the engine
460         engineWorkerMap.get(engineKey).start(engineKey);
461
462         LOGGER.exit(engineKey);
463     }
464
465     /*
466      * (non-Javadoc)
467      *
468      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop()
469      */
470     @Override
471     public void stop() throws ApexException {
472         LOGGER.entry();
473
474         // Stop each engine
475         for (final EngineService engine : engineWorkerMap.values()) {
476             if (engine.getState() != AxEngineState.STOPPED) {
477                 engine.stop();
478             }
479         }
480
481         LOGGER.exit();
482     }
483
484     /*
485      * (non-Javadoc)
486      *
487      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core.model.
488      * concepts.AxArtifactKey)
489      */
490     @Override
491     public void stop(final AxArtifactKey engineKey) throws ApexException {
492         LOGGER.entry(engineKey);
493
494         // Check if we have this key on our map
495         if (!engineWorkerMap.containsKey(engineKey)) {
496             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
497             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
498         }
499
500         // Stop the engine
501         engineWorkerMap.get(engineKey).stop(engineKey);
502
503         LOGGER.exit(engineKey);
504     }
505
506     /*
507      * (non-Javadoc)
508      *
509      * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear()
510      */
511     @Override
512     public void clear() throws ApexException {
513         LOGGER.entry();
514
515         // Stop each engine
516         for (final EngineService engine : engineWorkerMap.values()) {
517             if (engine.getState() == AxEngineState.STOPPED) {
518                 engine.clear();
519             }
520         }
521
522         LOGGER.exit();
523     }
524
525     /*
526      * (non-Javadoc)
527      *
528      * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core.model.
529      * concepts.AxArtifactKey)
530      */
531     @Override
532     public void clear(final AxArtifactKey engineKey) throws ApexException {
533         LOGGER.entry(engineKey);
534
535         // Check if we have this key on our map
536         if (!engineWorkerMap.containsKey(engineKey)) {
537             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
538             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
539         }
540
541         // Clear the engine
542         if (engineWorkerMap.get(engineKey).getState() == AxEngineState.STOPPED) {
543             engineWorkerMap.get(engineKey).stop(engineKey);
544         }
545
546         LOGGER.exit(engineKey);
547     }
548
549     /**
550      * Check all engines are started.
551      *
552      * @return true if <i>all</i> engines are started
553      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
554      */
555     @Override
556     public boolean isStarted() {
557         for (final EngineService engine : engineWorkerMap.values()) {
558             if (!engine.isStarted()) {
559                 return false;
560             }
561         }
562         return true;
563     }
564
565     /*
566      * (non-Javadoc)
567      *
568      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex.model.
569      * basicmodel.concepts.AxArtifactKey)
570      */
571     @Override
572     public boolean isStarted(final AxArtifactKey engineKey) {
573         // Check if we have this key on our map
574         if (!engineWorkerMap.containsKey(engineKey)) {
575             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
576         }
577         return engineWorkerMap.get(engineKey).isStarted();
578     }
579
580     /**
581      * Check all engines are stopped.
582      *
583      * @return true if <i>all</i> engines are stopped
584      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
585      */
586     @Override
587     public boolean isStopped() {
588         for (final EngineService engine : engineWorkerMap.values()) {
589             if (!engine.isStopped()) {
590                 return false;
591             }
592         }
593         return true;
594     }
595
596     /*
597      * (non-Javadoc)
598      *
599      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex.model.
600      * basicmodel.concepts.AxArtifactKey)
601      */
602     @Override
603     public boolean isStopped(final AxArtifactKey engineKey) {
604         // Check if we have this key on our map
605         if (!engineWorkerMap.containsKey(engineKey)) {
606             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
607         }
608         return engineWorkerMap.get(engineKey).isStopped();
609     }
610
611     /*
612      * (non-Javadoc)
613      *
614      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
615      */
616     @Override
617     public void startPeriodicEvents(final long period) throws ApexException {
618         // Check if periodic events are already started
619         if (periodicEventGenerator != null) {
620             String message = "Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
621                             + periodicEventGenerator.toString();
622             LOGGER.warn(message);
623             throw new ApexException(message);
624         }
625
626         // Set up periodic event execution, its a Java Timer/TimerTask
627         periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
628
629         // Record the periodic event period because it may have been set over the Web Socket admin
630         // interface
631         this.periodicEventPeriod = period;
632     }
633
634     /*
635      * (non-Javadoc)
636      *
637      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
638      */
639     @Override
640     public void stopPeriodicEvents() throws ApexException {
641         // Check if periodic events are already started
642         if (periodicEventGenerator == null) {
643             LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getId());
644             throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getId());
645         }
646
647         // Stop periodic events
648         periodicEventGenerator.cancel();
649         periodicEventGenerator = null;
650     }
651
652     /*
653      * (non-Javadoc)
654      *
655      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core.model
656      * .concepts.AxArtifactKey)
657      */
658     @Override
659     public String getStatus(final AxArtifactKey engineKey) throws ApexException {
660         // Check if we have this key on our map
661         if (!engineWorkerMap.containsKey(engineKey)) {
662             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
663             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
664         }
665
666         // Return the information for this worker
667         return engineWorkerMap.get(engineKey).getStatus(engineKey);
668     }
669
670     /*
671      * (non-Javadoc)
672      *
673      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex.core.
674      * model.concepts.AxArtifactKey)
675      */
676     @Override
677     public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
678         // Check if we have this key on our map
679         if (!engineWorkerMap.containsKey(engineKey)) {
680             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
681             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
682         }
683
684         // Return the information for this worker
685         return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
686     }
687
688     /*
689      * (non-Javadoc)
690      *
691      * @see org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface#sendEvent(org.onap.policy.
692      * apex.service.engine.event.ApexEvent)
693      */
694     @Override
695     public void sendEvent(final ApexEvent event) {
696         // Check if we have this key on our map
697         if (getState() == AxEngineState.STOPPED) {
698             LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
699                             + engineServiceKey.getId() + " are running");
700             return;
701         }
702
703         if (event == null) {
704             LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
705             return;
706         }
707
708         if (DEBUG_ENABLED) {
709             LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
710         }
711
712         // Add the incoming event to the queue, the next available worker will process it
713         queue.add(event);
714     }
715 }