* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import lombok.Getter;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
-import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
+import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
SUCCESS, FAILURE, STILL_WAITING
}
- // fields extracted from the operator
-
- private final BidirectionalTopicHandler topicHandler;
- private final Forwarder forwarder;
- private final BidirectionalTopicParams topicParams;
- private final long timeoutMs;
+ /**
+ * Configuration for this operation.
+ */
+ private final BidirectionalTopicConfig config;
/**
* Response class.
*/
private final Class<S> responseClass;
+ // fields extracted from "config"
+
+ private final BidirectionalTopicHandler topicHandler;
+ private final Forwarder forwarder;
+
/**
* Constructs the object.
*
* @param params operation parameters
- * @param operator operator that created this operation
+ * @param config configuration for this operation
* @param clazz response class
+ * @param propertyNames names of properties required by this operation
*/
- public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicOperator operator,
- Class<S> clazz) {
- super(params, operator);
- this.topicHandler = operator.getTopicHandler();
- this.forwarder = operator.getForwarder();
- this.topicParams = operator.getParams();
+ protected BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config,
+ Class<S> clazz, List<String> propertyNames) {
+ super(params, config, propertyNames);
+ this.config = config;
this.responseClass = clazz;
- this.timeoutMs = TimeUnit.MILLISECONDS.convert(topicParams.getTimeoutSec(), TimeUnit.SECONDS);
+ this.forwarder = config.getForwarder();
+ this.topicHandler = config.getTopicHandler();
+ }
+
+ public long getTimeoutMs() {
+ return config.getTimeoutMs();
}
/**
@Override
protected long getTimeoutMs(Integer timeoutSec) {
// TODO move this method to the superclass
- return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec));
+ return (timeoutSec == null || timeoutSec == 0 ? getTimeoutMs() : super.getTimeoutMs(timeoutSec));
}
/**
// register a listener BEFORE publishing
BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
- OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
- if (latestOutcome != null) {
- // final response - complete the controller
- controller.completeAsync(() -> latestOutcome, executor);
+ try {
+ OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
+ if (latestOutcome != null) {
+ // final response - complete the controller
+ controller.completeAsync(() -> latestOutcome, executor);
+ }
+ } catch (RuntimeException e) {
+ logger.warn("{}: failed to process response for {}", getFullName(), params.getRequestId());
+ controller.completeExceptionally(e);
}
};
* @param request request to be published
*/
protected void publishRequest(Q request) {
- String json;
- try {
- if (request instanceof String) {
- json = request.toString();
- } else {
- json = makeCoder().encode(request);
- }
- } catch (CoderException e) {
- throw new IllegalArgumentException("cannot encode request", e);
- }
+ String json = prettyPrint(request);
+ logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), json);
if (!topicHandler.send(json)) {
throw new IllegalStateException("nothing published");
}
-
- logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request);
}
/**
} else {
try {
- response = makeCoder().decode(rawResponse, responseClass);
+ response = getCoder().decode(rawResponse, responseClass);
} catch (CoderException e) {
logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
params.getRequestId());
case SUCCESS:
logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
params.getRequestId());
- setOutcome(outcome, PolicyResult.SUCCESS, response);
+ setOutcome(outcome, OperationResult.SUCCESS, response);
postProcessResponse(outcome, rawResponse, response);
return outcome;
case FAILURE:
logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
params.getRequestId());
- return setOutcome(outcome, PolicyResult.FAILURE, response);
+ return setOutcome(outcome, OperationResult.FAILURE, response);
case STILL_WAITING:
default:
* @param response response used to populate the outcome
* @return the updated operation
*/
- public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, S response) {
+ public OperationOutcome setOutcome(OperationOutcome outcome, OperationResult result, S response) {
+ outcome.setResponse(response);
return setOutcome(outcome, result);
}