Use BidirectionalTopicClient from policy-common
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / topic / Forwarder.java
index 8e9109c..2d98b66 100644 (file)
@@ -24,8 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
+import java.util.function.BiConsumer;
 import org.onap.policy.common.utils.coder.StandardCoderObject;
 import org.onap.policy.controlloop.actorserviceprovider.Util;
 import org.slf4j.Logger;
@@ -43,7 +42,7 @@ public class Forwarder {
      * Maps a set of field values to one or more listeners.
      */
     // @formatter:off
-    private final Map<List<String>, Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String>>
+    private final Map<List<String>, Map<BiConsumer<String, StandardCoderObject>, String>>
                 values2listeners = new ConcurrentHashMap<>();
     // @formatter:on
 
@@ -68,13 +67,13 @@ public class Forwarder {
      * @param values field values of interest, in one-to-one correspondence with the keys
      * @param listener listener to register
      */
-    public void register(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) {
+    public void register(List<String> values, BiConsumer<String, StandardCoderObject> listener) {
         if (keys.size() != values.size()) {
             throw new IllegalArgumentException("key/value mismatch");
         }
 
         values2listeners.compute(values, (key, listeners) -> {
-            Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> map = listeners;
+            Map<BiConsumer<String, StandardCoderObject>, String> map = listeners;
             if (map == null) {
                 map = new ConcurrentHashMap<>();
             }
@@ -90,7 +89,7 @@ public class Forwarder {
      * @param values field values of interest, in one-to-one correspondence with the keys
      * @param listener listener to unregister
      */
-    public void unregister(List<String> values, TriConsumer<CommInfrastructure, String, StandardCoderObject> listener) {
+    public void unregister(List<String> values, BiConsumer<String, StandardCoderObject> listener) {
         values2listeners.computeIfPresent(values, (key, listeners) -> {
             listeners.remove(listener);
             return (listeners.isEmpty() ? null : listeners);
@@ -100,11 +99,10 @@ public class Forwarder {
     /**
      * Processes a message, forwarding it to the appropriate listeners, if any.
      *
-     * @param infra communication infrastructure on which the response was received
      * @param textMessage original text message that was received
      * @param scoMessage decoded text message
      */
-    public void onMessage(CommInfrastructure infra, String textMessage, StandardCoderObject scoMessage) {
+    public void onMessage(String textMessage, StandardCoderObject scoMessage) {
         // extract the key values from the message
         List<String> values = new ArrayList<>(keys.size());
         for (SelectorKey key : keys) {
@@ -121,8 +119,7 @@ public class Forwarder {
         }
 
         // get the listeners for this set of values
-        Map<TriConsumer<CommInfrastructure, String, StandardCoderObject>, String> listeners =
-                        values2listeners.get(values);
+        Map<BiConsumer<String, StandardCoderObject>, String> listeners = values2listeners.get(values);
         if (listeners == null) {
             // no listeners for this particular list of values
             return;
@@ -130,9 +127,9 @@ public class Forwarder {
 
 
         // forward the message to each listener
-        for (TriConsumer<CommInfrastructure, String, StandardCoderObject> listener : listeners.keySet()) {
+        for (BiConsumer<String, StandardCoderObject> listener : listeners.keySet()) {
             try {
-                listener.accept(infra, textMessage, scoMessage);
+                listener.accept(textMessage, scoMessage);
             } catch (RuntimeException e) {
                 logger.warn("exception thrown by listener {}", Util.ident(listener), e);
             }