# ============LICENSE_START=======================================================
 # feature-pooling-dmaap
 # ================================================================================
-# Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+# Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
 # ================================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at
-# 
+#
 #      http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # Topic used for inter-host communication for a particular controller
 # pooling.<controller-name>.topic=XXX
 
-# These specify how the request id is to be extracted from each type of
-# object that may be presented to a controller from shared topics
-# (i.e., topics where hosts do not all receive a copy of the event)
-extractor.requestId.org.onap.policy.controlloop.VirtualControlLoopEvent=${requestId}
-
-
 # Each controller that is enabled should have its own topic and the
-# corresponding ueb.xxx properties.  However, for now, just assume that
-# the amsterdam-cl and beijing-cl features will not both be enabled
-# at the same time.
-
-pooling.amsterdam.enabled=true
-pooling.amsterdam.topic=${env:POOLING_TOPIC}
-
-pooling.beijing.enabled=true
-pooling.beijing.topic=${env:POOLING_TOPIC}
+# corresponding dmaap.xxx properties.  However, for now, just assume that
+# the usecases features will not both be enabled at the same time.
 
+pooling.usecases.enabled=true
+pooling.usecases.topic=${env:POOLING_TOPIC}
 
 # the list of sources and sinks should be identical
-ueb.source.topics=POOLING_TOPIC
-ueb.sink.topics=POOLING_TOPIC
-
-ueb.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.source.topics.POOLING_TOPIC.apiKey=
-ueb.source.topics.POOLING_TOPIC.apiSecret=
-
-ueb.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
-ueb.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
-ueb.sink.topics.POOLING_TOPIC.apiKey=
-ueb.sink.topics.POOLING_TOPIC.apiSecret=
+dmaap.source.topics=POOLING_TOPIC
+dmaap.sink.topics=POOLING_TOPIC
+
+dmaap.source.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.source.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.source.topics.POOLING_TOPIC.apiKey=
+dmaap.source.topics.POOLING_TOPIC.apiSecret=
+
+dmaap.sink.topics.POOLING_TOPIC.servers=${env:DMAAP_SERVERS}
+dmaap.sink.topics.POOLING_TOPIC.effectiveTopic=${env:POOLING_TOPIC}
+dmaap.sink.topics.POOLING_TOPIC.apiKey=
+dmaap.sink.topics.POOLING_TOPIC.apiSecret=
 
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.policy.drools.pooling;
 
 import java.util.List;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
     /**
      * Topic source whose filter is to be manipulated.
      */
-    private final FilterableTopicSource topicSource;
+    private final TopicSource topicSource;
 
     /**
      * Where to publish messages.
             this.topicSource = findTopicSource();
             this.topicSink = findTopicSink();
 
-            // verify that we can set the filter
-            setFilter(null);
-
         } catch (IllegalArgumentException e) {
             logger.error("failed to attach to topic {}", topic);
             throw new PoolingFeatureException(e);
      * @return the topic source
      * @throws PoolingFeatureException if the source doesn't exist or is not filterable
      */
-    private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
+    private TopicSource findTopicSource() throws PoolingFeatureException {
         for (TopicSource src : getTopicSources()) {
             if (topic.equals(src.getTopic())) {
-                if (src instanceof FilterableTopicSource) {
-                    return (FilterableTopicSource) src;
-
-                } else {
-                    throw new PoolingFeatureException("topic source " + topic + " is not filterable");
-                }
+                return src;
             }
         }
 
         topicSource.unregister(listener);
     }
 
-    /**
-     * Sets the server-side filter to be used by the consumer.
-     *
-     * @param filter the filter string, or {@code null} if no filter is to be used
-     * @throws PoolingFeatureException if the topic is not filterable
-     */
-    public void setFilter(String filter) throws PoolingFeatureException {
-        try {
-            logger.debug("change filter for topic {} to {}", topic, filter);
-            topicSource.setFilter(filter);
-
-        } catch (UnsupportedOperationException e) {
-            throw new PoolingFeatureException("cannot filter topic " + topic, e);
-        }
-    }
-
     /**
      * Publishes a message to the sink.
      *
 
     private final CountDownLatch activeLatch = new CountDownLatch(1);
 
     /**
-     * Arguments passed to beforeOffer(), which are saved for when the beforeInsert() is called
-     * later. As multiple threads can be active within the methods at the same time, we must keep
-     * this in thread local storage.
+     * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is
+     * called later. As multiple threads can be active within the methods at the same
+     * time, we must keep this in thread local storage.
      */
-    private ThreadLocal<OfferArgs> offerArgs = new ThreadLocal<>();
+    private ThreadLocal<String> offerTopics = new ThreadLocal<>();
 
     /**
      * Constructor.
             return false;
         }
 
-        if (mgr.beforeOffer(protocol, topic2, event)) {
+        if (mgr.beforeOffer(topic2, event)) {
             return true;
         }
 
-        offerArgs.set(new OfferArgs(protocol, topic2, event));
+        offerTopics.set(topic2);
         return false;
     }
 
     @Override
     public boolean beforeInsert(DroolsController droolsController, Object fact) {
 
-        OfferArgs args = offerArgs.get();
-        if (args == null) {
+        String topic = offerTopics.get();
+        if (topic == null) {
             logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert");
             return false;
         }
             return false;
         }
 
-        return mgr.beforeInsert(args.protocol, args.topic, args.event, fact);
+        return mgr.beforeInsert(topic, fact);
     }
 
     @Override
             boolean success) {
 
         // clear any stored arguments
-        offerArgs.remove();
+        offerTopics.remove();
 
         return false;
     }
         boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException;
     }
 
-    /**
-     * Arguments captured from beforeOffer().
-     */
-    private static class OfferArgs {
-
-        /**
-         * Protocol of the receiving topic.
-         */
-        private CommInfrastructure protocol;
-
-        /**
-         * Topic on which the event was received.
-         */
-        private String topic;
-
-        /**
-         * The event text that was received on the topic.
-         */
-        private String event;
-
-        /**
-         * Constructor.
-         *
-         * @param protocol protocol
-         * @param topic topic
-         * @param event the actual event data received on the topic
-         */
-        public OfferArgs(CommInfrastructure protocol, String topic, String event) {
-            this.protocol = protocol;
-            this.topic = topic;
-            this.event = event;
-        }
-    }
-
     /*
      * The remaining methods may be overridden by junit tests.
      */
 
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.policy.drools.pooling;
 
 import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
 import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.state.State;
 import org.onap.policy.drools.pooling.state.StateTimerTask;
      */
     void publish(String channel, Message msg);
 
-    /**
-     * Handles a {@link Forward} event that was received from the internal topic.
-     *
-     * @param event event
-     */
-    void handle(Forward event);
-
     /**
      * Schedules a timer to fire after a delay.
      *
 
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 import com.google.gson.JsonParseException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.utils.properties.SpecProperties;
 import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
 import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
 import org.onap.policy.drools.pooling.message.Leader;
 import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Offline;
      */
     private final PolicyController controller;
 
-    /**
-     * Where to offer events that have been forwarded to this host (i.e, the controller).
-     */
-    private final TopicListener listener;
-
     /**
      * Decremented each time the manager enters the Active state. Used by junit tests.
      */
      */
     private final DmaapManager dmaapMgr;
 
-    /**
-     * Used to extract the request id from the decoded message.
-     */
-    private final ClassExtractors extractors;
-
     /**
      * Lock used while updating {@link #current}. In general, public methods must use
      * this, while private methods assume the lock is already held.
      */
     private ScheduledThreadPoolExecutor scheduler = null;
 
-    /**
-     * {@code True} if events offered by the controller should be intercepted,
-     * {@code false} otherwise.
-     */
-    private boolean intercept = true;
-
     /**
      * Constructs the manager, initializing all of the data structures.
      *
         this.activeLatch = activeLatch;
 
         try {
-            this.listener = (TopicListener) controller;
             this.serializer = new Serializer();
             this.topic = props.getPoolingTopic();
-            this.extractors = makeClassExtractors(makeExtractorProps(controller, props.getSource()));
             this.dmaapMgr = makeDmaapManager(props.getPoolingTopic());
             this.current = new IdleState(this);
 
         return props;
     }
 
-    /**
-     * Makes properties for configuring extractors.
-     *
-     * @param controller the controller for which the extractors will be configured
-     * @param source properties from which to get the extractor properties
-     * @return extractor properties
-     */
-    private Properties makeExtractorProps(PolicyController controller, Properties source) {
-        return new SpecProperties(PoolingProperties.PROP_EXTRACTOR_PREFIX, controller.getName(), source);
-    }
-
     /**
      * Indicates that the controller is about to start. Starts the publisher for the
      * internal topic, and creates a thread pool for the timers.
             current.cancelTimers();
             current = newState;
 
-            // set the filter before starting the state
-            setFilter(newState.getFilter());
             newState.start();
         }
     }
 
-    /**
-     * Sets the server-side filter for the internal topic.
-     *
-     * @param filter new filter to be used
-     */
-    private void setFilter(Map<String, Object> filter) {
-        try {
-            dmaapMgr.setFilter(serializer.encodeFilter(filter));
-
-        } catch (JsonParseException e) {
-            logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
-
-        } catch (PoolingFeatureException e) {
-            logger.error("failed to set server-side filter for topic {}, {}", topic, filter, e);
-        }
-    }
-
     @Override
     public CancellableScheduledTask schedule(long delayMs, StateTimerTask task) {
         // wrap the task in a TimerAction and schedule it
      * and let {@link #beforeInsert(Object, String, String, Object) beforeInsert()} handle
      * it instead, as it already has the decoded message.
      *
-     * @param protocol protocol
      * @param topic2 topic
      * @param event event
      * @return {@code true} if the event was handled by the manager, {@code false} if it
      *         must still be handled by the invoker
      */
-    public boolean beforeOffer(CommInfrastructure protocol, String topic2, String event) {
+    public boolean beforeOffer(String topic2, String event) {
 
-        if (!controller.isLocked() || !intercept) {
+        if (!controller.isLocked()) {
             // we should NOT intercept this message - let the invoker handle it
             return false;
         }
 
-        return handleExternal(protocol, topic2, event, extractRequestId(decodeEvent(topic2, event)));
+        return handleExternal(topic2, decodeEvent(topic2, event));
     }
 
     /**
      * Called by the DroolsController before it inserts the event into the rule engine.
      *
-     * @param protocol protocol
      * @param topic2 topic
-     * @param event original event text, as received from the Bus
-     * @param event2 event, as an object
+     * @param event event, as an object
      * @return {@code true} if the event was handled by the manager, {@code false} if it
      *         must still be handled by the invoker
      */
-    public boolean beforeInsert(CommInfrastructure protocol, String topic2, String event, Object event2) {
-
-        if (!intercept) {
-            // we should NOT intercept this message - let the invoker handle it
-            return false;
-        }
-
-        return handleExternal(protocol, topic2, event, extractRequestId(event2));
+    public boolean beforeInsert(String topic2, Object event) {
+        return handleExternal(topic2, event);
     }
 
     /**
      * Handles an event from an external topic.
      *
-     * @param protocol protocol
      * @param topic2 topic
-     * @param event event
-     * @param reqid request id extracted from the event, or {@code null} if it couldn't be
-     *        extracted
+     * @param event event, as an object, or {@code null} if it cannot be decoded
      * @return {@code true} if the event was handled by the manager, {@code false} if it
      *         must still be handled by the invoker
      */
-    private boolean handleExternal(CommInfrastructure protocol, String topic2, String event, String reqid) {
-        if (reqid == null) {
-            // no request id - let the invoker handle it
-            return false;
-        }
-
-        if (reqid.isEmpty()) {
-            logger.warn("handle locally due to empty request id for topic {}", topic2);
-            // no request id - let the invoker handle it
+    private boolean handleExternal(String topic2, Object event) {
+        if (event == null) {
+            // no event - let the invoker handle it
             return false;
         }
 
-        Forward ev = makeForward(protocol, topic2, event, reqid);
-        if (ev == null) {
-            // invalid args - consume the message
-            logger.warn("constructed an invalid Forward message on topic {}", getTopic());
-            return true;
-        }
-
         synchronized (curLocker) {
-            return handleExternal(ev);
+            return handleExternal(topic2, event, event.hashCode());
         }
     }
 
     /**
      * Handles an event from an external topic.
      *
-     * @param event event
+     * @param topic2 topic
+     * @param event event, as an object
+     * @param eventHashCode event's hash code
      * @return {@code true} if the event was handled, {@code false} if the invoker should
      *         handle it
      */
-    private boolean handleExternal(Forward event) {
+    private boolean handleExternal(String topic2, Object event, int eventHashCode) {
         if (assignments == null) {
             // no bucket assignments yet - handle locally
-            logger.info("handle event locally for request {}", event.getRequestId());
+            logger.info("handle event locally for request {}", event);
 
             // we did NOT consume the event
             return false;
 
         } else {
-            return handleEvent(event);
+            return handleEvent(topic2, event, eventHashCode);
         }
     }
 
     /**
      * Handles a {@link Forward} event, possibly forwarding it again.
      *
-     * @param event event
+     * @param topic2 topic
+     * @param event event, as an object
+     * @param eventHashCode event's hash code
      * @return {@code true} if the event was handled, {@code false} if the invoker should
      *         handle it
      */
-    private boolean handleEvent(Forward event) {
-        String target = assignments.getAssignedHost(event.getRequestId().hashCode());
+    private boolean handleEvent(String topic2, Object event, int eventHashCode) {
+        String target = assignments.getAssignedHost(eventHashCode);
 
         if (target == null) {
             /*
              * This bucket has no assignment - just discard the event
              */
-            logger.warn("discarded event for unassigned bucket from topic {}", event.getTopic());
+            logger.warn("discarded event for unassigned bucket from topic {}", topic2);
             return true;
         }
 
             /*
              * Message belongs to this host - allow the controller to handle it.
              */
-            logger.info("handle local event for request {} from topic {}", event.getRequestId(), event.getTopic());
+            logger.info("handle local event for request {} from topic {}", event, topic2);
             return false;
         }
 
-        // forward to a different host, if hop count has been exhausted
-        if (event.getNumHops() > MAX_HOPS) {
-            logger.warn("message discarded - hop count {} exceeded {} for topic {}", event.getNumHops(), MAX_HOPS,
-                            topic);
-
-        } else {
-            logger.info("forward event hop-count={} from topic {}", event.getNumHops(), event.getTopic());
-            event.bumpNumHops();
-            publish(target, event);
-        }
-
-        // either way, consume the event
+        // not our message, consume the event
+        logger.warn("discarded event for host {} from topic {}", target, topic2);
         return true;
     }
 
-    /**
-     * Extract the request id from an event object.
-     *
-     * @param event the event object, or {@code null}
-     * @return the event's request id, or {@code null} if it can't be extracted
-     */
-    private String extractRequestId(Object event) {
-        if (event == null) {
-            return null;
-        }
-
-        Object reqid = extractors.extract(event);
-        return (reqid != null ? reqid.toString() : null);
-    }
-
     /**
      * Decodes an event from a String into an event Object.
      *
         }
     }
 
-    /**
-     * Makes a {@link Forward}, and validates its contents.
-     *
-     * @param protocol protocol
-     * @param topic2 topic
-     * @param event event
-     * @param reqid request id
-     * @return a new message, or {@code null} if the message was invalid
-     */
-    private Forward makeForward(CommInfrastructure protocol, String topic2, String event, String reqid) {
-        try {
-            Forward ev = new Forward(host, protocol, topic2, event, reqid);
-
-            // required for the validity check
-            ev.setChannel(host);
-
-            ev.checkValidity();
-
-            return ev;
-
-        } catch (PoolingFeatureException e) {
-            logger.error("invalid message for topic {}", topic2, e);
-            return null;
-        }
-    }
-
-    @Override
-    public void handle(Forward event) {
-        synchronized (curLocker) {
-            if (!handleExternal(event)) {
-                // this host should handle it - inject it
-                inject(event);
-            }
-        }
-    }
-
-    /**
-     * Injects an event into the controller.
-     *
-     * @param event event
-     */
-    private void inject(Forward event) {
-        logger.info("inject event for request {} from topic {}", event.getRequestId(), event.getTopic());
-
-        try {
-            intercept = false;
-            listener.onTopicEvent(event.getProtocol(), event.getTopic(), event.getPayload());
-
-        } finally {
-            intercept = true;
-        }
-    }
-
     /**
      * Handles an event from the internal topic. This uses reflection to identify the
      * appropriate process() method to invoke, based on the type of Message that was
         }
     }
 
-    /*
-     * The remaining methods may be overridden by junit tests.
-     */
-
-    /**
-     * Creates object extractors.
-     *
-     * @param props properties used to configure the extractors
-     * @return a new set of extractors
-     */
-    protected ClassExtractors makeClassExtractors(Properties props) {
-        return new ClassExtractors(props, PoolingProperties.PROP_EXTRACTOR_PREFIX,
-                        PoolingProperties.EXTRACTOR_TYPE);
-    }
-
     /**
      * Creates a DMaaP manager.
      *
 
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 import com.google.gson.JsonParseException;
 import java.util.HashMap;
 import java.util.Map;
-import org.onap.policy.drools.pooling.message.Forward;
 import org.onap.policy.drools.pooling.message.Heartbeat;
 import org.onap.policy.drools.pooling.message.Identification;
 import org.onap.policy.drools.pooling.message.Leader;
     private static final Map<String, Class<? extends Message>> type2class = new HashMap<>();
 
     static {
-        class2type.put(Forward.class, "forward");
         class2type.put(Heartbeat.class, "heartbeat");
         class2type.put(Identification.class, "identification");
         class2type.put(Leader.class, "leader");
 
     /**
      * Encodes a filter.
-     * 
+     *
      * @param filter filter to be encoded
      * @return the filter, serialized as a JSON string
      */
 
     /**
      * Encodes a message.
-     * 
+     *
      * @param msg message to be encoded
      * @return the message, serialized as a JSON string
      */
 
     /**
      * Decodes a JSON string into a Message.
-     * 
+     *
      * @param msg JSON string representing the message
      * @return the message
      */
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2019-2020 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Extractors for each object class. Properties define how the data is to be
- * extracted for a given class, where the properties are similar to the
- * following:
- *
- * <pre>
- * <code><a.prefix>.<class.name> = ${event.reqid}</code>
- * </pre>
- *
- * <p>For any given field name (e.g., "reqid"), it first looks for a public "getXxx()"
- * method to extract the specified field. If that fails, then it looks for a public field
- * by the given name. If that also fails, and the object is a <i>Map</i> subclass, then it
- * simply uses the "get(field-name)" method to extract the data from the map.
- */
-public class ClassExtractors {
-
-    private static final Logger logger = LoggerFactory.getLogger(ClassExtractors.class);
-
-    /**
-     * Properties that specify how the data is to be extracted from a given
-     * class.
-     */
-    private final Properties properties;
-
-    /**
-     * Property prefix, including a trailing ".".
-     */
-    private final String prefix;
-
-    /**
-     * Type of item to be extracted.
-     */
-    private final String type;
-
-    /**
-     * Maps the class name to its extractor.
-     */
-    private final ConcurrentHashMap<String, Extractor> class2extractor = new ConcurrentHashMap<>();
-
-    /**
-     * Constructor.
-     *
-     * @param props properties that specify how the data is to be extracted from
-     *        a given class
-     * @param prefix property name prefix, prepended before the class name
-     * @param type type of item to be extracted
-     */
-    public ClassExtractors(Properties props, String prefix, String type) {
-        this.properties = props;
-        this.prefix = (prefix.endsWith(".") ? prefix : prefix + ".");
-        this.type = type;
-    }
-
-    /**
-     * Gets the number of extractors in the map.
-     *
-     * @return gets the number of extractors in the map
-     */
-    protected int size() {
-        return class2extractor.size();
-    }
-
-    /**
-     * Extracts the desired data item from an object.
-     *
-     * @param object object from which to extract the data item
-     * @return the extracted item, or {@code null} if it could not be extracted
-     */
-    public Object extract(Object object) {
-        if (object == null) {
-            return null;
-        }
-
-        Extractor ext = getExtractor(object);
-
-        return ext.extract(object);
-    }
-
-    /**
-     * Gets the extractor for the given type of object, creating one if it
-     * doesn't exist yet.
-     *
-     * @param object object whose extracted is desired
-     * @return an extractor for the object
-     */
-    private Extractor getExtractor(Object object) {
-        Class<?> clazz = object.getClass();
-        Extractor ext = class2extractor.get(clazz.getName());
-
-        if (ext == null) {
-            // allocate a new extractor, if another thread doesn't beat us to it
-            ext = class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
-        }
-
-        return ext;
-    }
-
-    /**
-     * Builds an extractor for the class.
-     *
-     * @param clazz class for which the extractor should be built
-     *
-     * @return a new extractor
-     */
-    private Extractor buildExtractor(Class<?> clazz) {
-        String value = properties.getProperty(prefix + clazz.getName(), null);
-        if (value != null) {
-            // property has config info for this class - build the extractor
-            return buildExtractor(clazz, value);
-        }
-
-        /*
-         * Get the extractor, if any, for the super class or interfaces, but
-         * don't add one if it doesn't exist
-         */
-        Extractor ext = getClassExtractor(clazz, false);
-        if (ext != null) {
-            return ext;
-        }
-
-        /*
-         * No extractor defined for for this class or its super class - we
-         * cannot extract data items from objects of this type, so just
-         * allocated a null extractor.
-         */
-        logger.warn("missing property {}{}", prefix, clazz.getName());
-        return new NullExtractor();
-    }
-
-    /**
-     * Builds an extractor for the class, based on the config value extracted
-     * from the corresponding property.
-     *
-     * @param clazz class for which the extractor should be built
-     * @param value config value (e.g., "${event.request.id}"
-     * @return a new extractor
-     */
-    private Extractor buildExtractor(Class<?> clazz, String value) {
-        if (!value.startsWith("${")) {
-            logger.warn("property value for {}{} does not start with {}", prefix, clazz.getName(), "'${'");
-            return new NullExtractor();
-        }
-
-        if (!value.endsWith("}")) {
-            logger.warn("property value for {}{} does not end with '}'", prefix, clazz.getName());
-            return new NullExtractor();
-        }
-
-        // get the part in the middle
-        String val = value.substring(2, value.length() - 1);
-        if (val.startsWith(".")) {
-            logger.warn("property value for {}{} begins with '.'", prefix, clazz.getName());
-            return new NullExtractor();
-        }
-
-        if (val.endsWith(".")) {
-            logger.warn("property value for {}{} ends with '.'", prefix, clazz.getName());
-            return new NullExtractor();
-        }
-
-        // everything's valid - create the extractor
-        try {
-            ComponetizedExtractor ext = new ComponetizedExtractor(clazz, val.split("[.]"));
-
-            /*
-             * If there's only one extractor, then just return it, otherwise
-             * return the whole extractor.
-             */
-            return (ext.extractors.length == 1 ? ext.extractors[0] : ext);
-
-        } catch (ExtractorException e) {
-            logger.warn("cannot build extractor for {}", clazz.getName(), e);
-            return new NullExtractor();
-        }
-    }
-
-    /**
-     * Gets the extractor for a class, examining all super classes and
-     * interfaces.
-     *
-     * @param clazz class whose extractor is desired
-     * @param addOk {@code true} if the extractor may be added, provided the
-     *        property is defined, {@code false} otherwise
-     * @return the extractor to be used for the class, or {@code null} if no
-     *         extractor has been defined yet
-     */
-    private Extractor getClassExtractor(Class<?> clazz, boolean addOk) {
-        if (clazz == null) {
-            return null;
-        }
-
-        Extractor ext = null;
-
-        if (addOk) {
-            String val = properties.getProperty(prefix + clazz.getName(), null);
-
-            if (val != null) {
-                /*
-                 * A property is defined for this class, so create the extractor
-                 * for it.
-                 */
-                return class2extractor.computeIfAbsent(clazz.getName(), xxx -> buildExtractor(clazz));
-            }
-        }
-
-        // see if the superclass has an extractor
-        if ((ext = getClassExtractor(clazz.getSuperclass(), true)) != null) {
-            return ext;
-        }
-
-        // check the interfaces, too
-        for (Class<?> clz : clazz.getInterfaces()) {
-            if ((ext = getClassExtractor(clz, true)) != null) {
-                break;
-            }
-        }
-
-        return ext;
-    }
-
-    /**
-     * Extractor that always returns {@code null}. Used when no extractor could
-     * be built for a given object type.
-     */
-    private class NullExtractor implements Extractor {
-
-        @Override
-        public Object extract(Object object) {
-            logger.info("cannot extract {} from {}", type, object.getClass());
-            return null;
-        }
-    }
-
-    /**
-     * Component-ized extractor. Extracts an object that is referenced
-     * hierarchically, where each name identifies a particular component within
-     * the hierarchy. Supports retrieval from {@link Map} objects, as well as
-     * via getXxx() methods, or by direct field retrieval.
-     *
-     * <p>Note: this will <i>not</i> work if POJOs are contained within a Map.
-     */
-    private class ComponetizedExtractor implements Extractor {
-
-        /**
-         * Extractor for each component.
-         */
-        private final Extractor[] extractors;
-
-        /**
-         * Constructor.
-         *
-         * @param clazz the class associated with the object at the root of the
-         *        hierarchy
-         * @param names name associated with each component
-         * @throws ExtractorException extractor exception
-         */
-        public ComponetizedExtractor(Class<?> clazz, String[] names) throws ExtractorException {
-            this.extractors = new Extractor[names.length];
-
-            Class<?> clz = clazz;
-
-            for (int x = 0; x < names.length; ++x) {
-                String comp = names[x];
-
-                Pair<Extractor, Class<?>> pair = buildExtractor(clz, comp);
-
-                extractors[x] = pair.getLeft();
-                clz = pair.getRight();
-            }
-        }
-
-        /**
-         * Builds an extractor for the given component of an object.
-         *
-         * @param clazz type of object from which the component will be
-         *        extracted
-         * @param comp name of the component to extract
-         * @return a pair containing the extractor and the extracted object's
-         *         type
-         * @throws ExtractorException extrator exception
-         */
-        private Pair<Extractor, Class<?>> buildExtractor(Class<?> clazz, String comp) throws ExtractorException {
-
-            Pair<Extractor, Class<?>> pair = getMethodExtractor(clazz, comp);
-
-            if (pair == null) {
-                pair = getFieldExtractor(clazz, comp);
-            }
-
-            if (pair == null) {
-                pair = getMapExtractor(clazz, comp);
-            }
-
-
-            // didn't find an extractor
-            if (pair == null) {
-                throw new ExtractorException("class " + clazz + " contains no element " + comp);
-            }
-
-            return pair;
-        }
-
-        @Override
-        public Object extract(Object object) {
-            Object obj = object;
-
-            for (Extractor ext : extractors) {
-                if (obj == null) {
-                    break;
-                }
-
-                obj = ext.extract(obj);
-            }
-
-            return obj;
-        }
-
-        /**
-         * Gets an extractor that invokes a getXxx() method to retrieve the
-         * object.
-         *
-         * @param clazz container's class
-         * @param name name of the property to be retrieved
-         * @return a new extractor, or {@code null} if the class does not
-         *         contain the corresponding getXxx() method
-         * @throws ExtractorException if the getXxx() method is inaccessible
-         */
-        private Pair<Extractor, Class<?>> getMethodExtractor(Class<?> clazz, String name) throws ExtractorException {
-            Method meth;
-
-            String nm = "get" + StringUtils.capitalize(name);
-
-            try {
-                meth = clazz.getMethod(nm);
-
-                Class<?> retType = meth.getReturnType();
-                if (retType == void.class) {
-                    // it's a void method, thus it won't return an object
-                    return null;
-                }
-
-                return Pair.of(new MethodExtractor(meth), retType);
-
-            } catch (NoSuchMethodException expected) {
-                // no getXxx() method, maybe there's a field by this name
-                logger.debug("no method {} in {}", nm, clazz.getName(), expected);
-                return null;
-
-            } catch (SecurityException e) {
-                throw new ExtractorException("inaccessible method " + clazz + "." + nm, e);
-            }
-        }
-
-        /**
-         * Gets an extractor for a field within the object.
-         *
-         * @param clazz container's class
-         * @param name name of the field whose value is to be extracted
-         * @return a new extractor, or {@code null} if the class does not
-         *         contain the given field
-         * @throws ExtractorException if the field is inaccessible
-         */
-        private Pair<Extractor, Class<?>> getFieldExtractor(Class<?> clazz, String name) throws ExtractorException {
-
-            Field field = getClassField(clazz, name);
-            if (field == null) {
-                return null;
-            }
-
-            return Pair.of(new FieldExtractor(field), field.getType());
-        }
-
-        /**
-         * Gets an extractor for an item within a Map object.
-         *
-         * @param clazz container's class
-         * @param key item key within the map
-         * @return a new extractor, or {@code null} if the class is not a Map
-         *         subclass
-         */
-        private Pair<Extractor, Class<?>> getMapExtractor(Class<?> clazz, String key) {
-
-            if (!Map.class.isAssignableFrom(clazz)) {
-                return null;
-            }
-
-            /*
-             * Don't know the value's actual type, so we'll assume it's a Map
-             * for now. Things should still work OK, as this is only used to
-             * direct the constructor on what type of extractor to create next.
-             * If the object turns out not to be a map, then the MapExtractor
-             * for the next component will just return null.
-             */
-            return Pair.of(new MapExtractor(key), Map.class);
-        }
-
-        /**
-         * Gets field within a class, examining all super classes and
-         * interfaces.
-         *
-         * @param clazz class whose field is desired
-         * @param name name of the desired field
-         * @return the field within the class, or {@code null} if the field does
-         *         not exist
-         * @throws ExtractorException if the field is inaccessible
-         */
-        private Field getClassField(Class<?> clazz, String name) throws ExtractorException {
-            if (clazz == null) {
-                return null;
-            }
-
-            try {
-                return clazz.getField(name);
-
-            } catch (NoSuchFieldException expected) {
-                // no field by this name - try super class & interfaces
-                logger.debug("no field {} in {}", name, clazz.getName(), expected);
-                return null;
-
-            } catch (SecurityException e) {
-                throw new ExtractorException("inaccessible field " + clazz + "." + name, e);
-            }
-        }
-    }
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to extract an object contained within another object.
- */
-@FunctionalInterface
-public interface Extractor {
-
-    /**
-     * Extracts an object contained within another object.
-     *
-     * @param object object from which to extract the contained object
-     * @return the extracted value, or {@code null} if it cannot be extracted
-     */
-    Object extract(Object object);
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Exception generated by extractors.
- */
-public class ExtractorException extends Exception {
-    private static final long serialVersionUID = 1L;
-
-    public ExtractorException() {
-        super();
-    }
-
-    public ExtractorException(String message) {
-        super(message);
-    }
-
-    public ExtractorException(Throwable cause) {
-        super(cause);
-    }
-
-    public ExtractorException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public ExtractorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
-    }
-
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in one of the container's fields.
- */
-public class FieldExtractor implements Extractor {
-
-    private static final Logger logger = LoggerFactory.getLogger(FieldExtractor.class);
-
-    /**
-     * Field containing the object.
-     */
-    private final Field field;
-
-    /**
-     * Constructor.
-     * 
-     * @param field field containing the object
-     */
-    public FieldExtractor(Field field) {
-        this.field = field;
-    }
-
-    @Override
-    public Object extract(Object object) {
-        try {
-            return field.get(object);
-
-        } catch (IllegalAccessException | IllegalArgumentException e) {
-            logger.warn("cannot get {} from {}", field.getName(), object.getClass(), e);
-            return null;
-        }
-    }
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object stored in a map.
- */
-public class MapExtractor implements Extractor {
-
-    private static final Logger logger = LoggerFactory.getLogger(MapExtractor.class);
-
-    /**
-     * Key to the item to extract from the map.
-     */
-    private final String key;
-
-    /**
-     * Constructor.
-     * 
-     * @param key key to the item to extract from the map
-     */
-    public MapExtractor(String key) {
-        this.key = key;
-    }
-
-    @Override
-    public Object extract(Object object) {
-        
-        if (object instanceof Map) {
-            Map<?, ?> map = (Map<?, ?>) object;
-
-            return map.get(key);
-
-        } else {
-            logger.warn("expecting a map, instead of {}", object.getClass());
-            return null;
-        }
-    }
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to extract an object by invoking a method on the container.
- */
-public class MethodExtractor implements Extractor {
-
-    private static final Logger logger = LoggerFactory.getLogger(MethodExtractor.class);
-
-    /**
-     * Method to invoke to extract the contained object.
-     */
-    private final Method method;
-
-    /**
-     * Constructor.
-     * 
-     * @param method method to invoke to extract the contained object
-     */
-    public MethodExtractor(Method method) {
-        this.method = method;
-    }
-
-    @Override
-    public Object extract(Object object) {
-        try {
-            return method.invoke(object);
-
-        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
-            logger.warn("cannot invoke {} on {}", method.getName(), object.getClass(), e);
-            return null;
-        }
-    }
-}
 
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
     /**
      * Constructor.
-     * 
+     *
      * @param hostArray maps a bucket number (i.e., array index) to a host. All values
      *        must be non-null
      */
 
     /**
      * Gets the leader, which is the host with the minimum UUID.
-     * 
+     *
      * @return the assignment leader
      */
     public String getLeader() {
 
     /**
      * Determines if a host has an assignment.
-     * 
+     *
      * @param host host to be checked
      * @return {@code true} if the host has an assignment, {@code false} otherwise
      */
 
     /**
      * Gets all of the hosts that have an assignment.
-     * 
+     *
      * @return all of the hosts that have an assignment
      */
     public Set<String> getAllHosts() {
 
     /**
      * Gets the host assigned to a given bucket.
-     * 
+     *
      * @param hashCode hash code of the item whose assignment is desired
      * @return the assigned host, or {@code null} if the item has no assigned host
      */
             return null;
         }
 
-        return hostArray[(hashCode & MAX_BUCKETS_MASK) % hostArray.length];
+        return hostArray[(Math.abs(hashCode) & MAX_BUCKETS_MASK) % hostArray.length];
     }
 
     /**
      * Gets the number of buckets.
-     * 
+     *
      * @return the number of buckets
      */
     public int size() {
     /**
      * Checks the validity of the assignments, verifying that all buckets have been
      * assigned to a host.
-     * 
+     *
      * @throws PoolingFeatureException if the assignments are invalid
      */
     public void checkValidity() throws PoolingFeatureException {
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.message;
-
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.drools.pooling.PoolingFeatureException;
-
-/**
- * Message to forward an event to another host.
- */
-public class Forward extends Message {
-
-    /**
-     * Number of hops (i.e., number of times it's been forwarded) so far.
-     */
-    private int numHops;
-
-    /**
-     * Time, in milliseconds, at which the message was created.
-     */
-    private long createTimeMs;
-
-    /**
-     * Protocol of the receiving topic.
-     */
-    private CommInfrastructure protocol;
-
-    /**
-     * Topic on which the event was received.
-     */
-    private String topic;
-
-    /**
-     * The event pay load that was received on the topic.
-     */
-    private String payload;
-
-    /**
-     * The request id that was extracted from the event.
-     */
-    private String requestId;
-
-    /**
-     * Constructor.
-     */
-    public Forward() {
-        super();
-    }
-
-    /**
-     * Constructor.
-     * 
-     * @param source host on which the message originated
-     * @param protocol protocol
-     * @param topic topic
-     * @param payload the actual event data received on the topic
-     * @param requestId request id
-     */
-    public Forward(String source, CommInfrastructure protocol, String topic, String payload, String requestId) {
-        super(source);
-
-        this.numHops = 0;
-        this.createTimeMs = System.currentTimeMillis();
-        this.protocol = protocol;
-        this.topic = topic;
-        this.payload = payload;
-        this.requestId = requestId;
-    }
-
-    /**
-     * Increments {@link #numHops}.
-     */
-    public void bumpNumHops() {
-        ++numHops;
-    }
-
-    public int getNumHops() {
-        return numHops;
-    }
-
-    public void setNumHops(int numHops) {
-        this.numHops = numHops;
-    }
-
-    public long getCreateTimeMs() {
-        return createTimeMs;
-    }
-
-    public void setCreateTimeMs(long createTimeMs) {
-        this.createTimeMs = createTimeMs;
-    }
-
-    public CommInfrastructure getProtocol() {
-        return protocol;
-    }
-
-    public void setProtocol(CommInfrastructure protocol) {
-        this.protocol = protocol;
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    public String getPayload() {
-        return payload;
-    }
-
-    public void setPayload(String payload) {
-        this.payload = payload;
-    }
-
-    public String getRequestId() {
-        return requestId;
-    }
-
-    public void setRequestId(String requestId) {
-        this.requestId = requestId;
-    }
-
-    public boolean isExpired(long minCreateTimeMs) {
-        return (createTimeMs < minCreateTimeMs);
-
-    }
-
-    @Override
-    public void checkValidity() throws PoolingFeatureException {
-
-        super.checkValidity();
-
-        if (protocol == null) {
-            throw new PoolingFeatureException("missing message protocol");
-        }
-
-        if (topic == null || topic.isEmpty()) {
-            throw new PoolingFeatureException("missing message topic");
-        }
-
-        /*
-         * Note: an empty pay load is OK, as an empty message could have been
-         * received on the topic.
-         */
-
-        if (payload == null) {
-            throw new PoolingFeatureException("missing message payload");
-        }
-
-        if (requestId == null || requestId.isEmpty()) {
-            throw new PoolingFeatureException("missing message requestId");
-        }
-
-        if (numHops < 0) {
-            throw new PoolingFeatureException("invalid message hop count");
-        }
-    }
-
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.state;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Filter Utilities. These methods create <i>TreeMap</i> objects, because they
- * should only contain a small number of items.
- */
-public class FilterUtils {
-    // message element names
-    public static final String MSG_CHANNEL = "channel";
-    public static final String MSG_TIMESTAMP = "timestampMs";
-
-    // json element names
-    protected static final String JSON_CLASS = "class";
-    protected static final String JSON_FILTERS = "filters";
-    protected static final String JSON_FIELD = "field";
-    protected static final String JSON_VALUE = "value";
-
-    // values to be stuck into the "class" element
-    protected static final String CLASS_OR = "Or";
-    protected static final String CLASS_AND = "And";
-    protected static final String CLASS_EQUALS = "Equals";
-
-    /**
-     * Constructor.
-     */
-    private FilterUtils() {
-        super();
-    }
-
-    /**
-     * Makes a filter that verifies that a field equals a value.
-     *
-     * @param field name of the field to check
-     * @param value desired value
-     * @return a map representing an "equals" filter
-     */
-    public static Map<String, Object> makeEquals(String field, String value) {
-        Map<String, Object> map = new TreeMap<>();
-        map.put(JSON_CLASS, CLASS_EQUALS);
-        map.put(JSON_FIELD, field);
-        map.put(JSON_VALUE, value);
-
-        return map;
-    }
-
-    /**
-     * Makes an "and" filter, where all of the items must be true.
-     *
-     * @param items items to be checked
-     * @return an "and" filter
-     */
-    public static Map<String, Object> makeAnd(@SuppressWarnings("unchecked") Map<String, Object>... items) {
-        Map<String, Object> map = new TreeMap<>();
-        map.put(JSON_CLASS, CLASS_AND);
-        map.put(JSON_FILTERS, items);
-
-        return map;
-    }
-
-    /**
-     * Makes an "or" filter, where at least one of the items must be true.
-     *
-     * @param items items to be checked
-     * @return an "or" filter
-     */
-    public static Map<String, Object> makeOr(@SuppressWarnings("unchecked") Map<String, Object>... items) {
-        Map<String, Object> map = new TreeMap<>();
-        map.put(JSON_CLASS, CLASS_OR);
-        map.put(JSON_FILTERS, items);
-
-        return map;
-    }
-}
 
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
 package org.onap.policy.drools.pooling.state;
 
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_TIMESTAMP;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
-import java.util.Map;
 import org.onap.policy.drools.pooling.PoolingManager;
 import org.onap.policy.drools.pooling.message.Heartbeat;
-import org.onap.policy.drools.pooling.message.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
     /**
      * Constructor.
-     * 
+     *
      * @param mgr pooling manager
      */
     public StartState(PoolingManager mgr) {
 
     /**
      * Get Heart beat time stamp in milliseconds.
-     * 
+     *
      * @return the time stamp inserted into the heart beat message
      */
     public long getHbTimestampMs() {
 
         return null;
     }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public Map<String, Object> getFilter() {
-        // ignore everything except our most recent heart beat message
-        return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeAnd(makeEquals(MSG_CHANNEL, getHost()),
-                        makeEquals(MSG_TIMESTAMP, String.valueOf(hbTimestampMs))));
-
-    }
-
 }
 
 
 package org.onap.policy.drools.pooling.state;
 
-import static org.onap.policy.drools.pooling.state.FilterUtils.MSG_CHANNEL;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import org.onap.policy.drools.pooling.CancellableScheduledTask;
 import org.onap.policy.drools.pooling.PoolingManager;
 import org.onap.policy.drools.pooling.PoolingProperties;
 import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
 import org.onap.policy.drools.pooling.message.Heartbeat;
 import org.onap.policy.drools.pooling.message.Identification;
 import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Offline;
 import org.onap.policy.drools.pooling.message.Query;
 import org.slf4j.Logger;
         this.mgr = mgr;
     }
 
-    /**
-     * Gets the server-side filter to use when polling the DMaaP internal topic. The
-     * default method returns a filter that accepts messages on the admin channel and on
-     * the host's own channel.
-     *
-     * @return the server-side filter to use.
-     */
-    @SuppressWarnings("unchecked")
-    public Map<String, Object> getFilter() {
-        return makeOr(makeEquals(MSG_CHANNEL, Message.ADMIN), makeEquals(MSG_CHANNEL, getHost()));
-    }
-
     /**
      * Cancels the timers added by this state.
      */
         return mgr.goInactive();
     }
 
-    /**
-     * Processes a message. The default method passes it to the manager to handle and
-     * returns {@code null}.
-     *
-     * @param msg message to be processed
-     * @return the new state, or {@code null} if the state is unchanged
-     */
-    public State process(Forward msg) {
-        if (getHost().equals(msg.getChannel())) {
-            logger.info("received Forward message from {} on topic {}", msg.getSource(), getTopic());
-            mgr.handle(msg);
-
-        } else {
-            logger.info("discard Forward message to {} from {} on topic {}", msg.getChannel(), msg.getSource(),
-                            getTopic());
-        }
-
-        return null;
-    }
-
     /**
      * Processes a message. The default method just returns {@code null}.
      *
         mgr.publishAdmin(msg);
     }
 
-    /**
-     * Publishes a message on the specified channel.
-     *
-     * @param channel channel
-     * @param msg message to be published
-     */
-    protected final void publish(String channel, Forward msg) {
-        mgr.publish(channel, msg);
-    }
-
     /**
      * Publishes a message on the specified channel.
      *
 
 import java.util.concurrent.CountDownLatch;
 import org.junit.Before;
 import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
     private static final String EXPECTED = "expected";
     private static final String MY_TOPIC = "my.topic";
     private static final String MSG = "a message";
-    private static final String FILTER = "a filter";
 
     private TopicListener listener;
-    private FilterableTopicSource source;
+    private TopicSource source;
     private boolean gotSources;
     private TopicSink sink;
     private boolean gotSinks;
     @Before
     public void setUp() throws Exception {
         listener = mock(TopicListener.class);
-        source = mock(FilterableTopicSource.class);
+        source = mock(TopicSource.class);
         gotSources = false;
         sink = mock(TopicSink.class);
         gotSinks = false;
         };
     }
 
-    @Test(expected = PoolingFeatureException.class)
-    public void testDmaapManager_CannotFilter() throws PoolingFeatureException {
-        // force an error when setFilter() is called
-        doThrow(new UnsupportedOperationException(EXPECTED)).when(source).setFilter(any());
-
-        new DmaapManagerImpl(MY_TOPIC);
-    }
-
     @Test
     public void testGetTopic() {
         assertEquals(MY_TOPIC, mgr.getTopic());
     }
 
-    @Test(expected = PoolingFeatureException.class)
-    public void testFindTopicSource_NotFilterableTopicSource() throws PoolingFeatureException {
-
-        // matching topic, but doesn't have the correct interface
-        TopicSource source2 = mock(TopicSource.class);
-        when(source2.getTopic()).thenReturn(MY_TOPIC);
-
-        new DmaapManagerImpl(MY_TOPIC) {
-            @Override
-            protected List<TopicSource> getTopicSources() {
-                return Arrays.asList(source2);
-            }
-        };
-    }
-
     @Test(expected = PoolingFeatureException.class)
     public void testFindTopicSource_NotFound() throws PoolingFeatureException {
         // one item in list, and its topic doesn't match
         verify(source).unregister(listener);
     }
 
-    @Test
-    public void testSetFilter() throws PoolingFeatureException {
-        assertThatCode(() -> mgr.setFilter(FILTER)).doesNotThrowAnyException();
-    }
-
-    @Test(expected = PoolingFeatureException.class)
-    public void testSetFilter_Exception() throws PoolingFeatureException {
-        // force an error when setFilter() is called
-        doThrow(new UnsupportedOperationException(EXPECTED)).when(source).setFilter(any());
-
-        mgr.setFilter(FILTER);
-    }
-
     @Test
     public void testPublish() throws PoolingFeatureException {
         // cannot publish before starting
 
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2020 Nordix Foundation
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import lombok.Getter;
 import org.apache.commons.lang3.tuple.Pair;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
 import org.onap.policy.common.endpoints.event.comm.Topic;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.common.endpoints.event.comm.TopicSink;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
 import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.system.PolicyController;
 import org.onap.policy.drools.system.PolicyEngine;
 import org.slf4j.Logger;
          * Maps a channel to its queue. Does <i>not</i> include the "admin" channel.
          */
         private final ConcurrentMap<String, BlockingQueue<String>> channel2queue = new ConcurrentHashMap<>(7);
-        /**
-         * Queue for the external "DMaaP" topic.
-         */
-        private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
         /**
          * Counts the number of decode errors.
          */
         }
 
         /**
-         * Offers an event to the external topic.
+         * Offers an event to the external topic. As each host needs a copy, it is posted
+         * to each Host's queue.
          *
          * @param event event
          */
 
         public void offerExternal(String event) {
-            externalTopic.offer(event);
+            for (Host host : hosts) {
+                host.getExternalTopic().offer(event);
+            }
         }
 
         /**
             channel2queue.values().forEach(queue -> queue.offer(message));
         }
 
-        /**
-         * Offers amessage to an internal channel.
-         *
-         * @param channel channel
-         * @param message message
-         */
-
-        public void offerInternal(String channel, String message) {
-            BlockingQueue<String> queue = channel2queue.get(channel);
-            if (queue != null) {
-                queue.offer(message);
-            }
-        }
-
         /**
          * Associates a controller with its drools controller.
          *
             return drools2policy.get(droolsController);
         }
 
-        /**
-         * Constructor.
-         *
-         * @return queue for the external topic
-         */
-
-        public BlockingQueue<String> getExternalTopic() {
-            return externalTopic;
-        }
-
         /**
          * Get decode errors.
          *
 
         private final BlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
 
+        /**
+         * Queue for the external "DMaaP" topic.
+         */
+        @Getter
+        private final BlockingQueue<String> externalTopic = new LinkedBlockingQueue<String>();
+
         /**
          * Source that reads from the external topic and posts to the listener.
          */
             doAnswer(new MyExternalTopicListener(context, this)).when(controller).onTopicEvent(any(), any(), any());
             context.addController(controller, drools);
             // arrange to read from the external topic
-            externalSource = new TopicSourceImpl(context, false);
+            externalSource = new TopicSourceImpl(EXTERNAL_TOPIC, externalTopic);
             feature = new PoolingFeatureImpl(context);
         }
 
 
     private static class TopicSinkImpl extends TopicImpl implements TopicSink {
         private final Context context;
-        /**
-         * Used to decode the messages so that the channel can be extracted.
-         */
-        private final Serializer serializer = new Serializer();
 
         /**
          * Constructor.
                 return false;
             }
             try {
-                Message msg = serializer.decodeMsg(message);
-                String channel = msg.getChannel();
-                if (Message.ADMIN.equals(channel)) {
-                    // add to every queue
-                    context.offerInternal(message);
-                } else {
-                    // add to a specific queue
-                    context.offerInternal(channel, message);
-                }
+                context.offerInternal(message);
                 return true;
             } catch (JsonParseException e) {
                 logger.warn("could not decode message: {}", message);
      * Source implementation that reads from a queue associated with a topic.
      */
 
-    private static class TopicSourceImpl extends TopicImpl implements FilterableTopicSource {
+    private static class TopicSourceImpl extends TopicImpl implements TopicSource {
 
         private final String topic;
         /**
         /**
          * Constructor.
          *
-         * @param context context
-         * @param internal {@code true} if to read from the internal topic, {@code false}
-         *        to read from the external topic
+         * @param type topic type
+         * @param queue topic from which to read
          */
 
-        public TopicSourceImpl(Context context, boolean internal) {
-            if (internal) {
-                this.topic = INTERNAL_TOPIC;
-                this.queue = context.getCurrentHost().getInternalQueue();
-            } else {
-                this.topic = EXTERNAL_TOPIC;
-                this.queue = context.getExternalTopic();
-            }
-        }
-
-        @Override
-        public void setFilter(String filter) {
-            logger.info("topic filter set to: {}", filter);
+        public TopicSourceImpl(String type, BlockingQueue<String> queue) {
+            this.topic = type;
+            this.queue = queue;
         }
 
         @Override
 
         @Override
         protected List<TopicSource> getTopicSources() {
-            return Arrays.asList(new TopicSourceImpl(currentContext.get(), true));
+            return Arrays.asList(new TopicSourceImpl(INTERNAL_TOPIC,
+                            currentContext.get().getCurrentHost().getInternalQueue()));
         }
 
         @Override
 
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2020 Nordix Foundation
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
     @Test
     public void testBeforeOffer() {
         assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
-        verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC1, EVENT1);
+        verify(mgr1).beforeOffer(TOPIC1, EVENT1);
 
         // ensure that the args were captured
         pool.beforeInsert(drools1, OBJECT1);
-        verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC1, EVENT1, OBJECT1);
+        verify(mgr1).beforeInsert(TOPIC1, OBJECT1);
 
 
         // ensure it's still in the map by re-invoking
         assertFalse(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
-        verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC2, EVENT2);
+        verify(mgr1).beforeOffer(TOPIC2, EVENT2);
 
         // ensure that the new args were captured
         pool.beforeInsert(drools1, OBJECT2);
-        verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC2, EVENT2, OBJECT2);
+        verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
 
 
         assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
     public void testBeforeOffer_MgrTrue() {
 
         // manager will return true
-        when(mgr1.beforeOffer(any(), any(), any())).thenReturn(true);
+        when(mgr1.beforeOffer(any(), any())).thenReturn(true);
 
         assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1));
-        verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC1, EVENT1);
+        verify(mgr1).beforeOffer(TOPIC1, EVENT1);
 
         // ensure it's still in the map by re-invoking
         assertTrue(pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2));
-        verify(mgr1).beforeOffer(CommInfrastructure.UEB, TOPIC2, EVENT2);
+        verify(mgr1).beforeOffer(TOPIC2, EVENT2);
 
         assertFalse(pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC1, EVENT1));
     }
     public void testBeforeInsert() {
         pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
         assertFalse(pool.beforeInsert(drools1, OBJECT1));
-        verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC1, EVENT1, OBJECT1);
+        verify(mgr1).beforeInsert(TOPIC1, OBJECT1);
 
         // ensure it's still in the map by re-invoking
         pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2);
         assertFalse(pool.beforeInsert(drools1, OBJECT2));
-        verify(mgr1).beforeInsert(CommInfrastructure.UEB, TOPIC2, EVENT2, OBJECT2);
+        verify(mgr1).beforeInsert(TOPIC2, OBJECT2);
 
         pool.beforeOffer(controllerDisabled, CommInfrastructure.UEB, TOPIC2, EVENT2);
         assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
 
         // call beforeInsert without beforeOffer
         assertFalse(pool.beforeInsert(drools1, OBJECT1));
-        verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+        verify(mgr1, never()).beforeInsert(any(), any());
 
         assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
-        verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+        verify(mgr1, never()).beforeInsert(any(), any());
     }
 
     @Test
 
         pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
         assertFalse(pool.beforeInsert(drools1, OBJECT1));
-        verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+        verify(mgr1, never()).beforeInsert(any(), any());
     }
 
     @Test
 
         pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
         assertFalse(pool.beforeInsert(drools1, OBJECT1));
-        verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+        verify(mgr1, never()).beforeInsert(any(), any());
     }
 
     @Test
 
         pool.beforeOffer(controller1, CommInfrastructure.UEB, TOPIC1, EVENT1);
         assertFalse(pool.beforeInsert(drools1, OBJECT1));
-        verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+        verify(mgr1, never()).beforeInsert(any(), any());
     }
 
     @Test
         assertFalse(pool.afterOffer(controller1, CommInfrastructure.UEB, TOPIC2, EVENT2, true));
 
         assertFalse(pool.beforeInsert(drools1, OBJECT1));
-        verify(mgr1, never()).beforeInsert(any(), any(), any(), any());
+        verify(mgr1, never()).beforeInsert(any(), any());
 
 
         assertFalse(pool.beforeInsert(droolsDisabled, OBJECT1));
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.contains;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 import org.onap.policy.common.endpoints.event.comm.TopicListener;
 import org.onap.policy.drools.controller.DroolsController;
-import org.onap.policy.drools.pooling.extractor.ClassExtractors;
 import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
 import org.onap.policy.drools.pooling.message.Heartbeat;
 import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Offline;
     private static final String THE_EVENT = "the event";
 
     private static final Object DECODED_EVENT = new Object();
-    private static final String REQUEST_ID = "my.request.id";
 
     /**
      * Number of dmaap.publish() invocations that should be issued when the manager is
 
     private PoolingProperties poolProps;
     private ListeningController controller;
-    private ClassExtractors extractors;
     private DmaapManager dmaap;
     private boolean gotDmaap;
     private ScheduledThreadPoolExecutor sched;
         ser = new Serializer();
         active = new CountDownLatch(1);
 
-        extractors = mock(ClassExtractors.class);
         dmaap = mock(DmaapManager.class);
         gotDmaap = false;
         controller = mock(ListeningController.class);
         schedCount = 0;
         drools = mock(DroolsController.class);
 
-        when(extractors.extract(DECODED_EVENT)).thenReturn(REQUEST_ID);
-
         when(controller.getName()).thenReturn(MY_CONTROLLER);
         when(controller.getDrools()).thenReturn(drools);
         when(controller.isAlive()).thenReturn(true);
         assertEquals(mgr.getHost(), st.getHost());
     }
 
-    @Test
-    public void testPoolingManagerImpl_ClassEx() {
-        /*
-         * this controller does not implement TopicListener, which should cause a
-         * ClassCastException
-         */
-        PolicyController ctlr = mock(PolicyController.class);
-
-        assertThatThrownBy(() -> new PoolingManagerTest(MY_HOST, ctlr, poolProps, active))
-                        .isInstanceOf(PoolingFeatureRtException.class).hasCauseInstanceOf(ClassCastException.class);
-    }
-
     @Test
     public void testPoolingManagerImpl_PoolEx() throws PoolingFeatureException {
         // throw an exception when we try to create the dmaap manager
         startMgr();
         mgr.startDistributing(makeAssignments(false));
 
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-        verify(dmaap, times(START_PUB + 1)).publish(any());
+        verify(dmaap, times(START_PUB)).publish(any());
 
         mgr.beforeStop();
 
         verify(dmaap).stopConsumer(mgr);
         verify(sched).shutdownNow();
-        verify(dmaap, times(START_PUB + 2)).publish(any());
+        verify(dmaap, times(START_PUB + 1)).publish(any());
         verify(dmaap).publish(contains("offline"));
 
         assertTrue(mgr.getCurrent() instanceof IdleState);
 
         // verify that next message is handled locally
-        mgr.handle(msg);
-        verify(dmaap, times(START_PUB + 2)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
+        verify(dmaap, times(START_PUB + 1)).publish(any());
     }
 
     @Test
         // start should invoke changeState()
         startMgr();
 
-        int ntimes = 0;
-
-        // should have set the filter for the StartState
-        verify(dmaap, times(++ntimes)).setFilter(any());
-
         /*
          * now go offline while it's locked
          */
         lockMgr();
 
-        // should have set the new filter
-        verify(dmaap, times(++ntimes)).setFilter(any());
-
         // should have cancelled the timers
         assertEquals(2, futures.size());
         verify(futures.poll()).cancel(false);
          */
         unlockMgr();
 
-        // should have set the new filter
-        verify(dmaap, times(++ntimes)).setFilter(any());
-
         // new timers should now be active
         assertEquals(2, futures.size());
         verify(futures.poll(), never()).cancel(false);
         verify(futures.poll(), never()).cancel(false);
     }
 
-    @Test
-    public void testSetFilter() throws Exception {
-        // start should cause a filter to be set
-        startMgr();
-
-        verify(dmaap).setFilter(any());
-    }
-
-    @Test
-    public void testSetFilter_DmaapEx() throws Exception {
-
-        // generate an exception
-        doThrow(new PoolingFeatureException()).when(dmaap).setFilter(any());
-
-        // start should invoke setFilter()
-        assertThatCode(() -> startMgr()).doesNotThrowAnyException();
-
-        // no exception, means success
-    }
-
     @Test
     public void testSchedule() throws Exception {
         // must start the scheduler
     }
 
     @Test
-    public void testBeforeOffer_Unlocked_NoIntercept() throws Exception {
+    public void testBeforeOffer_Unlocked() throws Exception {
         startMgr();
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
-    }
-
-    @Test
-    public void testBeforeOffer_Locked_NoIntercept() throws Exception {
-        startMgr();
-
-        lockMgr();
+        // route the message to another host
+        mgr.startDistributing(makeAssignments(false));
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+        assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
-    public void testBeforeOffer_Locked_Intercept() throws Exception {
+    public void testBeforeOffer_Locked() throws Exception {
         startMgr();
         lockMgr();
 
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(true));
-
-        final CountDownLatch latch = catchRecursion(false);
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
+        // route the message to another host
+        mgr.startDistributing(makeAssignments(false));
 
-        // ensure we made it past both beforeXxx() methods
-        assertEquals(0, latch.getCount());
+        assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
-    public void testBeforeInsert_Intercept() throws Exception {
+    public void testBeforeInsert() throws Exception {
         startMgr();
         lockMgr();
 
         // route the message to this host
         mgr.startDistributing(makeAssignments(true));
 
-        final CountDownLatch latch = catchRecursion(true);
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
-        // ensure we made it past both beforeXxx() methods
-        assertEquals(0, latch.getCount());
-    }
-
-    @Test
-    public void testBeforeInsert_NoIntercept() throws Exception {
-        validateUnhandled(CommInfrastructure.UEB);
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
     }
 
     @Test
     public void testHandleExternalCommInfrastructureStringStringString_InvalidMsg() throws Exception {
         startMgr();
 
-        assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
+        assertFalse(mgr.beforeInsert(TOPIC2, "invalid message"));
     }
 
     @Test
     public void testHandleExternalCommInfrastructureStringStringString() throws Exception {
-        validateUnhandled(CommInfrastructure.UEB);
+        validateUnhandled();
     }
 
     @Test
     public void testHandleExternalForward_NoAssignments() throws Exception {
-        validateUnhandled(CommInfrastructure.UEB);
+        validateUnhandled();
     }
 
     @Test
     @Test
     public void testHandleEvent_NullTarget() throws Exception {
         // buckets have null targets
-        validateHandled(new BucketAssignments(new String[] {null, null}), START_PUB);
+        validateDiscarded(new BucketAssignments(new String[] {null, null}));
     }
 
     @Test
     }
 
     @Test
-    public void testHandleEvent_DiffHost_TooManyHops() throws Exception {
-        startMgr();
-
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(false));
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        msg.setNumHops(PoolingManagerImpl.MAX_HOPS + 1);
-        mgr.handle(msg);
-
-        // shouldn't publish
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-    }
-
-    @Test
-    public void testHandleEvent_DiffHost_Forward() throws Exception {
-        validateHandled(makeAssignments(false), START_PUB + 1);
-    }
-
-    @Test
-    public void testExtractRequestId_NullEvent() throws Exception {
-        startMgr();
-
-        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, null));
-    }
-
-    @Test
-    public void testExtractRequestId_NullReqId() throws Exception {
-        validateHandleReqId(null);
-    }
-
-    @Test
-    public void testExtractRequestId() throws Exception {
-        startMgr();
-
+    public void testHandleEvent_DiffHost() throws Exception {
         // route the message to the *OTHER* host
-        mgr.startDistributing(makeAssignments(false));
-
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        validateDiscarded(makeAssignments(false));
     }
 
     @Test
         // create assignments, though they are irrelevant
         mgr.startDistributing(makeAssignments(false));
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+        assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
         // create assignments, though they are irrelevant
         mgr.startDistributing(makeAssignments(false));
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+        assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
         // create assignments, though they are irrelevant
         mgr.startDistributing(makeAssignments(false));
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+        assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
         // create assignments, though they are irrelevant
         mgr.startDistributing(makeAssignments(false));
 
-        assertFalse(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
+        assertFalse(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
         // route to another host
         mgr.startDistributing(makeAssignments(false));
 
-        assertTrue(mgr.beforeOffer(CommInfrastructure.UEB, TOPIC2, THE_EVENT));
-    }
-
-    @Test
-    public void testMakeForward() throws Exception {
-        startMgr();
-
-        // route the message to another host
-        mgr.startDistributing(makeAssignments(false));
-
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
-        verify(dmaap, times(START_PUB + 1)).publish(any());
-    }
-
-    @Test
-    public void testMakeForward_InvalidMsg() throws Exception {
-        startMgr();
-
-        // route the message to another host
-        mgr.startDistributing(makeAssignments(false));
-
-        assertTrue(mgr.beforeInsert(null, TOPIC2, THE_EVENT, DECODED_EVENT));
-
-        // should not have tried to publish a message
-        verify(dmaap, times(START_PUB)).publish(any());
-    }
-
-    @Test
-    public void testHandle_SameHost() throws Exception {
-        startMgr();
-
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(true));
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-    }
-
-    @Test
-    public void testHandle_DiffHost() throws Exception {
-        startMgr();
-
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(false));
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB + 1)).publish(any());
-        verify(controller, never()).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-    }
-
-    @Test
-    public void testInject() throws Exception {
-        startMgr();
-
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(true));
-
-        final CountDownLatch latch = catchRecursion(true);
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
-        // ensure we made it past both beforeXxx() methods
-        assertEquals(0, latch.getCount());
-    }
-
-    @Test
-    public void testInject_Ex() throws Exception {
-        startMgr();
-
-        // route the message to this host
-        mgr.startDistributing(makeAssignments(true));
-
-        // generate RuntimeException when onTopicEvent() is invoked
-        doThrow(new IllegalArgumentException("expected")).when(controller).onTopicEvent(any(), any(), any());
-
-        final CountDownLatch latch = catchRecursion(true);
-
-        Forward msg = new Forward(mgr.getHost(), CommInfrastructure.UEB, TOPIC2, THE_EVENT, REQUEST_ID);
-        mgr.handle(msg);
-
-        verify(dmaap, times(START_PUB)).publish(any());
-        verify(controller).onTopicEvent(CommInfrastructure.UEB, TOPIC2, THE_EVENT);
-
-        // ensure we made it past both beforeXxx() methods
-        assertEquals(0, latch.getCount());
+        assertTrue(mgr.beforeOffer(TOPIC2, THE_EVENT));
     }
 
     @Test
 
         // null assignments should cause message to be processed locally
         mgr.startDistributing(null);
-        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
         verify(dmaap, times(START_PUB)).publish(any());
 
 
-        // route the message to this host
+        // message for this host
         mgr.startDistributing(makeAssignments(true));
-        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-        verify(dmaap, times(START_PUB)).publish(any());
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
 
 
-        // route the message to the other host
+        // message for another host
         mgr.startDistributing(makeAssignments(false));
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-        verify(dmaap, times(START_PUB + 1)).publish(any());
+        assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
     }
 
     @Test
     private void validateHandleReqId(String requestId) throws PoolingFeatureException {
         startMgr();
 
-        when(extractors.extract(any())).thenReturn(requestId);
-
-        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
     }
 
     private void validateNoForward() throws PoolingFeatureException {
         // route the message to this host
         mgr.startDistributing(makeAssignments(true));
 
-        assertFalse(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
 
         verify(dmaap, times(START_PUB)).publish(any());
     }
 
-    private void validateHandled(BucketAssignments assignments, int publishCount) throws PoolingFeatureException {
+    private void validateUnhandled() throws PoolingFeatureException {
         startMgr();
-
-        // route the message to the *OTHER* host
-        mgr.startDistributing(assignments);
-
-        assertTrue(mgr.beforeInsert(CommInfrastructure.UEB, TOPIC2, THE_EVENT, DECODED_EVENT));
-
-        verify(dmaap, times(publishCount)).publish(any());
+        assertFalse(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
     }
 
-    private void validateUnhandled(CommInfrastructure infra) throws PoolingFeatureException {
+    private void validateDiscarded(BucketAssignments bucketAssignments) throws PoolingFeatureException {
         startMgr();
-        assertFalse(mgr.beforeInsert(infra, TOPIC2, THE_EVENT, DECODED_EVENT));
-    }
-
-    /**
-     * Configure the mock controller to act like a real controller, invoking beforeOffer
-     * and then beforeInsert, so we can make sure they pass through. We'll keep count to
-     * ensure we don't get into infinite recursion.
-     *
-     * @param invokeBeforeInsert {@code true} if beforeInsert() should be invoked,
-     *        {@code false} if it should be skipped
-     *
-     * @return a latch that will be counted down if both beforeXxx() methods return false
-     */
-    private CountDownLatch catchRecursion(boolean invokeBeforeInsert) {
-        CountDownLatch recursion = new CountDownLatch(3);
-        CountDownLatch latch = new CountDownLatch(1);
-
-        doAnswer(args -> {
-
-            recursion.countDown();
-            if (recursion.getCount() == 0) {
-                fail("recursive calls to onTopicEvent");
-            }
-
-            int iarg = 0;
-            CommInfrastructure proto = args.getArgument(iarg++);
-            String topic = args.getArgument(iarg++);
-            String event = args.getArgument(iarg++);
-
-            if (mgr.beforeOffer(proto, topic, event)) {
-                return null;
-            }
 
-            if (invokeBeforeInsert && mgr.beforeInsert(proto, topic, event, DECODED_EVENT)) {
-                return null;
-            }
-
-            latch.countDown();
-
-            return null;
-        }).when(controller).onTopicEvent(any(), any(), any());
+        // buckets have null targets
+        mgr.startDistributing(bucketAssignments);
 
-        return latch;
+        assertTrue(mgr.beforeInsert(TOPIC2, DECODED_EVENT));
     }
 
     /**
      * @return a new bucket assignment
      */
     private BucketAssignments makeAssignments(boolean sameHost) {
-        int slot = REQUEST_ID.hashCode() % 2;
+        int slot = DECODED_EVENT.hashCode() % 2;
 
         // slot numbers are 0 and 1 - reverse them if it's for a different host
         if (!sameHost) {
      */
     private void lockMgr() {
         mgr.beforeLock();
+        when(controller.isLocked()).thenReturn(true);
     }
 
     /**
      */
     private void unlockMgr() {
         mgr.afterUnlock();
+        when(controller.isLocked()).thenReturn(false);
     }
 
     /**
             super(host, controller, props, activeLatch);
         }
 
-        @Override
-        protected ClassExtractors makeClassExtractors(Properties props) {
-            return extractors;
-        }
-
         @Override
         protected DmaapManager makeDmaapManager(String topic) throws PoolingFeatureException {
             gotDmaap = true;
 
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
 
 import com.google.gson.JsonParseException;
-import java.util.Map;
-import java.util.TreeMap;
 import org.junit.Test;
 import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Query;
         assertThatCode(() -> new Serializer()).doesNotThrowAnyException();
     }
 
-    @Test
-    @SuppressWarnings("unchecked")
-    public void testEncodeFilter() throws Exception {
-        final Serializer ser = new Serializer();
-
-        /*
-         * Ensure raw maps serialize as expected. Use a TreeMap so the field
-         * order is predictable.
-         */
-        Map<String, Object> top = new TreeMap<>();
-        Map<String, Object> inner = new TreeMap<>();
-        top.put("abc", 20);
-        top.put("def", inner);
-        top.put("ghi", true);
-        inner.put("xyz", 30);
-        assertEquals("{'abc':20,'def':{'xyz':30},'ghi':true}".replace('\'', '"'), ser.encodeFilter(top));
-
-        /*
-         * Ensure we can encode a complicated filter without throwing an
-         * exception
-         */
-        Map<String, Object> complexFilter = makeAnd(makeEquals("fieldC", "valueC"),
-                        makeOr(makeEquals("fieldA", "valueA"), makeEquals("fieldB", "valueB")));
-        String val = ser.encodeFilter(complexFilter);
-        assertFalse(val.isEmpty());
-    }
-
     @Test
     public void testEncodeMsg_testDecodeMsg() throws Exception {
         Serializer ser = new Serializer();
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.TreeMap;
-import java.util.function.Function;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ClassExtractorsTest {
-
-    private static final int NTIMES = 5;
-
-    private static final String MY_TYPE = "theType";
-    private static final String PROP_PREFIX = "extractor." + MY_TYPE + ".";
-
-    private static final String VALUE = "a value";
-    private static final Integer INT_VALUE = 10;
-    private static final Integer INT_VALUE2 = 20;
-
-    private Properties props;
-    private ClassExtractors map;
-
-    /**
-     * Setup.
-     * 
-     */
-    @Before
-    public void setUp() {
-        props = new Properties();
-
-        props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
-        props.setProperty(PROP_PREFIX + WithString.class.getName(), "${strValue}");
-
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-    }
-
-    @Test
-    public void testExtract() {
-        Simple obj = new Simple();
-        assertEquals(INT_VALUE, map.extract(obj));
-
-        // string value
-        assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
-
-        // null object
-        assertNull(map.extract(null));
-
-        // values from two different kinds of objects
-        props = new Properties();
-        props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
-        props.setProperty(PROP_PREFIX + WithString.class.getName(), "${strValue}");
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
-        assertEquals(INT_VALUE, map.extract(new Simple()));
-        assertEquals(VALUE, map.extract(new Sub()));
-
-        // values from a superclass method, but property defined for subclass
-        props = new Properties();
-        props.setProperty(PROP_PREFIX + Sub.class.getName(), "${strValue}");
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
-        assertEquals(VALUE, map.extract(new Sub()));
-
-        // values from a superclass field, but property defined for subclass
-        props = new Properties();
-        props.setProperty(PROP_PREFIX + Sub.class.getName(), "${intValue}");
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
-        assertEquals(INT_VALUE, map.extract(new Sub()));
-
-
-        // prefix includes trailing "."
-        props = new Properties();
-        props.setProperty(PROP_PREFIX + Simple.class.getName(), "${intValue}");
-        map = new ClassExtractors(props, PROP_PREFIX.substring(0, PROP_PREFIX.length() - 1), MY_TYPE);
-        assertEquals(INT_VALUE, map.extract(new Simple()));
-
-
-        // values from an class in a different file
-        props = new Properties();
-        props.setProperty(PROP_PREFIX + ClassExtractorsTestSupport.class.getName(), "${nested.theValue}");
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
-        assertEquals(ClassExtractorsTestSupport2.NESTED_VALUE, map.extract(new ClassExtractorsTestSupport()));
-    }
-
-    @Test
-    public void testGetExtractor() {
-        Simple obj = new Simple();
-
-        // repeat - shouldn't re-create the extractor
-        for (int x = 0; x < NTIMES; ++x) {
-            assertEquals("x=" + x, INT_VALUE, map.extract(obj));
-            assertEquals("x=" + x, 1, map.size());
-        }
-    }
-
-    @Test
-    public void testBuildExtractorClass_TopLevel() {
-        // extractor defined for top-level class
-        props = new Properties();
-        props.setProperty(PROP_PREFIX + Sub.class.getName(), "${strValue}");
-
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-        assertEquals(VALUE, map.extract(new Sub()));
-
-        // one extractor for top-level class
-        assertEquals(1, map.size());
-    }
-
-    @Test
-    public void testBuildExtractorClass_SuperClass() {
-        // extractor defined for superclass (interface)
-        assertEquals(VALUE, map.extract(new Sub()));
-
-        // one extractor for top-level class and one for interface
-        assertEquals(2, map.size());
-    }
-
-    @Test
-    public void testBuildExtractorClass_NotDefined() {
-        // no extractor defined for "this" class
-        assertNull(map.extract(this));
-
-        // one NULL extractor for top-level class
-        assertEquals(1, map.size());
-    }
-
-    @Test
-    public void testBuildExtractorClassString() {
-        // no leading "${"
-        assertNull(tryIt(Simple.class, "intValue}", xxx -> new Simple()));
-
-        // no trailing "}"
-        assertNull(tryIt(Simple.class, "${intValue", xxx -> new Simple()));
-
-        // leading "."
-        assertNull(tryIt(Sub.class, "${.simple.strValue}", xxx -> new Sub()));
-
-        // trailing "."
-        assertNull(tryIt(Sub.class, "${simple.strValue.}", xxx -> new Sub()));
-
-        // one component
-        assertEquals(VALUE, tryIt(Sub.class, "${strValue}", xxx -> new Sub()));
-
-        // two components
-        assertEquals(VALUE, tryIt(Sub.class, "${simple.strValue}", xxx -> new Sub()));
-
-        // invalid component
-        assertNull(tryIt(Sub.class, "${unknown}", xxx -> new Sub()));
-    }
-
-    @Test
-    public void testGetClassExtractor_InSuper() {
-        // field in the superclass
-        assertEquals(INT_VALUE, tryIt(Super.class, "${intValue}", xxx -> new Sub()));
-    }
-
-    @Test
-    public void testGetClassExtractor_InInterface() {
-        // defined in the interface
-        assertEquals(VALUE, map.extract(new Sub()));
-    }
-
-    @Test
-    public void testNullExtractorExtract() {
-        // empty properties - should only create NullExtractor
-        map = new ClassExtractors(new Properties(), PROP_PREFIX, MY_TYPE);
-
-        Simple obj = new Simple();
-
-        // repeat - shouldn't re-create the extractor
-        for (int x = 0; x < NTIMES; ++x) {
-            assertNull("x=" + x, map.extract(obj));
-            assertEquals("x=" + x, 1, map.size());
-        }
-    }
-
-    @Test
-    public void testComponetizedExtractor() {
-        // one component
-        assertEquals(VALUE, tryIt(Sub.class, "${strValue}", xxx -> new Sub()));
-
-        // three components
-        assertEquals(VALUE, tryIt(Sub.class, "${cont.data.strValue}", xxx -> new Sub()));
-    }
-
-    @Test
-    public void testComponetizedExtractorBuildExtractor_Method() {
-        assertEquals(INT_VALUE, tryIt(Simple.class, "${intValue}", xxx -> new Simple()));
-    }
-
-    @Test
-    public void testComponetizedExtractorBuildExtractor_Field() {
-        assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
-    }
-
-    @Test
-    public void testComponetizedExtractorBuildExtractor_Map() {
-        Map<String, Object> inner = new TreeMap<>();
-        inner.put("inner1", "abc1");
-        inner.put("inner2", "abc2");
-
-        Map<String, Object> outer = new TreeMap<>();
-        outer.put("outer1", "def1");
-        outer.put("outer2", inner);
-
-        Simple obj = new Simple();
-
-        props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue}");
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-        assertEquals(null, map.extract(obj));
-
-        obj.mapValue = outer;
-        props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-        assertEquals("abc2", map.extract(obj));
-    }
-
-    @Test
-    public void testComponetizedExtractorBuildExtractor_Unknown() {
-        assertNull(tryIt(Simple.class, "${unknown2}", xxx -> new Simple()));
-    }
-
-    @Test
-    public void testComponetizedExtractorExtract_MiddleNull() {
-        // data component is null
-        assertEquals(null, tryIt(Sub.class, "${cont.data.strValue}", xxx -> {
-            Sub obj = new Sub();
-            obj.cont.simpleValue = null;
-            return obj;
-        }));
-    }
-
-    @Test
-    public void testComponetizedExtractorGetMethodExtractor_VoidMethod() {
-        // tell it to use getVoidValue()
-        props.setProperty(PROP_PREFIX + Simple.class.getName(), "${voidValue}");
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
-        Simple obj = new Simple();
-        assertNull(map.extract(obj));
-
-        assertFalse(obj.voidInvoked);
-    }
-
-    @Test
-    public void testComponetizedExtractorGetMethodExtractor() {
-        assertEquals(INT_VALUE, map.extract(new Simple()));
-    }
-
-    @Test
-    public void testComponetizedExtractorGetFieldExtractor() {
-        // use a field
-        assertEquals(VALUE, tryIt(Simple.class, "${strValue}", xxx -> new Simple()));
-    }
-
-    @Test
-    public void testComponetizedExtractorGetMapExtractor() {
-        Map<String, Object> inner = new TreeMap<>();
-        inner.put("inner1", "abc1");
-        inner.put("inner2", "abc2");
-
-        Map<String, Object> outer = new TreeMap<>();
-        outer.put("outer1", "def1");
-        outer.put("outer2", inner);
-
-        Simple obj = new Simple();
-
-        obj.mapValue = outer;
-        props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-        assertEquals("abc2", map.extract(obj));
-    }
-
-    @Test
-    public void testComponetizedExtractorGetMapExtractor_MapSubclass() {
-        Map<String, Object> inner = new TreeMap<>();
-        inner.put("inner1", "abc1");
-        inner.put("inner2", "abc2");
-
-        MapSubclass outer = new MapSubclass();
-        outer.put("outer1", "def1");
-        outer.put("outer2", inner);
-
-        Simple obj = new Simple();
-
-        props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue}");
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-        assertEquals(null, map.extract(obj));
-
-        obj.mapValue = outer;
-        props.setProperty(PROP_PREFIX + Simple.class.getName(), "${mapValue.outer2.inner2}");
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-        assertEquals("abc2", map.extract(obj));
-    }
-
-    /**
-     * Sets a property for the given class, makes an object, and then returns
-     * the value extracted.
-     * 
-     * @param clazz class whose property is to be set
-     * @param propval value to which to set the property
-     * @param makeObj function to create the object whose data is to be
-     *        extracted
-     * @return the extracted data, or {@code null} if nothing was extracted
-     */
-    private Object tryIt(Class<?> clazz, String propval, Function<Void, Object> makeObj) {
-        Properties props = new Properties();
-        props.setProperty(PROP_PREFIX + clazz.getName(), propval);
-
-        map = new ClassExtractors(props, PROP_PREFIX, MY_TYPE);
-
-        return map.extract(makeObj.apply(null));
-    }
-
-    /**
-     * A Map subclass, used to verify that getMapExtractor() still handles it.
-     */
-    private static class MapSubclass extends TreeMap<String, Object> {
-        private static final long serialVersionUID = 1L;
-
-    }
-
-    /**
-     * A simple class.
-     */
-    private static class Simple {
-
-        /**
-         * This will not be used because getIntValue() will override it.
-         */
-        @SuppressWarnings("unused")
-        public final int intValue = INT_VALUE2;
-
-        /**
-         * Used to verify retrieval via a field name.
-         */
-        @SuppressWarnings("unused")
-        public final String strValue = VALUE;
-
-        /**
-         * Used to verify retrieval within maps.
-         */
-        @SuppressWarnings("unused")
-        public Map<String, Object> mapValue = null;
-
-        /**
-         * {@code True} if {@link #getVoidValue()} was invoked, {@code false}
-         * otherwise.
-         */
-        private boolean voidInvoked = false;
-
-        /**
-         * This function will supercede the value in the "intValue" field.
-         * 
-         * @return INT_VALUE
-         */
-        @SuppressWarnings("unused")
-        public Integer getIntValue() {
-            return INT_VALUE;
-        }
-
-        /**
-         * Used to verify that void functions are not invoked.
-         */
-        @SuppressWarnings("unused")
-        public void getVoidValue() {
-            voidInvoked = true;
-        }
-    }
-
-    /**
-     * Used to verify multi-component retrieval.
-     */
-    private static class Container {
-        public Simple simpleValue = new Simple();
-
-        @SuppressWarnings("unused")
-        public Simple getData() {
-            return simpleValue;
-        }
-    }
-
-    /**
-     * Used to verify extraction when the property refers to an interface.
-     */
-    private static interface WithString {
-
-        String getStrValue();
-    }
-
-    /**
-     * Used to verify retrieval within a superclass.
-     */
-    private static class Super implements WithString {
-
-        @SuppressWarnings("unused")
-        public final int intValue = INT_VALUE;
-
-        @Override
-        public String getStrValue() {
-            return VALUE;
-        }
-    }
-
-    /**
-     * Used to verify retrieval within a subclass.
-     */
-    private static class Sub extends Super {
-
-        @SuppressWarnings("unused")
-        public final Simple simple = new Simple();
-
-        /**
-         * Used to verify multi-component retrieval.
-         */
-        public final Container cont = new Container();
-    }
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to test extractors.
- */
-public class ClassExtractorsTestSupport {
-
-    private ClassExtractorsTestSupport2 nested = new ClassExtractorsTestSupport2();
-
-    /**
-     * Constructor.
-     */
-    public ClassExtractorsTestSupport() {
-        super();
-    }
-
-    public ClassExtractorsTestSupport2 getNested() {
-        return nested;
-    }
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-/**
- * Used to test extractors.
- */
-public class ClassExtractorsTestSupport2 {
-
-    public static final int NESTED_VALUE = 30;
-    
-    public final int theValue = NESTED_VALUE;
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-import org.onap.policy.common.utils.test.ExceptionsTester;
-
-public class ExtractorExceptionTest extends ExceptionsTester {
-
-    @Test
-    public void test() {
-        assertEquals(5, test(ExtractorException.class));
-    }
-
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.lang.reflect.Field;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldExtractorTest {
-
-    private static final String VALUE = "the value";
-    private static final Integer INT_VALUE = 10;
-
-    private Field field;
-    private FieldExtractor ext;
-
-    @Before
-    public void setUp() throws Exception {
-        field = MyClass.class.getDeclaredField("value");
-        ext = new FieldExtractor(field);
-    }
-
-    @Test
-    public void testExtract() throws Exception {
-        assertEquals(VALUE, ext.extract(new MyClass()));
-
-        // repeat
-        assertEquals(VALUE, ext.extract(new MyClass()));
-
-        // null value
-        MyClass obj = new MyClass();
-        obj.value = null;
-        assertEquals(null, ext.extract(obj));
-
-        obj.value = VALUE + "X";
-        assertEquals(VALUE + "X", ext.extract(obj));
-
-        // different value type
-        field = MyClass.class.getDeclaredField("value2");
-        ext = new FieldExtractor(field);
-        assertEquals(INT_VALUE, ext.extract(new MyClass()));
-    }
-
-    @Test
-    public void testExtract_ArgEx() {
-        // pass it the wrong class type
-        assertNull(ext.extract(this));
-    }
-
-    private static class MyClass {
-        @SuppressWarnings("unused")
-        public String value = VALUE;
-
-        @SuppressWarnings("unused")
-        public int value2 = INT_VALUE;
-    }
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MapExtractorTest {
-    private static final String KEY = "a.key";
-    private static final String VALUE = "a.value";
-
-    private MapExtractor ext;
-
-    @Before
-    public void setUp() {
-        ext = new MapExtractor(KEY);
-    }
-
-    @Test
-    public void testExtract_NotAMap() {
-
-        // object is not a map (i.e., it's a String)
-        assertNull(ext.extract(KEY));
-    }
-
-    @Test
-    public void testExtract_MissingValue() {
-
-        Map<String, Object> map = new HashMap<>();
-        map.put(KEY + "x", VALUE + "x");
-
-        // object is a map, but doesn't have the key
-        assertNull(ext.extract(map));
-    }
-
-    @Test
-    public void testExtract() {
-
-        Map<String, Object> map = new HashMap<>();
-        map.put(KEY + "x", VALUE + "x");
-        map.put(KEY, VALUE);
-
-        // object is a map and contains the key
-        assertEquals(VALUE, ext.extract(map));
-
-        // change to value to a different type
-        map.put(KEY, 20);
-        assertEquals(20, ext.extract(map));
-    }
-
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.extractor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import java.lang.reflect.Method;
-import org.junit.Before;
-import org.junit.Test;
-
-public class MethodExtractorTest {
-
-    private static final String VALUE = "the value";
-    private static final Integer INT_VALUE = 10;
-
-    private Method meth;
-    private MethodExtractor ext;
-
-    @Before
-    public void setUp() throws Exception {
-        meth = MyClass.class.getMethod("getValue");
-        ext = new MethodExtractor(meth);
-    }
-
-    @Test
-    public void testExtract() throws Exception {
-        assertEquals(VALUE, ext.extract(new MyClass()));
-
-        // repeat
-        assertEquals(VALUE, ext.extract(new MyClass()));
-
-        // null value
-        MyClass obj = new MyClass();
-        meth = MyClass.class.getMethod("getNullValue");
-        ext = new MethodExtractor(meth);
-        assertEquals(null, ext.extract(obj));
-
-        // different value type
-        meth = MyClass.class.getMethod("getIntValue");
-        ext = new MethodExtractor(meth);
-        assertEquals(INT_VALUE, ext.extract(new MyClass()));
-    }
-
-    @Test
-    public void testExtract_ArgEx() {
-        // pass it the wrong class type
-        assertNull(ext.extract(this));
-    }
-
-    @Test
-    public void testExtract_InvokeEx() throws Exception {
-        // invoke method that throws an exception
-        meth = MyClass.class.getMethod("throwException");
-        ext = new MethodExtractor(meth);
-        assertEquals(null, ext.extract(new MyClass()));
-    }
-
-    private static class MyClass {
-
-        @SuppressWarnings("unused")
-        public String getValue() {
-            return VALUE;
-        }
-
-        @SuppressWarnings("unused")
-        public int getIntValue() {
-            return INT_VALUE;
-        }
-
-        @SuppressWarnings("unused")
-        public String getNullValue() {
-            return null;
-        }
-
-        @SuppressWarnings("unused")
-        public String throwException() {
-            throw new IllegalStateException("expected");
-        }
-    }
-
-}
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.message;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-
-public class ForwardTest extends SupportBasicMessageTester<Forward> {
-    // values set by makeValidMessage()
-    public static final CommInfrastructure VALID_PROTOCOL = CommInfrastructure.UEB;
-    public static final int VALID_HOPS = 0;
-    public static final String VALID_TOPIC = "topicA";
-    public static final String VALID_PAYLOAD = "payloadA";
-    public static final String VALID_REQUEST_ID = "requestIdA";
-
-    /**
-     * Time, in milliseconds, after which the most recent message was created.
-     */
-    private static long tcreateMs;
-
-    public ForwardTest() {
-        super(Forward.class);
-    }
-
-    @Test
-    public void testBumpNumHops() {
-        Forward msg = makeValidMessage();
-
-        for (int x = 0; x < 3; ++x) {
-            assertEquals("x=" + x, x, msg.getNumHops());
-            msg.bumpNumHops();
-        }
-    }
-
-    @Test
-    public void testGetNumHops_testSetNumHops() {
-        Forward msg = makeValidMessage();
-
-        // from constructor
-        assertEquals(VALID_HOPS, msg.getNumHops());
-
-        msg.setNumHops(5);
-        assertEquals(5, msg.getNumHops());
-
-        msg.setNumHops(7);
-        assertEquals(7, msg.getNumHops());
-    }
-
-    @Test
-    public void testGetCreateTimeMs_testSetCreateTimeMs() {
-        Forward msg = makeValidMessage();
-
-        // from constructor
-        assertTrue(msg.getCreateTimeMs() >= tcreateMs);
-
-        msg.setCreateTimeMs(1000L);
-        assertEquals(1000L, msg.getCreateTimeMs());
-
-        msg.setCreateTimeMs(2000L);
-        assertEquals(2000L, msg.getCreateTimeMs());
-    }
-
-    @Test
-    public void testGetProtocol_testSetProtocol() {
-        Forward msg = makeValidMessage();
-
-        // from constructor
-        assertEquals(CommInfrastructure.UEB, msg.getProtocol());
-
-        msg.setProtocol(CommInfrastructure.DMAAP);
-        assertEquals(CommInfrastructure.DMAAP, msg.getProtocol());
-
-        msg.setProtocol(CommInfrastructure.UEB);
-        assertEquals(CommInfrastructure.UEB, msg.getProtocol());
-    }
-
-    @Test
-    public void testGetTopic_testSetTopic() {
-        Forward msg = makeValidMessage();
-
-        // from constructor
-        assertEquals(VALID_TOPIC, msg.getTopic());
-
-        msg.setTopic("topicX");
-        assertEquals("topicX", msg.getTopic());
-
-        msg.setTopic("topicY");
-        assertEquals("topicY", msg.getTopic());
-    }
-
-    @Test
-    public void testGetPayload_testSetPayload() {
-        Forward msg = makeValidMessage();
-
-        // from constructor
-        assertEquals(VALID_PAYLOAD, msg.getPayload());
-
-        msg.setPayload("payloadX");
-        assertEquals("payloadX", msg.getPayload());
-
-        msg.setPayload("payloadY");
-        assertEquals("payloadY", msg.getPayload());
-    }
-
-    @Test
-    public void testGetRequestId_testSetRequestId() {
-        Forward msg = makeValidMessage();
-
-        // from constructor
-        assertEquals(VALID_REQUEST_ID, msg.getRequestId());
-
-        msg.setRequestId("reqX");
-        assertEquals("reqX", msg.getRequestId());
-
-        msg.setRequestId("reqY");
-        assertEquals("reqY", msg.getRequestId());
-    }
-
-    @Test
-    public void testIsExpired() {
-        Forward msg = makeValidMessage();
-
-        long tcreate = msg.getCreateTimeMs();
-        assertTrue(msg.isExpired(tcreate + 1));
-        assertTrue(msg.isExpired(tcreate + 10));
-
-        assertFalse(msg.isExpired(tcreate));
-        assertFalse(msg.isExpired(tcreate - 1));
-        assertFalse(msg.isExpired(tcreate - 10));
-    }
-
-    @Test
-    public void testCheckValidity_InvalidFields() throws Exception {
-        // null source (i.e., superclass field)
-        expectCheckValidityFailure(msg -> msg.setSource(null));
-        
-        // null protocol
-        expectCheckValidityFailure(msg -> msg.setProtocol(null));
-        
-        // null or empty topic
-        expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setTopic(value));
-        
-        // null payload
-        expectCheckValidityFailure(msg -> msg.setPayload(null));
-        
-        // empty payload should NOT throw an exception
-        Forward forward = makeValidMessage();
-        forward.setPayload("");
-        forward.checkValidity();
-        
-        // null or empty requestId
-        expectCheckValidityFailure_NullOrEmpty((msg, value) -> msg.setRequestId(value));
-        
-        // invalid hop count
-        expectCheckValidityFailure(msg -> msg.setNumHops(-1));
-    }
-
-    @Override
-    public Forward makeValidMessage() {
-        tcreateMs = System.currentTimeMillis();
-
-        Forward msg = new Forward(VALID_HOST, VALID_PROTOCOL, VALID_TOPIC, VALID_PAYLOAD, VALID_REQUEST_ID);
-        msg.setChannel(VALID_CHANNEL);
-
-        return msg;
-    }
-
-    @Override
-    public void testDefaultConstructorFields(Forward msg) {
-        super.testDefaultConstructorFields(msg);
-        
-        assertEquals(VALID_HOPS, msg.getNumHops());
-        assertEquals(0, msg.getCreateTimeMs());
-        assertNull(msg.getPayload());
-        assertNull(msg.getProtocol());
-        assertNull(msg.getRequestId());
-        assertNull(msg.getTopic());
-    }
-
-    @Override
-    public void testValidFields(Forward msg) {
-        super.testValidFields(msg);
-        
-        assertEquals(VALID_HOPS, msg.getNumHops());
-        assertTrue(msg.getCreateTimeMs() >= tcreateMs);
-        assertEquals(VALID_PAYLOAD, msg.getPayload());
-        assertEquals(VALID_PROTOCOL, msg.getProtocol());
-        assertEquals(VALID_REQUEST_ID, msg.getRequestId());
-        assertEquals(VALID_TOPIC, msg.getTopic());
-    }
-}
 
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
-import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import org.junit.Before;
 import org.onap.policy.drools.pooling.message.BucketAssignments;
 import org.onap.policy.drools.pooling.message.Heartbeat;
 import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Offline;
 import org.onap.policy.drools.pooling.message.Query;
 
         assertEquals(MY_HOST, msg.getRight().getSource());
     }
 
-    @Test
-    public void testGetFilter() {
-        Map<String, Object> filter = state.getFilter();
-
-        FilterUtilsTest utils = new FilterUtilsTest();
-
-        utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
-    }
-
     @Test
     public void testProcessHeartbeat_NullHost() {
         assertNull(state.process(new Heartbeat()));
 
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * ONAP
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.pooling.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_AND;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_EQUALS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.CLASS_OR;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_CLASS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FIELD;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_FILTERS;
-import static org.onap.policy.drools.pooling.state.FilterUtils.JSON_VALUE;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeAnd;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeEquals;
-import static org.onap.policy.drools.pooling.state.FilterUtils.makeOr;
-
-import java.util.Map;
-import org.junit.Test;
-
-public class FilterUtilsTest {
-
-    @Test
-    public void testMakeEquals() {
-        checkEquals("abc", "def", makeEquals("abc", "def"));
-    }
-
-    @Test
-    public void testMakeAnd() {
-        @SuppressWarnings("unchecked")
-        Map<String, Object> filter =
-                        makeAnd(makeEquals("an1", "av1"), makeEquals("an2", "av2"), makeEquals("an3", "av3"));
-
-        checkArray(CLASS_AND, 3, filter);
-        checkEquals("an1", "av1", getItem(filter, 0));
-        checkEquals("an2", "av2", getItem(filter, 1));
-        checkEquals("an3", "av3", getItem(filter, 2));
-    }
-
-    @Test
-    public void testMakeOr() {
-        @SuppressWarnings("unchecked")
-        Map<String, Object> filter =
-                        makeOr(makeEquals("on1", "ov1"), makeEquals("on2", "ov2"), makeEquals("on3", "ov3"));
-
-        checkArray(CLASS_OR, 3, filter);
-        checkEquals("on1", "ov1", getItem(filter, 0));
-        checkEquals("on2", "ov2", getItem(filter, 1));
-        checkEquals("on3", "ov3", getItem(filter, 2));
-    }
-
-    /**
-     * Checks that the filter contains an array.
-     * 
-     * @param expectedClassName type of filter this should represent
-     * @param expectedCount number of items expected in the array
-     * @param filter filter to be examined
-     */
-    protected void checkArray(String expectedClassName, int expectedCount, Map<String, Object> filter) {
-        assertEquals(expectedClassName, filter.get(JSON_CLASS));
-
-        Object[] val = (Object[]) filter.get(JSON_FILTERS);
-        assertEquals(expectedCount, val.length);
-    }
-
-    /**
-     * Checks that a map represents an "equals".
-     * 
-     * @param name name of the field on the left side of the equals
-     * @param value value on the right side of the equals
-     * @param map map whose content is to be examined
-     */
-    protected void checkEquals(String name, String value, Map<String, Object> map) {
-        assertEquals(CLASS_EQUALS, map.get(JSON_CLASS));
-        assertEquals(name, map.get(JSON_FIELD));
-        assertEquals(value, map.get(JSON_VALUE));
-    }
-
-    /**
-     * Gets a particular sub-filter from the array contained within a filter.
-     * 
-     * @param filter containing filter
-     * @param index index of the sub-filter of interest
-     * @return the sub-filter with the given index
-     */
-    @SuppressWarnings("unchecked")
-    protected Map<String, Object> getItem(Map<String, Object> filter, int index) {
-        Object[] val = (Object[]) filter.get(JSON_FILTERS);
-
-        return (Map<String, Object>) val[index];
-    }
-
-}
 
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 package org.onap.policy.drools.pooling.state;
 
 import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Map;
 import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
 import org.onap.policy.drools.pooling.message.Heartbeat;
 import org.onap.policy.drools.pooling.message.Identification;
 import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Offline;
 import org.onap.policy.drools.pooling.message.Query;
 
         state = new IdleState(mgr);
     }
 
-    @Test
-    public void testGetFilter() {
-        Map<String, Object> filter = state.getFilter();
-
-        FilterUtilsTest utils = new FilterUtilsTest();
-
-        utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
-    }
-
-    @Test
-    public void testProcessForward() {
-        Forward msg = new Forward();
-        msg.setChannel(MY_HOST);
-        assertNull(state.process(msg));
-
-        verify(mgr).handle(msg);
-    }
-
     @Test
     public void testProcessHeartbeat() {
         assertNull(state.process(new Heartbeat(PREV_HOST, 0L)));
 
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2020 Nordix Foundation
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.drools.pooling.message.BucketAssignments;
 import org.onap.policy.drools.pooling.message.Identification;
 import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Query;
 
 public class InactiveStateTest extends SupportBasicStateTester {
         state = new InactiveState(mgr);
     }
 
-    @Test
-    public void testGetFilter() {
-        Map<String, Object> filter = state.getFilter();
-
-        FilterUtilsTest utils = new FilterUtilsTest();
-
-        utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
-    }
-
     @Test
     public void testProcessLeader() {
         State next = mock(State.class);
 
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
-import java.util.Map;
 import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.drools.pooling.message.BucketAssignments;
         hostBucket = new HostBucket(MY_HOST);
     }
 
-    @Test
-    public void testGetFilter() {
-        Map<String, Object> filter = state.getFilter();
-
-        FilterUtilsTest utils = new FilterUtilsTest();
-
-        utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
-    }
-
     @Test
     public void testProcessQuery() {
         State next = mock(State.class);
 
  * ============LICENSE_START=======================================================
  * ONAP
  * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018, 2020 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2020 Nordix Foundation
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
 import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.drools.pooling.message.BucketAssignments;
 import org.onap.policy.drools.pooling.message.Identification;
 import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Offline;
 
 public class QueryStateTest extends SupportBasicStateTester {
         state = new QueryState(mgr);
     }
 
-    @Test
-    public void testGetFilter() {
-        Map<String, Object> filter = state.getFilter();
-
-        FilterUtilsTest utils = new FilterUtilsTest();
-
-        utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
-    }
-
     @Test
     public void testGoQuery() {
         assertNull(state.goQuery());
 
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import org.junit.Before;
 import org.junit.Test;
-import org.onap.policy.drools.pooling.message.Forward;
 import org.onap.policy.drools.pooling.message.Heartbeat;
 import org.onap.policy.drools.pooling.message.Identification;
 import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Offline;
 import org.onap.policy.drools.pooling.message.Query;
 
         state = new StartState(mgr);
     }
 
-    @Test
-    public void testGetFilter() {
-        Map<String, Object> filter = state.getFilter();
-
-        FilterUtilsTest utils = new FilterUtilsTest();
-
-        utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
-
-        // get the sub-filter
-        filter = utils.getItem(filter, 1);
-
-        utils.checkArray(FilterUtils.CLASS_AND, 2, filter);
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 0));
-        utils.checkEquals(FilterUtils.MSG_TIMESTAMP, String.valueOf(state.getHbTimestampMs()),
-                        utils.getItem(filter, 1));
-    }
-
     @Test
     public void testStart() {
         state.start();
         assertEquals(MY_HOST, state.getHost());
     }
 
-    @Test
-    public void testProcessForward() {
-        Forward msg = new Forward();
-        msg.setChannel(MY_HOST);
-        assertNull(state.process(msg));
-
-        verify(mgr).handle(msg);
-    }
-
     @Test
     public void testProcessHeartbeat() {
         Heartbeat msg = new Heartbeat();
 
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Map;
 import org.junit.Before;
 import org.junit.Test;
 import org.onap.policy.drools.pooling.CancellableScheduledTask;
 import org.onap.policy.drools.pooling.PoolingManager;
 import org.onap.policy.drools.pooling.message.BucketAssignments;
-import org.onap.policy.drools.pooling.message.Forward;
 import org.onap.policy.drools.pooling.message.Heartbeat;
 import org.onap.policy.drools.pooling.message.Identification;
 import org.onap.policy.drools.pooling.message.Leader;
-import org.onap.policy.drools.pooling.message.Message;
 import org.onap.policy.drools.pooling.message.Offline;
 import org.onap.policy.drools.pooling.message.Query;
 
         verify(sched3).cancel();
     }
 
-    @Test
-    public void testGetFilter() {
-        Map<String, Object> filter = state.getFilter();
-
-        FilterUtilsTest utils = new FilterUtilsTest();
-
-        utils.checkArray(FilterUtils.CLASS_OR, 2, filter);
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, Message.ADMIN, utils.getItem(filter, 0));
-        utils.checkEquals(FilterUtils.MSG_CHANNEL, MY_HOST, utils.getItem(filter, 1));
-    }
-
     @Test
     public void testStart() {
         assertThatCode(() -> state.start()).doesNotThrowAnyException();
         assertEquals(next, next2);
     }
 
-    @Test
-    public void testProcessForward() {
-        Forward msg = new Forward();
-        assertNull(state.process(msg));
-
-        verify(mgr, never()).handle(msg);
-
-        msg.setChannel(MY_HOST);
-        assertNull(state.process(msg));
-
-        verify(mgr).handle(msg);
-    }
-
     @Test
     public void testProcessHeartbeat() {
         assertNull(state.process(new Heartbeat()));
         verify(mgr).publishAdmin(msg);
     }
 
-    @Test
-    public void testPublishStringForward() {
-        String chnl = "channelF";
-        Forward msg = new Forward();
-
-        state.publish(chnl, msg);
-
-        verify(mgr).publish(chnl, msg);
-    }
-
     @Test
     public void testPublishStringHeartbeat() {
         String chnl = "channelH";
 
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicReference;
         when(props.getActiveHeartbeatMs()).thenReturn(STD_ACTIVE_HEARTBEAT_MS);
         when(props.getInterHeartbeatMs()).thenReturn(STD_INTER_HEARTBEAT_MS);
 
-        prevState = new State(mgr) {
-            @Override
-            public Map<String, Object> getFilter() {
-                throw new UnsupportedOperationException("cannot filter");
-            }
-        };
+        prevState = new State(mgr) {};
 
         // capture publish() arguments
         doAnswer(invocation -> {