import org.onap.policy.common.utils.coder.CoderException;
import org.onap.policy.common.utils.coder.StandardCoderObject;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
-import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @param params operation parameters
* @param config configuration for this operation
* @param clazz response class
+ * @param propertyNames names of properties required by this operation
*/
public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config,
- Class<S> clazz) {
- super(params, config);
+ Class<S> clazz, List<String> propertyNames) {
+ super(params, config, propertyNames);
this.config = config;
this.responseClass = clazz;
this.forwarder = config.getForwarder();
// register a listener BEFORE publishing
BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
- OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
- if (latestOutcome != null) {
- // final response - complete the controller
- controller.completeAsync(() -> latestOutcome, executor);
+ try {
+ OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
+ if (latestOutcome != null) {
+ // final response - complete the controller
+ controller.completeAsync(() -> latestOutcome, executor);
+ }
+ } catch (RuntimeException e) {
+ logger.warn("{}: failed to process response for {}", getFullName(), params.getRequestId());
+ controller.completeExceptionally(e);
}
};
* @param request request to be published
*/
protected void publishRequest(Q request) {
- String json;
- try {
- if (request instanceof String) {
- json = request.toString();
- } else {
- json = makeCoder().encode(request);
- }
- } catch (CoderException e) {
- throw new IllegalArgumentException("cannot encode request", e);
- }
+ String json = prettyPrint(request);
+ logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), json);
if (!topicHandler.send(json)) {
throw new IllegalStateException("nothing published");
}
-
- logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request);
}
/**
} else {
try {
- response = makeCoder().decode(rawResponse, responseClass);
+ response = getCoder().decode(rawResponse, responseClass);
} catch (CoderException e) {
logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
params.getRequestId());
case SUCCESS:
logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
params.getRequestId());
- setOutcome(outcome, PolicyResult.SUCCESS, response);
+ setOutcome(outcome, OperationResult.SUCCESS, response);
postProcessResponse(outcome, rawResponse, response);
return outcome;
case FAILURE:
logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
params.getRequestId());
- return setOutcome(outcome, PolicyResult.FAILURE, response);
+ return setOutcome(outcome, OperationResult.FAILURE, response);
case STILL_WAITING:
default:
* @param response response used to populate the outcome
* @return the updated operation
*/
- public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, S response) {
+ public OperationOutcome setOutcome(OperationOutcome outcome, OperationResult result, S response) {
+ outcome.setResponse(response);
return setOutcome(outcome, result);
}