4db8fe38ee5bfff80f725724092bb4095a801bbb
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / PoolingManagerImpl.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import java.io.IOException;
25 import java.lang.reflect.InvocationTargetException;
26 import java.lang.reflect.Method;
27 import java.util.Map;
28 import java.util.Properties;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ScheduledFuture;
31 import java.util.concurrent.ScheduledThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
34 import org.onap.policy.common.endpoints.event.comm.TopicListener;
35 import org.onap.policy.common.utils.properties.SpecProperties;
36 import org.onap.policy.drools.controller.DroolsController;
37 import org.onap.policy.drools.pooling.extractor.ClassExtractors;
38 import org.onap.policy.drools.pooling.message.BucketAssignments;
39 import org.onap.policy.drools.pooling.message.Forward;
40 import org.onap.policy.drools.pooling.message.Leader;
41 import org.onap.policy.drools.pooling.message.Message;
42 import org.onap.policy.drools.pooling.message.Offline;
43 import org.onap.policy.drools.pooling.state.ActiveState;
44 import org.onap.policy.drools.pooling.state.IdleState;
45 import org.onap.policy.drools.pooling.state.InactiveState;
46 import org.onap.policy.drools.pooling.state.QueryState;
47 import org.onap.policy.drools.pooling.state.StartState;
48 import org.onap.policy.drools.pooling.state.State;
49 import org.onap.policy.drools.pooling.state.StateTimerTask;
50 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
51 import org.onap.policy.drools.system.PolicyController;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
57  * events coming from external topics are saved in a queue for later processing. Once
58  * assignments are made, the saved events are processed. In addition, while the controller
59  * is locked, events are still forwarded to other hosts and bucket assignments are still
60  * updated, based on any {@link Leader} messages that it receives.
61  */
62 public class PoolingManagerImpl implements PoolingManager, TopicListener {
63
64     private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
65
66     /**
67      * Maximum number of times a message can be forwarded.
68      */
69     public static final int MAX_HOPS = 5;
70
71     /**
72      * ID of this host.
73      */
74     private final String host;
75
76     /**
77      * Properties with which this was configured.
78      */
79     private final PoolingProperties props;
80
81     /**
82      * Associated controller.
83      */
84     private final PolicyController controller;
85
86     /**
87      * Where to offer events that have been forwarded to this host (i.e, the controller).
88      */
89     private final TopicListener listener;
90
91     /**
92      * Decremented each time the manager enters the Active state. Used by junit tests.
93      */
94     private final CountDownLatch activeLatch;
95
96     /**
97      * Used to encode & decode request objects received from & sent to a rule engine.
98      */
99     private final Serializer serializer;
100
101     /**
102      * Internal DMaaP topic used by this controller.
103      */
104     private final String topic;
105
106     /**
107      * Manager for the internal DMaaP topic.
108      */
109     private final DmaapManager dmaapMgr;
110
111     /**
112      * Used to extract the request id from the decoded message.
113      */
114     private final ClassExtractors extractors;
115
116     /**
117      * Lock used while updating {@link #current}. In general, public methods must use
118      * this, while private methods assume the lock is already held.
119      */
120     private final Object curLocker = new Object();
121
122     /**
123      * Current state.
124      * 
125      * <p>This uses a finite state machine, wherein the state object contains all of the data
126      * relevant to that state. Each state object has a process() method, specific to each
127      * type of {@link Message} subclass. The method returns the next state object, or
128      * {@code null} if the state is to remain the same.
129      */
130     private State current;
131
132     /**
133      * Current bucket assignments or {@code null}.
134      */
135     private BucketAssignments assignments = null;
136
137     /**
138      * Pool used to execute timers.
139      */
140     private ScheduledThreadPoolExecutor scheduler = null;
141
142     /**
143      * {@code True} if events offered by the controller should be intercepted,
144      * {@code false} otherwise.
145      */
146     private boolean intercept = true;
147
148     /**
149      * Constructs the manager, initializing all of the data structures.
150      *
151      * @param host name/uuid of this host
152      * @param controller controller with which this is associated
153      * @param props feature properties specific to the controller
154      * @param activeLatch latch to be decremented each time the manager enters the Active
155      *        state
156      */
157     public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
158                     CountDownLatch activeLatch) {
159         this.host = host;
160         this.controller = controller;
161         this.props = props;
162         this.activeLatch = activeLatch;
163
164         try {
165             this.listener = (TopicListener) controller;
166             this.serializer = new Serializer();
167             this.topic = props.getPoolingTopic();
168             this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource()));
169             this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
170             this.current = new IdleState(this);
171
172             logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
173
174         } catch (ClassCastException e) {
175             logger.error("not a topic listener, controller {}", controller.getName());
176             throw new PoolingFeatureRtException(e);
177
178         } catch (PoolingFeatureException e) {
179             logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
180             throw new PoolingFeatureRtException(e);
181         }
182     }
183
184     /**
185      * Should only be used by junit tests.
186      * 
187      * @return the current state
188      */
189     protected State getCurrent() {
190         synchronized (curLocker) {
191             return current;
192         }
193     }
194
195     @Override
196     public String getHost() {
197         return host;
198     }
199
200     @Override
201     public String getTopic() {
202         return topic;
203     }
204
205     @Override
206     public PoolingProperties getProperties() {
207         return props;
208     }
209
210     /**
211      * Makes properties for configuring extractors.
212      * 
213      * @param controller the controller for which the extractors will be configured
214      * @param source properties from which to get the extractor properties
215      * @return extractor properties
216      */
217     private Properties makeExtractorProps(PolicyController controller, Properties source) {
218         return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
219     }
220
221     /**
222      * Indicates that the controller is about to start. Starts the publisher for the
223      * internal topic, and creates a thread pool for the timers.
224      */
225     public void beforeStart() {
226         synchronized (curLocker) {
227             if (scheduler == null) {
228                 dmaapMgr.startPublisher();
229
230                 logger.debug("make scheduler thread for topic {}", getTopic());
231                 scheduler = makeScheduler();
232
233                 /*
234                  * Only a handful of timers at any moment, thus we can afford to take the
235                  * time to remove them when they're cancelled.
236                  */
237                 scheduler.setRemoveOnCancelPolicy(true);
238                 scheduler.setMaximumPoolSize(1);
239                 scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
240                 scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
241             }
242         }
243     }
244
245     /**
246      * Indicates that the controller has successfully started. Starts the consumer for the
247      * internal topic, enters the {@link StartState}, and sets the filter for the initial
248      * state.
249      */
250     public void afterStart() {
251         synchronized (curLocker) {
252             if (current instanceof IdleState) {
253                 dmaapMgr.startConsumer(this);
254                 changeState(new StartState(this));
255             }
256         }
257     }
258
259     /**
260      * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
261      * and the current state.
262      */
263     public void beforeStop() {
264         ScheduledThreadPoolExecutor sched;
265
266         synchronized (curLocker) {
267             sched = scheduler;
268             scheduler = null;
269
270             if (!(current instanceof IdleState)) {
271                 changeState(new IdleState(this));
272                 dmaapMgr.stopConsumer(this);
273                 publishAdmin(new Offline(getHost()));
274             }
275
276             assignments = null;
277         }
278
279         if (sched != null) {
280             logger.debug("stop scheduler for topic {}", getTopic());
281             sched.shutdownNow();
282         }
283     }
284
285     /**
286      * Indicates that the controller has stopped. Stops the publisher and logs a warning
287      * if any events are still in the queue.
288      */
289     public void afterStop() {
290         synchronized (curLocker) {
291             /*
292              * stop the publisher, but allow time for any Offline message to be
293              * transmitted
294              */
295             dmaapMgr.stopPublisher(props.getOfflinePubWaitMs());
296         }
297     }
298
299     /**
300      * Indicates that the controller is about to be locked. Enters the idle state, as all
301      * it will be doing is forwarding messages.
302      */
303     public void beforeLock() {
304         logger.info("locking manager for topic {}", getTopic());
305
306         synchronized (curLocker) {
307             changeState(new IdleState(this));
308         }
309     }
310
311     /**
312      * Indicates that the controller has been unlocked. Enters the start state, if the
313      * controller is running.
314      */
315     public void afterUnlock() {
316         logger.info("unlocking manager for topic {}", getTopic());
317
318         synchronized (curLocker) {
319             if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
320                 changeState(new StartState(this));
321             }
322         }
323     }
324
325     /**
326      * Changes the finite state machine to a new state, provided the new state is not
327      * {@code null}.
328      * 
329      * @param newState new state, or {@code null} if to remain unchanged
330      */
331     private void changeState(State newState) {
332         if (newState != null) {
333             current.cancelTimers();
334             current = newState;
335
336             // set the filter before starting the state
337             setFilter(newState.getFilter());
338             newState.start();
339         }
340     }
341
342     /**
343      * Sets the server-side filter for the internal topic.
344      * 
345      * @param filter new filter to be used
346      */
347     private void setFilter(Map<String, Object> filter) {
348         try {
349             dmaapMgr.setFilter(serializer.encodeFilter(filter));
350
351         } catch (JsonProcessingException e) {
352             logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
353
354         } catch (PoolingFeatureException e) {
355             logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
356         }
357     }
358
359     @Override
360     public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
361         // wrap the task in a TimerAction and schedule it
362         ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
363
364         // wrap the future in a "CancellableScheduledTask"
365         return () -> fut.cancel(false);
366     }
367
368     @Override
369     public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
370         // wrap the task in a TimerAction and schedule it
371         ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
372                         TimeUnit.MILLISECONDS);
373
374         // wrap the future in a "CancellableScheduledTask"
375         return () -> fut.cancel(false);
376     }
377
378     @Override
379     public void publishAdmin(Message msg) {
380         publish(Message.ADMIN, msg);
381     }
382
383     @Override
384     public void publish(String channel, Message msg) {
385         logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
386
387         msg.setChannel(channel);
388
389         try {
390             // ensure it's valid before we send it
391             msg.checkValidity();
392
393             String txt = serializer.encodeMsg(msg);
394             dmaapMgr.publish(txt);
395
396         } catch (JsonProcessingException e) {
397             logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
398
399         } catch (PoolingFeatureException e) {
400             logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
401         }
402     }
403
404     /**
405      * Handles an event from the internal topic.
406      * 
407      * @param commType comm infrastructure
408      * @param topic2 topic
409      * @param event event
410      */
411     @Override
412     public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
413
414         if (event == null) {
415             logger.error("null event on topic {}", topic);
416             return;
417         }
418
419         synchronized (curLocker) {
420             // it's on the internal topic
421             handleInternal(event);
422         }
423     }
424
425     /**
426      * Called by the PolicyController before it offers the event to the DroolsController.
427      * If the controller is locked, then it isn't processing events. However, they still
428      * need to be forwarded, thus in that case, they are decoded and forwarded.
429      * 
430      * <p>On the other hand, if the controller is not locked, then we just return immediately
431      * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
432      * it instead, as it already has the decoded message.
433      * 
434      * @param protocol protocol
435      * @param topic2 topic
436      * @param event event
437      * @return {@code true} if the event was handled by the manager, {@code false} if it
438      *         must still be handled by the invoker
439      */
440     public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
441
442         if (!controller.isLocked() || !intercept) {
443             // we should NOT intercept this message - let the invoker handle it
444             return false;
445         }
446
447         return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
448     }
449
450     /**
451      * Called by the DroolsController before it inserts the event into the rule engine.
452      * 
453      * @param protocol protocol
454      * @param topic2 topic
455      * @param event original event text, as received from the Bus
456      * @param event2 event, as an object
457      * @return {@code true} if the event was handled by the manager, {@code false} if it
458      *         must still be handled by the invoker
459      */
460     public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
461
462         if (!intercept) {
463             // we should NOT intercept this message - let the invoker handle it
464             return false;
465         }
466
467         return handleExternal(protocol, topic2, event, extractRequestId(event2));
468     }
469
470     /**
471      * Handles an event from an external topic.
472      * 
473      * @param protocol protocol
474      * @param topic2 topic
475      * @param event event
476      * @param reqid request id extracted from the event, or {@code null} if it couldn't be
477      *        extracted
478      * @return {@code true} if the event was handled by the manager, {@code false} if it
479      *         must still be handled by the invoker
480      */
481     private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
482         if (reqid == null) {
483             // no request id - let the invoker handle it
484             return false;
485         }
486
487         if (reqid.isEmpty()) {
488             logger.warn("handle locally due to empty request id for topic {}", topic2);
489             // no request id - let the invoker handle it
490             return false;
491         }
492
493         Forward ev = makeForward(protocol, topic2, event, reqid);
494         if (ev == null) {
495             // invalid args - consume the message
496             logger.warn("constructed an invalid Forward message on topic {}", getTopic());
497             return true;
498         }
499
500         synchronized (curLocker) {
501             return handleExternal(ev);
502         }
503     }
504
505     /**
506      * Handles an event from an external topic.
507      * 
508      * @param event event
509      * @return {@code true} if the event was handled, {@code false} if the invoker should
510      *         handle it
511      */
512     private boolean handleExternal(Forward event) {
513         if (assignments == null) {
514             // no bucket assignments yet - handle locally
515             logger.info("handle event locally for request {}", event.getRequestId());
516
517             // we did NOT consume the event
518             return false;
519
520         } else {
521             return handleEvent(event);
522         }
523     }
524
525     /**
526      * Handles a {@link Forward} event, possibly forwarding it again.
527      * 
528      * @param event event
529      * @return {@code true} if the event was handled, {@code false} if the invoker should
530      *         handle it
531      */
532     private boolean handleEvent(Forward event) {
533         String target = assignments.getAssignedHost(event.getRequestId().hashCode());
534
535         if (target == null) {
536             /*
537              * This bucket has no assignment - just discard the event
538              */
539             logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
540             return true;
541         }
542
543         if (target.equals(host)) {
544             /*
545              * Message belongs to this host - allow the controller to handle it.
546              */
547             logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
548             return false;
549         }
550
551         // forward to a different host, if hop count has been exhausted
552         if (event.getNumHops() > MAX_HOPS) {
553             logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
554                             topic);
555
556         } else {
557             logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
558             event.bumpNumHops();
559             publish(target, event);
560         }
561
562         // either way, consume the event
563         return true;
564     }
565
566     /**
567      * Extract the request id from an event object.
568      * 
569      * @param event the event object, or {@code null}
570      * @return the event's request id, or {@code null} if it can't be extracted
571      */
572     private String extractRequestId(Object event) {
573         if (event == null) {
574             return null;
575         }
576
577         Object reqid = extractors.extract(event);
578         return (reqid != null ? reqid.toString() : null);
579     }
580
581     /**
582      * Decodes an event from a String into an event Object.
583      * 
584      * @param topic2 topic
585      * @param event event
586      * @return the decoded event object, or {@code null} if it can't be decoded
587      */
588     private Object decodeEvent(String topic2, String event) {
589         DroolsController drools = controller.getDrools();
590
591         // check if this topic has a decoder
592
593         if (!canDecodeEvent(drools, topic2)) {
594
595             logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
596                             drools.getArtifactId());
597             return null;
598         }
599
600         // decode
601
602         try {
603             return decodeEventWrapper(drools, topic2, event);
604
605         } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
606             logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
607             return null;
608         }
609     }
610
611     /**
612      * Makes a {@link Forward}, and validates its contents.
613      * 
614      * @param protocol protocol
615      * @param topic2 topic
616      * @param event event
617      * @param reqid request id
618      * @return a new message, or {@code null} if the message was invalid
619      */
620     private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
621         try {
622             Forward ev = new Forward(host, protocol, topic2, event, reqid);
623
624             // required for the validity check
625             ev.setChannel(host);
626
627             ev.checkValidity();
628
629             return ev;
630
631         } catch (PoolingFeatureException e) {
632             logger.error("invalid message for topic {}", topic2, e);
633             return null;
634         }
635     }
636
637     @Override
638     public void handle(Forward event) {
639         synchronized (curLocker) {
640             if (!handleExternal(event)) {
641                 // this host should handle it - inject it
642                 inject(event);
643             }
644         }
645     }
646
647     /**
648      * Injects an event into the controller.
649      * 
650      * @param event event
651      */
652     private void inject(Forward event) {
653         logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
654
655         try {
656             intercept = false;
657             listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
658
659         } finally {
660             intercept = true;
661         }
662     }
663
664     /**
665      * Handles an event from the internal topic. This uses reflection to identify the
666      * appropriate process() method to invoke, based on the type of Message that was
667      * decoded.
668      * 
669      * @param event the serialized {@link Message} read from the internal topic
670      */
671     private void handleInternal(String event) {
672         Class<?> clazz = null;
673
674         try {
675             Message msg = serializer.decodeMsg(event);
676
677             // get the class BEFORE checking the validity
678             clazz = msg.getClass();
679
680             msg.checkValidity();
681
682             Method meth = current.getClass().getMethod("process", msg.getClass());
683             changeState((State) meth.invoke(current, msg));
684
685         } catch (IOException e) {
686             logger.warn("failed to decode message for topic {}", topic, e);
687
688         } catch (NoSuchMethodException | SecurityException e) {
689             logger.error("no processor for message {} for topic {}", clazz, topic, e);
690
691         } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
692                         | PoolingFeatureException e) {
693             logger.error("failed to process message {} for topic {}", clazz, topic, e);
694         }
695     }
696
697     @Override
698     public void startDistributing(BucketAssignments asgn) {
699         synchronized (curLocker) {
700             int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
701             logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
702             assignments = asgn;
703         }
704     }
705
706     @Override
707     public BucketAssignments getAssignments() {
708         return assignments;
709     }
710
711     @Override
712     public State goStart() {
713         return new StartState(this);
714     }
715
716     @Override
717     public State goQuery() {
718         return new QueryState(this);
719     }
720
721     @Override
722     public State goActive() {
723         activeLatch.countDown();
724         return new ActiveState(this);
725     }
726
727     @Override
728     public State goInactive() {
729         return new InactiveState(this);
730     }
731
732     /**
733      * Action to run a timer task. Only runs the task if the machine is still in the state
734      * that it was in when the timer was created.
735      */
736     private class TimerAction implements Runnable {
737
738         /**
739          * State of the machine when the timer was created.
740          */
741         private State origState;
742
743         /**
744          * Task to be executed.
745          */
746         private StateTimerTask task;
747
748         /**
749          * Constructor.
750          * 
751          * @param task task to execute when this timer runs
752          */
753         public TimerAction(StateTimerTask task) {
754             this.origState = current;
755             this.task = task;
756         }
757
758         @Override
759         public void run() {
760             synchronized (curLocker) {
761                 if (current == origState) {
762                     changeState(task.fire());
763                 }
764             }
765         }
766     }
767     
768     /*
769      * The remaining methods may be overridden by junit tests.
770      */
771
772     /**
773      * Creates object extractors.
774      * 
775      * @param props properties used to configure the extractors
776      * @return a new set of extractors
777      */
778     protected ClassExtractors makeClassExtractors(Properties props) {
779         return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
780                         PoolingProperties.EXTRACTOR_TYPE);
781     }
782
783     /**
784      * Creates a DMaaP manager.
785      * 
786      * @param topic name of the internal DMaaP topic
787      * @return a new DMaaP manager
788      * @throws PoolingFeatureException if an error occurs
789      */
790     protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
791         return new DmaapManager(topic);
792     }
793
794     /**
795      * Creates a scheduled thread pool.
796      * 
797      * @return a new scheduled thread pool
798      */
799     protected ScheduledThreadPoolExecutor makeScheduler() {
800         return new ScheduledThreadPoolExecutor(1);
801     }
802
803     /**
804      * Determines if the event can be decoded.
805      * 
806      * @param drools drools controller
807      * @param topic topic on which the event was received
808      * @return {@code true} if the event can be decoded, {@code false} otherwise
809      */
810     protected boolean canDecodeEvent(DroolsController drools, String topic) {
811         return EventProtocolCoder.manager.isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), topic);
812     }
813
814     /**
815      * Decodes the event.
816      * 
817      * @param drools drools controller
818      * @param topic topic on which the event was received
819      * @param event event text to be decoded
820      * @return the decoded event
821      * @throws IllegalArgumentException illegal argument
822      * @throw UnsupportedOperationException unsupported operation
823      * @throws IllegalStateException illegal state
824      */
825     protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
826         return EventProtocolCoder.manager.decode(drools.getGroupId(), drools.getArtifactId(), topic, event);
827     }
828 }