2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import java.util.List;
 
  24 import java.util.concurrent.CompletableFuture;
 
  25 import java.util.concurrent.Executor;
 
  26 import java.util.concurrent.TimeUnit;
 
  28 import org.apache.commons.lang3.tuple.Triple;
 
  29 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 
  30 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
 
  31 import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
 
  32 import org.onap.policy.common.utils.coder.Coder;
 
  33 import org.onap.policy.common.utils.coder.CoderException;
 
  34 import org.onap.policy.common.utils.coder.StandardCoder;
 
  35 import org.onap.policy.common.utils.coder.StandardCoderObject;
 
  36 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 
  37 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  38 import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
 
  39 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 
  40 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
 
  41 import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
 
  42 import org.onap.policy.controlloop.policy.PolicyResult;
 
  43 import org.slf4j.Logger;
 
  44 import org.slf4j.LoggerFactory;
 
  47  * Operation that uses a Topic pair.
 
  49  * @param <S> response type
 
  52 public abstract class TopicPairOperation<Q, S> extends OperationPartial {
 
  53     private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class);
 
  54     private static final Coder coder = new StandardCoder();
 
  56     // fields extracted from the operator
 
  58     private final TopicPair topicPair;
 
  59     private final Forwarder forwarder;
 
  60     private final TopicPairParams pairParams;
 
  61     private final long timeoutMs;
 
  66     private final Class<S> responseClass;
 
  70      * Constructs the object.
 
  72      * @param params operation parameters
 
  73      * @param operator operator that created this operation
 
  74      * @param clazz response class
 
  76     public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) {
 
  77         super(params, operator);
 
  78         this.topicPair = operator.getTopicPair();
 
  79         this.forwarder = operator.getForwarder();
 
  80         this.pairParams = operator.getParams();
 
  81         this.responseClass = clazz;
 
  82         this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS);
 
  86      * If no timeout is specified, then it returns the default timeout.
 
  89     protected long getTimeoutMs(Integer timeoutSec) {
 
  90         // TODO move this method to the superclass
 
  91         return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec));
 
  95      * Publishes the request and arranges to receive the response.
 
  98     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
 100         final Q request = makeRequest(attempt);
 
 101         final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
 
 103         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 104         final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future =
 
 105                         new CompletableFuture<>();
 
 106         final Executor executor = params.getExecutor();
 
 108         // register a listener BEFORE publishing
 
 111         TriConsumer<CommInfrastructure, String, StandardCoderObject> listener =
 
 112             (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse));
 
 115         // TODO this currently only allows a single matching response
 
 117         forwarder.register(expectedKeyValues, listener);
 
 119         // ensure listener is unregistered if the controller is canceled
 
 120         controller.add(() -> forwarder.unregister(expectedKeyValues, listener));
 
 122         // publish the request
 
 124             publishRequest(request);
 
 125         } catch (RuntimeException e) {
 
 126             logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId());
 
 127             forwarder.unregister(expectedKeyValues, listener);
 
 132         // once "future" completes, process the response, and then complete the controller
 
 135         future.thenApplyAsync(
 
 136             triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()),
 
 138                         .whenCompleteAsync(controller.delayedComplete(), executor);
 
 147      * @param attempt operation attempt
 
 148      * @return a new request
 
 150     protected abstract Q makeRequest(int attempt);
 
 153      * Gets values, expected in the response, that should match the selector keys.
 
 155      * @param attempt operation attempt
 
 156      * @param request request to be published
 
 157      * @return a list of the values to be matched by the selector keys
 
 159     protected abstract List<String> getExpectedKeyValues(int attempt, Q request);
 
 162      * Publishes the request. Encodes the request, if it is not already a String.
 
 164      * @param request request to be published
 
 166     protected void publishRequest(Q request) {
 
 169             if (request instanceof String) {
 
 170                 json = request.toString();
 
 172                 json = makeCoder().encode(request);
 
 174         } catch (CoderException e) {
 
 175             throw new IllegalArgumentException("cannot encode request", e);
 
 178         List<CommInfrastructure> list = topicPair.publish(json);
 
 179         if (list.isEmpty()) {
 
 180             throw new IllegalStateException("nothing published");
 
 183         logTopicRequest(list, request);
 
 187      * Processes a response.
 
 189      * @param infra communication infrastructure on which the response was received
 
 190      * @param outcome outcome to be populated
 
 191      * @param response raw response to process
 
 192      * @param scoResponse response, as a {@link StandardCoderObject}
 
 193      * @return the outcome
 
 195     protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse,
 
 196                     StandardCoderObject scoResponse) {
 
 198         logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
 
 200         logTopicResponse(infra, rawResponse);
 
 203         if (responseClass == String.class) {
 
 204             response = responseClass.cast(rawResponse);
 
 206         } else if (responseClass == StandardCoderObject.class) {
 
 207             response = responseClass.cast(scoResponse);
 
 211                 response = makeCoder().decode(rawResponse, responseClass);
 
 212             } catch (CoderException e) {
 
 213                 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
 
 214                                 params.getRequestId());
 
 215                 throw new IllegalArgumentException("cannot decode response", e);
 
 219         if (!isSuccess(rawResponse, response)) {
 
 220             logger.info("{}.{} request failed  for {}", params.getActor(), params.getOperation(),
 
 221                             params.getRequestId());
 
 222             return setOutcome(outcome, PolicyResult.FAILURE);
 
 225         logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
 
 226         setOutcome(outcome, PolicyResult.SUCCESS);
 
 227         postProcessResponse(outcome, rawResponse, response);
 
 233      * Processes a successful response.
 
 235      * @param outcome outcome to be populated
 
 236      * @param rawResponse raw response
 
 237      * @param response decoded response
 
 239     protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) {
 
 244      * Determines if the response indicates success.
 
 246      * @param rawResponse raw response
 
 247      * @param response decoded response
 
 248      * @return {@code true} if the response indicates success, {@code false} otherwise
 
 250     protected abstract boolean isSuccess(String rawResponse, S response);
 
 253      * Logs a TOPIC request. If the request is not of type, String, then it attempts to
 
 254      * pretty-print it into JSON before logging.
 
 256      * @param infrastructures list of communication infrastructures on which it was
 
 258      * @param request request to be logged
 
 260     protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) {
 
 261         if (infrastructures.isEmpty()) {
 
 267             if (request == null) {
 
 269             } else if (request instanceof String) {
 
 270                 json = request.toString();
 
 272                 json = makeCoder().encode(request, true);
 
 275         } catch (CoderException e) {
 
 276             logger.warn("cannot pretty-print request", e);
 
 277             json = request.toString();
 
 280         for (CommInfrastructure infra : infrastructures) {
 
 281             logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json);
 
 286      * Logs a TOPIC response. If the response is not of type, String, then it attempts to
 
 287      * pretty-print it into JSON before logging.
 
 289      * @param infra communication infrastructure on which the response was received
 
 290      * @param response response to be logged
 
 292     protected <T> void logTopicResponse(CommInfrastructure infra, T response) {
 
 295             if (response == null) {
 
 297             } else if (response instanceof String) {
 
 298                 json = response.toString();
 
 300                 json = makeCoder().encode(response, true);
 
 303         } catch (CoderException e) {
 
 304             logger.warn("cannot pretty-print response", e);
 
 305             json = response.toString();
 
 308         logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json);
 
 311     // these may be overridden by junit tests
 
 313     protected Coder makeCoder() {