0ca7fe837721b0766f29df9c11d2c2517200f11d
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  * 
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  * 
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * 
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.runtime.impl;
23
24 import com.google.gson.Gson;
25 import com.google.gson.GsonBuilder;
26 import com.google.gson.JsonElement;
27 import com.google.gson.JsonParser;
28
29 import java.io.ByteArrayInputStream;
30 import java.io.ByteArrayOutputStream;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.concurrent.BlockingQueue;
36
37 import org.onap.policy.apex.context.ContextException;
38 import org.onap.policy.apex.context.ContextRuntimeException;
39 import org.onap.policy.apex.context.SchemaHelper;
40 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
41 import org.onap.policy.apex.core.engine.engine.ApexEngine;
42 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
43 import org.onap.policy.apex.core.engine.event.EnEvent;
44 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
45 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
46 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
47 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
48 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
49 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
50 import org.onap.policy.apex.model.basicmodel.service.ModelService;
51 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
52 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
53 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
54 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
55 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
56 import org.onap.policy.apex.service.engine.event.ApexEvent;
57 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
58 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
59 import org.onap.policy.apex.service.engine.runtime.EngineService;
60 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
61 import org.slf4j.ext.XLogger;
62 import org.slf4j.ext.XLoggerFactory;
63
64 /**
65  * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies defined in the
66  * {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy is triggered by an Apex event, and
67  * when the policy is triggered it runs through to completion in the ApexEngine.
68  *
69  * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it events, and receiving
70  * events from it.
71  *
72  * @author Liam Fallon (liam.fallon@ericsson.com)
73  */
74 final class EngineWorker implements EngineService {
75     // Logger for this class
76     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
77
78     // Recurring string constants
79     private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
80     private static final String ENGINE_SUFFIX = " of this engine";
81     private static final String BAD_KEY_MATCH_TAG = " does not match the key";
82     private static final String ENGINE_KEY_PREFIX = "engine key ";
83
84     // The ID of this engine
85     private final AxArtifactKey engineWorkerKey;
86
87     // The Apex engine which is running the policies in this worker
88     private final ApexEngine engine;
89
90     // The event processor is an inner class, an instance of which runs as a thread that reads
91     // incoming events from a queue and forwards them to the Apex engine
92     private EventProcessor processor = null;
93
94     // Thread handling for the worker
95     private final ApplicationThreadFactory threadFactory;
96     private Thread processorThread;
97
98     // Converts ApexEvent instances to and from EnEvent instances
99     private ApexEvent2EnEventConverter apexEnEventConverter = null;
100
101     /**
102      * Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an
103      * {@link ApexModelReader} instance to read Apex models using JAXB.
104      *
105      * @param engineWorkerKey the engine worker key
106      * @param queue the queue on which events for this Apex worker will come
107      * @param threadFactory the thread factory to use for creating the event processing thread
108      * @throws ApexException thrown on errors on worker instantiation
109      */
110     protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
111         final ApplicationThreadFactory threadFactory) {
112         LOGGER.entry(engineWorkerKey);
113
114         this.engineWorkerKey = engineWorkerKey;
115         this.threadFactory = threadFactory;
116
117         // Create the Apex engine
118         engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
119
120         // Create and run the event processor
121         processor = new EventProcessor(queue);
122
123         // Set the Event converter up
124         apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
125
126         LOGGER.exit();
127     }
128
129     /*
130      * (non-Javadoc)
131      * 
132      * @see org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. String,
133      * org.onap.policy.apex.service.engine.runtime.ApexEventListener)
134      */
135     @Override
136     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
137         engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
138     }
139
140     /*
141      * (non-Javadoc)
142      * 
143      * @see org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. String)
144      */
145     @Override
146     public void deregisterActionListener(final String listenerName) {
147         engine.removeEventListener(listenerName);
148     }
149
150     /*
151      * (non-Javadoc)
152      *
153      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
154      */
155     @Override
156     public EngineServiceEventInterface getEngineServiceEventInterface() {
157         throw new UnsupportedOperationException(
158             "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
159     }
160
161     /*
162      * (non-Javadoc)
163      *
164      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey()
165      */
166     @Override
167     public AxArtifactKey getKey() {
168         return engineWorkerKey;
169     }
170
171     /*
172      * (non-Javadoc)
173      *
174      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo()
175      */
176     @Override
177     public Collection<AxArtifactKey> getEngineKeys() {
178         return Arrays.asList(engineWorkerKey);
179     }
180
181     /*
182      * (non-Javadoc)
183      *
184      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey()
185      */
186     @Override
187     public AxArtifactKey getApexModelKey() {
188         if (ModelService.existsModel(AxPolicyModel.class)) {
189             return ModelService.getModel(AxPolicyModel.class).getKey();
190         } else {
191             return null;
192         }
193     }
194
195     /*
196      * (non-Javadoc)
197      *
198      * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. model.
199      * basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
200      */
201     @Override
202     public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
203         throws ApexException {
204         LOGGER.entry(engineKey);
205
206         // Read the Apex model into memory using the Apex Model Reader
207         AxPolicyModel apexPolicyModel = null;
208         try {
209             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
210             apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
211         } catch (final ApexModelException e) {
212             LOGGER.error("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
213             throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
214         }
215
216         // Update the Apex model in the Apex engine
217         updateModel(engineKey, apexPolicyModel, forceFlag);
218
219         LOGGER.exit();
220     }
221
222     /*
223      * (non-Javadoc)
224      *
225      * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. model.
226      * basicmodel.concepts.AxArtifactKey, org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean)
227      */
228     @Override
229     public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
230         throws ApexException {
231         LOGGER.entry(engineKey);
232
233         // Check if the key on the update request is correct
234         if (!engineWorkerKey.equals(engineKey)) {
235             String message = ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
236                 + ENGINE_SUFFIX;
237             LOGGER.warn(message);
238             throw new ApexException(message);
239         }
240
241         // Check model compatibility
242         if (ModelService.existsModel(AxPolicyModel.class)) {
243             // The current policy model may or may not be defined
244             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
245             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
246                 if (forceFlag) {
247                     LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
248                         + "\" is not a compatible model update from the existing engine model with key \""
249                         + currentModel.getKey().getId() + "\"");
250                 } else {
251                     throw new ContextException(
252                         "apex model update failed, supplied model with key \"" + apexModel.getKey().getId()
253                             + "\" is not a compatible model update from the existing engine model with key \""
254                             + currentModel.getKey().getId() + "\"");
255                 }
256             }
257         }
258
259         // Update the Apex model in the Apex engine
260         engine.updateModel(apexModel);
261
262         LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
263         LOGGER.exit();
264     }
265
266     /*
267      * (non-Javadoc)
268      *
269      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState()
270      */
271     @Override
272     public AxEngineState getState() {
273         return engine.getState();
274     }
275
276     /*
277      * (non-Javadoc)
278      *
279      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll()
280      */
281     @Override
282     public void startAll() throws ApexException {
283         start(this.getKey());
284     }
285
286     /*
287      * (non-Javadoc)
288      *
289      * @see org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core. model.
290      * concepts.AxArtifactKey)
291      */
292     @Override
293     public void start(final AxArtifactKey engineKey) throws ApexException {
294         LOGGER.entry(engineKey);
295
296         // Check if the key on the start request is correct
297         if (!engineWorkerKey.equals(engineKey)) {
298             LOGGER.warn(
299                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
300             throw new ApexException(
301                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
302         }
303
304         // Starts the event processing thread that handles incoming events
305         if (processorThread != null && processorThread.isAlive()) {
306             String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state "
307                 + getState();
308             LOGGER.error(message);
309             throw new ApexException(message);
310         }
311
312         // Start the engine
313         engine.start();
314
315         // Start a thread to process events for the engine
316         processorThread = threadFactory.newThread(processor);
317         processorThread.start();
318
319         LOGGER.exit(engineKey);
320     }
321
322     /*
323      * (non-Javadoc)
324      *
325      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop()
326      */
327     @Override
328     public void stop() throws ApexException {
329         stop(this.getKey());
330     }
331
332     /*
333      * (non-Javadoc)
334      *
335      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core. model.
336      * concepts.AxArtifactKey)
337      */
338     @Override
339     public void stop(final AxArtifactKey engineKey) throws ApexException {
340         // Check if the key on the start request is correct
341         if (!engineWorkerKey.equals(engineKey)) {
342             LOGGER.warn(
343                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
344             throw new ApexException(
345                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
346         }
347
348         // Interrupt the worker to stop its thread
349         if (processorThread == null || !processorThread.isAlive()) {
350             processorThread = null;
351
352             LOGGER
353                 .warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " + getState());
354             return;
355         }
356
357         // Interrupt the thread that is handling events toward the engine
358         processorThread.interrupt();
359         processorThread = null;
360
361         // Stop the engine
362         engine.stop();
363
364         LOGGER.exit(engineKey);
365     }
366
367     /*
368      * (non-Javadoc)
369      *
370      * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear()
371      */
372     @Override
373     public void clear() throws ApexException {
374         clear(this.getKey());
375     }
376
377     /*
378      * (non-Javadoc)
379      *
380      * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core. model.
381      * concepts.AxArtifactKey)
382      */
383     @Override
384     public void clear(final AxArtifactKey engineKey) throws ApexException {
385         // Check if the key on the start request is correct
386         if (!engineWorkerKey.equals(engineKey)) {
387             LOGGER.warn(
388                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
389             throw new ApexException(
390                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
391         }
392
393         // Interrupt the worker to stop its thread
394         if (processorThread != null && !processorThread.isAlive()) {
395             LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " + getState());
396             return;
397         }
398
399         // Clear the engine
400         engine.clear();
401
402         LOGGER.exit(engineKey);
403     }
404
405     /*
406      * (non-Javadoc)
407      *
408      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
409      */
410     @Override
411     public boolean isStarted() {
412         return isStarted(this.getKey());
413     }
414
415     /*
416      * (non-Javadoc)
417      *
418      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex. model.
419      * basicmodel.concepts.AxArtifactKey)
420      */
421     @Override
422     public boolean isStarted(final AxArtifactKey engineKey) {
423         final AxEngineState engstate = getState();
424         switch (engstate) {
425             case STOPPED:
426             case STOPPING:
427             case UNDEFINED:
428                 return false;
429             case EXECUTING:
430             case READY:
431                 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
432             default:
433                 break;
434         }
435         return false;
436     }
437
438     /*
439      * (non-Javadoc)
440      *
441      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
442      */
443     @Override
444     public boolean isStopped() {
445         return isStopped(this.getKey());
446     }
447
448     /*
449      * (non-Javadoc)
450      *
451      * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex. model.
452      * basicmodel.concepts.AxArtifactKey)
453      */
454     @Override
455     public boolean isStopped(final AxArtifactKey engineKey) {
456         final AxEngineState engstate = getState();
457         switch (engstate) {
458             case STOPPING:
459             case UNDEFINED:
460             case EXECUTING:
461             case READY:
462                 return false;
463             case STOPPED:
464                 return processorThread == null || !processorThread.isAlive();
465             default:
466                 break;
467         }
468         return false;
469     }
470
471     /*
472      * (non-Javadoc)
473      *
474      * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
475      */
476     @Override
477     public void startPeriodicEvents(final long period) {
478         throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
479     }
480
481     /*
482      * (non-Javadoc)
483      *
484      * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
485      */
486     @Override
487     public void stopPeriodicEvents() {
488         throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
489     }
490
491     /*
492      * (non-Javadoc)
493      *
494      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core .model
495      * .concepts.AxArtifactKey)
496      */
497     @Override
498     public String getStatus(final AxArtifactKey engineKey) {
499         // Get the information from the engine that we want to return
500         final AxEngineModel apexEngineModel = engine.getEngineStatus();
501         apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
502
503         // Convert that information into a string
504         try {
505             final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
506             final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
507             modelWriter.setJsonOutput(true);
508             modelWriter.write(apexEngineModel, baOutputStream);
509             return baOutputStream.toString();
510         } catch (final Exception e) {
511             LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
512             return null;
513         }
514     }
515
516     /*
517      * (non-Javadoc)
518      *
519      * @see org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex
520      * .core.model.concepts.AxArtifactKey)
521      */
522     @Override
523     public String getRuntimeInfo(final AxArtifactKey engineKey) {
524         // We'll build up the JSON string for runtime information bit by bit
525         final StringBuilder runtimeJsonStringBuilder = new StringBuilder();
526
527         // Get the engine information
528         final AxEngineModel engineModel = engine.getEngineStatus();
529         final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
530
531         // Use GSON to convert our context information into JSON
532         final Gson gson = new GsonBuilder().setPrettyPrinting().create();
533
534         // Get context into a JSON string
535         runtimeJsonStringBuilder.append("{\"TimeStamp\":");
536         runtimeJsonStringBuilder.append(engineModel.getTimestamp());
537         runtimeJsonStringBuilder.append(",\"State\":");
538         runtimeJsonStringBuilder.append(engineModel.getState());
539         runtimeJsonStringBuilder.append(",\"Stats\":");
540         runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
541
542         // Get context into a JSON string
543         runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
544
545         boolean firstAlbum = true;
546         for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
547             if (firstAlbum) {
548                 firstAlbum = false;
549             } else {
550                 runtimeJsonStringBuilder.append(",");
551             }
552
553             runtimeJsonStringBuilder.append("{\"AlbumKey\":");
554             runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
555             runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
556
557             // Get the schema helper to use to marshal context album objects to JSON
558             final AxContextAlbum axContextAlbum = ModelService.getModel(AxContextAlbums.class)
559                 .get(contextAlbumEntry.getKey());
560             SchemaHelper schemaHelper = null;
561
562             try {
563                 // Get a schema helper to manage the translations between objects on the album map
564                 // for this album
565                 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
566                     axContextAlbum.getItemSchema());
567             } catch (final ContextRuntimeException e) {
568                 final String resultString = "could not find schema helper to marshal context album \"" + axContextAlbum
569                     + "\" to JSON";
570                 LOGGER.warn(resultString, e);
571
572                 // End of context album entry
573                 runtimeJsonStringBuilder.append(resultString);
574                 runtimeJsonStringBuilder.append("]}");
575
576                 continue;
577             }
578
579             boolean firstEntry = true;
580             for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
581                 if (firstEntry) {
582                     firstEntry = false;
583                 } else {
584                     runtimeJsonStringBuilder.append(",");
585                 }
586                 runtimeJsonStringBuilder.append("{\"EntryName\":");
587                 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
588                 runtimeJsonStringBuilder.append(",\"EntryContent\":");
589                 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
590
591                 // End of context entry
592                 runtimeJsonStringBuilder.append("}");
593             }
594
595             // End of context album entry
596             runtimeJsonStringBuilder.append("]}");
597         }
598
599         runtimeJsonStringBuilder.append("]}");
600
601         // Tidy up the JSON string
602         final JsonParser jsonParser = new JsonParser();
603         final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString());
604         final String tidiedRuntimeString = gson.toJson(jsonElement);
605
606         LOGGER.debug("runtime information={}", tidiedRuntimeString);
607
608         return tidiedRuntimeString;
609     }
610
611     /**
612      * This is an event processor thread, this class decouples the events handling logic from core business logic. This
613      * class runs its own thread and continuously querying the blocking queue for the events that have been sent to the
614      * worker for processing by the Apex engine.
615      *
616      * @author Liam Fallon (liam.fallon@ericsson.com)
617      */
618     private class EventProcessor implements Runnable {
619         private final boolean debugEnabled = LOGGER.isDebugEnabled();
620         // the events queue
621         private BlockingQueue<ApexEvent> eventProcessingQueue = null;
622
623         /**
624          * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
625          *
626          * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger events.
627          */
628         EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
629             this.eventProcessingQueue = eventProcessingQueue;
630         }
631
632         /*
633          * (non-Javadoc)
634          *
635          * @see java.lang.Runnable#run()
636          */
637         @Override
638         public void run() {
639             LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
640
641             // Take events from the event processing queue of the worker and pass them to the engine
642             // for processing
643             boolean stopFlag = false;
644             while (processorThread != null && !processorThread.isInterrupted() && !stopFlag) {
645                 ApexEvent event = null;
646                 try {
647                     event = eventProcessingQueue.take();
648                 } catch (final InterruptedException e) {
649                     // restore the interrupt status
650                     Thread.currentThread().interrupt();
651                     LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
652                     break;
653                 }
654
655                 try {
656                     if (event != null) {
657                         debugEventIfDebugEnabled(event);
658
659                         final EnEvent enevent = apexEnEventConverter.fromApexEvent(event);
660                         engine.handleEvent(enevent);
661                     }
662                 } catch (final ApexException e) {
663                     LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
664                 } catch (final Exception e) {
665                     LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
666                     stopFlag = true;
667                 }
668             }
669             LOGGER.debug("Engine {} completed processing", engineWorkerKey);
670         }
671
672         /**
673          * Debug the event if debug is enabled.
674          * 
675          * @param event the event to debug
676          */
677         private void debugEventIfDebugEnabled(ApexEvent event) {
678             if (debugEnabled) {
679                 LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);
680             }
681         }
682     }
683 }