6b584d7c632f99f433c457f3239193dff6e27fdc
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / TopicPairOperation.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.TimeUnit;
27 import lombok.Getter;
28 import org.apache.commons.lang3.tuple.Triple;
29 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
30 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
31 import org.onap.policy.common.endpoints.utils.PropertyUtils.TriConsumer;
32 import org.onap.policy.common.utils.coder.Coder;
33 import org.onap.policy.common.utils.coder.CoderException;
34 import org.onap.policy.common.utils.coder.StandardCoder;
35 import org.onap.policy.common.utils.coder.StandardCoderObject;
36 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
37 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
38 import org.onap.policy.controlloop.actorserviceprovider.parameters.TopicPairParams;
39 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
40 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
41 import org.onap.policy.controlloop.actorserviceprovider.topic.TopicPair;
42 import org.onap.policy.controlloop.policy.PolicyResult;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Operation that uses a Topic pair.
48  *
49  * @param <S> response type
50  */
51 @Getter
52 public abstract class TopicPairOperation<Q, S> extends OperationPartial {
53     private static final Logger logger = LoggerFactory.getLogger(TopicPairOperation.class);
54     private static final Coder coder = new StandardCoder();
55
56     // fields extracted from the operator
57
58     private final TopicPair topicPair;
59     private final Forwarder forwarder;
60     private final TopicPairParams pairParams;
61     private final long timeoutMs;
62
63     /**
64      * Response class.
65      */
66     private final Class<S> responseClass;
67
68
69     /**
70      * Constructs the object.
71      *
72      * @param params operation parameters
73      * @param operator operator that created this operation
74      * @param clazz response class
75      */
76     public TopicPairOperation(ControlLoopOperationParams params, TopicPairOperator operator, Class<S> clazz) {
77         super(params, operator);
78         this.topicPair = operator.getTopicPair();
79         this.forwarder = operator.getForwarder();
80         this.pairParams = operator.getParams();
81         this.responseClass = clazz;
82         this.timeoutMs = TimeUnit.MILLISECONDS.convert(pairParams.getTimeoutSec(), TimeUnit.SECONDS);
83     }
84
85     /**
86      * If no timeout is specified, then it returns the default timeout.
87      */
88     @Override
89     protected long getTimeoutMs(Integer timeoutSec) {
90         // TODO move this method to the superclass
91         return (timeoutSec == null || timeoutSec == 0 ? this.timeoutMs : super.getTimeoutMs(timeoutSec));
92     }
93
94     /**
95      * Publishes the request and arranges to receive the response.
96      */
97     @Override
98     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
99
100         final Q request = makeRequest(attempt);
101         final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
102
103         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
104         final CompletableFuture<Triple<CommInfrastructure, String, StandardCoderObject>> future =
105                         new CompletableFuture<>();
106         final Executor executor = params.getExecutor();
107
108         // register a listener BEFORE publishing
109
110         // @formatter:off
111         TriConsumer<CommInfrastructure, String, StandardCoderObject> listener =
112             (infra, rawResponse, scoResponse) -> future.complete(Triple.of(infra, rawResponse, scoResponse));
113         // @formatter:on
114
115         // TODO this currently only allows a single matching response
116
117         forwarder.register(expectedKeyValues, listener);
118
119         // ensure listener is unregistered if the controller is canceled
120         controller.add(() -> forwarder.unregister(expectedKeyValues, listener));
121
122         // publish the request
123         try {
124             publishRequest(request);
125         } catch (RuntimeException e) {
126             logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId());
127             forwarder.unregister(expectedKeyValues, listener);
128             throw e;
129         }
130
131
132         // once "future" completes, process the response, and then complete the controller
133
134         // @formatter:off
135         future.thenApplyAsync(
136             triple -> processResponse(triple.getLeft(), outcome, triple.getMiddle(), triple.getRight()),
137                             executor)
138                         .whenCompleteAsync(controller.delayedComplete(), executor);
139         // @formatter:on
140
141         return controller;
142     }
143
144     /**
145      * Makes the request.
146      *
147      * @param attempt operation attempt
148      * @return a new request
149      */
150     protected abstract Q makeRequest(int attempt);
151
152     /**
153      * Gets values, expected in the response, that should match the selector keys.
154      *
155      * @param attempt operation attempt
156      * @param request request to be published
157      * @return a list of the values to be matched by the selector keys
158      */
159     protected abstract List<String> getExpectedKeyValues(int attempt, Q request);
160
161     /**
162      * Publishes the request. Encodes the request, if it is not already a String.
163      *
164      * @param request request to be published
165      */
166     protected void publishRequest(Q request) {
167         String json;
168         try {
169             if (request instanceof String) {
170                 json = request.toString();
171             } else {
172                 json = makeCoder().encode(request);
173             }
174         } catch (CoderException e) {
175             throw new IllegalArgumentException("cannot encode request", e);
176         }
177
178         List<CommInfrastructure> list = topicPair.publish(json);
179         if (list.isEmpty()) {
180             throw new IllegalStateException("nothing published");
181         }
182
183         logTopicRequest(list, request);
184     }
185
186     /**
187      * Processes a response.
188      *
189      * @param infra communication infrastructure on which the response was received
190      * @param outcome outcome to be populated
191      * @param response raw response to process
192      * @param scoResponse response, as a {@link StandardCoderObject}
193      * @return the outcome
194      */
195     protected OperationOutcome processResponse(CommInfrastructure infra, OperationOutcome outcome, String rawResponse,
196                     StandardCoderObject scoResponse) {
197
198         logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
199
200         logTopicResponse(infra, rawResponse);
201
202         S response;
203         if (responseClass == String.class) {
204             response = responseClass.cast(rawResponse);
205
206         } else if (responseClass == StandardCoderObject.class) {
207             response = responseClass.cast(scoResponse);
208
209         } else {
210             try {
211                 response = makeCoder().decode(rawResponse, responseClass);
212             } catch (CoderException e) {
213                 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
214                                 params.getRequestId());
215                 throw new IllegalArgumentException("cannot decode response", e);
216             }
217         }
218
219         if (!isSuccess(rawResponse, response)) {
220             logger.info("{}.{} request failed  for {}", params.getActor(), params.getOperation(),
221                             params.getRequestId());
222             return setOutcome(outcome, PolicyResult.FAILURE);
223         }
224
225         logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(), params.getRequestId());
226         setOutcome(outcome, PolicyResult.SUCCESS);
227         postProcessResponse(outcome, rawResponse, response);
228
229         return outcome;
230     }
231
232     /**
233      * Processes a successful response.
234      *
235      * @param outcome outcome to be populated
236      * @param rawResponse raw response
237      * @param response decoded response
238      */
239     protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) {
240         // do nothing
241     }
242
243     /**
244      * Determines if the response indicates success.
245      *
246      * @param rawResponse raw response
247      * @param response decoded response
248      * @return {@code true} if the response indicates success, {@code false} otherwise
249      */
250     protected abstract boolean isSuccess(String rawResponse, S response);
251
252     /**
253      * Logs a TOPIC request. If the request is not of type, String, then it attempts to
254      * pretty-print it into JSON before logging.
255      *
256      * @param infrastructures list of communication infrastructures on which it was
257      *        published
258      * @param request request to be logged
259      */
260     protected void logTopicRequest(List<CommInfrastructure> infrastructures, Q request) {
261         if (infrastructures.isEmpty()) {
262             return;
263         }
264
265         String json;
266         try {
267             if (request == null) {
268                 json = null;
269             } else if (request instanceof String) {
270                 json = request.toString();
271             } else {
272                 json = makeCoder().encode(request, true);
273             }
274
275         } catch (CoderException e) {
276             logger.warn("cannot pretty-print request", e);
277             json = request.toString();
278         }
279
280         for (CommInfrastructure infra : infrastructures) {
281             logger.info("[OUT|{}|{}|]{}{}", infra, pairParams.getTarget(), NetLoggerUtil.SYSTEM_LS, json);
282         }
283     }
284
285     /**
286      * Logs a TOPIC response. If the response is not of type, String, then it attempts to
287      * pretty-print it into JSON before logging.
288      *
289      * @param infra communication infrastructure on which the response was received
290      * @param response response to be logged
291      */
292     protected <T> void logTopicResponse(CommInfrastructure infra, T response) {
293         String json;
294         try {
295             if (response == null) {
296                 json = null;
297             } else if (response instanceof String) {
298                 json = response.toString();
299             } else {
300                 json = makeCoder().encode(response, true);
301             }
302
303         } catch (CoderException e) {
304             logger.warn("cannot pretty-print response", e);
305             json = response.toString();
306         }
307
308         logger.info("[IN|{}|{}|]{}{}", infra, pairParams.getSource(), NetLoggerUtil.SYSTEM_LS, json);
309     }
310
311     // these may be overridden by junit tests
312
313     protected Coder makeCoder() {
314         return coder;
315     }
316 }