Use BidirectionalTopicClient from policy-common
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperator.java
@@ -28,24 +28,24 @@ import lombok.Getter;
 import org.onap.policy.common.parameters.ValidationResult;
 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
+import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager;
 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
 import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
-import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPairManager;
 
 /**
- * Operator that uses a pair of topics, one for publishing the request, and another for
- * receiving the response. Topic operators may share a {@link TopicPair}.
+ * Operator that uses a bidirectional topic. Topic operators may share a
+ * {@link BidirectionalTopicHandler}.
  */
-public abstract class TopicPairOperator extends OperatorPartial {
+public abstract class BidirectionalTopicOperator extends OperatorPartial {
 
     /**
-     * Manager from which to get the topic pair.
+     * Manager from which to get the topic handlers.
      */
-    private final TopicPairManager pairManager;
+    private final BidirectionalTopicManager topicManager;
 
     /**
      * Keys used to extract the fields used to select responses for this operator.
@@ -62,13 +62,13 @@ public abstract class TopicPairOperator extends OperatorPartial {
      * will not, thus operations may copy it.
      */
     @Getter
-    private TopicPairParams params;
+    private BidirectionalTopicParams params;
 
     /**
-     * Topic pair associated with the parameters.
+     * Topic handler associated with the parameters.
      */
     @Getter
-    private TopicPair topicPair;
+    private BidirectionalTopicHandler topicHandler;
 
     /**
      * Forwarder associated with the parameters.
@@ -82,27 +82,27 @@ public abstract class TopicPairOperator extends OperatorPartial {
      *
      * @param actorName name of the actor with which this operator is associated
      * @param name operation name
-     * @param pairManager manager from which to get the topic pair
+     * @param topicManager manager from which to get the topic handler
      * @param selectorKeys keys used to extract the fields used to select responses for
      *        this operator
      */
-    public TopicPairOperator(String actorName, String name, TopicPairManager pairManager,
+    public BidirectionalTopicOperator(String actorName, String name, BidirectionalTopicManager topicManager,
                     List<SelectorKey> selectorKeys) {
         super(actorName, name);
-        this.pairManager = pairManager;
+        this.topicManager = topicManager;
         this.selectorKeys = selectorKeys;
     }
 
     @Override
     protected void doConfigure(Map<String, Object> parameters) {
-        params = Util.translate(getFullName(), parameters, TopicPairParams.class);
+        params = Util.translate(getFullName(), parameters, BidirectionalTopicParams.class);
         ValidationResult result = params.validate(getFullName());
         if (!result.isValid()) {
             throw new ParameterValidationRuntimeException("invalid parameters", result);
         }
 
-        topicPair = pairManager.getTopicPair(params.getSource(), params.getTarget());
-        forwarder = topicPair.addForwarder(selectorKeys);
+        topicHandler = topicManager.getTopicHandler(params.getSinkTopic(), params.getSourceTopic());
+        forwarder = topicHandler.addForwarder(selectorKeys);
     }
 
     /**
@@ -112,19 +112,21 @@ public abstract class TopicPairOperator extends OperatorPartial {
      * @param <S> response type
      * @param actorName actor name
      * @param operation operation name
-     * @param pairManager manager from which to get the topic pair
+     * @param topicManager manager from which to get the topic handler
      * @param operationMaker function to make an operation
      * @param keys keys used to extract the fields used to select responses for this
      *        operator
      * @return a new operator
      */
     // @formatter:off
-    public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager,
-                    BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker,
+    public static <Q, S> BidirectionalTopicOperator makeOperator(String actorName, String operation,
+                    BidirectionalTopicManager topicManager,
+                    BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator,
+                        BidirectionalTopicOperation<Q, S>> operationMaker,
                     SelectorKey... keys) {
         // @formatter:off
 
-        return makeOperator(actorName, operation, pairManager, Arrays.asList(keys), operationMaker);
+        return makeOperator(actorName, operation, topicManager, Arrays.asList(keys), operationMaker);
     }
 
     /**
@@ -134,19 +136,21 @@ public abstract class TopicPairOperator extends OperatorPartial {
      * @param <S> response type
      * @param actorName actor name
      * @param operation operation name
-     * @param pairManager manager from which to get the topic pair
+     * @param topicManager manager from which to get the topic handler
      * @param keys keys used to extract the fields used to select responses for
      *        this operator
      * @param operationMaker function to make an operation
      * @return a new operator
      */
     // @formatter:off
-    public static <Q,S> TopicPairOperator makeOperator(String actorName, String operation, TopicPairManager pairManager,
+    public static <Q,S> BidirectionalTopicOperator makeOperator(String actorName, String operation,
+                    BidirectionalTopicManager topicManager,
                     List<SelectorKey> keys,
-                    BiFunction<ControlLoopOperationParams, TopicPairOperator, TopicPairOperation<Q,S>> operationMaker) {
+                    BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator,
+                        BidirectionalTopicOperation<Q,S>> operationMaker) {
         // @formatter:on
 
-        return new TopicPairOperator(actorName, operation, pairManager, keys) {
+        return new BidirectionalTopicOperator(actorName, operation, topicManager, keys) {
             @Override
             public synchronized Operation buildOperation(ControlLoopOperationParams params) {
                 return operationMaker.apply(params, this);