2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.TimeUnit;
28 import org.apache.commons.lang3.tuple.Triple;
29 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
30 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
31 import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
32 import org.onap.policy.common.utils.coder.Coder;
33 import org.onap.policy.common.utils.coder.CoderException;
34 import org.onap.policy.common.utils.coder.StandardCoder;
35 import org.onap.policy.common.utils.coder.StandardCoderObject;
36 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
37 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
38 import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
39 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
40 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
41 import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
42 import org.onap.policy.controlloop.policy.PolicyResult;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * Operation that uses a Topic pair.
49 * @param <S> response type
52 public abstract class TopicPairOperation<Q, S> extends OperationPartial {
53 private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class);
54 private static final Coder coder = new StandardCoder();
56 // fields extracted from the operator
58 private final TopicPair topicPair;
59 private final Forwarder forwarder;
60 private final TopicPairParams pairParams;
61 private final long timeoutMs;
66 private final Class<S> responseClass;
70 * Constructs the object.
72 * @param params operation parameters
73 * @param operator operator that created this operation
74 * @param clazz response class
76 public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) {
77 super(params, operator);
78 this.topicPair = operator.getTopicPair();
79 this.forwarder = operator.getForwarder();
80 this.pairParams = operator.getParams();
81 this.responseClass = clazz;
82 this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS);
86 * If no timeout is specified, then it returns the default timeout.
89 protected long getTimeoutMs(Integer timeoutSec) {
90 // TODO move this method to the superclass
91 return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec));
95 * Publishes the request and arranges to receive the response.
98 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
100 final Q request = makeRequest(attempt);
101 final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
103 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
104 final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future =
105 new CompletableFuture<>();
106 final Executor executor = params.getExecutor();
108 // register a listener BEFORE publishing
111 TriConsumer<CommInfrastructure, String, StandardCoderObject> listener =
112 (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse));
115 // TODO this currently only allows a single matching response
117 forwarder.register(expectedKeyValues, listener);
119 // ensure listener is unregistered if the controller is canceled
120 controller.add(() -> forwarder.unregister(expectedKeyValues, listener));
122 // publish the request
124 publishRequest(request);
125 } catch (RuntimeException e) {
126 logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId());
127 forwarder.unregister(expectedKeyValues, listener);
132 // once "future" completes, process the response, and then complete the controller
135 future.thenApplyAsync(
136 triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()),
138 .whenCompleteAsync(controller.delayedComplete(), executor);
147 * @param attempt operation attempt
148 * @return a new request
150 protected abstract Q makeRequest(int attempt);
153 * Gets values, expected in the response, that should match the selector keys.
155 * @param attempt operation attempt
156 * @param request request to be published
157 * @return a list of the values to be matched by the selector keys
159 protected abstract List<String> getExpectedKeyValues(int attempt, Q request);
162 * Publishes the request. Encodes the request, if it is not already a String.
164 * @param request request to be published
166 protected void publishRequest(Q request) {
169 if (request instanceof String) {
170 json = request.toString();
172 json = makeCoder().encode(request);
174 } catch (CoderException e) {
175 throw new IllegalArgumentException("cannot encode request", e);
178 List<CommInfrastructure> list = topicPair.publish(json);
179 if (list.isEmpty()) {
180 throw new IllegalStateException("nothing published");
183 logTopicRequest(list, request);
187 * Processes a response.
189 * @param infra communication infrastructure on which the response was received
190 * @param outcome outcome to be populated
191 * @param response raw response to process
192 * @param scoResponse response, as a {@link StandardCoderObject}
193 * @return the outcome
195 protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse,
196 StandardCoderObject scoResponse) {
198 logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
200 logTopicResponse(infra, rawResponse);
203 if (responseClass == String.class) {
204 response = responseClass.cast(rawResponse);
206 } else if (responseClass == StandardCoderObject.class) {
207 response = responseClass.cast(scoResponse);
211 response = makeCoder().decode(rawResponse, responseClass);
212 } catch (CoderException e) {
213 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
214 params.getRequestId());
215 throw new IllegalArgumentException("cannot decode response", e);
219 if (!isSuccess(rawResponse, response)) {
220 logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
221 params.getRequestId());
222 return setOutcome(outcome, PolicyResult.FAILURE);
225 logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
226 setOutcome(outcome, PolicyResult.SUCCESS);
227 postProcessResponse(outcome, rawResponse, response);
233 * Processes a successful response.
235 * @param outcome outcome to be populated
236 * @param rawResponse raw response
237 * @param response decoded response
239 protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) {
244 * Determines if the response indicates success.
246 * @param rawResponse raw response
247 * @param response decoded response
248 * @return {@code true} if the response indicates success, {@code false} otherwise
250 protected abstract boolean isSuccess(String rawResponse, S response);
253 * Logs a TOPIC request. If the request is not of type, String, then it attempts to
254 * pretty-print it into JSON before logging.
256 * @param infrastructures list of communication infrastructures on which it was
258 * @param request request to be logged
260 protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) {
261 if (infrastructures.isEmpty()) {
267 if (request == null) {
269 } else if (request instanceof String) {
270 json = request.toString();
272 json = makeCoder().encode(request, true);
275 } catch (CoderException e) {
276 logger.warn("cannot pretty-print request", e);
277 json = request.toString();
280 for (CommInfrastructure infra : infrastructures) {
281 logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json);
286 * Logs a TOPIC response. If the response is not of type, String, then it attempts to
287 * pretty-print it into JSON before logging.
289 * @param infra communication infrastructure on which the response was received
290 * @param response response to be logged
292 protected <T> void logTopicResponse(CommInfrastructure infra, T response) {
295 if (response == null) {
297 } else if (response instanceof String) {
298 json = response.toString();
300 json = makeCoder().encode(response, true);
303 } catch (CoderException e) {
304 logger.warn("cannot pretty-print response", e);
305 json = response.toString();
308 logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json);
311 // these may be overridden by junit tests
313 protected Coder makeCoder() {