04fb8e3896a82fb31dcaaf27a6eb252f80766de9
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.runtime.impl;
22
23 import java.io.ByteArrayInputStream;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.LinkedHashMap;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingQueue;
31
32 import org.onap.policy.apex.context.ContextException;
33 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
36 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
37 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
38 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
39 import org.onap.policy.apex.model.basicmodel.service.ModelService;
40 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
41 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
42 import org.onap.policy.apex.service.engine.event.ApexEvent;
43 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
44 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
45 import org.onap.policy.apex.service.engine.runtime.EngineService;
46 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
47 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
48 import org.onap.policy.common.parameters.GroupValidationResult;
49 import org.slf4j.ext.XLogger;
50 import org.slf4j.ext.XLoggerFactory;
51
52 /**
53  * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each of which is running
54  * on an identical Apex model. This class handles the management of the engine worker instances, their threads, and
55  * event forwarding to and from the engine workers.
56  *
57  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
58  * @author Liam Fallon (liam.fallon@ericsson.com)
59  * @author John Keeney (john.keeney@ericsson.com)
60  */
61 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
62     // Logging static variables
63     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
64     private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
65
66     // Constants for timing
67     private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
68     private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
69     private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
70
71     // The ID of this engine
72     private AxArtifactKey engineServiceKey = null;
73
74     // The Apex engine workers this engine service is handling
75     private final Map<AxArtifactKey, EngineService> engineWorkerMap = Collections
76                     .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineService>());
77
78     // Event queue for events being sent into the Apex engines, it used by all engines within a
79     // group.
80     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
81
82     // Thread factory for thread management
83     private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
84
85     // Periodic event generator and its period in milliseconds
86     private ApexPeriodicEventGenerator periodicEventGenerator = null;
87     private long periodicEventPeriod;
88
89     /**
90      * This constructor instantiates engine workers and adds them to the set of engine workers to be managed. The
91      * constructor is private to prevent subclassing.
92      *
93      * @param engineServiceKey the engine service key
94      * @param incomingThreadCount the thread count, the number of engine workers to start
95      * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
96      * @throws ApexException on worker instantiation errors
97      */
98     private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int incomingThreadCount,
99                     final long periodicEventPeriod) throws ApexException {
100         LOGGER.entry(engineServiceKey, incomingThreadCount);
101
102         this.engineServiceKey = engineServiceKey;
103         this.periodicEventPeriod = periodicEventPeriod;
104
105         int threadCount = incomingThreadCount;
106         if (threadCount <= 0) {
107             // Just start one engine worker
108             threadCount = 1;
109         }
110
111         // Start engine workers
112         for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
113             final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
114                             engineServiceKey.getVersion());
115             engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
116             LOGGER.info("Created apex engine {} .", engineWorkerKey.getId());
117         }
118
119         LOGGER.info("APEX service created.");
120         LOGGER.exit();
121     }
122
123     /**
124      * Create an Apex Engine Service instance. This method is deprecated and will be removed in the next version.
125      *
126      * @param engineServiceKey the engine service key
127      * @param threadCount the thread count, the number of engine workers to start
128      * @return the Engine Service instance
129      * @throws ApexException on worker instantiation errors
130      * @deprecated Do not use this version. Use {@link #create(EngineServiceParameters)}
131      */
132     @Deprecated
133     public static EngineServiceImpl create(final AxArtifactKey engineServiceKey, final int threadCount)
134                     throws ApexException {
135         // Check if the Apex model specified is sane
136         if (engineServiceKey == null) {
137             LOGGER.warn("engine service key is null");
138             throw new ApexException("engine service key is null");
139         }
140         return new EngineServiceImpl(engineServiceKey, threadCount, 0);
141     }
142
143     /**
144      * Create an Apex Engine Service instance. This method does not load the policy so
145      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
146      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
147      * start the Engine Service so {@link #start(AxArtifactKey)} or {@link #startAll()} must be used.
148      *
149      * @param config the configuration for this Apex Engine Service.
150      * @return the Engine Service instance
151      * @throws ApexException on worker instantiation errors
152      */
153     public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
154         if (config == null) {
155             LOGGER.warn("Engine service configuration parameters is null");
156             throw new ApexException("engine service configuration parameters is null");
157         }
158         final GroupValidationResult validation = config.validate();
159         if (!validation.isValid()) {
160             LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
161             throw new ApexException("Invalid engine service configuration parameters: " + validation);
162         }
163         final AxArtifactKey engineServiceKey = config.getEngineKey();
164         final int threadCount = config.getInstanceCount();
165
166         // Check if the Apex model specified is sane
167         if (engineServiceKey == null) {
168             LOGGER.warn("engine service key is null");
169             throw new ApexException("engine service key is null");
170         }
171
172         return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
173     }
174
175     /*
176      * (non-Javadoc)
177      * 
178      * @see org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. String,
179      * org.onap.policy.apex.service.engine.runtime.ApexEventListener)
180      */
181     @Override
182     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
183         LOGGER.entry(apexEventListener);
184
185         // Register the Apex event listener on all engine workers, each worker will return Apex
186         // events to the listening application
187         for (final EngineService engineWorker : engineWorkerMap.values()) {
188             engineWorker.registerActionListener(listenerName, apexEventListener);
189         }
190
191         LOGGER.info("Added the action listener to the engine");
192         LOGGER.exit();
193     }
194
195     /*
196      * (non-Javadoc)
197      * 
198      * @see org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. String)
199      */
200     @Override
201     public void deregisterActionListener(final String listenerName) {
202         LOGGER.entry(listenerName);
203
204         // Register the Apex event listener on all engine workers, each worker will return Apex
205         // events to the listening application
206         for (final EngineService engineWorker : engineWorkerMap.values()) {
207             engineWorker.deregisterActionListener(listenerName);
208         }
209
210         LOGGER.info("Removed the action listener from the engine");
211         LOGGER.exit();
212     }
213
214     /*
215      * (non-Javadoc)
216      *
217      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
218      */
219     @Override
220     public EngineServiceEventInterface getEngineServiceEventInterface() {
221         return this;
222     }
223
224     /*
225      * (non-Javadoc)
226      *
227      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey()
228      */
229     @Override
230     public AxArtifactKey getKey() {
231         return engineServiceKey;
232     }
233
234     /*
235      * (non-Javadoc)
236      *
237      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo()
238      */
239     @Override
240     public Collection<AxArtifactKey> getEngineKeys() {
241         return engineWorkerMap.keySet();
242     }
243
244     /*
245      * (non-Javadoc)
246      *
247      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey()
248      */
249     @Override
250     public AxArtifactKey getApexModelKey() {
251         if (engineWorkerMap.size() == 0) {
252             return null;
253         }
254
255         return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
256     }
257
258     /*
259      * (non-Javadoc)
260      *
261      * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.model.
262      * basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
263      */
264     @Override
265     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
266                     final boolean forceFlag) throws ApexException {
267         // Check if the Apex model specified is sane
268         if (apexModelString == null || apexModelString.trim().length() == 0) {
269             LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
270                             + " is empty");
271             throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
272                             + " is empty");
273         }
274
275         // Read the Apex model into memory using the Apex Model Reader
276         AxPolicyModel apexPolicyModel = null;
277         try {
278             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
279             apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
280         } catch (final ApexModelException e) {
281             LOGGER.error("failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId(), e);
282             throw new ApexException(
283                             "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId(),
284                             e);
285         }
286
287         if (apexPolicyModel == null) {
288             LOGGER.error("apex model null on engine service " + incomingEngineServiceKey.getId());
289             throw new ApexException("apex model null on engine service " + incomingEngineServiceKey.getId());
290         }
291
292         // Update the model
293         updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
294
295         LOGGER.exit();
296     }
297
298     /*
299      * (non-Javadoc)
300      *
301      * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.model.
302      * basicmodel.concepts.AxArtifactKey, org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean)
303      */
304     @Override
305     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
306                     final boolean forceFlag) throws ApexException {
307         LOGGER.entry(incomingEngineServiceKey);
308
309         // Check if the Apex model specified is sane
310         if (apexModel == null) {
311             LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
312                             + " is null");
313             throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
314                             + " is null");
315         }
316
317         // Check if the key on the update request is correct
318         if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
319             LOGGER.warn("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
320                             + engineServiceKey.getId() + " of this engine service");
321             throw new ApexException("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
322                             + engineServiceKey.getId() + " of this engine service");
323         }
324
325         // Check model compatibility
326         if (ModelService.existsModel(AxPolicyModel.class)) {
327             // The current policy model may or may not be defined
328             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
329             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
330                 if (forceFlag) {
331                     LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
332                                     + "\" is not a compatible model update from the existing engine model with key \""
333                                     + currentModel.getKey().getId() + "\"");
334                 } else {
335                     throw new ContextException("apex model update failed, supplied model with key \""
336                                     + apexModel.getKey().getId()
337                                     + "\" is not a compatible model update from the existing engine model with key \""
338                                     + currentModel.getKey().getId() + "\"");
339                 }
340             }
341         }
342
343         if (!isStopped()) {
344             // Stop all engines on this engine service
345             stop();
346             final long stoptime = System.currentTimeMillis();
347             while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
348                 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
349             }
350             // Check if all engines are stopped
351             final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
352             for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
353                 if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
354                     notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
355                     notStoppedEngineIdBuilder.append('(');
356                     notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
357                     notStoppedEngineIdBuilder.append(") ");
358                 }
359             }
360             if (notStoppedEngineIdBuilder.length() > 0) {
361                 final String errorString = "cannot update model on engine service with key "
362                                 + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
363                                 + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
364                 LOGGER.warn(errorString);
365                 throw new ApexException(errorString);
366             }
367         }
368
369         // Update the engines
370         for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
371             LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
372             engineWorkerEntry.getValue().updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
373         }
374
375         // start all engines on this engine service if it was not stopped before the update
376         startAll();
377         final long starttime = System.currentTimeMillis();
378         while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
379             ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
380         }
381         // Check if all engines are running
382         final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
383         for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
384             if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
385                             && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
386                 notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
387                 notRunningEngineIdBuilder.append('(');
388                 notRunningEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
389                 notRunningEngineIdBuilder.append(") ");
390             }
391         }
392         if (notRunningEngineIdBuilder.length() > 0) {
393             final String errorString = "engine start error on model update on engine service with key "
394                             + incomingEngineServiceKey.getId() + ", engines not running are: "
395                             + notRunningEngineIdBuilder.toString().trim();
396             LOGGER.warn(errorString);
397             throw new ApexException(errorString);
398         }
399
400         LOGGER.exit();
401     }
402
403     /*
404      * (non-Javadoc)
405      *
406      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState()
407      */
408     @Override
409     public AxEngineState getState() {
410         // If one worker is running then we are running, otherwise we are stopped
411         for (final EngineService engine : engineWorkerMap.values()) {
412             if (engine.getState() != AxEngineState.STOPPED) {
413                 return AxEngineState.EXECUTING;
414             }
415         }
416
417         return AxEngineState.STOPPED;
418     }
419
420     /*
421      * (non-Javadoc)
422      *
423      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll()
424      */
425     @Override
426     public void startAll() throws ApexException {
427         for (final EngineService engine : engineWorkerMap.values()) {
428             start(engine.getKey());
429         }
430
431         // Check if periodic events should be turned on
432         if (periodicEventPeriod > 0) {
433             startPeriodicEvents(periodicEventPeriod);
434         }
435     }
436
437     /*
438      * (non-Javadoc)
439      *
440      * @see org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core.model.
441      * concepts.AxArtifactKey)
442      */
443     @Override
444     public void start(final AxArtifactKey engineKey) throws ApexException {
445         LOGGER.entry(engineKey);
446
447         // Check if we have this key on our map
448         if (!engineWorkerMap.containsKey(engineKey)) {
449             LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
450             throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
451         }
452
453         // Start the engine
454         engineWorkerMap.get(engineKey).start(engineKey);
455
456         LOGGER.exit(engineKey);
457     }
458
459     /*
460      * (non-Javadoc)
461      *
462      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop()
463      */
464     @Override
465     public void stop() throws ApexException {
466         LOGGER.entry();
467
468         // Stop each engine
469         for (final EngineService engine : engineWorkerMap.values()) {
470             if (engine.getState() != AxEngineState.STOPPED) {
471                 engine.stop();
472             }
473         }
474
475         LOGGER.exit();
476     }
477
478     /*
479      * (non-Javadoc)
480      *
481      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core.model.
482      * concepts.AxArtifactKey)
483      */
484     @Override
485     public void stop(final AxArtifactKey engineKey) throws ApexException {
486         LOGGER.entry(engineKey);
487
488         // Check if we have this key on our map
489         if (!engineWorkerMap.containsKey(engineKey)) {
490             LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
491             throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
492         }
493
494         // Stop the engine
495         engineWorkerMap.get(engineKey).stop(engineKey);
496
497         LOGGER.exit(engineKey);
498     }
499
500     /*
501      * (non-Javadoc)
502      *
503      * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear()
504      */
505     @Override
506     public void clear() throws ApexException {
507         LOGGER.entry();
508
509         // Stop each engine
510         for (final EngineService engine : engineWorkerMap.values()) {
511             if (engine.getState() == AxEngineState.STOPPED) {
512                 engine.clear();
513             }
514         }
515
516         LOGGER.exit();
517     }
518
519     /*
520      * (non-Javadoc)
521      *
522      * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core.model.
523      * concepts.AxArtifactKey)
524      */
525     @Override
526     public void clear(final AxArtifactKey engineKey) throws ApexException {
527         LOGGER.entry(engineKey);
528
529         // Check if we have this key on our map
530         if (!engineWorkerMap.containsKey(engineKey)) {
531             LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
532             throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
533         }
534
535         // Clear the engine
536         if (engineWorkerMap.get(engineKey).getState() == AxEngineState.STOPPED) {
537             engineWorkerMap.get(engineKey).stop(engineKey);
538         }
539
540         LOGGER.exit(engineKey);
541     }
542
543     /**
544      * Check all engines are started.
545      *
546      * @return true if <i>all</i> engines are started
547      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
548      */
549     @Override
550     public boolean isStarted() {
551         for (final EngineService engine : engineWorkerMap.values()) {
552             if (!engine.isStarted()) {
553                 return false;
554             }
555         }
556         return true;
557     }
558
559     /*
560      * (non-Javadoc)
561      *
562      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex.model.
563      * basicmodel.concepts.AxArtifactKey)
564      */
565     @Override
566     public boolean isStarted(final AxArtifactKey engineKey) {
567         // Check if we have this key on our map
568         if (!engineWorkerMap.containsKey(engineKey)) {
569             LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
570         }
571         return engineWorkerMap.get(engineKey).isStarted();
572     }
573
574     /**
575      * Check all engines are stopped.
576      *
577      * @return true if <i>all</i> engines are stopped
578      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
579      */
580     @Override
581     public boolean isStopped() {
582         for (final EngineService engine : engineWorkerMap.values()) {
583             if (!engine.isStopped()) {
584                 return false;
585             }
586         }
587         return true;
588     }
589
590     /*
591      * (non-Javadoc)
592      *
593      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex.model.
594      * basicmodel.concepts.AxArtifactKey)
595      */
596     @Override
597     public boolean isStopped(final AxArtifactKey engineKey) {
598         // Check if we have this key on our map
599         if (!engineWorkerMap.containsKey(engineKey)) {
600             LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
601         }
602         return engineWorkerMap.get(engineKey).isStopped();
603     }
604
605     /*
606      * (non-Javadoc)
607      *
608      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
609      */
610     @Override
611     public void startPeriodicEvents(final long period) throws ApexException {
612         // Check if periodic events are already started
613         if (periodicEventGenerator != null) {
614             LOGGER.warn("Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
615                             + periodicEventGenerator.toString());
616             throw new ApexException("Peiodic event geneation already running on engine " + engineServiceKey.getId()
617                             + ", " + periodicEventGenerator.toString());
618         }
619
620         // Set up periodic event execution, its a Java Timer/TimerTask
621         periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
622
623         // Record the periodic event period because it may have been set over the Web Socket admin
624         // interface
625         this.periodicEventPeriod = period;
626     }
627
628     /*
629      * (non-Javadoc)
630      *
631      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
632      */
633     @Override
634     public void stopPeriodicEvents() throws ApexException {
635         // Check if periodic events are already started
636         if (periodicEventGenerator == null) {
637             LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getId());
638             throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getId());
639         }
640
641         // Stop periodic events
642         periodicEventGenerator.cancel();
643         periodicEventGenerator = null;
644     }
645
646     /*
647      * (non-Javadoc)
648      *
649      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core.model
650      * .concepts.AxArtifactKey)
651      */
652     @Override
653     public String getStatus(final AxArtifactKey engineKey) throws ApexException {
654         // Check if we have this key on our map
655         if (!engineWorkerMap.containsKey(engineKey)) {
656             LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
657             throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
658         }
659
660         // Return the information for this worker
661         return engineWorkerMap.get(engineKey).getStatus(engineKey);
662     }
663
664     /*
665      * (non-Javadoc)
666      *
667      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex.core.
668      * model.concepts.AxArtifactKey)
669      */
670     @Override
671     public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
672         // Check if we have this key on our map
673         if (!engineWorkerMap.containsKey(engineKey)) {
674             LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
675             throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
676         }
677
678         // Return the information for this worker
679         return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
680     }
681
682     /*
683      * (non-Javadoc)
684      *
685      * @see org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface#sendEvent(org.onap.policy.
686      * apex.service.engine.event.ApexEvent)
687      */
688     @Override
689     public void sendEvent(final ApexEvent event) {
690         // Check if we have this key on our map
691         if (getState() == AxEngineState.STOPPED) {
692             LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
693                             + engineServiceKey.getId() + " are running");
694             return;
695         }
696
697         if (event == null) {
698             LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
699             return;
700         }
701
702         if (DEBUG_ENABLED) {
703             LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
704         }
705
706         // Add the incoming event to the queue, the next available worker will process it
707         queue.add(event);
708     }
709 }