ee1a5dcdfd9caa75f2738565d72e568b711c271c
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.runtime.impl;
22
23 import java.io.ByteArrayInputStream;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.LinkedHashMap;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingQueue;
31
32 import org.onap.policy.apex.context.ContextException;
33 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
36 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
37 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
38 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
39 import org.onap.policy.apex.model.basicmodel.service.ModelService;
40 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
41 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
42 import org.onap.policy.apex.service.engine.event.ApexEvent;
43 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
44 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
45 import org.onap.policy.apex.service.engine.runtime.EngineService;
46 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
47 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
48 import org.onap.policy.common.parameters.GroupValidationResult;
49 import org.slf4j.ext.XLogger;
50 import org.slf4j.ext.XLoggerFactory;
51
52 /**
53  * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each of which is running
54  * on an identical Apex model. This class handles the management of the engine worker instances, their threads, and
55  * event forwarding to and from the engine workers.
56  *
57  * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
58  * @author Liam Fallon (liam.fallon@ericsson.com)
59  * @author John Keeney (john.keeney@ericsson.com)
60  */
61 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
62     // Logging static variables
63     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
64     private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
65
66     // Recurring string constants
67     private static final String ENGINE_KEY_PREAMBLE = "engine with key ";
68     private static final String NOT_FOUND_SUFFIX = " not found in engine service";
69     private static final String ENGINE_KEY_NOT_SPECIFIED = "engine key must be specified and may not be null";
70
71     // Constants for timing
72     private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
73     private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
74     private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
75
76     // The ID of this engine
77     private AxArtifactKey engineServiceKey = null;
78
79     // The Apex engine workers this engine service is handling
80     private final Map<AxArtifactKey, EngineService> engineWorkerMap = Collections
81                     .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineService>());
82
83     // Event queue for events being sent into the Apex engines, it used by all engines within a
84     // group.
85     private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
86
87     // Thread factory for thread management
88     private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
89
90     // Periodic event generator and its period in milliseconds
91     private ApexPeriodicEventGenerator periodicEventGenerator = null;
92     private long periodicEventPeriod;
93
94     /**
95      * This constructor instantiates engine workers and adds them to the set of engine workers to be managed. The
96      * constructor is private to prevent subclassing.
97      *
98      * @param engineServiceKey the engine service key
99      * @param threadCount the thread count, the number of engine workers to start
100      * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
101      * @throws ApexException on worker instantiation errors
102      */
103     private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int threadCount,
104                     final long periodicEventPeriod) {
105         LOGGER.entry(engineServiceKey, threadCount);
106
107         this.engineServiceKey = engineServiceKey;
108         this.periodicEventPeriod = periodicEventPeriod;
109
110         // Start engine workers
111         for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
112             final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
113                             engineServiceKey.getVersion());
114             engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
115             LOGGER.info("Created apex engine {} .", engineWorkerKey.getId());
116         }
117
118         LOGGER.info("APEX service created.");
119         LOGGER.exit();
120     }
121
122     /**
123      * Create an Apex Engine Service instance. This method does not load the policy so
124      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
125      * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
126      * start the Engine Service so {@link #start(AxArtifactKey)} or {@link #startAll()} must be used.
127      *
128      * @param config the configuration for this Apex Engine Service.
129      * @return the Engine Service instance
130      * @throws ApexException on worker instantiation errors
131      */
132     public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
133         if (config == null) {
134             LOGGER.warn("Engine service configuration parameters is null");
135             throw new ApexException("engine service configuration parameters are null");
136         }
137         
138         final GroupValidationResult validation = config.validate();
139         if (!validation.isValid()) {
140             LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
141             throw new ApexException("Invalid engine service configuration parameters: " + validation);
142         }
143         
144         final AxArtifactKey engineServiceKey = config.getEngineKey();
145         final int threadCount = config.getInstanceCount();
146
147         return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
148     }
149
150     /*
151      * (non-Javadoc)
152      * 
153      * @see org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. String,
154      * org.onap.policy.apex.service.engine.runtime.ApexEventListener)
155      */
156     @Override
157     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
158         LOGGER.entry(apexEventListener);
159
160         if (listenerName == null) {
161             String message = "listener name must be specified and may not be null";
162             LOGGER.warn(message);
163             return;
164         }
165
166         if (apexEventListener == null) {
167             String message = "apex event listener must be specified and may not be null";
168             LOGGER.warn(message);
169             return;
170         }
171
172         // Register the Apex event listener on all engine workers, each worker will return Apex
173         // events to the listening application
174         for (final EngineService engineWorker : engineWorkerMap.values()) {
175             engineWorker.registerActionListener(listenerName, apexEventListener);
176         }
177
178         LOGGER.info("Added the action listener to the engine");
179         LOGGER.exit();
180     }
181
182     /*
183      * (non-Javadoc)
184      * 
185      * @see org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. String)
186      */
187     @Override
188     public void deregisterActionListener(final String listenerName) {
189         LOGGER.entry(listenerName);
190
191         // Register the Apex event listener on all engine workers, each worker will return Apex
192         // events to the listening application
193         for (final EngineService engineWorker : engineWorkerMap.values()) {
194             engineWorker.deregisterActionListener(listenerName);
195         }
196
197         LOGGER.info("Removed the action listener from the engine");
198         LOGGER.exit();
199     }
200
201     /*
202      * (non-Javadoc)
203      *
204      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
205      */
206     @Override
207     public EngineServiceEventInterface getEngineServiceEventInterface() {
208         return this;
209     }
210
211     /*
212      * (non-Javadoc)
213      *
214      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey()
215      */
216     @Override
217     public AxArtifactKey getKey() {
218         return engineServiceKey;
219     }
220
221     /*
222      * (non-Javadoc)
223      *
224      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo()
225      */
226     @Override
227     public Collection<AxArtifactKey> getEngineKeys() {
228         return engineWorkerMap.keySet();
229     }
230
231     /*
232      * (non-Javadoc)
233      *
234      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey()
235      */
236     @Override
237     public AxArtifactKey getApexModelKey() {
238         if (engineWorkerMap.size() == 0) {
239             return null;
240         }
241
242         return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
243     }
244
245     /*
246      * (non-Javadoc)
247      *
248      * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.model.
249      * basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
250      */
251     @Override
252     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
253                     final boolean forceFlag) throws ApexException {
254         // Check if the engine service key specified is sane
255         if (incomingEngineServiceKey == null) {
256             String message = ENGINE_KEY_NOT_SPECIFIED;
257             LOGGER.warn(message);
258             throw new ApexException(message);
259         }
260
261         // Check if the Apex model specified is sane
262         if (apexModelString == null || apexModelString.trim().length() == 0) {
263             String emptyModelMessage = "model for updating engine service with key "
264                             + incomingEngineServiceKey.getId() + " is empty";
265             LOGGER.warn(emptyModelMessage);
266             throw new ApexException(emptyModelMessage);
267         }
268
269         // Read the Apex model into memory using the Apex Model Reader
270         AxPolicyModel apexPolicyModel = null;
271         try {
272             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
273             apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
274         } catch (final ApexModelException e) {
275             String message = "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId();
276             LOGGER.error(message, e);
277             throw new ApexException(message, e);
278         }
279
280         // Update the model
281         updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
282
283         LOGGER.exit();
284     }
285
286     /*
287      * (non-Javadoc)
288      *
289      * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.model.
290      * basicmodel.concepts.AxArtifactKey, org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean)
291      */
292     @Override
293     public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
294                     final boolean forceFlag) throws ApexException {
295         LOGGER.entry(incomingEngineServiceKey);
296
297         // Check if the engine service key specified is sane
298         if (incomingEngineServiceKey == null) {
299             String message = ENGINE_KEY_NOT_SPECIFIED;
300             LOGGER.warn(message);
301             throw new ApexException(message);
302         }
303
304         // Check if the Apex model specified is sane
305         if (apexModel == null) {
306             LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
307                             + " is null");
308             throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
309                             + " is null");
310         }
311
312         // Check if the key on the update request is correct
313         if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
314             LOGGER.warn("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
315                             + engineServiceKey.getId() + " of this engine service");
316             throw new ApexException("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
317                             + engineServiceKey.getId() + " of this engine service");
318         }
319
320         // Check model compatibility
321         if (ModelService.existsModel(AxPolicyModel.class)) {
322             // The current policy model may or may not be defined
323             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
324             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
325                 handleIncompatibility(apexModel, forceFlag, currentModel);
326             }
327         }
328
329         executeModelUpdate(incomingEngineServiceKey, apexModel, forceFlag);
330
331         LOGGER.exit();
332     }
333
334     /**
335      * Execute the model update on the engine instances.
336      * 
337      * @param incomingEngineServiceKey the engine service key to update
338      * @param apexModel the model to update the engines with
339      * @param forceFlag if true, ignore compatibility problems
340      * @throws ApexException on model update errors
341      */
342     private void executeModelUpdate(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
343                     final boolean forceFlag) throws ApexException {
344         
345         if (!isStopped()) {
346             stopEngines(incomingEngineServiceKey);
347         }
348
349         // Update the engines
350         for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
351             LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
352             engineWorkerEntry.getValue().updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
353         }
354
355         // start all engines on this engine service if it was not stopped before the update
356         startAll();
357         final long starttime = System.currentTimeMillis();
358         while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
359             ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
360         }
361         // Check if all engines are running
362         final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
363         for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
364             if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
365                             && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
366                 notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
367                 notRunningEngineIdBuilder.append('(');
368                 notRunningEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
369                 notRunningEngineIdBuilder.append(") ");
370             }
371         }
372         if (notRunningEngineIdBuilder.length() > 0) {
373             final String errorString = "engine start error on model update on engine service with key "
374                             + incomingEngineServiceKey.getId() + ", engines not running are: "
375                             + notRunningEngineIdBuilder.toString().trim();
376             LOGGER.warn(errorString);
377             throw new ApexException(errorString);
378         }
379     }
380
381     /**
382      * Stop engines for a model update.
383      * @param incomingEngineServiceKey the engine service key for the engines that are to be stopped
384      * @throws ApexException on errors stopping engines
385      */
386     private void stopEngines(final AxArtifactKey incomingEngineServiceKey) throws ApexException {
387         // Stop all engines on this engine service
388         stop();
389         final long stoptime = System.currentTimeMillis();
390         while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
391             ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
392         }
393         // Check if all engines are stopped
394         final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
395         for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
396             if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
397                 notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
398                 notStoppedEngineIdBuilder.append('(');
399                 notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
400                 notStoppedEngineIdBuilder.append(") ");
401             }
402         }
403         if (notStoppedEngineIdBuilder.length() > 0) {
404             final String errorString = "cannot update model on engine service with key "
405                             + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
406                             + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
407             LOGGER.warn(errorString);
408             throw new ApexException(errorString);
409         }
410     }
411
412     /**
413      * Issue compatibility warning or error message.
414      * @param apexModel The model name
415      * @param forceFlag true if we are forcing the update
416      * @param currentModel the existing model that is loaded
417      * @throws ContextException on compatibility errors
418      */
419     private void handleIncompatibility(final AxPolicyModel apexModel, final boolean forceFlag,
420                     final AxPolicyModel currentModel) throws ContextException {
421         if (forceFlag) {
422             LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
423                             + "\" is not a compatible model update from the existing engine model with key \""
424                             + currentModel.getKey().getId() + "\"");
425         } else {
426             throw new ContextException("apex model update failed, supplied model with key \""
427                             + apexModel.getKey().getId()
428                             + "\" is not a compatible model update from the existing engine model with key \""
429                             + currentModel.getKey().getId() + "\"");
430         }
431     }
432
433     /*
434      * (non-Javadoc)
435      *
436      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState()
437      */
438     @Override
439     public AxEngineState getState() {
440         // If one worker is running then we are running, otherwise we are stopped
441         for (final EngineService engine : engineWorkerMap.values()) {
442             if (engine.getState() != AxEngineState.STOPPED) {
443                 return AxEngineState.EXECUTING;
444             }
445         }
446
447         return AxEngineState.STOPPED;
448     }
449
450     /*
451      * (non-Javadoc)
452      *
453      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll()
454      */
455     @Override
456     public void startAll() throws ApexException {
457         for (final EngineService engine : engineWorkerMap.values()) {
458             start(engine.getKey());
459         }
460     }
461
462     /*
463      * (non-Javadoc)
464      *
465      * @see org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core.model.
466      * concepts.AxArtifactKey)
467      */
468     @Override
469     public void start(final AxArtifactKey engineKey) throws ApexException {
470         LOGGER.entry(engineKey);
471
472         if (engineKey == null) {
473             String message = ENGINE_KEY_NOT_SPECIFIED;
474             LOGGER.warn(message);
475             throw new ApexException(message);
476         }
477
478         // Check if we have this key on our map
479         if (!engineWorkerMap.containsKey(engineKey)) {
480             String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX;
481             LOGGER.warn(message);
482             throw new ApexException(message);
483         }
484
485         // Start the engine
486         engineWorkerMap.get(engineKey).start(engineKey);
487         
488         // Check if periodic events should be turned on
489         if (periodicEventPeriod > 0) {
490             startPeriodicEvents(periodicEventPeriod);
491         }
492
493         LOGGER.exit(engineKey);
494     }
495
496     /*
497      * (non-Javadoc)
498      *
499      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop()
500      */
501     @Override
502     public void stop() throws ApexException {
503         LOGGER.entry();
504
505         if (periodicEventGenerator != null) {
506             periodicEventGenerator.cancel();
507             periodicEventGenerator = null;
508         }
509         
510         // Stop each engine
511         for (final EngineService engine : engineWorkerMap.values()) {
512             if (engine.getState() != AxEngineState.STOPPED) {
513                 engine.stop();
514             }
515         }
516
517         LOGGER.exit();
518     }
519
520     /*
521      * (non-Javadoc)
522      *
523      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core.model.
524      * concepts.AxArtifactKey)
525      */
526     @Override
527     public void stop(final AxArtifactKey engineKey) throws ApexException {
528         LOGGER.entry(engineKey);
529
530         if (engineKey == null) {
531             String message = ENGINE_KEY_NOT_SPECIFIED;
532             LOGGER.warn(message);
533             throw new ApexException(message);
534         }
535
536         // Check if we have this key on our map
537         if (!engineWorkerMap.containsKey(engineKey)) {
538             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
539             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
540         }
541
542         // Stop the engine
543         engineWorkerMap.get(engineKey).stop(engineKey);
544
545         LOGGER.exit(engineKey);
546     }
547
548     /*
549      * (non-Javadoc)
550      *
551      * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear()
552      */
553     @Override
554     public void clear() throws ApexException {
555         LOGGER.entry();
556
557         // Stop each engine
558         for (final EngineService engine : engineWorkerMap.values()) {
559             if (engine.getState() == AxEngineState.STOPPED) {
560                 engine.clear();
561             }
562         }
563
564         LOGGER.exit();
565     }
566
567     /*
568      * (non-Javadoc)
569      *
570      * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core.model.
571      * concepts.AxArtifactKey)
572      */
573     @Override
574     public void clear(final AxArtifactKey engineKey) throws ApexException {
575         LOGGER.entry(engineKey);
576
577         if (engineKey == null) {
578             String message = ENGINE_KEY_NOT_SPECIFIED;
579             LOGGER.warn(message);
580             throw new ApexException(message);
581         }
582
583         // Check if we have this key on our map
584         if (!engineWorkerMap.containsKey(engineKey)) {
585             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
586             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
587         }
588
589         // Clear the engine
590         if (engineWorkerMap.get(engineKey).getState() == AxEngineState.STOPPED) {
591             engineWorkerMap.get(engineKey).stop(engineKey);
592         }
593
594         LOGGER.exit(engineKey);
595     }
596
597     /**
598      * Check all engines are started.
599      *
600      * @return true if <i>all</i> engines are started
601      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
602      */
603     @Override
604     public boolean isStarted() {
605         for (final EngineService engine : engineWorkerMap.values()) {
606             if (!engine.isStarted()) {
607                 return false;
608             }
609         }
610         return true;
611     }
612
613     /*
614      * (non-Javadoc)
615      *
616      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex.model.
617      * basicmodel.concepts.AxArtifactKey)
618      */
619     @Override
620     public boolean isStarted(final AxArtifactKey engineKey) {
621         if (engineKey == null) {
622             String message = ENGINE_KEY_NOT_SPECIFIED;
623             LOGGER.warn(message);
624             return false;
625         }
626
627         // Check if we have this key on our map
628         if (!engineWorkerMap.containsKey(engineKey)) {
629             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
630             return false;
631         }
632         return engineWorkerMap.get(engineKey).isStarted();
633     }
634
635     /**
636      * Check all engines are stopped.
637      *
638      * @return true if <i>all</i> engines are stopped
639      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
640      */
641     @Override
642     public boolean isStopped() {
643         for (final EngineService engine : engineWorkerMap.values()) {
644             if (!engine.isStopped()) {
645                 return false;
646             }
647         }
648         return true;
649     }
650
651     /*
652      * (non-Javadoc)
653      *
654      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex.model.
655      * basicmodel.concepts.AxArtifactKey)
656      */
657     @Override
658     public boolean isStopped(final AxArtifactKey engineKey) {
659         if (engineKey == null) {
660             String message = ENGINE_KEY_NOT_SPECIFIED;
661             LOGGER.warn(message);
662             return true;
663         }
664
665         // Check if we have this key on our map
666         if (!engineWorkerMap.containsKey(engineKey)) {
667             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
668             return true;
669         }
670         return engineWorkerMap.get(engineKey).isStopped();
671     }
672
673     /*
674      * (non-Javadoc)
675      *
676      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
677      */
678     @Override
679     public void startPeriodicEvents(final long period) throws ApexException {
680         // Check if periodic events are already started
681         if (periodicEventGenerator != null) {
682             String message = "Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
683                             + periodicEventGenerator.toString();
684             LOGGER.warn(message);
685             throw new ApexException(message);
686         }
687
688         // Set up periodic event execution, its a Java Timer/TimerTask
689         periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
690
691         // Record the periodic event period because it may have been set over the Web Socket admin
692         // interface
693         this.periodicEventPeriod = period;
694     }
695
696     /*
697      * (non-Javadoc)
698      *
699      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
700      */
701     @Override
702     public void stopPeriodicEvents() throws ApexException {
703         // Check if periodic events are already started
704         if (periodicEventGenerator == null) {
705             LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getId());
706             throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getId());
707         }
708
709         // Stop periodic events
710         periodicEventGenerator.cancel();
711         periodicEventGenerator = null;
712         periodicEventPeriod = 0;
713     }
714
715     /*
716      * (non-Javadoc)
717      *
718      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core.model
719      * .concepts.AxArtifactKey)
720      */
721     @Override
722     public String getStatus(final AxArtifactKey engineKey) throws ApexException {
723         if (engineKey == null) {
724             String message = ENGINE_KEY_NOT_SPECIFIED;
725             LOGGER.warn(message);
726             throw new ApexException(message);
727         }
728
729         // Check if we have this key on our map
730         if (!engineWorkerMap.containsKey(engineKey)) {
731             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
732             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
733         }
734
735         // Return the information for this worker
736         return engineWorkerMap.get(engineKey).getStatus(engineKey);
737     }
738
739     /*
740      * (non-Javadoc)
741      *
742      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex.core.
743      * model.concepts.AxArtifactKey)
744      */
745     @Override
746     public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
747         if (engineKey == null) {
748             String message = ENGINE_KEY_NOT_SPECIFIED;
749             LOGGER.warn(message);
750             throw new ApexException(message);
751         }
752
753         // Check if we have this key on our map
754         if (!engineWorkerMap.containsKey(engineKey)) {
755             LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
756             throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
757         }
758
759         // Return the information for this worker
760         return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
761     }
762
763     /*
764      * (non-Javadoc)
765      *
766      * @see org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface#sendEvent(org.onap.policy.
767      * apex.service.engine.event.ApexEvent)
768      */
769     @Override
770     public void sendEvent(final ApexEvent event) {
771         if (event == null) {
772             LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
773             return;
774         }
775
776         // Check if we have this key on our map
777         if (getState() == AxEngineState.STOPPED) {
778             LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
779                             + engineServiceKey.getId() + " are running");
780             return;
781         }
782
783         if (DEBUG_ENABLED) {
784             LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
785         }
786
787         // Add the incoming event to the queue, the next available worker will process it
788         queue.add(event);
789     }
790 }