2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import java.util.Arrays;
 
  24 import java.util.List;
 
  26 import java.util.function.BiFunction;
 
  28 import org.onap.policy.common.parameters.ValidationResult;
 
  29 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 
  30 import org.onap.policy.controlloop.actorserviceprovider.Util;
 
  31 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
 
  32 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  33 import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
 
  34 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
 
  35 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicManager;
 
  36 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
 
  37 import org.onap.policy.controlloop.actorserviceprovider.topic.SelectorKey;
 
  40  * Operator that uses a bidirectional topic. Topic operators may share a
 
  41  * {@link BidirectionalTopicHandler}.
 
  43 public abstract class BidirectionalTopicOperator extends OperatorPartial {
 
  46      * Manager from which to get the topic handlers.
 
  48     private final BidirectionalTopicManager topicManager;
 
  51      * Keys used to extract the fields used to select responses for this operator.
 
  53     private final List<SelectorKey> selectorKeys;
 
  56      * The remaining fields are initialized when configure() is invoked, thus they may
 
  61      * Current parameters. While {@link params} may change, the values contained within it
 
  62      * will not, thus operations may copy it.
 
  65     private BidirectionalTopicParams params;
 
  68      * Topic handler associated with the parameters.
 
  71     private BidirectionalTopicHandler topicHandler;
 
  74      * Forwarder associated with the parameters.
 
  77     private Forwarder forwarder;
 
  81      * Constructs the object.
 
  83      * @param actorName name of the actor with which this operator is associated
 
  84      * @param name operation name
 
  85      * @param topicManager manager from which to get the topic handler
 
  86      * @param selectorKeys keys used to extract the fields used to select responses for
 
  89     public BidirectionalTopicOperator(String actorName, String name, BidirectionalTopicManager topicManager,
 
  90                     List<SelectorKey> selectorKeys) {
 
  91         super(actorName, name);
 
  92         this.topicManager = topicManager;
 
  93         this.selectorKeys = selectorKeys;
 
  97     protected void doConfigure(Map<String, Object> parameters) {
 
  98         params = Util.translate(getFullName(), parameters, BidirectionalTopicParams.class);
 
  99         ValidationResult result = params.validate(getFullName());
 
 100         if (!result.isValid()) {
 
 101             throw new ParameterValidationRuntimeException("invalid parameters", result);
 
 104         topicHandler = topicManager.getTopicHandler(params.getSinkTopic(), params.getSourceTopic());
 
 105         forwarder = topicHandler.addForwarder(selectorKeys);
 
 109      * Makes an operator that will construct operations.
 
 111      * @param <Q> request type
 
 112      * @param <S> response type
 
 113      * @param actorName actor name
 
 114      * @param operation operation name
 
 115      * @param topicManager manager from which to get the topic handler
 
 116      * @param operationMaker function to make an operation
 
 117      * @param keys keys used to extract the fields used to select responses for this
 
 119      * @return a new operator
 
 122     public static <Q, S> BidirectionalTopicOperator makeOperator(String actorName, String operation,
 
 123                     BidirectionalTopicManager topicManager,
 
 124                     BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator,
 
 125                         BidirectionalTopicOperation<Q, S>> operationMaker,
 
 126                     SelectorKey... keys) {
 
 129         return makeOperator(actorName, operation, topicManager, Arrays.asList(keys), operationMaker);
 
 133      * Makes an operator that will construct operations.
 
 135      * @param <Q> request type
 
 136      * @param <S> response type
 
 137      * @param actorName actor name
 
 138      * @param operation operation name
 
 139      * @param topicManager manager from which to get the topic handler
 
 140      * @param keys keys used to extract the fields used to select responses for
 
 142      * @param operationMaker function to make an operation
 
 143      * @return a new operator
 
 146     public static <Q,S> BidirectionalTopicOperator makeOperator(String actorName, String operation,
 
 147                     BidirectionalTopicManager topicManager,
 
 148                     List<SelectorKey> keys,
 
 149                     BiFunction<ControlLoopOperationParams, BidirectionalTopicOperator,
 
 150                         BidirectionalTopicOperation<Q,S>> operationMaker) {
 
 153         return new BidirectionalTopicOperator(actorName, operation, topicManager, keys) {
 
 155             public synchronized Operation buildOperation(ControlLoopOperationParams params) {
 
 156                 return operationMaker.apply(params, this);