Fix return building on policy get
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperation.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.TimeUnit;
27 import java.util.function.BiConsumer;
28 import lombok.Getter;
29 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
30 import org.onap.policy.common.utils.coder.CoderException;
31 import org.onap.policy.common.utils.coder.StandardCoderObject;
32 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
33 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicParams;
34 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
35 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
36 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
37 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
38 import org.onap.policy.controlloop.policy.PolicyResult;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43  * Operation that uses a bidirectional topic.
44  *
45  * @param <S> response type
46  */
47 @Getter
48 public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial {
49     private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class);
50
51     /**
52      * Response status.
53      */
54     public enum Status {
55         SUCCESS, FAILURE, STILL_WAITING
56     }
57
58     // fields extracted from the operator
59
60     private final BidirectionalTopicHandler topicHandler;
61     private final Forwarder forwarder;
62     private final BidirectionalTopicParams topicParams;
63     private final long timeoutMs;
64
65     /**
66      * Response class.
67      */
68     private final Class<S> responseClass;
69
70
71     /**
72      * Constructs the object.
73      *
74      * @param params operation parameters
75      * @param operator operator that created this operation
76      * @param clazz response class
77      */
78     public BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicOperator operator,
79                     Class<S> clazz) {
80         super(params, operator);
81         this.topicHandler = operator.getTopicHandler();
82         this.forwarder = operator.getForwarder();
83         this.topicParams = operator.getParams();
84         this.responseClass = clazz;
85         this.timeoutMs = TimeUnit.MILLISECONDS.convert(topicParams.getTimeoutSec(), TimeUnit.SECONDS);
86     }
87
88     /**
89      * If no timeout is specified, then it returns the default timeout.
90      */
91     @Override
92     protected long getTimeoutMs(Integer timeoutSec) {
93         // TODO move this method to the superclass
94         return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec));
95     }
96
97     /**
98      * Publishes the request and arranges to receive the response.
99      */
100     @Override
101     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
102
103         final Q request = makeRequest(attempt);
104         final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
105
106         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
107         final Executor executor = params.getExecutor();
108
109         // register a listener BEFORE publishing
110
111         BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
112             OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
113             if (latestOutcome != null) {
114                 // final response - complete the controller
115                 controller.completeAsync(() -> latestOutcome, executor);
116             }
117         };
118
119         forwarder.register(expectedKeyValues, listener);
120
121         // ensure listener is unregistered if the controller is canceled
122         controller.add(() -> forwarder.unregister(expectedKeyValues, listener));
123
124         // publish the request
125         try {
126             publishRequest(request);
127         } catch (RuntimeException e) {
128             logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId());
129             forwarder.unregister(expectedKeyValues, listener);
130             throw e;
131         }
132
133         return controller;
134     }
135
136     /**
137      * Makes the request.
138      *
139      * @param attempt operation attempt
140      * @return a new request
141      */
142     protected abstract Q makeRequest(int attempt);
143
144     /**
145      * Gets values, expected in the response, that should match the selector keys.
146      *
147      * @param attempt operation attempt
148      * @param request request to be published
149      * @return a list of the values to be matched by the selector keys
150      */
151     protected abstract List<String> getExpectedKeyValues(int attempt, Q request);
152
153     /**
154      * Publishes the request. Encodes the request, if it is not already a String.
155      *
156      * @param request request to be published
157      */
158     protected void publishRequest(Q request) {
159         String json;
160         try {
161             if (request instanceof String) {
162                 json = request.toString();
163             } else {
164                 json = makeCoder().encode(request);
165             }
166         } catch (CoderException e) {
167             throw new IllegalArgumentException("cannot encode request", e);
168         }
169
170         if (!topicHandler.send(json)) {
171             throw new IllegalStateException("nothing published");
172         }
173
174         logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), request);
175     }
176
177     /**
178      * Processes a response.
179      *
180      * @param infra communication infrastructure on which the response was received
181      * @param outcome outcome to be populated
182      * @param response raw response to process
183      * @param scoResponse response, as a {@link StandardCoderObject}
184      * @return the outcome, or {@code null} if still waiting for completion
185      */
186     protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
187                     StandardCoderObject scoResponse) {
188
189         logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
190
191         logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(),
192                         rawResponse);
193
194         // decode the response
195         S response;
196         if (responseClass == String.class) {
197             response = responseClass.cast(rawResponse);
198
199         } else if (responseClass == StandardCoderObject.class) {
200             response = responseClass.cast(scoResponse);
201
202         } else {
203             try {
204                 response = makeCoder().decode(rawResponse, responseClass);
205             } catch (CoderException e) {
206                 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
207                                 params.getRequestId());
208                 throw new IllegalArgumentException("cannot decode response", e);
209             }
210         }
211
212         // check its status
213         switch (detmStatus(rawResponse, response)) {
214             case SUCCESS:
215                 logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
216                                 params.getRequestId());
217                 setOutcome(outcome, PolicyResult.SUCCESS);
218                 postProcessResponse(outcome, rawResponse, response);
219                 return outcome;
220
221             case FAILURE:
222                 logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
223                                 params.getRequestId());
224                 return setOutcome(outcome, PolicyResult.FAILURE);
225
226             case STILL_WAITING:
227             default:
228                 logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
229                                 params.getRequestId());
230                 return null;
231         }
232     }
233
234     /**
235      * Processes a successful response.
236      *
237      * @param outcome outcome to be populated
238      * @param rawResponse raw response
239      * @param response decoded response
240      */
241     protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) {
242         // do nothing
243     }
244
245     /**
246      * Determines the status of the response.
247      *
248      * @param rawResponse raw response
249      * @param response decoded response
250      * @return the status of the response
251      */
252     protected abstract Status detmStatus(String rawResponse, S response);
253 }