2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.TimeUnit;
27 import java.util.function.BiConsumer;
29 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
30 import org.onap.policy.common.utils.coder.CoderException;
31 import org.onap.policy.common.utils.coder.StandardCoderObject;
32 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
33 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
34 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
35 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
36 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
37 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
38 import org.onap.policy.controlloop.policy.PolicyResult;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Operation that uses a bidirectional topic.
45 * @param <S> response type
48 public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial {
49 private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class);
55 SUCCESS, FAILURE, STILL_WAITING
58 // fields extracted from the operator
60 private final BidirectionalTopicHandler topicHandler;
61 private final Forwarder forwarder;
62 private final BidirectionalTopicParams topicParams;
63 private final long timeoutMs;
68 private final Class<S> responseClass;
72 * Constructs the object.
74 * @param params operation parameters
75 * @param operator operator that created this operation
76 * @param clazz response class
78 public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicOperator operator,
80 super(params, operator);
81 this.topicHandler = operator.getTopicHandler();
82 this.forwarder = operator.getForwarder();
83 this.topicParams = operator.getParams();
84 this.responseClass = clazz;
85 this.timeoutMs = TimeUnit.MILLISECONDS.convert(topicParams.getTimeoutSec(), TimeUnit.SECONDS);
89 * If no timeout is specified, then it returns the default timeout.
92 protected long getTimeoutMs(Integer timeoutSec) {
93 // TODO move this method to the superclass
94 return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec));
98 * Publishes the request and arranges to receive the response.
101 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
103 final Q request = makeRequest(attempt);
104 final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
106 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
107 final Executor executor = params.getExecutor();
109 // register a listener BEFORE publishing
111 BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
112 OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
113 if (latestOutcome != null) {
114 // final response - complete the controller
115 controller.completeAsync(() -> latestOutcome, executor);
119 forwarder.register(expectedKeyValues, listener);
121 // ensure listener is unregistered if the controller is canceled
122 controller.add(() -> forwarder.unregister(expectedKeyValues, listener));
124 // publish the request
126 publishRequest(request);
127 } catch (RuntimeException e) {
128 logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId());
129 forwarder.unregister(expectedKeyValues, listener);
139 * @param attempt operation attempt
140 * @return a new request
142 protected abstract Q makeRequest(int attempt);
145 * Gets values, expected in the response, that should match the selector keys.
147 * @param attempt operation attempt
148 * @param request request to be published
149 * @return a list of the values to be matched by the selector keys
151 protected abstract List<String> getExpectedKeyValues(int attempt, Q request);
154 * Publishes the request. Encodes the request, if it is not already a String.
156 * @param request request to be published
158 protected void publishRequest(Q request) {
161 if (request instanceof String) {
162 json = request.toString();
164 json = makeCoder().encode(request);
166 } catch (CoderException e) {
167 throw new IllegalArgumentException("cannot encode request", e);
170 if (!topicHandler.send(json)) {
171 throw new IllegalStateException("nothing published");
174 logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request);
178 * Processes a response.
180 * @param infra communication infrastructure on which the response was received
181 * @param outcome outcome to be populated
182 * @param response raw response to process
183 * @param scoResponse response, as a {@link StandardCoderObject}
184 * @return the outcome, or {@code null} if still waiting for completion
186 protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
187 StandardCoderObject scoResponse) {
189 logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
191 logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(),
194 // decode the response
196 if (responseClass == String.class) {
197 response = responseClass.cast(rawResponse);
199 } else if (responseClass == StandardCoderObject.class) {
200 response = responseClass.cast(scoResponse);
204 response = makeCoder().decode(rawResponse, responseClass);
205 } catch (CoderException e) {
206 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
207 params.getRequestId());
208 throw new IllegalArgumentException("cannot decode response", e);
213 switch (detmStatus(rawResponse, response)) {
215 logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
216 params.getRequestId());
217 setOutcome(outcome, PolicyResult.SUCCESS);
218 postProcessResponse(outcome, rawResponse, response);
222 logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
223 params.getRequestId());
224 return setOutcome(outcome, PolicyResult.FAILURE);
228 logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
229 params.getRequestId());
235 * Processes a successful response.
237 * @param outcome outcome to be populated
238 * @param rawResponse raw response
239 * @param response decoded response
241 protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) {
246 * Determines the status of the response.
248 * @param rawResponse raw response
249 * @param response decoded response
250 * @return the status of the response
252 protected abstract Status detmStatus(String rawResponse, S response);