2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.function.BiConsumer;
27 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
28 import org.onap.policy.common.utils.coder.CoderException;
29 import org.onap.policy.common.utils.coder.StandardCoderObject;
30 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
31 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
32 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
33 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
34 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
35 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
36 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
41 * Operation that uses a bidirectional topic.
43 * @param <S> response type
46 public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial {
47 private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class);
53 SUCCESS, FAILURE, STILL_WAITING
57 * Configuration for this operation.
59 private final BidirectionalTopicConfig config;
64 private final Class<S> responseClass;
66 // fields extracted from "config"
68 private final BidirectionalTopicHandler topicHandler;
69 private final Forwarder forwarder;
73 * Constructs the object.
75 * @param params operation parameters
76 * @param config configuration for this operation
77 * @param clazz response class
78 * @param propertyNames names of properties required by this operation
80 protected BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config,
81 Class<S> clazz, List<String> propertyNames) {
82 super(params, config, propertyNames);
84 this.responseClass = clazz;
85 this.forwarder = config.getForwarder();
86 this.topicHandler = config.getTopicHandler();
89 public long getTimeoutMs() {
90 return config.getTimeoutMs();
94 * If no timeout is specified, then it returns the default timeout.
97 protected long getTimeoutMs(Integer timeoutSec) {
98 return (timeoutSec == null || timeoutSec == 0 ? getTimeoutMs() : super.getTimeoutMs(timeoutSec));
102 * Publishes the request and arranges to receive the response.
105 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
107 final var request = makeRequest(attempt);
108 final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
110 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
111 final var executor = params.getExecutor();
113 // register a listener BEFORE publishing
115 BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
117 OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
118 if (latestOutcome != null) {
119 // final response - complete the controller
120 controller.completeAsync(() -> latestOutcome, executor);
122 } catch (RuntimeException e) {
123 logger.warn("{}: failed to process response for {}", getFullName(), params.getRequestId());
124 controller.completeExceptionally(e);
128 forwarder.register(expectedKeyValues, listener);
130 // ensure listener is unregistered if the controller is canceled
131 controller.add(() -> forwarder.unregister(expectedKeyValues, listener));
133 // publish the request
135 publishRequest(request);
136 } catch (RuntimeException e) {
137 logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId());
138 forwarder.unregister(expectedKeyValues, listener);
148 * @param attempt operation attempt
149 * @return a new request
151 protected abstract Q makeRequest(int attempt);
154 * Gets values, expected in the response, that should match the selector keys.
156 * @param attempt operation attempt
157 * @param request request to be published
158 * @return a list of the values to be matched by the selector keys
160 protected abstract List<String> getExpectedKeyValues(int attempt, Q request);
163 * Publishes the request. Encodes the request, if it is not already a String.
165 * @param request request to be published
167 protected void publishRequest(Q request) {
168 String json = prettyPrint(request);
169 logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), json);
171 if (!topicHandler.send(json)) {
172 throw new IllegalStateException("nothing published");
177 * Processes a response.
179 * @param outcome outcome to be populated
180 * @param rawResponse raw response to process
181 * @param scoResponse response, as a {@link StandardCoderObject}
182 * @return the outcome, or {@code null} if still waiting for completion
184 protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
185 StandardCoderObject scoResponse) {
187 logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
189 logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(),
192 // decode the response
194 if (responseClass == String.class) {
195 response = responseClass.cast(rawResponse);
197 } else if (responseClass == StandardCoderObject.class) {
198 response = responseClass.cast(scoResponse);
202 response = getCoder().decode(rawResponse, responseClass);
203 } catch (CoderException e) {
204 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
205 params.getRequestId());
206 throw new IllegalArgumentException("cannot decode response", e);
211 switch (detmStatus(rawResponse, response)) {
213 logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
214 params.getRequestId());
215 setOutcome(outcome, OperationResult.SUCCESS, response);
216 postProcessResponse(outcome, rawResponse, response);
220 logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
221 params.getRequestId());
222 return setOutcome(outcome, OperationResult.FAILURE, response);
226 logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
227 params.getRequestId());
233 * Sets an operation's outcome and default message based on the result.
235 * @param outcome operation to be updated
236 * @param result result of the operation
237 * @param response response used to populate the outcome
238 * @return the updated operation
240 public OperationOutcome setOutcome(OperationOutcome outcome, OperationResult result, S response) {
241 outcome.setResponse(response);
242 return setOutcome(outcome, result);
246 * Processes a successful response.
248 * @param outcome outcome to be populated
249 * @param rawResponse raw response
250 * @param response decoded response
252 protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) {
257 * Determines the status of the response.
259 * @param rawResponse raw response
260 * @param response decoded response
261 * @return the status of the response
263 protected abstract Status detmStatus(String rawResponse, S response);