Remove Target and TargetType
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperation.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.Executor;
26 import java.util.function.BiConsumer;
27 import lombok.Getter;
28 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
29 import org.onap.policy.common.utils.coder.CoderException;
30 import org.onap.policy.common.utils.coder.StandardCoderObject;
31 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
32 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
33 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
34 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
35 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
36 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
37 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * Operation that uses a bidirectional topic.
43  *
44  * @param <S> response type
45  */
46 @Getter
47 public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial {
48     private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class);
49
50     /**
51      * Response status.
52      */
53     public enum Status {
54         SUCCESS, FAILURE, STILL_WAITING
55     }
56
57     /**
58      * Configuration for this operation.
59      */
60     private final BidirectionalTopicConfig config;
61
62     /**
63      * Response class.
64      */
65     private final Class<S> responseClass;
66
67     // fields extracted from "config"
68
69     private final BidirectionalTopicHandler topicHandler;
70     private final Forwarder forwarder;
71
72
73     /**
74      * Constructs the object.
75      *
76      * @param params operation parameters
77      * @param config configuration for this operation
78      * @param clazz response class
79      * @param propertyNames names of properties required by this operation
80      */
81     public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config,
82                     Class<S> clazz, List<String> propertyNames) {
83         super(params, config, propertyNames);
84         this.config = config;
85         this.responseClass = clazz;
86         this.forwarder = config.getForwarder();
87         this.topicHandler = config.getTopicHandler();
88     }
89
90     public long getTimeoutMs() {
91         return config.getTimeoutMs();
92     }
93
94     /**
95      * If no timeout is specified, then it returns the default timeout.
96      */
97     @Override
98     protected long getTimeoutMs(Integer timeoutSec) {
99         // TODO move this method to the superclass
100         return (timeoutSec == null || timeoutSec == 0 ? getTimeoutMs() : super.getTimeoutMs(timeoutSec));
101     }
102
103     /**
104      * Publishes the request and arranges to receive the response.
105      */
106     @Override
107     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
108
109         final Q request = makeRequest(attempt);
110         final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
111
112         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
113         final Executor executor = params.getExecutor();
114
115         // register a listener BEFORE publishing
116
117         BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
118             try {
119                 OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
120                 if (latestOutcome != null) {
121                     // final response - complete the controller
122                     controller.completeAsync(() -> latestOutcome, executor);
123                 }
124             } catch (RuntimeException e) {
125                 logger.warn("{}: failed to process response for {}", getFullName(), params.getRequestId());
126                 controller.completeExceptionally(e);
127             }
128         };
129
130         forwarder.register(expectedKeyValues, listener);
131
132         // ensure listener is unregistered if the controller is canceled
133         controller.add(() -> forwarder.unregister(expectedKeyValues, listener));
134
135         // publish the request
136         try {
137             publishRequest(request);
138         } catch (RuntimeException e) {
139             logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId());
140             forwarder.unregister(expectedKeyValues, listener);
141             throw e;
142         }
143
144         return controller;
145     }
146
147     /**
148      * Makes the request.
149      *
150      * @param attempt operation attempt
151      * @return a new request
152      */
153     protected abstract Q makeRequest(int attempt);
154
155     /**
156      * Gets values, expected in the response, that should match the selector keys.
157      *
158      * @param attempt operation attempt
159      * @param request request to be published
160      * @return a list of the values to be matched by the selector keys
161      */
162     protected abstract List<String> getExpectedKeyValues(int attempt, Q request);
163
164     /**
165      * Publishes the request. Encodes the request, if it is not already a String.
166      *
167      * @param request request to be published
168      */
169     protected void publishRequest(Q request) {
170         String json = prettyPrint(request);
171         logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), json);
172
173         if (!topicHandler.send(json)) {
174             throw new IllegalStateException("nothing published");
175         }
176     }
177
178     /**
179      * Processes a response.
180      *
181      * @param infra communication infrastructure on which the response was received
182      * @param outcome outcome to be populated
183      * @param response raw response to process
184      * @param scoResponse response, as a {@link StandardCoderObject}
185      * @return the outcome, or {@code null} if still waiting for completion
186      */
187     protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
188                     StandardCoderObject scoResponse) {
189
190         logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
191
192         logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(),
193                         rawResponse);
194
195         // decode the response
196         S response;
197         if (responseClass == String.class) {
198             response = responseClass.cast(rawResponse);
199
200         } else if (responseClass == StandardCoderObject.class) {
201             response = responseClass.cast(scoResponse);
202
203         } else {
204             try {
205                 response = getCoder().decode(rawResponse, responseClass);
206             } catch (CoderException e) {
207                 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
208                                 params.getRequestId());
209                 throw new IllegalArgumentException("cannot decode response", e);
210             }
211         }
212
213         // check its status
214         switch (detmStatus(rawResponse, response)) {
215             case SUCCESS:
216                 logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
217                                 params.getRequestId());
218                 setOutcome(outcome, OperationResult.SUCCESS, response);
219                 postProcessResponse(outcome, rawResponse, response);
220                 return outcome;
221
222             case FAILURE:
223                 logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
224                                 params.getRequestId());
225                 return setOutcome(outcome, OperationResult.FAILURE, response);
226
227             case STILL_WAITING:
228             default:
229                 logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
230                                 params.getRequestId());
231                 return null;
232         }
233     }
234
235     /**
236      * Sets an operation's outcome and default message based on the result.
237      *
238      * @param outcome operation to be updated
239      * @param result result of the operation
240      * @param response response used to populate the outcome
241      * @return the updated operation
242      */
243     public OperationOutcome setOutcome(OperationOutcome outcome, OperationResult result, S response) {
244         outcome.setResponse(response);
245         return setOutcome(outcome, result);
246     }
247
248     /**
249      * Processes a successful response.
250      *
251      * @param outcome outcome to be populated
252      * @param rawResponse raw response
253      * @param response decoded response
254      */
255     protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) {
256         // do nothing
257     }
258
259     /**
260      * Determines the status of the response.
261      *
262      * @param rawResponse raw response
263      * @param response decoded response
264      * @return the status of the response
265      */
266     protected abstract Status detmStatus(String rawResponse, S response);
267 }