Replaced all tabs with spaces in java and pom.xml
[so.git] / bpmn / mso-infrastructure-bpmn / src / main / java / org / onap / so / bpmn / common / workflow / service / CallbackHandlerService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Modifications Copyright (c) 2019 Samsung
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.bpmn.common.workflow.service;
24
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
30 import org.camunda.bpm.engine.OptimisticLockingException;
31 import org.camunda.bpm.engine.RuntimeService;
32 import org.camunda.bpm.engine.runtime.Execution;
33 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
34 import org.onap.so.bpmn.core.UrnPropertiesReader;
35 import org.onap.so.logger.ErrorCode;
36 import org.onap.so.logger.MessageEnum;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.springframework.beans.factory.annotation.Autowired;
40 import org.springframework.scheduling.annotation.Async;
41 import org.springframework.stereotype.Service;
42
43 /**
44  * Abstract base class for callback services.
45  */
46 @Service
47 public class CallbackHandlerService {
48     public static final long DEFAULT_TIMEOUT_SECONDS = 60;
49     public static final long FAST_POLL_DUR_SECONDS = 5;
50     public static final long FAST_POLL_INT_MS = 100;
51     public static final long SLOW_POLL_INT_MS = 1000;
52
53     private static final Logger logger = LoggerFactory.getLogger(CallbackHandlerService.class);
54
55     private RuntimeService runtimeService;
56
57     @Autowired
58     public CallbackHandlerService(RuntimeService runtimeService) {
59         this.runtimeService = runtimeService;
60     }
61
62     /**
63      * Parameterized callback handler.
64      */
65     @Async
66     protected CallbackResult handleCallback(String method, Object message, String messageEventName,
67             String messageVariable, String correlationVariable, String correlationValue, String logMarker) {
68
69         return handleCallback(method, message, messageEventName, messageVariable, correlationVariable, correlationValue,
70                 logMarker, null);
71     }
72
73     /**
74      * Parameterized callback handler.
75      */
76     protected CallbackResult handleCallback(String method, Object message, String messageEventName,
77             String messageVariable, String correlationVariable, String correlationValue, String logMarker,
78             Map<String, Object> injectedVariables) {
79
80         long startTime = System.currentTimeMillis();
81
82         logger.debug(logMarker + " " + method + " received message: " + (message == null ? "" : System.lineSeparator())
83                 + message);
84
85         try {
86             Map<String, Object> variables = new HashMap<>();
87
88             if (injectedVariables != null) {
89                 variables.putAll(injectedVariables);
90             }
91
92             variables.put(correlationVariable, correlationValue);
93             variables.put(messageVariable, message == null ? null : message.toString());
94
95             boolean ok = correlate(messageEventName, correlationVariable, correlationValue, variables, logMarker);
96
97             if (!ok) {
98                 String msg = "No process is waiting for " + messageEventName + " with " + correlationVariable + " = '"
99                         + correlationValue + "'";
100                 logCallbackError(method, startTime, msg);
101                 return new CallbackError(msg);
102             }
103
104             logCallbackSuccess(method, startTime);
105             return new CallbackSuccess();
106         } catch (Exception e) {
107             logger.debug("Exception :", e);
108             String msg = "Caught " + e.getClass().getSimpleName() + " processing " + messageEventName + " with "
109                     + correlationVariable + " = '" + correlationValue + "'";
110             logCallbackError(method, startTime, msg);
111             return new CallbackError(msg);
112         }
113     }
114
115     /**
116      * Performs message correlation. Waits a limited amount of time for a process to become ready for correlation. The
117      * return value indicates whether or not a process was found to receive the message. Due to the synchronous nature
118      * of message injection in Camunda, by the time this method returns, one of 3 things will have happened: (1) the
119      * process received the message and ended, (2) the process received the message and reached an activity that
120      * suspended, or (3) an exception occurred during correlation or while the process was executing. Correlation
121      * exceptions are handled differently from process execution exceptions. Correlation exceptions are thrown so the
122      * client knows something went wrong with the delivery of the message. Process execution exceptions are logged but
123      * not thrown.
124      * 
125      * @param messageEventName the message event name
126      * @param correlationVariable the process variable used as the correlator
127      * @param correlationValue the correlation value
128      * @param variables variables to inject into the process
129      * @param logMarker a marker for debug logging
130      * @return true if a process could be found, false if not
131      */
132     protected boolean correlate(String messageEventName, String correlationVariable, String correlationValue,
133             Map<String, Object> variables, String logMarker) {
134         try {
135             logger.debug(logMarker + " Attempting to find process waiting" + " for " + messageEventName + " with "
136                     + correlationVariable + " = '" + correlationValue + "'");
137
138
139
140             long timeout = DEFAULT_TIMEOUT_SECONDS;
141
142             // The code is here in case we ever need to change the default.
143             String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
144             if (correlationTimemout != null) {
145                 try {
146                     timeout = Long.parseLong(correlationTimemout);
147                 } catch (NumberFormatException e) {
148                     // Ignore
149                 }
150             }
151
152             long now = System.currentTimeMillis();
153             long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
154             long endTime = now + (timeout * 1000);
155             long sleep = FAST_POLL_INT_MS;
156
157             List<Execution> waitingProcesses = null;
158             Exception queryException = null;
159             int queryCount = 0;
160             int queryFailCount = 0;
161
162             while (true) {
163                 try {
164                     ++queryCount;
165                     waitingProcesses =
166                             runtimeService.createExecutionQuery().messageEventSubscriptionName(messageEventName)
167                                     .processVariableValueEquals(correlationVariable, correlationValue).list();
168                 } catch (Exception e) {
169                     ++queryFailCount;
170                     queryException = e;
171                 }
172
173                 if (waitingProcesses != null && waitingProcesses.size() > 0) {
174                     break;
175                 }
176
177                 if (now > endTime - sleep) {
178                     break;
179                 }
180
181                 Thread.sleep(sleep);
182                 now = System.currentTimeMillis();
183
184                 if (now > fastPollEndTime) {
185                     sleep = SLOW_POLL_INT_MS;
186                 }
187             }
188
189             if (waitingProcesses == null) {
190                 waitingProcesses = new ArrayList<Execution>(0);
191             }
192
193             int count = waitingProcesses.size();
194
195             List<ExecInfo> execInfoList = new ArrayList<>(count);
196             for (Execution execution : waitingProcesses) {
197                 execInfoList.add(new ExecInfo(execution));
198             }
199
200             logger.debug(logMarker + " Found " + count + " process(es) waiting" + " for " + messageEventName + " with "
201                     + correlationVariable + " = '" + correlationValue + "': " + execInfoList);
202
203             if (count == 0) {
204                 if (queryFailCount > 0) {
205                     String msg =
206                             queryFailCount + "/" + queryCount + " execution queries failed attempting to correlate "
207                                     + messageEventName + " with " + correlationVariable + " = '" + correlationValue
208                                     + "'; last exception was:" + queryException;
209                     logger.debug(msg);
210                     logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
211                             ErrorCode.UnknownError.getValue(), msg, queryException);
212                 }
213
214                 return false;
215             }
216
217             if (count > 1) {
218                 // Only one process should be waiting. Throw an exception back to the client.
219                 throw new MismatchingMessageCorrelationException(messageEventName,
220                         "more than 1 process is waiting with " + correlationVariable + " = '" + correlationValue + "'");
221             }
222
223             // We prototyped an asynchronous solution, i.e. resuming the process
224             // flow in a separate thread, but this affected too many existing tests,
225             // and we went back to the synchronous solution. The synchronous solution
226             // has some troublesome characteristics though. For example, the
227             // resumed flow may send request #2 to a remote system before MSO has
228             // acknowledged the notification associated with request #1.
229
230             try {
231                 logger.debug(logMarker + " Running " + execInfoList.get(0) + " to receive " + messageEventName
232                         + " with " + correlationVariable + " = '" + correlationValue + "'");
233
234                 @SuppressWarnings("unused")
235                 MessageCorrelationResult result = runtimeService.createMessageCorrelation(messageEventName)
236                         .setVariables(variables).processInstanceVariableEquals(correlationVariable, correlationValue)
237                         .correlateWithResult();
238
239             } catch (MismatchingMessageCorrelationException e) {
240                 // A correlation exception occurred even after we identified
241                 // one waiting process. Throw it back to the client.
242                 throw e;
243             } catch (OptimisticLockingException ole) {
244
245                 String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
246                         + " with " + correlationVariable + " = '" + correlationValue + "': " + ole;
247                 logger.debug(msg);
248                 logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
249                         ErrorCode.UnknownError.getValue(), msg, ole);
250
251                 // Retry for OptimisticLocking Exceptions
252                 int retryCount = 0;
253                 String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
254                 if (retryStr != null) {
255                     try {
256                         retryCount = Integer.parseInt(retryStr);
257                     } catch (NumberFormatException e) {
258                         // Ignore
259                     }
260                 }
261
262                 logger.debug("Retry correlate for OptimisticLockingException, retryCount:{}", retryCount);
263
264                 for (; retryCount > 0; retryCount--) {
265
266                     try {
267                         Thread.sleep(SLOW_POLL_INT_MS);
268
269                         @SuppressWarnings("unused")
270                         MessageCorrelationResult result =
271                                 runtimeService.createMessageCorrelation(messageEventName).setVariables(variables)
272                                         .processInstanceVariableEquals(correlationVariable, correlationValue)
273                                         .correlateWithResult();
274                         retryCount = 0;
275                         logger.debug("OptimisticLockingException retry was successful, seting retryCount: {}",
276                                 retryCount);
277                     } catch (OptimisticLockingException olex) {
278                         // oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
279                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:"
280                                 + retryCount + " | exception returned: " + olex;
281                         logger.debug(strMsg);
282                         logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
283                                 ErrorCode.UnknownError.getValue(), strMsg, olex);
284                     } catch (Exception excep) {
285                         retryCount = 0;
286                         // oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
287                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:"
288                                 + retryCount + " | exception returned: " + excep;
289                         logger.debug(strMsg);
290                         logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
291                                 ErrorCode.UnknownError.getValue(), strMsg, excep);
292                     }
293
294                 }
295
296             } catch (Exception e) {
297                 // This must be an exception from the flow itself. Log it, but don't
298                 // report it back to the client.
299                 String msg = "Caught " + e.getClass().getSimpleName() + " running " + execInfoList.get(0)
300                         + " after receiving " + messageEventName + " with " + correlationVariable + " = '"
301                         + correlationValue + "': " + e;
302                 logger.debug(msg);
303                 logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
304                         ErrorCode.UnknownError.getValue(), msg, e);
305             }
306         } catch (Exception e) {
307             // This must be an exception from the flow itself. Log it, but don't
308             // report it back to the client.
309             String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName + " with "
310                     + correlationVariable + " = '" + correlationValue + "': " + e;
311             logger.debug(msg);
312             logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
313                     ErrorCode.UnknownError.getValue(), msg, e);
314         }
315
316         return true;
317     }
318
319     /**
320      * Records audit and metric events in the log for a callback success.
321      * 
322      * @param method the method name
323      * @param startTime the request start time
324      */
325     protected void logCallbackSuccess(String method, long startTime) {}
326
327     /**
328      * Records error, audit and metric events in the log for a callback internal error.
329      * 
330      * @param method the method name
331      * @param startTime the request start time
332      * @param msg the error message
333      */
334     protected void logCallbackError(String method, long startTime, String msg) {
335         logCallbackError(method, startTime, msg, null);
336     }
337
338     /**
339      * Records error, audit and metric events in the log for a callback internal error.
340      * 
341      * @param method the method name
342      * @param startTime the request start time
343      * @param msg the error message
344      * @param e the exception
345      */
346     protected void logCallbackError(String method, long startTime, String msg, Exception e) {
347         if (e == null) {
348             logger.error("{} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN",
349                     ErrorCode.UnknownError.getValue(), msg);
350         } else {
351             logger.error("{} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN",
352                     ErrorCode.UnknownError.getValue(), msg, e);
353         }
354     }
355
356     /**
357      * Abstract callback result object.
358      */
359     protected abstract class CallbackResult {
360     }
361
362     /**
363      * Indicates that callback handling was successful.
364      */
365     protected class CallbackSuccess extends CallbackResult {
366     }
367
368     /**
369      * Indicates that callback handling failed.
370      */
371     protected class CallbackError extends CallbackResult {
372         private final String errorMessage;
373
374         public CallbackError(String errorMessage) {
375             this.errorMessage = errorMessage;
376         }
377
378         /**
379          * Gets the error message.
380          */
381         public String getErrorMessage() {
382             return errorMessage;
383         }
384     }
385
386     private static class ExecInfo {
387         private final Execution execution;
388
389         public ExecInfo(Execution execution) {
390             this.execution = execution;
391         }
392
393         @Override
394         public String toString() {
395             return "Process[" + execution.getProcessInstanceId() + ":" + execution.getId() + "]";
396         }
397     }
398 }