2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import java.util.HashMap;
 
  24 import java.util.List;
 
  26 import java.util.concurrent.CompletableFuture;
 
  27 import java.util.concurrent.Future;
 
  28 import java.util.concurrent.TimeUnit;
 
  29 import java.util.function.Function;
 
  30 import javax.ws.rs.client.InvocationCallback;
 
  31 import javax.ws.rs.core.Response;
 
  33 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 
  34 import org.onap.policy.common.endpoints.http.client.HttpClient;
 
  35 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
 
  36 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 
  37 import org.onap.policy.common.utils.coder.CoderException;
 
  38 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 
  39 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
 
  40 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  41 import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpConfig;
 
  42 import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
 
  43 import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpPollingConfig;
 
  44 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 
  45 import org.slf4j.Logger;
 
  46 import org.slf4j.LoggerFactory;
 
  49  * Operator that uses HTTP. The operator's parameters must be an {@link HttpParams}.
 
  51  * @param <T> response type
 
  54 public abstract class HttpOperation<T> extends OperationPartial {
 
  55     private static final Logger logger = LoggerFactory.getLogger(HttpOperation.class);
 
  61         SUCCESS, FAILURE, STILL_WAITING
 
  65      * Configuration for this operation.
 
  67     private final HttpConfig config;
 
  72     private final Class<T> responseClass;
 
  75      * {@code True} to use polling, {@code false} otherwise.
 
  78     private boolean usePolling;
 
  81      * Number of polls issued so far, on the current operation attempt.
 
  84     private int pollCount;
 
  88      * Constructs the object.
 
  90      * @param params operation parameters
 
  91      * @param config configuration for this operation
 
  92      * @param clazz response class
 
  93      * @param propertyNames names of properties required by this operation
 
  95     protected HttpOperation(ControlLoopOperationParams params, HttpConfig config, Class<T> clazz,
 
  96                     List<String> propertyNames) {
 
  97         super(params, config, propertyNames);
 
  99         this.responseClass = clazz;
 
 103      * Indicates that polling should be used.
 
 105     protected void setUsePolling() {
 
 106         if (!(config instanceof HttpPollingConfig)) {
 
 107             throw new IllegalStateException("cannot poll without polling parameters");
 
 113     public HttpClient getClient() {
 
 114         return config.getClient();
 
 118      * Gets the path to be used when performing the request; this is typically appended to
 
 119      * the base URL. This method simply invokes {@link #getPath()}.
 
 121      * @return the path URI suffix
 
 123     public String getPath() {
 
 124         return config.getPath();
 
 127     public long getTimeoutMs() {
 
 128         return config.getTimeoutMs();
 
 132      * If no timeout is specified, then it returns the operator's configured timeout.
 
 135     protected long getTimeoutMs(Integer timeoutSec) {
 
 136         return (timeoutSec == null || timeoutSec == 0 ? getTimeoutMs() : super.getTimeoutMs(timeoutSec));
 
 140      * Makes the request headers. This simply returns an empty map.
 
 142      * @return request headers, a non-null, modifiable map
 
 144     protected Map<String, Object> makeHeaders() {
 
 145         return new HashMap<>();
 
 149      * Makes the URL to which the HTTP request should be posted. This is primarily used
 
 150      * for logging purposes. This particular method returns the base URL appended with the
 
 151      * return value from {@link #getPath()}.
 
 153      * @return the URL to which from which to get
 
 155     public String getUrl() {
 
 156         return (getClient().getBaseUrl() + getPath());
 
 160      * Resets the polling count
 
 163      * Note: This should be invoked at the start of each operation (i.e., in
 
 164      * {@link #startOperationAsync(int, OperationOutcome)}.
 
 166     protected void resetPollCount() {
 
 171      * Arranges to handle a response.
 
 173      * @param outcome outcome to be populate
 
 174      * @param url URL to which to request was sent
 
 175      * @param requester function to initiate the request and invoke the given callback
 
 177      * @return a future for the response
 
 179     protected CompletableFuture<OperationOutcome> handleResponse(OperationOutcome outcome, String url,
 
 180                     Function<InvocationCallback<Response>, Future<Response>> requester) {
 
 182         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 183         final CompletableFuture<Response> future = new CompletableFuture<>();
 
 184         final var executor = params.getExecutor();
 
 186         // arrange for the callback to complete "future"
 
 187         InvocationCallback<Response> callback = new InvocationCallback<>() {
 
 189             public void completed(Response response) {
 
 190                 future.complete(response);
 
 194             public void failed(Throwable throwable) {
 
 195                 logger.warn("{}.{}: response failure for {}", params.getActor(), params.getOperation(),
 
 196                                 params.getRequestId());
 
 197                 future.completeExceptionally(throwable);
 
 201         // start the request and arrange to cancel it if the controller is canceled
 
 202         controller.add(requester.apply(callback));
 
 204         // once "future" completes, process the response, and then complete the controller
 
 205         future.thenComposeAsync(response -> processResponse(outcome, url, response), executor)
 
 206                         .whenCompleteAsync(controller.delayedComplete(), executor);
 
 212      * Processes a response. This method decodes the response, sets the outcome based on
 
 213      * the response, and then returns a completed future.
 
 215      * @param outcome outcome to be populate
 
 216      * @param url URL to which to request was sent
 
 217      * @param response raw response to process
 
 218      * @return a future to cancel or await the outcome
 
 220     protected CompletableFuture<OperationOutcome> processResponse(OperationOutcome outcome, String url,
 
 221                     Response rawResponse) {
 
 223         logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
 
 225         String strResponse = rawResponse.readEntity(String.class);
 
 227         logMessage(EventType.IN, CommInfrastructure.REST, url, strResponse);
 
 230         if (responseClass == String.class) {
 
 231             response = responseClass.cast(strResponse);
 
 234                 response = getCoder().decode(strResponse, responseClass);
 
 235             } catch (CoderException e) {
 
 236                 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
 
 237                                 params.getRequestId(), e);
 
 238                 throw new IllegalArgumentException("cannot decode response");
 
 242         if (!isSuccess(rawResponse, response)) {
 
 243             logger.info("{}.{} request failed with http error code {} for {}", params.getActor(), params.getOperation(),
 
 244                             rawResponse.getStatus(), params.getRequestId());
 
 245             return CompletableFuture.completedFuture(
 
 246                     setOutcome(outcome, OperationResult.FAILURE, rawResponse, response));
 
 249         logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
 
 250         setOutcome(outcome, OperationResult.SUCCESS, rawResponse, response);
 
 251         return postProcessResponse(outcome, url, rawResponse, response);
 
 255      * Sets an operation's outcome and default message based on the result.
 
 257      * @param outcome operation to be updated
 
 258      * @param result result of the operation
 
 259      * @param rawResponse raw response
 
 260      * @param response decoded response
 
 261      * @return the updated operation
 
 263     public OperationOutcome setOutcome(OperationOutcome outcome, OperationResult result, Response rawResponse,
 
 266         outcome.setResponse(response);
 
 267         return setOutcome(outcome, result);
 
 271      * Processes a successful response. This method simply returns the outcome wrapped in
 
 272      * a completed future.
 
 274      * @param outcome outcome to be populate
 
 275      * @param url URL to which to request was sent
 
 276      * @param rawResponse raw response
 
 277      * @param response decoded response
 
 278      * @return a future to cancel or await the outcome
 
 280     protected CompletableFuture<OperationOutcome> postProcessResponse(OperationOutcome outcome, String url,
 
 281                     Response rawResponse, T response) {
 
 284             // doesn't use polling - just return the completed future
 
 285             return CompletableFuture.completedFuture(outcome);
 
 288         HttpPollingConfig cfg = (HttpPollingConfig) config;
 
 290         switch (detmStatus(rawResponse, response)) {
 
 292                 logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
 
 293                                 params.getRequestId());
 
 294                 return CompletableFuture
 
 295                                 .completedFuture(setOutcome(outcome, OperationResult.SUCCESS, rawResponse, response));
 
 298                 logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
 
 299                                 params.getRequestId());
 
 300                 return CompletableFuture
 
 301                                 .completedFuture(setOutcome(outcome, OperationResult.FAILURE, rawResponse, response));
 
 305                 logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
 
 306                                 params.getRequestId());
 
 312         // see if the limit for the number of polls has been reached
 
 313         if (pollCount++ >= cfg.getMaxPolls()) {
 
 314             logger.warn("{}: execeeded 'poll' limit {} for {}", getFullName(), cfg.getMaxPolls(),
 
 315                             params.getRequestId());
 
 316             setOutcome(outcome, OperationResult.FAILURE_TIMEOUT);
 
 317             return CompletableFuture.completedFuture(outcome);
 
 320         // sleep and then poll
 
 321         Function<Void, CompletableFuture<OperationOutcome>> doPoll = unused -> issuePoll(outcome);
 
 322         return sleep(getPollWaitMs(), TimeUnit.MILLISECONDS).thenComposeAsync(doPoll);
 
 326      * Polls to see if the original request is complete. This method polls using an HTTP
 
 327      * "get" request whose URL is constructed by appending the extracted "poll ID" to the
 
 328      * poll path from the configuration data.
 
 330      * @param outcome outcome to be populated with the response
 
 331      * @return a future that can be used to cancel the poll or await its response
 
 333     protected CompletableFuture<OperationOutcome> issuePoll(OperationOutcome outcome) {
 
 334         String path = getPollingPath();
 
 335         String url = getClient().getBaseUrl() + path;
 
 337         logger.debug("{}: 'poll' count {} for {}", getFullName(), pollCount, params.getRequestId());
 
 339         logMessage(EventType.OUT, CommInfrastructure.REST, url, null);
 
 341         return handleResponse(outcome, url, callback -> getClient().get(callback, path, null));
 
 345      * Determines the status of the response. This particular method simply throws an
 
 348      * @param rawResponse raw response
 
 349      * @param response decoded response
 
 350      * @return the status of the response
 
 352     protected Status detmStatus(Response rawResponse, T response) {
 
 353         throw new UnsupportedOperationException("cannot determine response status");
 
 357      * Gets the URL to use when polling. Typically, this is some unique ID appended to the
 
 358      * polling path found within the configuration data. This particular method simply
 
 359      * returns the polling path from the configuration data.
 
 361      * @return the URL to use when polling
 
 363     protected String getPollingPath() {
 
 364         return ((HttpPollingConfig) config).getPollPath();
 
 368      * Determines if the response indicates success. This method simply checks the HTTP
 
 371      * @param rawResponse raw response
 
 372      * @param response decoded response
 
 373      * @return {@code true} if the response indicates success, {@code false} otherwise
 
 375     protected boolean isSuccess(Response rawResponse, T response) {
 
 376         return (rawResponse.getStatus() == 200);
 
 380     public <Q> String logMessage(EventType direction, CommInfrastructure infra, String sink, Q request) {
 
 381         String json = super.logMessage(direction, infra, sink, request);
 
 382         NetLoggerUtil.log(direction, infra, sink, json);
 
 386     // these may be overridden by junit tests
 
 388     protected long getPollWaitMs() {
 
 389         HttpPollingConfig cfg = (HttpPollingConfig) config;
 
 391         return TimeUnit.MILLISECONDS.convert(cfg.getPollWaitSec(), TimeUnit.SECONDS);