a5de624d195ee9d300e2efd556d1e1299bca4ce3
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  * 
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  * 
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * 
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.service.engine.runtime.impl;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonParser;
27
28 import java.io.ByteArrayInputStream;
29 import java.io.ByteArrayOutputStream;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.concurrent.BlockingQueue;
35
36 import org.onap.policy.apex.context.ContextException;
37 import org.onap.policy.apex.context.ContextRuntimeException;
38 import org.onap.policy.apex.context.SchemaHelper;
39 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
40 import org.onap.policy.apex.core.engine.engine.ApexEngine;
41 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
42 import org.onap.policy.apex.core.engine.event.EnEvent;
43 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
44 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
45 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
46 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
47 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
48 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
49 import org.onap.policy.apex.model.basicmodel.service.ModelService;
50 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
51 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
52 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
53 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
54 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
55 import org.onap.policy.apex.service.engine.event.ApexEvent;
56 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
57 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
58 import org.onap.policy.apex.service.engine.runtime.EngineService;
59 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
60 import org.slf4j.ext.XLogger;
61 import org.slf4j.ext.XLoggerFactory;
62
63 /**
64  * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies
65  * defined in the {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy
66  * is triggered by an Apex event, and when the policy is triggered it runs through to completion in
67  * the ApexEngine.
68  *
69  * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it
70  * events, and receiving events from it.
71  *
72  * @author Liam Fallon (liam.fallon@ericsson.com)
73  */
74 final class EngineWorker implements EngineService {
75     // Logger for this class
76     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
77
78     // Recurring string constants
79     private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
80     private static final String ENGINE_SUFFIX = " of this engine";
81     private static final String BAD_KEY_MATCH_TAG = " does not match the key";
82     private static final String ENGINE_KEY_PREFIX = "engine key ";
83
84     // The ID of this engine
85     private final AxArtifactKey engineWorkerKey;
86
87     // The Apex engine which is running the policies in this worker
88     private final ApexEngine engine;
89
90     // The event processor is an inner class, an instance of which runs as a thread that reads
91     // incoming events from a queue and forwards them to the Apex engine
92     private EventProcessor processor = null;
93
94     // Thread handling for the worker
95     private final ApplicationThreadFactory threadFactory;
96     private Thread processorThread;
97
98     // Converts ApexEvent instances to and from EnEvent instances
99     private ApexEvent2EnEventConverter apexEnEventConverter = null;
100
101     /**
102      * Constructor that creates an Apex engine, an event processor for events to be sent to that
103      * engine, and an {@link ApexModelReader} instance to read Apex models using JAXB.
104      *
105      * @param engineWorkerKey the engine worker key
106      * @param queue the queue on which events for this Apex worker will come
107      * @param threadFactory the thread factory to use for creating the event processing thread
108      * @throws ApexException thrown on errors on worker instantiation
109      */
110     protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
111             final ApplicationThreadFactory threadFactory) {
112         LOGGER.entry(engineWorkerKey);
113
114         this.engineWorkerKey = engineWorkerKey;
115         this.threadFactory = threadFactory;
116
117         // Create the Apex engine
118         engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
119
120         // Create and run the event processor
121         processor = new EventProcessor(queue);
122
123         // Set the Event converter up
124         apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
125
126         LOGGER.exit();
127     }
128
129     /*
130      * (non-Javadoc)
131      * 
132      * @see
133      * org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang.
134      * String, org.onap.policy.apex.service.engine.runtime.ApexEventListener)
135      */
136     @Override
137     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
138         engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
139     }
140
141     /*
142      * (non-Javadoc)
143      * 
144      * @see
145      * org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang.
146      * String)
147      */
148     @Override
149     public void deregisterActionListener(final String listenerName) {
150         engine.removeEventListener(listenerName);
151     }
152
153     /*
154      * (non-Javadoc)
155      *
156      * @see
157      * org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
158      */
159     @Override
160     public EngineServiceEventInterface getEngineServiceEventInterface() {
161         throw new UnsupportedOperationException(
162                 "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
163     }
164
165     /*
166      * (non-Javadoc)
167      *
168      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey()
169      */
170     @Override
171     public AxArtifactKey getKey() {
172         return engineWorkerKey;
173     }
174
175     /*
176      * (non-Javadoc)
177      *
178      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo()
179      */
180     @Override
181     public Collection<AxArtifactKey> getEngineKeys() {
182         return Arrays.asList(engineWorkerKey);
183     }
184
185     /*
186      * (non-Javadoc)
187      *
188      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey()
189      */
190     @Override
191     public AxArtifactKey getApexModelKey() {
192         if (ModelService.existsModel(AxPolicyModel.class)) {
193             return ModelService.getModel(AxPolicyModel.class).getKey();
194         } else {
195             return null;
196         }
197     }
198
199     /*
200      * (non-Javadoc)
201      *
202      * @see
203      * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.
204      * model. basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
205      */
206     @Override
207     public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
208             throws ApexException {
209         LOGGER.entry(engineKey);
210
211         // Read the Apex model into memory using the Apex Model Reader
212         AxPolicyModel apexPolicyModel = null;
213         try {
214             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
215             apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
216         } catch (final ApexModelException e) {
217             LOGGER.error("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
218             throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
219         }
220
221         // Update the Apex model in the Apex engine
222         updateModel(engineKey, apexPolicyModel, forceFlag);
223
224         LOGGER.exit();
225     }
226
227     /*
228      * (non-Javadoc)
229      *
230      * @see
231      * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.
232      * model. basicmodel.concepts.AxArtifactKey,
233      * org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean)
234      */
235     @Override
236     public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
237             throws ApexException {
238         LOGGER.entry(engineKey);
239
240         // Check if the key on the update request is correct
241         if (!engineWorkerKey.equals(engineKey)) {
242             String message = ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
243                     + ENGINE_SUFFIX;
244             LOGGER.warn(message);
245             throw new ApexException(message);
246         }
247
248         // Check model compatibility
249         if (ModelService.existsModel(AxPolicyModel.class)) {
250             // The current policy model may or may not be defined
251             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
252             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
253                 if (forceFlag) {
254                     LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
255                             + "\" is not a compatible model update from the existing engine model with key \""
256                             + currentModel.getKey().getId() + "\"");
257                 } else {
258                     throw new ContextException(
259                             "apex model update failed, supplied model with key \"" + apexModel.getKey().getId()
260                                     + "\" is not a compatible model update from the existing engine model with key \""
261                                     + currentModel.getKey().getId() + "\"");
262                 }
263             }
264         }
265
266         // Update the Apex model in the Apex engine
267         engine.clear();
268         engine.updateModel(apexModel);
269
270         LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
271         LOGGER.exit();
272     }
273
274     /*
275      * (non-Javadoc)
276      *
277      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState()
278      */
279     @Override
280     public AxEngineState getState() {
281         return engine.getState();
282     }
283
284     /*
285      * (non-Javadoc)
286      *
287      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll()
288      */
289     @Override
290     public void startAll() throws ApexException {
291         start(this.getKey());
292     }
293
294     /*
295      * (non-Javadoc)
296      *
297      * @see
298      * org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core.
299      * model. concepts.AxArtifactKey)
300      */
301     @Override
302     public void start(final AxArtifactKey engineKey) throws ApexException {
303         LOGGER.entry(engineKey);
304
305         // Check if the key on the start request is correct
306         if (!engineWorkerKey.equals(engineKey)) {
307             LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
308                     + ENGINE_SUFFIX);
309             throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG
310                     + engineWorkerKey.getId() + ENGINE_SUFFIX);
311         }
312
313         // Starts the event processing thread that handles incoming events
314         if (processorThread != null && processorThread.isAlive()) {
315             String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state "
316                     + getState();
317             LOGGER.error(message);
318             throw new ApexException(message);
319         }
320
321         // Start the engine
322         engine.start();
323
324         // Start a thread to process events for the engine
325         processorThread = threadFactory.newThread(processor);
326         processorThread.start();
327
328         LOGGER.exit(engineKey);
329     }
330
331     /*
332      * (non-Javadoc)
333      *
334      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop()
335      */
336     @Override
337     public void stop() throws ApexException {
338         stop(this.getKey());
339     }
340
341     /*
342      * (non-Javadoc)
343      *
344      * @see
345      * org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core.
346      * model. concepts.AxArtifactKey)
347      */
348     @Override
349     public void stop(final AxArtifactKey engineKey) throws ApexException {
350         // Check if the key on the start request is correct
351         if (!engineWorkerKey.equals(engineKey)) {
352             LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
353                     + ENGINE_SUFFIX);
354             throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG
355                     + engineWorkerKey.getId() + ENGINE_SUFFIX);
356         }
357
358         // Interrupt the worker to stop its thread
359         if (processorThread == null || !processorThread.isAlive()) {
360             processorThread = null;
361
362             LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state "
363                     + getState());
364             return;
365         }
366
367         // Interrupt the thread that is handling events toward the engine
368         processorThread.interrupt();
369         processorThread = null;
370
371         // Stop the engine
372         engine.stop();
373
374         LOGGER.exit(engineKey);
375     }
376
377     /*
378      * (non-Javadoc)
379      *
380      * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear()
381      */
382     @Override
383     public void clear() throws ApexException {
384         clear(this.getKey());
385     }
386
387     /*
388      * (non-Javadoc)
389      *
390      * @see
391      * org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core.
392      * model. concepts.AxArtifactKey)
393      */
394     @Override
395     public void clear(final AxArtifactKey engineKey) throws ApexException {
396         // Check if the key on the start request is correct
397         if (!engineWorkerKey.equals(engineKey)) {
398             LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
399                     + ENGINE_SUFFIX);
400             throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG
401                     + engineWorkerKey.getId() + ENGINE_SUFFIX);
402         }
403
404         // Interrupt the worker to stop its thread
405         if (processorThread != null && !processorThread.isAlive()) {
406             LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state "
407                     + getState());
408             return;
409         }
410
411         // Clear the engine
412         engine.clear();
413
414         LOGGER.exit(engineKey);
415     }
416
417     /*
418      * (non-Javadoc)
419      *
420      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
421      */
422     @Override
423     public boolean isStarted() {
424         return isStarted(this.getKey());
425     }
426
427     /*
428      * (non-Javadoc)
429      *
430      * @see
431      * org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex.
432      * model. basicmodel.concepts.AxArtifactKey)
433      */
434     @Override
435     public boolean isStarted(final AxArtifactKey engineKey) {
436         final AxEngineState engstate = getState();
437         switch (engstate) {
438             case STOPPED:
439             case STOPPING:
440             case UNDEFINED:
441                 return false;
442             case EXECUTING:
443             case READY:
444                 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
445             default:
446                 break;
447         }
448         return false;
449     }
450
451     /*
452      * (non-Javadoc)
453      *
454      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
455      */
456     @Override
457     public boolean isStopped() {
458         return isStopped(this.getKey());
459     }
460
461     /*
462      * (non-Javadoc)
463      *
464      * @see
465      * org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex.
466      * model. basicmodel.concepts.AxArtifactKey)
467      */
468     @Override
469     public boolean isStopped(final AxArtifactKey engineKey) {
470         final AxEngineState engstate = getState();
471         switch (engstate) {
472             case STOPPING:
473             case UNDEFINED:
474             case EXECUTING:
475             case READY:
476                 return false;
477             case STOPPED:
478                 return processorThread == null || !processorThread.isAlive();
479             default:
480                 break;
481         }
482         return false;
483     }
484
485     /*
486      * (non-Javadoc)
487      *
488      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
489      */
490     @Override
491     public void startPeriodicEvents(final long period) {
492         throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
493     }
494
495     /*
496      * (non-Javadoc)
497      *
498      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
499      */
500     @Override
501     public void stopPeriodicEvents() {
502         throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
503     }
504
505     /*
506      * (non-Javadoc)
507      *
508      * @see
509      * org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core
510      * .model .concepts.AxArtifactKey)
511      */
512     @Override
513     public String getStatus(final AxArtifactKey engineKey) {
514         // Get the information from the engine that we want to return
515         final AxEngineModel apexEngineModel = engine.getEngineStatus();
516         apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
517
518         // Convert that information into a string
519         try {
520             final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
521             final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
522             modelWriter.setJsonOutput(true);
523             modelWriter.write(apexEngineModel, baOutputStream);
524             return baOutputStream.toString();
525         } catch (final Exception e) {
526             LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
527             return null;
528         }
529     }
530
531     /*
532      * (non-Javadoc)
533      *
534      * @see
535      * org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex
536      * .core.model.concepts.AxArtifactKey)
537      */
538     @Override
539     public String getRuntimeInfo(final AxArtifactKey engineKey) {
540         // We'll build up the JSON string for runtime information bit by bit
541         final StringBuilder runtimeJsonStringBuilder = new StringBuilder();
542
543         // Get the engine information
544         final AxEngineModel engineModel = engine.getEngineStatus();
545         final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
546
547         // Use GSON to convert our context information into JSON
548         final Gson gson = new GsonBuilder().setPrettyPrinting().create();
549
550         // Get context into a JSON string
551         runtimeJsonStringBuilder.append("{\"TimeStamp\":");
552         runtimeJsonStringBuilder.append(engineModel.getTimestamp());
553         runtimeJsonStringBuilder.append(",\"State\":");
554         runtimeJsonStringBuilder.append(engineModel.getState());
555         runtimeJsonStringBuilder.append(",\"Stats\":");
556         runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
557
558         // Get context into a JSON string
559         runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
560
561         boolean firstAlbum = true;
562         for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
563             if (firstAlbum) {
564                 firstAlbum = false;
565             } else {
566                 runtimeJsonStringBuilder.append(",");
567             }
568
569             runtimeJsonStringBuilder.append("{\"AlbumKey\":");
570             runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
571             runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
572
573
574             // Get the schema helper to use to marshal context album objects to JSON
575             final AxContextAlbum axContextAlbum =
576                     ModelService.getModel(AxContextAlbums.class).get(contextAlbumEntry.getKey());
577             SchemaHelper schemaHelper = null;
578
579             try {
580                 // Get a schema helper to manage the translations between objects on the album map
581                 // for this album
582                 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
583                         axContextAlbum.getItemSchema());
584             } catch (final ContextRuntimeException e) {
585                 final String resultString =
586                         "could not find schema helper to marshal context album \"" + axContextAlbum + "\" to JSON";
587                 LOGGER.warn(resultString, e);
588
589                 // End of context album entry
590                 runtimeJsonStringBuilder.append(resultString);
591                 runtimeJsonStringBuilder.append("]}");
592
593                 continue;
594             }
595
596             boolean firstEntry = true;
597             for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
598                 if (firstEntry) {
599                     firstEntry = false;
600                 } else {
601                     runtimeJsonStringBuilder.append(",");
602                 }
603                 runtimeJsonStringBuilder.append("{\"EntryName\":");
604                 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
605                 runtimeJsonStringBuilder.append(",\"EntryContent\":");
606                 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
607
608                 // End of context entry
609                 runtimeJsonStringBuilder.append("}");
610             }
611
612             // End of context album entry
613             runtimeJsonStringBuilder.append("]}");
614         }
615
616         runtimeJsonStringBuilder.append("]}");
617
618         // Tidy up the JSON string
619         final JsonParser jsonParser = new JsonParser();
620         final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString());
621         final String tidiedRuntimeString = gson.toJson(jsonElement);
622
623         LOGGER.debug("runtime information={}", tidiedRuntimeString);
624
625         return tidiedRuntimeString;
626     }
627
628     /**
629      * This is an event processor thread, this class decouples the events handling logic from core
630      * business logic. This class runs its own thread and continuously querying the blocking queue
631      * for the events that have been sent to the worker for processing by the Apex engine.
632      *
633      * @author Liam Fallon (liam.fallon@ericsson.com)
634      */
635     private class EventProcessor implements Runnable {
636         private final boolean debugEnabled = LOGGER.isDebugEnabled();
637         // the events queue
638         private BlockingQueue<ApexEvent> eventProcessingQueue = null;
639
640         /**
641          * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
642          *
643          * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger
644          *        events.
645          */
646         EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
647             this.eventProcessingQueue = eventProcessingQueue;
648         }
649
650         /*
651          * (non-Javadoc)
652          *
653          * @see java.lang.Runnable#run()
654          */
655         @Override
656         public void run() {
657             LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
658
659             // Take events from the event processing queue of the worker and pass them to the engine
660             // for processing
661             boolean stopFlag = false;
662             while (processorThread != null && !processorThread.isInterrupted() && ! stopFlag) {
663                 ApexEvent event = null;
664                 try {
665                     event = eventProcessingQueue.take();
666                 } catch (final InterruptedException e) {
667                     // restore the interrupt status
668                     Thread.currentThread().interrupt();
669                     LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
670                     break;
671                 }
672
673                 try {
674                     if (event != null) {
675                         debugEventIfDebugEnabled(event);
676                         
677                         final EnEvent enevent = apexEnEventConverter.fromApexEvent(event);
678                         engine.handleEvent(enevent);
679                     }
680                 } catch (final ApexException e) {
681                     LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
682                 } catch (final Exception e) {
683                     LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
684                     stopFlag = true;
685                 }
686             }
687             LOGGER.debug("Engine {} completed processing", engineWorkerKey);
688         }
689
690         /**
691          * Debug the event if debug is enabled.
692          * 
693          * @param event the event to debug
694          */
695         private void debugEventIfDebugEnabled(ApexEvent event) {
696             if (debugEnabled) {
697                 LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);
698             }
699         }
700     }
701 }