package org.onap.policy.controlloop.actorserviceprovider.impl;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.core.Response;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpConfig;
import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.HttpPollingConfig;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
-import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger logger = LoggerFactory.getLogger(HttpOperation.class);
/**
- * Operator that created this operation.
+ * Response status.
*/
- protected final HttpOperator operator;
+ public enum Status {
+ SUCCESS, FAILURE, STILL_WAITING
+ }
+
+ /**
+ * Configuration for this operation.
+ */
+ private final HttpConfig config;
/**
* Response class.
*/
private final Class<T> responseClass;
+ /**
+ * {@code True} to use polling, {@code false} otherwise.
+ */
+ @Getter
+ private boolean usePolling;
+
+ /**
+ * Number of polls issued so far, on the current operation attempt.
+ */
+ @Getter
+ private int pollCount;
+
/**
* Constructs the object.
*
* @param params operation parameters
- * @param operator operator that created this operation
+ * @param config configuration for this operation
* @param clazz response class
+ * @param propertyNames names of properties required by this operation
*/
- public HttpOperation(ControlLoopOperationParams params, HttpOperator operator, Class<T> clazz) {
- super(params, operator);
- this.operator = operator;
+ public HttpOperation(ControlLoopOperationParams params, HttpConfig config, Class<T> clazz,
+ List<String> propertyNames) {
+ super(params, config, propertyNames);
+ this.config = config;
this.responseClass = clazz;
}
+ /**
+ * Indicates that polling should be used.
+ */
+ protected void setUsePolling() {
+ if (!(config instanceof HttpPollingConfig)) {
+ throw new IllegalStateException("cannot poll without polling parameters");
+ }
+
+ usePolling = true;
+ }
+
+ public HttpClient getClient() {
+ return config.getClient();
+ }
+
+ /**
+ * Gets the path to be used when performing the request; this is typically appended to
+ * the base URL. This method simply invokes {@link #getPath()}.
+ *
+ * @return the path URI suffix
+ */
+ public String getPath() {
+ return config.getPath();
+ }
+
+ public long getTimeoutMs() {
+ return config.getTimeoutMs();
+ }
+
/**
* If no timeout is specified, then it returns the operator's configured timeout.
*/
@Override
protected long getTimeoutMs(Integer timeoutSec) {
- return (timeoutSec == null || timeoutSec == 0 ? operator.getTimeoutMs() : super.getTimeoutMs(timeoutSec));
+ return (timeoutSec == null || timeoutSec == 0 ? getTimeoutMs() : super.getTimeoutMs(timeoutSec));
}
/**
}
/**
- * Gets the path to be used when performing the request; this is typically appended to
- * the base URL. This method simply invokes {@link #getPath()}.
+ * Makes the URL to which the HTTP request should be posted. This is primarily used
+ * for logging purposes. This particular method returns the base URL appended with the
+ * return value from {@link #getPath()}.
*
- * @return the path URI suffix
+ * @return the URL to which from which to get
*/
- public String makePath() {
- return operator.getPath();
+ public String getUrl() {
+ return (getClient().getBaseUrl() + getPath());
}
/**
- * Makes the URL to which the "get" request should be posted. This ir primarily used
- * for logging purposes. This particular method returns the base URL appended with the
- * return value from {@link #makePath()}.
+ * Resets the polling count
*
- * @return the URL to which from which to get
+ * <p/>
+ * Note: This should be invoked at the start of each operation (i.e., in
+ * {@link #startOperationAsync(int, OperationOutcome)}.
*/
- public String makeUrl() {
- return (operator.getClient().getBaseUrl() + makePath());
+ protected void resetPollCount() {
+ pollCount = 0;
}
/**
logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
- String strResponse = HttpClient.getBody(rawResponse, String.class);
+ String strResponse = rawResponse.readEntity(String.class);
logMessage(EventType.IN, CommInfrastructure.REST, url, strResponse);
response = responseClass.cast(strResponse);
} else {
try {
- response = makeCoder().decode(strResponse, responseClass);
+ response = getCoder().decode(strResponse, responseClass);
} catch (CoderException e) {
logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
params.getRequestId(), e);
if (!isSuccess(rawResponse, response)) {
logger.info("{}.{} request failed with http error code {} for {}", params.getActor(), params.getOperation(),
rawResponse.getStatus(), params.getRequestId());
- return CompletableFuture.completedFuture(setOutcome(outcome, PolicyResult.FAILURE, response));
+ return CompletableFuture.completedFuture(
+ setOutcome(outcome, OperationResult.FAILURE, rawResponse, response));
}
logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
- setOutcome(outcome, PolicyResult.SUCCESS, response);
+ setOutcome(outcome, OperationResult.SUCCESS, rawResponse, response);
return postProcessResponse(outcome, url, rawResponse, response);
}
*
* @param outcome operation to be updated
* @param result result of the operation
- * @param response response used to populate the outcome
+ * @param rawResponse raw response
+ * @param response decoded response
* @return the updated operation
*/
- public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, T response) {
+ public OperationOutcome setOutcome(OperationOutcome outcome, OperationResult result, Response rawResponse,
+ T response) {
+
+ outcome.setResponse(response);
return setOutcome(outcome, result);
}
protected CompletableFuture<OperationOutcome> postProcessResponse(OperationOutcome outcome, String url,
Response rawResponse, T response) {
- return CompletableFuture.completedFuture(outcome);
+ if (!usePolling) {
+ // doesn't use polling - just return the completed future
+ return CompletableFuture.completedFuture(outcome);
+ }
+
+ HttpPollingConfig cfg = (HttpPollingConfig) config;
+
+ switch (detmStatus(rawResponse, response)) {
+ case SUCCESS:
+ logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ return CompletableFuture
+ .completedFuture(setOutcome(outcome, OperationResult.SUCCESS, rawResponse, response));
+
+ case FAILURE:
+ logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ return CompletableFuture
+ .completedFuture(setOutcome(outcome, OperationResult.FAILURE, rawResponse, response));
+
+ case STILL_WAITING:
+ default:
+ logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
+ params.getRequestId());
+ break;
+ }
+
+ // still incomplete
+
+ // see if the limit for the number of polls has been reached
+ if (pollCount++ >= cfg.getMaxPolls()) {
+ logger.warn("{}: execeeded 'poll' limit {} for {}", getFullName(), cfg.getMaxPolls(),
+ params.getRequestId());
+ setOutcome(outcome, OperationResult.FAILURE_TIMEOUT);
+ return CompletableFuture.completedFuture(outcome);
+ }
+
+ // sleep and then poll
+ Function<Void, CompletableFuture<OperationOutcome>> doPoll = unused -> issuePoll(outcome);
+ return sleep(getPollWaitMs(), TimeUnit.MILLISECONDS).thenComposeAsync(doPoll);
+ }
+
+ /**
+ * Polls to see if the original request is complete. This method polls using an HTTP
+ * "get" request whose URL is constructed by appending the extracted "poll ID" to the
+ * poll path from the configuration data.
+ *
+ * @param outcome outcome to be populated with the response
+ * @return a future that can be used to cancel the poll or await its response
+ */
+ protected CompletableFuture<OperationOutcome> issuePoll(OperationOutcome outcome) {
+ String path = getPollingPath();
+ String url = getClient().getBaseUrl() + path;
+
+ logger.debug("{}: 'poll' count {} for {}", getFullName(), pollCount, params.getRequestId());
+
+ logMessage(EventType.OUT, CommInfrastructure.REST, url, null);
+
+ return handleResponse(outcome, url, callback -> getClient().get(callback, path, null));
+ }
+
+ /**
+ * Determines the status of the response. This particular method simply throws an
+ * exception.
+ *
+ * @param rawResponse raw response
+ * @param response decoded response
+ * @return the status of the response
+ */
+ protected Status detmStatus(Response rawResponse, T response) {
+ throw new UnsupportedOperationException("cannot determine response status");
+ }
+
+ /**
+ * Gets the URL to use when polling. Typically, this is some unique ID appended to the
+ * polling path found within the configuration data. This particular method simply
+ * returns the polling path from the configuration data.
+ *
+ * @return the URL to use when polling
+ */
+ protected String getPollingPath() {
+ return ((HttpPollingConfig) config).getPollPath();
}
/**
NetLoggerUtil.log(direction, infra, sink, json);
return json;
}
+
+ // these may be overridden by junit tests
+
+ protected long getPollWaitMs() {
+ HttpPollingConfig cfg = (HttpPollingConfig) config;
+
+ return TimeUnit.MILLISECONDS.convert(cfg.getPollWaitSec(), TimeUnit.SECONDS);
+ }
}