Update policy committers in policy/models
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / impl / BidirectionalTopicOperation.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
22
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.function.BiConsumer;
26 import lombok.Getter;
27 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
28 import org.onap.policy.common.utils.coder.CoderException;
29 import org.onap.policy.common.utils.coder.StandardCoderObject;
30 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
31 import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
32 import org.onap.policy.controlloop.actorserviceprovider.parameters.BidirectionalTopicConfig;
33 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
34 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
35 import org.onap.policy.controlloop.actorserviceprovider.topic.BidirectionalTopicHandler;
36 import org.onap.policy.controlloop.actorserviceprovider.topic.Forwarder;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Operation that uses a bidirectional topic.
42  *
43  * @param <S> response type
44  */
45 @Getter
46 public abstract class BidirectionalTopicOperation<Q, S> extends OperationPartial {
47     private static final Logger logger = LoggerFactory.getLogger(BidirectionalTopicOperation.class);
48
49     /**
50      * Response status.
51      */
52     public enum Status {
53         SUCCESS, FAILURE, STILL_WAITING
54     }
55
56     /**
57      * Configuration for this operation.
58      */
59     private final BidirectionalTopicConfig config;
60
61     /**
62      * Response class.
63      */
64     private final Class<S> responseClass;
65
66     // fields extracted from "config"
67
68     private final BidirectionalTopicHandler topicHandler;
69     private final Forwarder forwarder;
70
71
72     /**
73      * Constructs the object.
74      *
75      * @param params operation parameters
76      * @param config configuration for this operation
77      * @param clazz response class
78      * @param propertyNames names of properties required by this operation
79      */
80     protected BidirectionalTopicOperation(ControlLoopOperationParams params, BidirectionalTopicConfig config,
81                     Class<S> clazz, List<String> propertyNames) {
82         super(params, config, propertyNames);
83         this.config = config;
84         this.responseClass = clazz;
85         this.forwarder = config.getForwarder();
86         this.topicHandler = config.getTopicHandler();
87     }
88
89     public long getTimeoutMs() {
90         return config.getTimeoutMs();
91     }
92
93     /**
94      * If no timeout is specified, then it returns the default timeout.
95      */
96     @Override
97     protected long getTimeoutMs(Integer timeoutSec) {
98         return (timeoutSec == null || timeoutSec == 0 ? getTimeoutMs() : super.getTimeoutMs(timeoutSec));
99     }
100
101     /**
102      * Publishes the request and arranges to receive the response.
103      */
104     @Override
105     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
106
107         final var request = makeRequest(attempt);
108         final List<String> expectedKeyValues = getExpectedKeyValues(attempt, request);
109
110         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
111         final var executor = params.getExecutor();
112
113         // register a listener BEFORE publishing
114
115         BiConsumer<String, StandardCoderObject> listener = (rawResponse, scoResponse) -> {
116             try {
117                 OperationOutcome latestOutcome = processResponse(outcome, rawResponse, scoResponse);
118                 if (latestOutcome != null) {
119                     // final response - complete the controller
120                     controller.completeAsync(() -> latestOutcome, executor);
121                 }
122             } catch (RuntimeException e) {
123                 logger.warn("{}: failed to process response for {}", getFullName(), params.getRequestId());
124                 controller.completeExceptionally(e);
125             }
126         };
127
128         forwarder.register(expectedKeyValues, listener);
129
130         // ensure listener is unregistered if the controller is canceled
131         controller.add(() -> forwarder.unregister(expectedKeyValues, listener));
132
133         // publish the request
134         try {
135             publishRequest(request);
136         } catch (RuntimeException e) {
137             logger.warn("{}: failed to publish request for {}", getFullName(), params.getRequestId());
138             forwarder.unregister(expectedKeyValues, listener);
139             throw e;
140         }
141
142         return controller;
143     }
144
145     /**
146      * Makes the request.
147      *
148      * @param attempt operation attempt
149      * @return a new request
150      */
151     protected abstract Q makeRequest(int attempt);
152
153     /**
154      * Gets values, expected in the response, that should match the selector keys.
155      *
156      * @param attempt operation attempt
157      * @param request request to be published
158      * @return a list of the values to be matched by the selector keys
159      */
160     protected abstract List<String> getExpectedKeyValues(int attempt, Q request);
161
162     /**
163      * Publishes the request. Encodes the request, if it is not already a String.
164      *
165      * @param request request to be published
166      */
167     protected void publishRequest(Q request) {
168         String json = prettyPrint(request);
169         logMessage(EventType.OUT, topicHandler.getSinkTopicCommInfrastructure(), topicHandler.getSinkTopic(), json);
170
171         if (!topicHandler.send(json)) {
172             throw new IllegalStateException("nothing published");
173         }
174     }
175
176     /**
177      * Processes a response.
178      *
179      * @param outcome outcome to be populated
180      * @param rawResponse raw response to process
181      * @param scoResponse response, as a {@link StandardCoderObject}
182      * @return the outcome, or {@code null} if still waiting for completion
183      */
184     protected OperationOutcome processResponse(OperationOutcome outcome, String rawResponse,
185                     StandardCoderObject scoResponse) {
186
187         logger.info("{}.{}: response received for {}", params.getActor(), params.getOperation(), params.getRequestId());
188
189         logMessage(EventType.IN, topicHandler.getSourceTopicCommInfrastructure(), topicHandler.getSourceTopic(),
190                         rawResponse);
191
192         // decode the response
193         S response;
194         if (responseClass == String.class) {
195             response = responseClass.cast(rawResponse);
196
197         } else if (responseClass == StandardCoderObject.class) {
198             response = responseClass.cast(scoResponse);
199
200         } else {
201             try {
202                 response = getCoder().decode(rawResponse, responseClass);
203             } catch (CoderException e) {
204                 logger.warn("{}.{} cannot decode response for {}", params.getActor(), params.getOperation(),
205                                 params.getRequestId());
206                 throw new IllegalArgumentException("cannot decode response", e);
207             }
208         }
209
210         // check its status
211         switch (detmStatus(rawResponse, response)) {
212             case SUCCESS:
213                 logger.info("{}.{} request succeeded for {}", params.getActor(), params.getOperation(),
214                                 params.getRequestId());
215                 setOutcome(outcome, OperationResult.SUCCESS, response);
216                 postProcessResponse(outcome, rawResponse, response);
217                 return outcome;
218
219             case FAILURE:
220                 logger.info("{}.{} request failed for {}", params.getActor(), params.getOperation(),
221                                 params.getRequestId());
222                 return setOutcome(outcome, OperationResult.FAILURE, response);
223
224             case STILL_WAITING:
225             default:
226                 logger.info("{}.{} request incomplete for {}", params.getActor(), params.getOperation(),
227                                 params.getRequestId());
228                 return null;
229         }
230     }
231
232     /**
233      * Sets an operation's outcome and default message based on the result.
234      *
235      * @param outcome operation to be updated
236      * @param result result of the operation
237      * @param response response used to populate the outcome
238      * @return the updated operation
239      */
240     public OperationOutcome setOutcome(OperationOutcome outcome, OperationResult result, S response) {
241         outcome.setResponse(response);
242         return setOutcome(outcome, result);
243     }
244
245     /**
246      * Processes a successful response.
247      *
248      * @param outcome outcome to be populated
249      * @param rawResponse raw response
250      * @param response decoded response
251      */
252     protected void postProcessResponse(OperationOutcome outcome, String rawResponse, S response) {
253         // do nothing
254     }
255
256     /**
257      * Determines the status of the response.
258      *
259      * @param rawResponse raw response
260      * @param response decoded response
261      * @return the status of the response
262      */
263     protected abstract Status detmStatus(String rawResponse, S response);
264 }