Exception not propagated by processResponse
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperation.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.Executor;
26 import java.util.function.BiConsumer;
27 import lombok.Getter;
28 import org.apache.commons.lang3.tuple.Pair;
29 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
30 import org.onap.policy.common.utils.coder.CoderException;
31 import org.onap.policy.common.utils.coder.StandardCoderObject;
32 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
33 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
34 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
35 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
36 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
37 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
38 import org.onap.policy.controlloop.policy.PolicyResult;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Operation that uses a bidirectional topic.
44  *
45  * @param <S> response type
46  */
47 @Getter
48 public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial {
49     private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class);
50
51     /**
52      * Response status.
53      */
54     public enum Status {
55         SUCCESS, FAILURE, STILL_WAITING
56     }
57
58     /**
59      * Configuration for this operation.
60      */
61     private final BidirectionalTopicConfig config;
62
63     /**
64      * Response class.
65      */
66     private final Class<S> responseClass;
67
68     // fields extracted from "config"
69
70     private final BidirectionalTopicHandler topicHandler;
71     private final Forwarder forwarder;
72
73
74     /**
75      * Constructs the object.
76      *
77      * @param params operation parameters
78      * @param config configuration for this operation
79      * @param clazz response class
80      */
81     public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config,
82                     Class<S> clazz) {
83         super(params, config);
84         this.config = config;
85         this.responseClass = clazz;
86         this.forwarder = config.getForwarder();
87         this.topicHandler = config.getTopicHandler();
88     }
89
90     public long getTimeoutMs() {
91         return config.getTimeoutMs();
92     }
93
94     /**
95      * If no timeout is specified, then it returns the default timeout.
96      */
97     @Override
98     protected long getTimeoutMs(Integer timeoutSec) {
99         // TODO move this method to the superclass
100         return (timeoutSec == null || timeoutSec == 0 ? getTimeoutMs() : super.getTimeoutMs(timeoutSec));
101     }
102
103     /**
104      * Publishes the request and arranges to receive the response.
105      */
106     @Override
107     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
108
109         final Pair<String, Q> pair = makeRequest(attempt);
110         final Q request = pair.getRight();
111         outcome.setSubRequestId(pair.getLeft());
112
113         final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
114
115         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
116         final Executor executor = params.getExecutor();
117
118         // register a listener BEFORE publishing
119
120         BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
121             try {
122                 OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
123                 if (latestOutcome != null) {
124                     // final response - complete the controller
125                     controller.completeAsync(() -> latestOutcome, executor);
126                 }
127             } catch (RuntimeException e) {
128                 logger.warn("{}: failed to process response for {}", getFullName(), params.getRequestId());
129                 controller.completeExceptionally(e);
130             }
131         };
132
133         forwarder.register(expectedKeyValues, listener);
134
135         // ensure listener is unregistered if the controller is canceled
136         controller.add(() -> forwarder.unregister(expectedKeyValues, listener));
137
138         // publish the request
139         try {
140             publishRequest(request);
141         } catch (RuntimeException e) {
142             logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId());
143             forwarder.unregister(expectedKeyValues, listener);
144             throw e;
145         }
146
147         return controller;
148     }
149
150     /**
151      * Makes the request.
152      *
153      * @param attempt operation attempt
154      * @return a pair containing sub request ID, which may be {@code null} and the new
155      *         request
156      */
157     protected abstract Pair<String, Q> makeRequest(int attempt);
158
159     /**
160      * Gets values, expected in the response, that should match the selector keys.
161      *
162      * @param attempt operation attempt
163      * @param request request to be published
164      * @return a list of the values to be matched by the selector keys
165      */
166     protected abstract List<String> getExpectedKeyValues(int attempt, Q request);
167
168     /**
169      * Publishes the request. Encodes the request, if it is not already a String.
170      *
171      * @param request request to be published
172      */
173     protected void publishRequest(Q request) {
174         String json;
175         try {
176             if (request instanceof String) {
177                 json = request.toString();
178             } else {
179                 json = makeCoder().encode(request);
180             }
181         } catch (CoderException e) {
182             throw new IllegalArgumentException("cannot encode request", e);
183         }
184
185         logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request);
186
187         if (!topicHandler.send(json)) {
188             throw new IllegalStateException("nothing published");
189         }
190     }
191
192     /**
193      * Processes a response.
194      *
195      * @param infra communication infrastructure on which the response was received
196      * @param outcome outcome to be populated
197      * @param response raw response to process
198      * @param scoResponse response, as a {@link StandardCoderObject}
199      * @return the outcome, or {@code null} if still waiting for completion
200      */
201     protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
202                     StandardCoderObject scoResponse) {
203
204         logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
205
206         logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(),
207                         rawResponse);
208
209         // decode the response
210         S response;
211         if (responseClass == String.class) {
212             response = responseClass.cast(rawResponse);
213
214         } else if (responseClass == StandardCoderObject.class) {
215             response = responseClass.cast(scoResponse);
216
217         } else {
218             try {
219                 response = makeCoder().decode(rawResponse, responseClass);
220             } catch (CoderException e) {
221                 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
222                                 params.getRequestId());
223                 throw new IllegalArgumentException("cannot decode response", e);
224             }
225         }
226
227         // check its status
228         switch (detmStatus(rawResponse, response)) {
229             case SUCCESS:
230                 logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
231                                 params.getRequestId());
232                 setOutcome(outcome, PolicyResult.SUCCESS, response);
233                 postProcessResponse(outcome, rawResponse, response);
234                 return outcome;
235
236             case FAILURE:
237                 logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
238                                 params.getRequestId());
239                 return setOutcome(outcome, PolicyResult.FAILURE, response);
240
241             case STILL_WAITING:
242             default:
243                 logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
244                                 params.getRequestId());
245                 return null;
246         }
247     }
248
249     /**
250      * Sets an operation's outcome and default message based on the result.
251      *
252      * @param outcome operation to be updated
253      * @param result result of the operation
254      * @param response response used to populate the outcome
255      * @return the updated operation
256      */
257     public OperationOutcome setOutcome(OperationOutcome outcome, PolicyResult result, S response) {
258         return setOutcome(outcome, result);
259     }
260
261     /**
262      * Processes a successful response.
263      *
264      * @param outcome outcome to be populated
265      * @param rawResponse raw response
266      * @param response decoded response
267      */
268     protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) {
269         // do nothing
270     }
271
272     /**
273      * Determines the status of the response.
274      *
275      * @param rawResponse raw response
276      * @param response decoded response
277      * @return the status of the response
278      */
279     protected abstract Status detmStatus(String rawResponse, S response);
280 }