* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
+ * Modifications Copyright (C) 2023 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.policy.controlloop.actorserviceprovider.impl;
+import jakarta.ws.rs.client.InvocationCallback;
+import jakarta.ws.rs.core.Response;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-import javax.ws.rs.client.InvocationCallback;
-import javax.ws.rs.core.Response;
import lombok.Getter;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.http.client.HttpClient;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpConfig;
import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpPollingConfig;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
-import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class HttpOperation<T> extends OperationPartial {
private static final Logger logger = LoggerFactory.getLogger(HttpOperation.class);
+ /**
+ * Response status.
+ */
+ public enum Status {
+ SUCCESS, FAILURE, STILL_WAITING
+ }
+
/**
* Configuration for this operation.
*/
*/
private final Class<T> responseClass;
+ /**
+ * {@code True} to use polling, {@code false} otherwise.
+ */
+ @Getter
+ private boolean usePolling;
+
+ /**
+ * Number of polls issued so far, on the current operation attempt.
+ */
+ @Getter
+ private int pollCount;
+
/**
* Constructs the object.
* @param params operation parameters
* @param config configuration for this operation
* @param clazz response class
+ * @param propertyNames names of properties required by this operation
*/
- public HttpOperation(ControlLoopOperationParams params, HttpConfig config, Class<T> clazz) {
- super(params, config);
+ protected HttpOperation(ControlLoopOperationParams params, HttpConfig config, Class<T> clazz,
+ List<String> propertyNames) {
+ super(params, config, propertyNames);
this.config = config;
this.responseClass = clazz;
}
+ /**
+ * Indicates that polling should be used.
+ */
+ protected void setUsePolling() {
+ if (!(config instanceof HttpPollingConfig)) {
+ throw new IllegalStateException("cannot poll without polling parameters");
+ }
+
+ usePolling = true;
+ }
+
public HttpClient getClient() {
return config.getClient();
}
+ /**
+ * Gets the path to be used when performing the request; this is typically appended to
+ * the base URL. This method simply invokes {@link #getPath()}.
+ *
+ * @return the path URI suffix
+ */
public String getPath() {
return config.getPath();
}
}
/**
- * Gets the path to be used when performing the request; this is typically appended to
- * the base URL. This method simply invokes {@link #getPath()}.
+ * Makes the URL to which the HTTP request should be posted. This is primarily used
+ * for logging purposes. This particular method returns the base URL appended with the
+ * return value from {@link #getPath()}.
*
- * @return the path URI suffix
+ * @return the URL to which from which to get
*/
- public String makePath() {
- return getPath();
+ public String getUrl() {
+ return (getClient().getBaseUrl() + getPath());
}
/**
- * Makes the URL to which the "get" request should be posted. This is primarily used
- * for logging purposes. This particular method returns the base URL appended with the
- * return value from {@link #makePath()}.
+ * Resets the polling count
*
- * @return the URL to which from which to get
+ * <p/>
+ * Note: This should be invoked at the start of each operation (i.e., in
+ * {@link #startOperationAsync(int, OperationOutcome)}.
*/
- public String makeUrl() {
- return (getClient().getBaseUrl() + makePath());
+ protected void resetPollCount() {
+ pollCount = 0;
}
/**
final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
final CompletableFuture<Response> future = new CompletableFuture<>();
- final Executor executor = params.getExecutor();
+ final var executor = params.getExecutor();
// arrange for the callback to complete "future"
InvocationCallback<Response> callback = new InvocationCallback<>() {
*
* @param outcome outcome to be populate
* @param url URL to which to request was sent
- * @param response raw response to process
+ * @param rawResponse raw response to process
* @return a future to cancel or await the outcome
*/
protected CompletableFuture<OperationOutcome> processResponse(OperationOutcome outcome, String url,
logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
- String strResponse = HttpClient.getBody(rawResponse, String.class);
+ String strResponse = rawResponse.readEntity(String.class);
logMessage(EventType.IN, CommInfrastructure.REST, url, strResponse);
response = responseClass.cast(strResponse);
} else {
try {
- response = makeCoder().decode(strResponse, responseClass);
+ response = getCoder().decode(strResponse, responseClass);
} catch (CoderException e) {
logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
params.getRequestId(), e);
if (!isSuccess(rawResponse, response)) {
logger.info("{}.{} request failed with http error code {} for {}", params.getActor(), params.getOperation(),
rawResponse.getStatus(), params.getRequestId());
- return CompletableFuture.completedFuture(setOutcome(outcome, PolicyResult.FAILURE, rawResponse, response));
+ return CompletableFuture.completedFuture(
+ setOutcome(outcome, OperationResult.FAILURE, rawResponse, response));
}
logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
- setOutcome(outcome, PolicyResult.SUCCESS, rawResponse, response);
+ setOutcome(outcome, OperationResult.SUCCESS, rawResponse, response);
return postProcessResponse(outcome, url, rawResponse, response);
}
* @param response decoded response
* @return the updated operation
*/
- public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, Response rawResponse,
+ public OperationOutcome setOutcome(OperationOutcome outcome, OperationResult result, Response rawResponse,
T response) {
+ outcome.setResponse(response);
return setOutcome(outcome, result);
}
protected CompletableFuture<OperationOutcome> postProcessResponse(OperationOutcome outcome, String url,
Response rawResponse, T response) {
- return CompletableFuture.completedFuture(outcome);
+ if (!usePolling) {
+ // doesn't use polling - just return the completed future
+ return CompletableFuture.completedFuture(outcome);
+ }
+
+ HttpPollingConfig cfg = (HttpPollingConfig) config;
+
+ switch (detmStatus(rawResponse, response)) {
+ case SUCCESS -> {
+ logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ return CompletableFuture
+ .completedFuture(setOutcome(outcome, OperationResult.SUCCESS, rawResponse, response));
+ }
+ case FAILURE -> {
+ logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ return CompletableFuture
+ .completedFuture(setOutcome(outcome, OperationResult.FAILURE, rawResponse, response));
+ }
+ default -> logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ }
+
+ // still incomplete
+
+ // see if the limit for the number of polls has been reached
+ if (pollCount++ >= cfg.getMaxPolls()) {
+ logger.warn("{}: exceeded 'poll' limit {} for {}", getFullName(), cfg.getMaxPolls(),
+ params.getRequestId());
+ setOutcome(outcome, OperationResult.FAILURE_TIMEOUT);
+ return CompletableFuture.completedFuture(outcome);
+ }
+
+ // sleep and then poll
+ Function<Void, CompletableFuture<OperationOutcome>> doPoll = unused -> issuePoll(outcome);
+ return sleep(getPollWaitMs(), TimeUnit.MILLISECONDS).thenComposeAsync(doPoll);
+ }
+
+ /**
+ * Polls to see if the original request is complete. This method polls using an HTTP
+ * "get" request whose URL is constructed by appending the extracted "poll ID" to the
+ * poll path from the configuration data.
+ *
+ * @param outcome outcome to be populated with the response
+ * @return a future that can be used to cancel the poll or await its response
+ */
+ protected CompletableFuture<OperationOutcome> issuePoll(OperationOutcome outcome) {
+ String path = getPollingPath();
+ String url = getClient().getBaseUrl() + path;
+
+ logger.debug("{}: 'poll' count {} for {}", getFullName(), pollCount, params.getRequestId());
+
+ logMessage(EventType.OUT, CommInfrastructure.REST, url, null);
+
+ return handleResponse(outcome, url, callback -> getClient().get(callback, path, null));
+ }
+
+ /**
+ * Determines the status of the response. This particular method simply throws an
+ * exception.
+ *
+ * @param rawResponse raw response
+ * @param response decoded response
+ * @return the status of the response
+ */
+ protected Status detmStatus(Response rawResponse, T response) {
+ throw new UnsupportedOperationException("cannot determine response status");
+ }
+
+ /**
+ * Gets the URL to use when polling. Typically, this is some unique ID appended to the
+ * polling path found within the configuration data. This particular method simply
+ * returns the polling path from the configuration data.
+ *
+ * @return the URL to use when polling
+ */
+ protected String getPollingPath() {
+ return ((HttpPollingConfig) config).getPollPath();
}
/**
NetLoggerUtil.log(direction, infra, sink, json);
return json;
}
+
+ // these may be overridden by junit tests
+
+ protected long getPollWaitMs() {
+ HttpPollingConfig cfg = (HttpPollingConfig) config;
+
+ return TimeUnit.MILLISECONDS.convert(cfg.getPollWaitSec(), TimeUnit.SECONDS);
+ }
}