13ba46a8520de9a689da498526f98bb5440270c8
[so.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Modifications Copyright (c) 2019 Samsung
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.bpmn.common.workflow.service;
24
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import org.onap.so.logger.LoggingAnchor;
30 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
31 import org.camunda.bpm.engine.OptimisticLockingException;
32 import org.camunda.bpm.engine.RuntimeService;
33 import org.camunda.bpm.engine.runtime.Execution;
34 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
35 import org.onap.so.bpmn.core.UrnPropertiesReader;
36 import org.onap.so.logger.ErrorCode;
37 import org.onap.so.logger.MessageEnum;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.scheduling.annotation.Async;
42 import org.springframework.stereotype.Service;
43
44 /**
45  * Abstract base class for callback services.
46  */
47 @Service
48 public class CallbackHandlerService {
49     public static final long DEFAULT_TIMEOUT_SECONDS = 60;
50     public static final long FAST_POLL_DUR_SECONDS = 5;
51     public static final long FAST_POLL_INT_MS = 100;
52     public static final long SLOW_POLL_INT_MS = 1000;
53
54     private static final Logger logger = LoggerFactory.getLogger(CallbackHandlerService.class);
55
56     private RuntimeService runtimeService;
57
58     @Autowired
59     public CallbackHandlerService(RuntimeService runtimeService) {
60         this.runtimeService = runtimeService;
61     }
62
63     /**
64      * Parameterized callback handler.
65      */
66     @Async
67     protected CallbackResult handleCallback(String method, Object message, String messageEventName,
68             String messageVariable, String correlationVariable, String correlationValue, String logMarker) {
69
70         return handleCallback(method, message, messageEventName, messageVariable, correlationVariable, correlationValue,
71                 logMarker, null);
72     }
73
74     /**
75      * Parameterized callback handler.
76      */
77     protected CallbackResult handleCallback(String method, Object message, String messageEventName,
78             String messageVariable, String correlationVariable, String correlationValue, String logMarker,
79             Map<String, Object> injectedVariables) {
80
81         long startTime = System.currentTimeMillis();
82
83         logger.debug(logMarker + " " + method + " received message: " + (message == null ? "" : System.lineSeparator())
84                 + message);
85
86         try {
87             Map<String, Object> variables = new HashMap<>();
88
89             if (injectedVariables != null) {
90                 variables.putAll(injectedVariables);
91             }
92
93             variables.put(correlationVariable, correlationValue);
94             variables.put(messageVariable, message == null ? null : message.toString());
95
96             boolean ok = correlate(messageEventName, correlationVariable, correlationValue, variables, logMarker);
97
98             if (!ok) {
99                 String msg = "No process is waiting for " + messageEventName + " with " + correlationVariable + " = '"
100                         + correlationValue + "'";
101                 logCallbackError(method, startTime, msg);
102                 return new CallbackError(msg);
103             }
104
105             logCallbackSuccess(method, startTime);
106             return new CallbackSuccess();
107         } catch (Exception e) {
108             logger.debug("Exception :", e);
109             String msg = "Caught " + e.getClass().getSimpleName() + " processing " + messageEventName + " with "
110                     + correlationVariable + " = '" + correlationValue + "'";
111             logCallbackError(method, startTime, msg);
112             return new CallbackError(msg);
113         }
114     }
115
116     /**
117      * Performs message correlation. Waits a limited amount of time for a process to become ready for correlation. The
118      * return value indicates whether or not a process was found to receive the message. Due to the synchronous nature
119      * of message injection in Camunda, by the time this method returns, one of 3 things will have happened: (1) the
120      * process received the message and ended, (2) the process received the message and reached an activity that
121      * suspended, or (3) an exception occurred during correlation or while the process was executing. Correlation
122      * exceptions are handled differently from process execution exceptions. Correlation exceptions are thrown so the
123      * client knows something went wrong with the delivery of the message. Process execution exceptions are logged but
124      * not thrown.
125      * 
126      * @param messageEventName the message event name
127      * @param correlationVariable the process variable used as the correlator
128      * @param correlationValue the correlation value
129      * @param variables variables to inject into the process
130      * @param logMarker a marker for debug logging
131      * @return true if a process could be found, false if not
132      */
133     protected boolean correlate(String messageEventName, String correlationVariable, String correlationValue,
134             Map<String, Object> variables, String logMarker) {
135         try {
136             logger.debug(logMarker + " Attempting to find process waiting" + " for " + messageEventName + " with "
137                     + correlationVariable + " = '" + correlationValue + "'");
138
139
140
141             long timeout = DEFAULT_TIMEOUT_SECONDS;
142
143             // The code is here in case we ever need to change the default.
144             String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
145             if (correlationTimemout != null) {
146                 try {
147                     timeout = Long.parseLong(correlationTimemout);
148                 } catch (NumberFormatException e) {
149                     // Ignore
150                 }
151             }
152
153             long now = System.currentTimeMillis();
154             long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
155             long endTime = now + (timeout * 1000);
156             long sleep = FAST_POLL_INT_MS;
157
158             List<Execution> waitingProcesses = null;
159             Exception queryException = null;
160             int queryCount = 0;
161             int queryFailCount = 0;
162
163             while (true) {
164                 try {
165                     ++queryCount;
166                     waitingProcesses =
167                             runtimeService.createExecutionQuery().messageEventSubscriptionName(messageEventName)
168                                     .processVariableValueEquals(correlationVariable, correlationValue).list();
169                 } catch (Exception e) {
170                     ++queryFailCount;
171                     queryException = e;
172                 }
173
174                 if (waitingProcesses != null && waitingProcesses.size() > 0) {
175                     break;
176                 }
177
178                 if (now > endTime - sleep) {
179                     break;
180                 }
181
182                 Thread.sleep(sleep);
183                 now = System.currentTimeMillis();
184
185                 if (now > fastPollEndTime) {
186                     sleep = SLOW_POLL_INT_MS;
187                 }
188             }
189
190             if (waitingProcesses == null) {
191                 waitingProcesses = new ArrayList<Execution>(0);
192             }
193
194             int count = waitingProcesses.size();
195
196             List<ExecInfo> execInfoList = new ArrayList<>(count);
197             for (Execution execution : waitingProcesses) {
198                 execInfoList.add(new ExecInfo(execution));
199             }
200
201             logger.debug(logMarker + " Found " + count + " process(es) waiting" + " for " + messageEventName + " with "
202                     + correlationVariable + " = '" + correlationValue + "': " + execInfoList);
203
204             if (count == 0) {
205                 if (queryFailCount > 0) {
206                     String msg =
207                             queryFailCount + "/" + queryCount + " execution queries failed attempting to correlate "
208                                     + messageEventName + " with " + correlationVariable + " = '" + correlationValue
209                                     + "'; last exception was:" + queryException;
210                     logger.debug(msg);
211                     logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
212                             ErrorCode.UnknownError.getValue(), msg, queryException);
213                 }
214
215                 return false;
216             }
217
218             if (count > 1) {
219                 // Only one process should be waiting. Throw an exception back to the client.
220                 throw new MismatchingMessageCorrelationException(messageEventName,
221                         "more than 1 process is waiting with " + correlationVariable + " = '" + correlationValue + "'");
222             }
223
224             // We prototyped an asynchronous solution, i.e. resuming the process
225             // flow in a separate thread, but this affected too many existing tests,
226             // and we went back to the synchronous solution. The synchronous solution
227             // has some troublesome characteristics though. For example, the
228             // resumed flow may send request #2 to a remote system before MSO has
229             // acknowledged the notification associated with request #1.
230
231             try {
232                 logger.debug(logMarker + " Running " + execInfoList.get(0) + " to receive " + messageEventName
233                         + " with " + correlationVariable + " = '" + correlationValue + "'");
234
235                 @SuppressWarnings("unused")
236                 MessageCorrelationResult result = runtimeService.createMessageCorrelation(messageEventName)
237                         .setVariables(variables).processInstanceVariableEquals(correlationVariable, correlationValue)
238                         .correlateWithResult();
239
240             } catch (MismatchingMessageCorrelationException e) {
241                 // A correlation exception occurred even after we identified
242                 // one waiting process. Throw it back to the client.
243                 throw e;
244             } catch (OptimisticLockingException ole) {
245
246                 String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
247                         + " with " + correlationVariable + " = '" + correlationValue + "': " + ole;
248                 logger.debug(msg);
249                 logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(),
250                         "BPMN CORRELATION ERROR -", ErrorCode.UnknownError.getValue(), msg, ole);
251
252                 // Retry for OptimisticLocking Exceptions
253                 int retryCount = 0;
254                 String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
255                 if (retryStr != null) {
256                     try {
257                         retryCount = Integer.parseInt(retryStr);
258                     } catch (NumberFormatException e) {
259                         // Ignore
260                     }
261                 }
262
263                 logger.debug("Retry correlate for OptimisticLockingException, retryCount:{}", retryCount);
264
265                 for (; retryCount > 0; retryCount--) {
266
267                     try {
268                         Thread.sleep(SLOW_POLL_INT_MS);
269
270                         @SuppressWarnings("unused")
271                         MessageCorrelationResult result =
272                                 runtimeService.createMessageCorrelation(messageEventName).setVariables(variables)
273                                         .processInstanceVariableEquals(correlationVariable, correlationValue)
274                                         .correlateWithResult();
275                         retryCount = 0;
276                         logger.debug("OptimisticLockingException retry was successful, seting retryCount: {}",
277                                 retryCount);
278                     } catch (OptimisticLockingException olex) {
279                         // oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
280                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:"
281                                 + retryCount + " | exception returned: " + olex;
282                         logger.debug(strMsg);
283                         logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
284                                 ErrorCode.UnknownError.getValue(), strMsg, olex);
285                     } catch (Exception excep) {
286                         retryCount = 0;
287                         // oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
288                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:"
289                                 + retryCount + " | exception returned: " + excep;
290                         logger.debug(strMsg);
291                         logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
292                                 ErrorCode.UnknownError.getValue(), strMsg, excep);
293                     }
294
295                 }
296
297             } catch (Exception e) {
298                 // This must be an exception from the flow itself. Log it, but don't
299                 // report it back to the client.
300                 String msg = "Caught " + e.getClass().getSimpleName() + " running " + execInfoList.get(0)
301                         + " after receiving " + messageEventName + " with " + correlationVariable + " = '"
302                         + correlationValue + "': " + e;
303                 logger.debug(msg);
304                 logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
305                         ErrorCode.UnknownError.getValue(), msg, e);
306             }
307         } catch (Exception e) {
308             // This must be an exception from the flow itself. Log it, but don't
309             // report it back to the client.
310             String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName + " with "
311                     + correlationVariable + " = '" + correlationValue + "': " + e;
312             logger.debug(msg);
313             logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
314                     ErrorCode.UnknownError.getValue(), msg, e);
315         }
316
317         return true;
318     }
319
320     /**
321      * Records audit and metric events in the log for a callback success.
322      * 
323      * @param method the method name
324      * @param startTime the request start time
325      */
326     protected void logCallbackSuccess(String method, long startTime) {}
327
328     /**
329      * Records error, audit and metric events in the log for a callback internal error.
330      * 
331      * @param method the method name
332      * @param startTime the request start time
333      * @param msg the error message
334      */
335     protected void logCallbackError(String method, long startTime, String msg) {
336         logCallbackError(method, startTime, msg, null);
337     }
338
339     /**
340      * Records error, audit and metric events in the log for a callback internal error.
341      * 
342      * @param method the method name
343      * @param startTime the request start time
344      * @param msg the error message
345      * @param e the exception
346      */
347     protected void logCallbackError(String method, long startTime, String msg, Exception e) {
348         if (e == null) {
349             logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN",
350                     ErrorCode.UnknownError.getValue(), msg);
351         } else {
352             logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN",
353                     ErrorCode.UnknownError.getValue(), msg, e);
354         }
355     }
356
357     /**
358      * Abstract callback result object.
359      */
360     protected abstract class CallbackResult {
361     }
362
363     /**
364      * Indicates that callback handling was successful.
365      */
366     protected class CallbackSuccess extends CallbackResult {
367     }
368
369     /**
370      * Indicates that callback handling failed.
371      */
372     protected class CallbackError extends CallbackResult {
373         private final String errorMessage;
374
375         public CallbackError(String errorMessage) {
376             this.errorMessage = errorMessage;
377         }
378
379         /**
380          * Gets the error message.
381          */
382         public String getErrorMessage() {
383             return errorMessage;
384         }
385     }
386
387     private static class ExecInfo {
388         private final Execution execution;
389
390         public ExecInfo(Execution execution) {
391             this.execution = execution;
392         }
393
394         @Override
395         public String toString() {
396             return "Process[" + execution.getProcessInstanceId() + ":" + execution.getId() + "]";
397         }
398     }
399 }