Remove jackson from feature-pooling-dmaap
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / PoolingManagerImpl.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import com.google.gson.JsonParseException;
24 import java.lang.reflect.InvocationTargetException;
25 import java.lang.reflect.Method;
26 import java.util.Map;
27 import java.util.Properties;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ScheduledFuture;
30 import java.util.concurrent.ScheduledThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
33 import org.onap.policy.common.endpoints.event.comm.TopicListener;
34 import org.onap.policy.common.utils.properties.SpecProperties;
35 import org.onap.policy.drools.controller.DroolsController;
36 import org.onap.policy.drools.pooling.extractor.ClassExtractors;
37 import org.onap.policy.drools.pooling.message.BucketAssignments;
38 import org.onap.policy.drools.pooling.message.Forward;
39 import org.onap.policy.drools.pooling.message.Leader;
40 import org.onap.policy.drools.pooling.message.Message;
41 import org.onap.policy.drools.pooling.message.Offline;
42 import org.onap.policy.drools.pooling.state.ActiveState;
43 import org.onap.policy.drools.pooling.state.IdleState;
44 import org.onap.policy.drools.pooling.state.InactiveState;
45 import org.onap.policy.drools.pooling.state.QueryState;
46 import org.onap.policy.drools.pooling.state.StartState;
47 import org.onap.policy.drools.pooling.state.State;
48 import org.onap.policy.drools.pooling.state.StateTimerTask;
49 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
50 import org.onap.policy.drools.system.PolicyController;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 /**
55  * Implementation of a {@link PoolingManager}. Until bucket assignments have been made,
56  * events coming from external topics are saved in a queue for later processing. Once
57  * assignments are made, the saved events are processed. In addition, while the controller
58  * is locked, events are still forwarded to other hosts and bucket assignments are still
59  * updated, based on any {@link Leader} messages that it receives.
60  */
61 public class PoolingManagerImpl implements PoolingManager, TopicListener {
62
63     private static final Logger logger = LoggerFactory.getLogger(PoolingManagerImpl.class);
64
65     /**
66      * Maximum number of times a message can be forwarded.
67      */
68     public static final int MAX_HOPS = 5;
69
70     /**
71      * ID of this host.
72      */
73     private final String host;
74
75     /**
76      * Properties with which this was configured.
77      */
78     private final PoolingProperties props;
79
80     /**
81      * Associated controller.
82      */
83     private final PolicyController controller;
84
85     /**
86      * Where to offer events that have been forwarded to this host (i.e, the controller).
87      */
88     private final TopicListener listener;
89
90     /**
91      * Decremented each time the manager enters the Active state. Used by junit tests.
92      */
93     private final CountDownLatch activeLatch;
94
95     /**
96      * Used to encode & decode request objects received from & sent to a rule engine.
97      */
98     private final Serializer serializer;
99
100     /**
101      * Internal DMaaP topic used by this controller.
102      */
103     private final String topic;
104
105     /**
106      * Manager for the internal DMaaP topic.
107      */
108     private final DmaapManager dmaapMgr;
109
110     /**
111      * Used to extract the request id from the decoded message.
112      */
113     private final ClassExtractors extractors;
114
115     /**
116      * Lock used while updating {@link #current}. In general, public methods must use
117      * this, while private methods assume the lock is already held.
118      */
119     private final Object curLocker = new Object();
120
121     /**
122      * Current state.
123      * 
124      * <p>This uses a finite state machine, wherein the state object contains all of the data
125      * relevant to that state. Each state object has a process() method, specific to each
126      * type of {@link Message} subclass. The method returns the next state object, or
127      * {@code null} if the state is to remain the same.
128      */
129     private State current;
130
131     /**
132      * Current bucket assignments or {@code null}.
133      */
134     private BucketAssignments assignments = null;
135
136     /**
137      * Pool used to execute timers.
138      */
139     private ScheduledThreadPoolExecutor scheduler = null;
140
141     /**
142      * {@code True} if events offered by the controller should be intercepted,
143      * {@code false} otherwise.
144      */
145     private boolean intercept = true;
146
147     /**
148      * Constructs the manager, initializing all of the data structures.
149      *
150      * @param host name/uuid of this host
151      * @param controller controller with which this is associated
152      * @param props feature properties specific to the controller
153      * @param activeLatch latch to be decremented each time the manager enters the Active
154      *        state
155      */
156     public PoolingManagerImpl(String host, PolicyController controller, PoolingProperties props,
157                     CountDownLatch activeLatch) {
158         this.host = host;
159         this.controller = controller;
160         this.props = props;
161         this.activeLatch = activeLatch;
162
163         try {
164             this.listener = (TopicListener) controller;
165             this.serializer = new Serializer();
166             this.topic = props.getPoolingTopic();
167             this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource()));
168             this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
169             this.current = new IdleState(this);
170
171             logger.info("allocating host {} to controller {} for topic {}", host, controller.getName(), topic);
172
173         } catch (ClassCastException e) {
174             logger.error("not a topic listener, controller {}", controller.getName());
175             throw new PoolingFeatureRtException(e);
176
177         } catch (PoolingFeatureException e) {
178             logger.error("failed to attach internal DMaaP topic to controller {}", controller.getName());
179             throw new PoolingFeatureRtException(e);
180         }
181     }
182
183     /**
184      * Should only be used by junit tests.
185      * 
186      * @return the current state
187      */
188     protected State getCurrent() {
189         synchronized (curLocker) {
190             return current;
191         }
192     }
193
194     @Override
195     public String getHost() {
196         return host;
197     }
198
199     @Override
200     public String getTopic() {
201         return topic;
202     }
203
204     @Override
205     public PoolingProperties getProperties() {
206         return props;
207     }
208
209     /**
210      * Makes properties for configuring extractors.
211      * 
212      * @param controller the controller for which the extractors will be configured
213      * @param source properties from which to get the extractor properties
214      * @return extractor properties
215      */
216     private Properties makeExtractorProps(PolicyController controller, Properties source) {
217         return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
218     }
219
220     /**
221      * Indicates that the controller is about to start. Starts the publisher for the
222      * internal topic, and creates a thread pool for the timers.
223      */
224     public void beforeStart() {
225         synchronized (curLocker) {
226             if (scheduler == null) {
227                 dmaapMgr.startPublisher();
228
229                 logger.debug("make scheduler thread for topic {}", getTopic());
230                 scheduler = makeScheduler();
231
232                 /*
233                  * Only a handful of timers at any moment, thus we can afford to take the
234                  * time to remove them when they're cancelled.
235                  */
236                 scheduler.setRemoveOnCancelPolicy(true);
237                 scheduler.setMaximumPoolSize(1);
238                 scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
239                 scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
240             }
241         }
242     }
243
244     /**
245      * Indicates that the controller has successfully started. Starts the consumer for the
246      * internal topic, enters the {@link StartState}, and sets the filter for the initial
247      * state.
248      */
249     public void afterStart() {
250         synchronized (curLocker) {
251             if (current instanceof IdleState) {
252                 dmaapMgr.startConsumer(this);
253                 changeState(new StartState(this));
254             }
255         }
256     }
257
258     /**
259      * Indicates that the controller is about to stop. Stops the consumer, the scheduler,
260      * and the current state.
261      */
262     public void beforeStop() {
263         ScheduledThreadPoolExecutor sched;
264
265         synchronized (curLocker) {
266             sched = scheduler;
267             scheduler = null;
268
269             if (!(current instanceof IdleState)) {
270                 changeState(new IdleState(this));
271                 dmaapMgr.stopConsumer(this);
272                 publishAdmin(new Offline(getHost()));
273             }
274
275             assignments = null;
276         }
277
278         if (sched != null) {
279             logger.debug("stop scheduler for topic {}", getTopic());
280             sched.shutdownNow();
281         }
282     }
283
284     /**
285      * Indicates that the controller has stopped. Stops the publisher and logs a warning
286      * if any events are still in the queue.
287      */
288     public void afterStop() {
289         synchronized (curLocker) {
290             /*
291              * stop the publisher, but allow time for any Offline message to be
292              * transmitted
293              */
294             dmaapMgr.stopPublisher(props.getOfflinePubWaitMs());
295         }
296     }
297
298     /**
299      * Indicates that the controller is about to be locked. Enters the idle state, as all
300      * it will be doing is forwarding messages.
301      */
302     public void beforeLock() {
303         logger.info("locking manager for topic {}", getTopic());
304
305         synchronized (curLocker) {
306             changeState(new IdleState(this));
307         }
308     }
309
310     /**
311      * Indicates that the controller has been unlocked. Enters the start state, if the
312      * controller is running.
313      */
314     public void afterUnlock() {
315         logger.info("unlocking manager for topic {}", getTopic());
316
317         synchronized (curLocker) {
318             if (controller.isAlive() && current instanceof IdleState && scheduler != null) {
319                 changeState(new StartState(this));
320             }
321         }
322     }
323
324     /**
325      * Changes the finite state machine to a new state, provided the new state is not
326      * {@code null}.
327      * 
328      * @param newState new state, or {@code null} if to remain unchanged
329      */
330     private void changeState(State newState) {
331         if (newState != null) {
332             current.cancelTimers();
333             current = newState;
334
335             // set the filter before starting the state
336             setFilter(newState.getFilter());
337             newState.start();
338         }
339     }
340
341     /**
342      * Sets the server-side filter for the internal topic.
343      * 
344      * @param filter new filter to be used
345      */
346     private void setFilter(Map<String, Object> filter) {
347         try {
348             dmaapMgr.setFilter(serializer.encodeFilter(filter));
349
350         } catch (JsonParseException e) {
351             logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
352
353         } catch (PoolingFeatureException e) {
354             logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
355         }
356     }
357
358     @Override
359     public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
360         // wrap the task in a TimerAction and schedule it
361         ScheduledFuture<?> fut = scheduler.schedule(new TimerAction(task), delayMs, TimeUnit.MILLISECONDS);
362
363         // wrap the future in a "CancellableScheduledTask"
364         return () -> fut.cancel(false);
365     }
366
367     @Override
368     public CancellableScheduledTask scheduleWithFixedDelay(long initialDelayMs, long delayMs, StateTimerTask task) {
369         // wrap the task in a TimerAction and schedule it
370         ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new TimerAction(task), initialDelayMs, delayMs,
371                         TimeUnit.MILLISECONDS);
372
373         // wrap the future in a "CancellableScheduledTask"
374         return () -> fut.cancel(false);
375     }
376
377     @Override
378     public void publishAdmin(Message msg) {
379         publish(Message.ADMIN, msg);
380     }
381
382     @Override
383     public void publish(String channel, Message msg) {
384         logger.info("publish {} to {} on topic {}", msg.getClass().getSimpleName(), channel, getTopic());
385
386         msg.setChannel(channel);
387
388         try {
389             // ensure it's valid before we send it
390             msg.checkValidity();
391
392             String txt = serializer.encodeMsg(msg);
393             dmaapMgr.publish(txt);
394
395         } catch (JsonParseException e) {
396             logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
397
398         } catch (PoolingFeatureException e) {
399             logger.error("failed to publish message for topic {} channel {}", topic, channel, e);
400         }
401     }
402
403     /**
404      * Handles an event from the internal topic.
405      * 
406      * @param commType comm infrastructure
407      * @param topic2 topic
408      * @param event event
409      */
410     @Override
411     public void onTopicEvent(CommInfrastructure commType, String topic2, String event) {
412
413         if (event == null) {
414             logger.error("null event on topic {}", topic);
415             return;
416         }
417
418         synchronized (curLocker) {
419             // it's on the internal topic
420             handleInternal(event);
421         }
422     }
423
424     /**
425      * Called by the PolicyController before it offers the event to the DroolsController.
426      * If the controller is locked, then it isn't processing events. However, they still
427      * need to be forwarded, thus in that case, they are decoded and forwarded.
428      * 
429      * <p>On the other hand, if the controller is not locked, then we just return immediately
430      * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
431      * it instead, as it already has the decoded message.
432      * 
433      * @param protocol protocol
434      * @param topic2 topic
435      * @param event event
436      * @return {@code true} if the event was handled by the manager, {@code false} if it
437      *         must still be handled by the invoker
438      */
439     public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
440
441         if (!controller.isLocked() || !intercept) {
442             // we should NOT intercept this message - let the invoker handle it
443             return false;
444         }
445
446         return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
447     }
448
449     /**
450      * Called by the DroolsController before it inserts the event into the rule engine.
451      * 
452      * @param protocol protocol
453      * @param topic2 topic
454      * @param event original event text, as received from the Bus
455      * @param event2 event, as an object
456      * @return {@code true} if the event was handled by the manager, {@code false} if it
457      *         must still be handled by the invoker
458      */
459     public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
460
461         if (!intercept) {
462             // we should NOT intercept this message - let the invoker handle it
463             return false;
464         }
465
466         return handleExternal(protocol, topic2, event, extractRequestId(event2));
467     }
468
469     /**
470      * Handles an event from an external topic.
471      * 
472      * @param protocol protocol
473      * @param topic2 topic
474      * @param event event
475      * @param reqid request id extracted from the event, or {@code null} if it couldn't be
476      *        extracted
477      * @return {@code true} if the event was handled by the manager, {@code false} if it
478      *         must still be handled by the invoker
479      */
480     private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
481         if (reqid == null) {
482             // no request id - let the invoker handle it
483             return false;
484         }
485
486         if (reqid.isEmpty()) {
487             logger.warn("handle locally due to empty request id for topic {}", topic2);
488             // no request id - let the invoker handle it
489             return false;
490         }
491
492         Forward ev = makeForward(protocol, topic2, event, reqid);
493         if (ev == null) {
494             // invalid args - consume the message
495             logger.warn("constructed an invalid Forward message on topic {}", getTopic());
496             return true;
497         }
498
499         synchronized (curLocker) {
500             return handleExternal(ev);
501         }
502     }
503
504     /**
505      * Handles an event from an external topic.
506      * 
507      * @param event event
508      * @return {@code true} if the event was handled, {@code false} if the invoker should
509      *         handle it
510      */
511     private boolean handleExternal(Forward event) {
512         if (assignments == null) {
513             // no bucket assignments yet - handle locally
514             logger.info("handle event locally for request {}", event.getRequestId());
515
516             // we did NOT consume the event
517             return false;
518
519         } else {
520             return handleEvent(event);
521         }
522     }
523
524     /**
525      * Handles a {@link Forward} event, possibly forwarding it again.
526      * 
527      * @param event event
528      * @return {@code true} if the event was handled, {@code false} if the invoker should
529      *         handle it
530      */
531     private boolean handleEvent(Forward event) {
532         String target = assignments.getAssignedHost(event.getRequestId().hashCode());
533
534         if (target == null) {
535             /*
536              * This bucket has no assignment - just discard the event
537              */
538             logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
539             return true;
540         }
541
542         if (target.equals(host)) {
543             /*
544              * Message belongs to this host - allow the controller to handle it.
545              */
546             logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
547             return false;
548         }
549
550         // forward to a different host, if hop count has been exhausted
551         if (event.getNumHops() > MAX_HOPS) {
552             logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
553                             topic);
554
555         } else {
556             logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
557             event.bumpNumHops();
558             publish(target, event);
559         }
560
561         // either way, consume the event
562         return true;
563     }
564
565     /**
566      * Extract the request id from an event object.
567      * 
568      * @param event the event object, or {@code null}
569      * @return the event's request id, or {@code null} if it can't be extracted
570      */
571     private String extractRequestId(Object event) {
572         if (event == null) {
573             return null;
574         }
575
576         Object reqid = extractors.extract(event);
577         return (reqid != null ? reqid.toString() : null);
578     }
579
580     /**
581      * Decodes an event from a String into an event Object.
582      * 
583      * @param topic2 topic
584      * @param event event
585      * @return the decoded event object, or {@code null} if it can't be decoded
586      */
587     private Object decodeEvent(String topic2, String event) {
588         DroolsController drools = controller.getDrools();
589
590         // check if this topic has a decoder
591
592         if (!canDecodeEvent(drools, topic2)) {
593
594             logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", drools, topic2, drools.getGroupId(),
595                             drools.getArtifactId());
596             return null;
597         }
598
599         // decode
600
601         try {
602             return decodeEventWrapper(drools, topic2, event);
603
604         } catch (UnsupportedOperationException | IllegalStateException | IllegalArgumentException e) {
605             logger.debug("{}: DECODE FAILED: {} <- {} because of {}", drools, topic2, event, e.getMessage(), e);
606             return null;
607         }
608     }
609
610     /**
611      * Makes a {@link Forward}, and validates its contents.
612      * 
613      * @param protocol protocol
614      * @param topic2 topic
615      * @param event event
616      * @param reqid request id
617      * @return a new message, or {@code null} if the message was invalid
618      */
619     private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
620         try {
621             Forward ev = new Forward(host, protocol, topic2, event, reqid);
622
623             // required for the validity check
624             ev.setChannel(host);
625
626             ev.checkValidity();
627
628             return ev;
629
630         } catch (PoolingFeatureException e) {
631             logger.error("invalid message for topic {}", topic2, e);
632             return null;
633         }
634     }
635
636     @Override
637     public void handle(Forward event) {
638         synchronized (curLocker) {
639             if (!handleExternal(event)) {
640                 // this host should handle it - inject it
641                 inject(event);
642             }
643         }
644     }
645
646     /**
647      * Injects an event into the controller.
648      * 
649      * @param event event
650      */
651     private void inject(Forward event) {
652         logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
653
654         try {
655             intercept = false;
656             listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
657
658         } finally {
659             intercept = true;
660         }
661     }
662
663     /**
664      * Handles an event from the internal topic. This uses reflection to identify the
665      * appropriate process() method to invoke, based on the type of Message that was
666      * decoded.
667      * 
668      * @param event the serialized {@link Message} read from the internal topic
669      */
670     private void handleInternal(String event) {
671         Class<?> clazz = null;
672
673         try {
674             Message msg = serializer.decodeMsg(event);
675
676             // get the class BEFORE checking the validity
677             clazz = msg.getClass();
678
679             msg.checkValidity();
680
681             Method meth = current.getClass().getMethod("process", msg.getClass());
682             changeState((State) meth.invoke(current, msg));
683
684         } catch (JsonParseException e) {
685             logger.warn("failed to decode message for topic {}", topic, e);
686
687         } catch (NoSuchMethodException | SecurityException e) {
688             logger.error("no processor for message {} for topic {}", clazz, topic, e);
689
690         } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
691                         | PoolingFeatureException e) {
692             logger.error("failed to process message {} for topic {}", clazz, topic, e);
693         }
694     }
695
696     @Override
697     public void startDistributing(BucketAssignments asgn) {
698         synchronized (curLocker) {
699             int sz = (asgn == null ? 0 : asgn.getAllHosts().size());
700             logger.info("new assignments for {} hosts on topic {}", sz, getTopic());
701             assignments = asgn;
702         }
703     }
704
705     @Override
706     public BucketAssignments getAssignments() {
707         return assignments;
708     }
709
710     @Override
711     public State goStart() {
712         return new StartState(this);
713     }
714
715     @Override
716     public State goQuery() {
717         return new QueryState(this);
718     }
719
720     @Override
721     public State goActive() {
722         activeLatch.countDown();
723         return new ActiveState(this);
724     }
725
726     @Override
727     public State goInactive() {
728         return new InactiveState(this);
729     }
730
731     /**
732      * Action to run a timer task. Only runs the task if the machine is still in the state
733      * that it was in when the timer was created.
734      */
735     private class TimerAction implements Runnable {
736
737         /**
738          * State of the machine when the timer was created.
739          */
740         private State origState;
741
742         /**
743          * Task to be executed.
744          */
745         private StateTimerTask task;
746
747         /**
748          * Constructor.
749          * 
750          * @param task task to execute when this timer runs
751          */
752         public TimerAction(StateTimerTask task) {
753             this.origState = current;
754             this.task = task;
755         }
756
757         @Override
758         public void run() {
759             synchronized (curLocker) {
760                 if (current == origState) {
761                     changeState(task.fire());
762                 }
763             }
764         }
765     }
766     
767     /*
768      * The remaining methods may be overridden by junit tests.
769      */
770
771     /**
772      * Creates object extractors.
773      * 
774      * @param props properties used to configure the extractors
775      * @return a new set of extractors
776      */
777     protected ClassExtractors makeClassExtractors(Properties props) {
778         return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
779                         PoolingProperties.EXTRACTOR_TYPE);
780     }
781
782     /**
783      * Creates a DMaaP manager.
784      * 
785      * @param topic name of the internal DMaaP topic
786      * @return a new DMaaP manager
787      * @throws PoolingFeatureException if an error occurs
788      */
789     protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
790         return new DmaapManager(topic);
791     }
792
793     /**
794      * Creates a scheduled thread pool.
795      * 
796      * @return a new scheduled thread pool
797      */
798     protected ScheduledThreadPoolExecutor makeScheduler() {
799         return new ScheduledThreadPoolExecutor(1);
800     }
801
802     /**
803      * Determines if the event can be decoded.
804      * 
805      * @param drools drools controller
806      * @param topic topic on which the event was received
807      * @return {@code true} if the event can be decoded, {@code false} otherwise
808      */
809     protected boolean canDecodeEvent(DroolsController drools, String topic) {
810         return EventProtocolCoder.manager.isDecodingSupported(drools.getGroupId(), drools.getArtifactId(), topic);
811     }
812
813     /**
814      * Decodes the event.
815      * 
816      * @param drools drools controller
817      * @param topic topic on which the event was received
818      * @param event event text to be decoded
819      * @return the decoded event
820      * @throws IllegalArgumentException illegal argument
821      * @throw UnsupportedOperationException unsupported operation
822      * @throws IllegalStateException illegal state
823      */
824     protected Object decodeEventWrapper(DroolsController drools, String topic, String event) {
825         return EventProtocolCoder.manager.decode(drools.getGroupId(), drools.getArtifactId(), topic, event);
826     }
827 }