b9a405b445ecbe18fc918bf75dfb8f77f7e3570f
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.runtime.impl;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonParser;
27
28 import java.io.ByteArrayInputStream;
29 import java.io.ByteArrayOutputStream;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.concurrent.BlockingQueue;
35
36 import org.onap.policy.apex.context.ContextException;
37 import org.onap.policy.apex.context.ContextRuntimeException;
38 import org.onap.policy.apex.context.SchemaHelper;
39 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
40 import org.onap.policy.apex.core.engine.engine.ApexEngine;
41 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
42 import org.onap.policy.apex.core.engine.event.EnEvent;
43 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
44 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
45 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
46 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
47 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
48 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
49 import org.onap.policy.apex.model.basicmodel.service.ModelService;
50 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
51 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
52 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
53 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
54 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
55 import org.onap.policy.apex.service.engine.event.ApexEvent;
56 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
57 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
58 import org.onap.policy.apex.service.engine.runtime.EngineService;
59 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
60 import org.slf4j.ext.XLogger;
61 import org.slf4j.ext.XLoggerFactory;
62
63 /**
64  * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies
65  * defined in the {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy
66  * is triggered by an Apex event, and when the policy is triggered it runs through to completion in
67  * the ApexEngine.
68  *
69  * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it
70  * events, and receiving events from it.
71  *
72  * @author Liam Fallon (liam.fallon@ericsson.com)
73  */
74 final class EngineWorker implements EngineService {
75     // Logger for this class
76     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
77
78     // The ID of this engine
79     private final AxArtifactKey engineWorkerKey;
80
81     // The Apex engine which is running the policies in this worker
82     private final ApexEngine engine;
83
84     // The event processor is an inner class, an instance of which runs as a thread that reads
85     // incoming events from a queue and forwards them to the Apex engine
86     private EventProcessor processor = null;
87
88     // Thread handling for the worker
89     private final ApplicationThreadFactory threadFactory;
90     private Thread processorThread;
91
92     // Converts ApexEvent instances to and from EnEvent instances
93     private ApexEvent2EnEventConverter apexEnEventConverter = null;
94
95     /**
96      * Constructor that creates an Apex engine, an event processor for events to be sent to that
97      * engine, and an {@link ApexModelReader} instance to read Apex models using JAXB.
98      *
99      * @param engineWorkerKey the engine worker key
100      * @param queue the queue on which events for this Apex worker will come
101      * @param threadFactory the thread factory to use for creating the event processing thread
102      * @throws ApexException thrown on errors on worker instantiation
103      */
104     EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
105             final ApplicationThreadFactory threadFactory) throws ApexException {
106         LOGGER.entry(engineWorkerKey);
107
108         this.engineWorkerKey = engineWorkerKey;
109         this.threadFactory = threadFactory;
110
111         // Create the Apex engine
112         engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
113
114         // Create and run the event processor
115         processor = new EventProcessor(queue);
116
117         // Set the Event converter up
118         apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
119
120         LOGGER.exit();
121     }
122
123     /*
124      * (non-Javadoc)
125      * 
126      * @see
127      * org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang.
128      * String, org.onap.policy.apex.service.engine.runtime.ApexEventListener)
129      */
130     @Override
131     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
132         // Sanity checks on the Apex model
133         if (engine == null) {
134             LOGGER.warn("listener registration on engine with key " + engineWorkerKey.getId()
135                     + ", failed, listener is null");
136             return;
137         }
138
139         engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
140     }
141
142     /*
143      * (non-Javadoc)
144      * 
145      * @see
146      * org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang.
147      * String)
148      */
149     @Override
150     public void deregisterActionListener(final String listenerName) {
151         // Sanity checks on the Apex model
152         if (engine == null) {
153             LOGGER.warn("listener deregistration on engine with key " + engineWorkerKey.getId()
154                     + ", failed, listener is null");
155             return;
156         }
157
158         engine.removeEventListener(listenerName);
159     }
160
161     /*
162      * (non-Javadoc)
163      *
164      * @see
165      * org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
166      */
167     @Override
168     public EngineServiceEventInterface getEngineServiceEventInterface() {
169         throw new UnsupportedOperationException(
170                 "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
171     }
172
173     /*
174      * (non-Javadoc)
175      *
176      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey()
177      */
178     @Override
179     public AxArtifactKey getKey() {
180         return engineWorkerKey;
181     }
182
183     /*
184      * (non-Javadoc)
185      *
186      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo()
187      */
188     @Override
189     public Collection<AxArtifactKey> getEngineKeys() {
190         return Arrays.asList(engineWorkerKey);
191     }
192
193     /*
194      * (non-Javadoc)
195      *
196      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey()
197      */
198     @Override
199     public AxArtifactKey getApexModelKey() {
200         if (ModelService.existsModel(AxPolicyModel.class)) {
201             return ModelService.getModel(AxPolicyModel.class).getKey();
202         } else {
203             return null;
204         }
205     }
206
207     /*
208      * (non-Javadoc)
209      *
210      * @see
211      * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.
212      * model. basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
213      */
214     @Override
215     public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
216             throws ApexException {
217         LOGGER.entry(engineKey);
218
219         // Read the Apex model into memory using the Apex Model Reader
220         AxPolicyModel apexPolicyModel = null;
221         try {
222             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
223             apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
224         } catch (final ApexModelException e) {
225             LOGGER.error("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
226             throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
227         }
228
229         if (apexPolicyModel == null) {
230             LOGGER.error("apex model null on engine " + engineKey.getId());
231             throw new ApexException("apex model null on engine " + engineKey.getId());
232         }
233
234         // Update the Apex model in the Apex engine
235         updateModel(engineKey, apexPolicyModel, forceFlag);
236
237         LOGGER.exit();
238     }
239
240     /*
241      * (non-Javadoc)
242      *
243      * @see
244      * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.
245      * model. basicmodel.concepts.AxArtifactKey,
246      * org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean)
247      */
248     @Override
249     public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
250             throws ApexException {
251         LOGGER.entry(engineKey);
252
253         // Check if the key on the update request is correct
254         if (!engineWorkerKey.equals(engineKey)) {
255             LOGGER.warn("engine key " + engineKey.getId() + " does not match the key" + engineWorkerKey.getId()
256                     + " of this engine");
257             throw new ApexException("engine key " + engineKey.getId() + " does not match the key"
258                     + engineWorkerKey.getId() + " of this engine");
259         }
260
261         // Sanity checks on the Apex model
262         if (engine == null) {
263             LOGGER.warn("engine with key " + engineKey.getId() + " not initialized");
264             throw new ApexException("engine with key " + engineKey.getId() + " not initialized");
265         }
266
267         // Check model compatibility
268         if (ModelService.existsModel(AxPolicyModel.class)) {
269             // The current policy model may or may not be defined
270             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
271             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
272                 if (forceFlag) {
273                     LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
274                             + "\" is not a compatible model update from the existing engine model with key \""
275                             + currentModel.getKey().getId() + "\"");
276                 } else {
277                     throw new ContextException(
278                             "apex model update failed, supplied model with key \"" + apexModel.getKey().getId()
279                                     + "\" is not a compatible model update from the existing engine model with key \""
280                                     + currentModel.getKey().getId() + "\"");
281                 }
282             }
283         }
284
285         // Update the Apex model in the Apex engine
286         engine.clear();
287         engine.updateModel(apexModel);
288
289         LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
290         LOGGER.exit();
291     }
292
293     /*
294      * (non-Javadoc)
295      *
296      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState()
297      */
298     @Override
299     public AxEngineState getState() {
300         return engine.getState();
301     }
302
303     /*
304      * (non-Javadoc)
305      *
306      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll()
307      */
308     @Override
309     public void startAll() throws ApexException {
310         start(this.getKey());
311     }
312
313     /*
314      * (non-Javadoc)
315      *
316      * @see
317      * org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core.
318      * model. concepts.AxArtifactKey)
319      */
320     @Override
321     public void start(final AxArtifactKey engineKey) throws ApexException {
322         LOGGER.entry(engineKey);
323
324         // Check if the key on the start request is correct
325         if (!engineWorkerKey.equals(engineKey)) {
326             LOGGER.warn("engine key " + engineKey.getId() + " does not match the key" + engineWorkerKey.getId()
327                     + " of this engine");
328             throw new ApexException("engine key " + engineKey.getId() + " does not match the key"
329                     + engineWorkerKey.getId() + " of this engine");
330         }
331
332         if (engine == null) {
333             LOGGER.error("apex engine for engine key" + engineWorkerKey.getId() + " null");
334             throw new ApexException("apex engine for engine key" + engineWorkerKey.getId() + " null");
335         }
336
337         // Starts the event processing thread that handles incoming events
338         if (processorThread != null && processorThread.isAlive()) {
339             LOGGER.error("apex engine for engine key" + engineWorkerKey.getId() + " is already running with state "
340                     + getState());
341             throw new ApexException("apex engine for engine key" + engineWorkerKey.getId()
342                     + " is already running with state " + getState());
343         }
344
345         // Start the engine
346         engine.start();
347
348         // Start a thread to process events for the engine
349         processorThread = threadFactory.newThread(processor);
350         processorThread.start();
351
352         LOGGER.exit(engineKey);
353     }
354
355     /*
356      * (non-Javadoc)
357      *
358      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop()
359      */
360     @Override
361     public void stop() throws ApexException {
362         stop(this.getKey());
363     }
364
365     /*
366      * (non-Javadoc)
367      *
368      * @see
369      * org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core.
370      * model. concepts.AxArtifactKey)
371      */
372     @Override
373     public void stop(final AxArtifactKey engineKey) throws ApexException {
374         // Check if the key on the start request is correct
375         if (!engineWorkerKey.equals(engineKey)) {
376             LOGGER.warn("engine key " + engineKey.getId() + " does not match the key" + engineWorkerKey.getId()
377                     + " of this engine");
378             throw new ApexException("engine key " + engineKey.getId() + " does not match the key"
379                     + engineWorkerKey.getId() + " of this engine");
380         }
381
382         if (engine == null) {
383             LOGGER.error("apex engine for engine key" + engineWorkerKey.getId() + " null");
384             throw new ApexException("apex engine for engine key" + engineWorkerKey.getId() + " null");
385         }
386
387         // Interrupt the worker to stop its thread
388         if (processorThread == null || !processorThread.isAlive()) {
389             processorThread = null;
390
391             LOGGER.warn("apex engine for engine key" + engineWorkerKey.getId() + " is already stopped with state "
392                     + getState());
393             return;
394         }
395
396         // Interrupt the thread that is handling events toward the engine
397         processorThread.interrupt();
398         processorThread = null;
399
400         // Stop the engine
401         engine.stop();
402
403         LOGGER.exit(engineKey);
404     }
405
406     /*
407      * (non-Javadoc)
408      *
409      * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear()
410      */
411     @Override
412     public void clear() throws ApexException {
413         clear(this.getKey());
414     }
415
416     /*
417      * (non-Javadoc)
418      *
419      * @see
420      * org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core.
421      * model. concepts.AxArtifactKey)
422      */
423     @Override
424     public void clear(final AxArtifactKey engineKey) throws ApexException {
425         // Check if the key on the start request is correct
426         if (!engineWorkerKey.equals(engineKey)) {
427             LOGGER.warn("engine key " + engineKey.getId() + " does not match the key" + engineWorkerKey.getId()
428                     + " of this engine");
429             throw new ApexException("engine key " + engineKey.getId() + " does not match the key"
430                     + engineWorkerKey.getId() + " of this engine");
431         }
432
433         if (engine == null) {
434             LOGGER.error("apex engine for engine key" + engineWorkerKey.getId() + " null");
435             throw new ApexException("apex engine for engine key" + engineWorkerKey.getId() + " null");
436         }
437
438         // Interrupt the worker to stop its thread
439         if (processorThread != null && !processorThread.isAlive()) {
440             LOGGER.warn("apex engine for engine key" + engineWorkerKey.getId() + " is not stopped with state "
441                     + getState());
442             return;
443         }
444
445         // Clear the engine
446         engine.clear();
447
448         LOGGER.exit(engineKey);
449     }
450
451     /*
452      * (non-Javadoc)
453      *
454      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
455      */
456     @Override
457     public boolean isStarted() {
458         return isStarted(this.getKey());
459     }
460
461     /*
462      * (non-Javadoc)
463      *
464      * @see
465      * org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex.
466      * model. basicmodel.concepts.AxArtifactKey)
467      */
468     @Override
469     public boolean isStarted(final AxArtifactKey engineKey) {
470         final AxEngineState engstate = getState();
471         switch (engstate) {
472             case STOPPED:
473             case STOPPING:
474             case UNDEFINED:
475                 return false;
476             case EXECUTING:
477             case READY:
478                 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
479             default:
480                 break;
481         }
482         return false;
483     }
484
485     /*
486      * (non-Javadoc)
487      *
488      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
489      */
490     @Override
491     public boolean isStopped() {
492         return isStopped(this.getKey());
493     }
494
495     /*
496      * (non-Javadoc)
497      *
498      * @see
499      * org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex.
500      * model. basicmodel.concepts.AxArtifactKey)
501      */
502     @Override
503     public boolean isStopped(final AxArtifactKey engineKey) {
504         final AxEngineState engstate = getState();
505         switch (engstate) {
506             case STOPPING:
507             case UNDEFINED:
508             case EXECUTING:
509             case READY:
510                 return false;
511             case STOPPED:
512                 return processorThread == null || !processorThread.isAlive();
513             default:
514                 break;
515         }
516         return false;
517     }
518
519     /*
520      * (non-Javadoc)
521      *
522      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
523      */
524     @Override
525     public void startPeriodicEvents(final long period) {
526         throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
527     }
528
529     /*
530      * (non-Javadoc)
531      *
532      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
533      */
534     @Override
535     public void stopPeriodicEvents() {
536         throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
537     }
538
539     /*
540      * (non-Javadoc)
541      *
542      * @see
543      * org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core
544      * .model .concepts.AxArtifactKey)
545      */
546     @Override
547     public String getStatus(final AxArtifactKey engineKey) {
548         // Get the information from the engine that we want to return
549         final AxEngineModel apexEngineModel = engine.getEngineStatus();
550         apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
551
552         // Convert that information into a string
553         try {
554             final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
555             final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
556             modelWriter.write(apexEngineModel, baOutputStream);
557             return baOutputStream.toString();
558         } catch (final Exception e) {
559             LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
560             return null;
561         }
562     }
563
564     /*
565      * (non-Javadoc)
566      *
567      * @see
568      * org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex
569      * .core.model.concepts.AxArtifactKey)
570      */
571     @Override
572     public String getRuntimeInfo(final AxArtifactKey engineKey) {
573         // We'll build up the JSON string for runtime information bit by bit
574         final StringBuilder runtimeJsonStringBuilder = new StringBuilder();
575
576         // Get the engine information
577         final AxEngineModel engineModel = engine.getEngineStatus();
578         final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
579
580         // Use GSON to convert our context information into JSON
581         final Gson gson = new GsonBuilder().setPrettyPrinting().create();
582
583         // Get context into a JSON string
584         runtimeJsonStringBuilder.append("{\"TimeStamp\":");
585         runtimeJsonStringBuilder.append(engineModel.getTimestamp());
586         runtimeJsonStringBuilder.append(",\"State\":");
587         runtimeJsonStringBuilder.append(engineModel.getState());
588         runtimeJsonStringBuilder.append(",\"Stats\":");
589         runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
590
591         // Get context into a JSON string
592         runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
593
594         boolean firstAlbum = true;
595         for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
596             if (firstAlbum) {
597                 firstAlbum = false;
598             } else {
599                 runtimeJsonStringBuilder.append(",");
600             }
601
602             runtimeJsonStringBuilder.append("{\"AlbumKey\":");
603             runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
604             runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
605
606
607             // Get the schema helper to use to marshal context album objects to JSON
608             final AxContextAlbum axContextAlbum =
609                     ModelService.getModel(AxContextAlbums.class).get(contextAlbumEntry.getKey());
610             SchemaHelper schemaHelper = null;
611
612             try {
613                 // Get a schema helper to manage the translations between objects on the album map
614                 // for this album
615                 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
616                         axContextAlbum.getItemSchema());
617             } catch (final ContextRuntimeException e) {
618                 final String resultString =
619                         "could not find schema helper to marshal context album \"" + axContextAlbum + "\" to JSON";
620                 LOGGER.warn(resultString, e);
621
622                 // End of context album entry
623                 runtimeJsonStringBuilder.append(resultString);
624                 runtimeJsonStringBuilder.append("]}");
625
626                 continue;
627             }
628
629             boolean firstEntry = true;
630             for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
631                 if (firstEntry) {
632                     firstEntry = false;
633                 } else {
634                     runtimeJsonStringBuilder.append(",");
635                 }
636                 runtimeJsonStringBuilder.append("{\"EntryName\":");
637                 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
638                 runtimeJsonStringBuilder.append(",\"EntryContent\":");
639                 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
640
641                 // End of context entry
642                 runtimeJsonStringBuilder.append("}");
643             }
644
645             // End of context album entry
646             runtimeJsonStringBuilder.append("]}");
647         }
648
649         runtimeJsonStringBuilder.append("]}");
650
651         // Tidy up the JSON string
652         final JsonParser jsonParser = new JsonParser();
653         final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString());
654         final String tidiedRuntimeString = gson.toJson(jsonElement);
655
656         LOGGER.debug("runtime information=" + tidiedRuntimeString);
657
658         return tidiedRuntimeString;
659     }
660
661     /**
662      * This is an event processor thread, this class decouples the events handling logic from core
663      * business logic. This class runs its own thread and continuously querying the blocking queue
664      * for the events that have been sent to the worker for processing by the Apex engine.
665      *
666      * @author Liam Fallon (liam.fallon@ericsson.com)
667      */
668     private class EventProcessor implements Runnable {
669         private final boolean debugEnabled = LOGGER.isDebugEnabled();
670         // the events queue
671         private BlockingQueue<ApexEvent> eventProcessingQueue = null;
672
673         /**
674          * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
675          *
676          * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger
677          *        events.
678          */
679         EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
680             this.eventProcessingQueue = eventProcessingQueue;
681         }
682
683         /*
684          * (non-Javadoc)
685          *
686          * @see java.lang.Runnable#run()
687          */
688         @Override
689         public void run() {
690             LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
691
692             // Take events from the event processing queue of the worker and pass them to the engine
693             // for processing
694             while (!processorThread.isInterrupted()) {
695                 ApexEvent event = null;
696                 try {
697                     event = eventProcessingQueue.take();
698                 } catch (final InterruptedException e) {
699                     // restore the interrupt status
700                     Thread.currentThread().interrupt();
701                     LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
702                     break;
703                 }
704
705                 try {
706                     if (event != null) {
707                         if (debugEnabled) {
708                             LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);
709                         }
710                         final EnEvent enevent = apexEnEventConverter.fromApexEvent(event);
711                         engine.handleEvent(enevent);
712                     }
713                 } catch (final ApexException e) {
714                     LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
715                 } catch (final Exception e) {
716                     LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
717                     break;
718                 }
719             }
720             LOGGER.debug("Engine {} completed processing", engineWorkerKey);
721         }
722     }
723 }