Merge "Reorder modifiers"
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / openecomp / mso / bpmn / common / workflow / service / AbstractCallbackService.java
1 /*-\r
2  * ============LICENSE_START=======================================================\r
3  * ONAP - SO\r
4  * ================================================================================\r
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * you may not use this file except in compliance with the License.\r
9  * You may obtain a copy of the License at\r
10  *\r
11  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  * Unless required by applicable law or agreed to in writing, software\r
14  * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * See the License for the specific language governing permissions and\r
17  * limitations under the License.\r
18  * ============LICENSE_END=========================================================\r
19  */\r
20 \r
21 package org.openecomp.mso.bpmn.common.workflow.service;\r
22 \r
23 import java.util.ArrayList;\r
24 import java.util.HashMap;\r
25 import java.util.List;\r
26 import java.util.Map;\r
27 \r
28 import org.camunda.bpm.BpmPlatform;\r
29 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;\r
30 import org.camunda.bpm.engine.OptimisticLockingException;\r
31 import org.camunda.bpm.engine.RuntimeService;\r
32 import org.camunda.bpm.engine.runtime.Execution;\r
33 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;\r
34 import org.openecomp.mso.bpmn.core.PropertyConfiguration;\r
35 import org.openecomp.mso.logger.MessageEnum;\r
36 import org.openecomp.mso.logger.MsoLogger;\r
37 \r
38 /**\r
39  * Abstract base class for callback services.\r
40  */\r
41 public abstract class AbstractCallbackService extends ProcessEngineAwareService {\r
42         public static final long DEFAULT_TIMEOUT_SECONDS = 60;\r
43         public static final long FAST_POLL_DUR_SECONDS = 5;\r
44         public static final long FAST_POLL_INT_MS = 100;\r
45         public static final long SLOW_POLL_INT_MS = 1000;\r
46         \r
47         private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);\r
48 \r
49         /**\r
50          * Parameterized callback handler.\r
51          */\r
52         protected CallbackResult handleCallback(String method, Object message,\r
53                         String messageEventName, String messageVariable,\r
54                         String correlationVariable, String correlationValue,\r
55                         String logMarker) {\r
56 \r
57                 return handleCallback(method, message, messageEventName, messageVariable,\r
58                         correlationVariable, correlationValue, logMarker, null);\r
59         }\r
60 \r
61         /**\r
62          * Parameterized callback handler.\r
63          */\r
64         protected CallbackResult handleCallback(String method, Object message,\r
65                         String messageEventName, String messageVariable,\r
66                         String correlationVariable, String correlationValue,\r
67                         String logMarker, Map<String, Object> injectedVariables) {\r
68 \r
69                 long startTime = System.currentTimeMillis();\r
70 \r
71                 LOGGER.debug(logMarker + " " + method + " received message: "\r
72                         + (message == null ? "" : System.lineSeparator()) + message);\r
73 \r
74                 try {\r
75                         Map<String, Object> variables = new HashMap<>();\r
76 \r
77                         if (injectedVariables != null) {\r
78                                 variables.putAll(injectedVariables);\r
79                         }\r
80 \r
81                         variables.put(correlationVariable, correlationValue);\r
82                         variables.put(messageVariable, message == null ? null : message.toString());\r
83 \r
84                         boolean ok = correlate(messageEventName, correlationVariable,\r
85                                 correlationValue, variables, logMarker);\r
86 \r
87                         if (!ok) {\r
88                                 String msg = "No process is waiting for " + messageEventName\r
89                                         + " with " + correlationVariable + " = '" + correlationValue + "'";\r
90                                 logCallbackError(method, startTime, msg);\r
91                                 return new CallbackError(msg);\r
92                         }\r
93 \r
94                         logCallbackSuccess(method, startTime);\r
95                         return new CallbackSuccess();\r
96                 } catch (Exception e) {\r
97                         LOGGER.debug("Exception :",e);\r
98                         String msg = "Caught " + e.getClass().getSimpleName()\r
99                                 + " processing " + messageEventName + " with " + correlationVariable\r
100                                 + " = '" + correlationValue + "'";\r
101                         logCallbackError(method, startTime, msg);\r
102                         return new CallbackError(msg);\r
103                 }\r
104         }\r
105         \r
106         /**\r
107          * Performs message correlation.  Waits a limited amount of time for\r
108          * a process to become ready for correlation.  The return value indicates\r
109          * whether or not a process was found to receive the message.  Due to the\r
110          * synchronous nature of message injection in Camunda, by the time this\r
111          * method returns, one of 3 things will have happened: (1) the process\r
112          * received the message and ended, (2) the process received the message\r
113          * and reached an activity that suspended, or (3) an exception occurred\r
114          * during correlation or while the process was executing.  Correlation\r
115          * exceptions are handled differently from process execution exceptions.\r
116          * Correlation exceptions are thrown so the client knows something went\r
117          * wrong with the delivery of the message.  Process execution exceptions\r
118          * are logged but not thrown.\r
119          * @param messageEventName the message event name\r
120          * @param correlationVariable the process variable used as the correlator\r
121          * @param correlationValue the correlation value\r
122          * @param variables variables to inject into the process\r
123          * @param logMarker a marker for debug logging\r
124          * @return true if a process could be found, false if not\r
125          * @throws Exception for correlation errors\r
126          */\r
127         protected boolean correlate(String messageEventName, String correlationVariable,\r
128                         String correlationValue, Map<String, Object> variables, String logMarker)\r
129                         throws Exception {\r
130         try{\r
131                 LOGGER.debug(logMarker + " Attempting to find process waiting"\r
132                         + " for " + messageEventName + " with " + correlationVariable\r
133                         + " = '" + correlationValue + "'");\r
134 \r
135                 RuntimeService runtimeService =\r
136                         getProcessEngineServices().getRuntimeService();\r
137 \r
138                 Map<String, String> properties =\r
139                         PropertyConfiguration.getInstance().getProperties("mso.bpmn.urn.properties");\r
140 \r
141                 long timeout = DEFAULT_TIMEOUT_SECONDS;\r
142 \r
143                 // The code is here in case we ever need to change the default.\r
144                 String s = properties.get("mso.correlation.timeout");\r
145                 if (s != null) {\r
146                         try {\r
147                                 timeout = Long.parseLong(s);\r
148                         } catch (NumberFormatException e) {\r
149                                 // Ignore\r
150                         }\r
151                 }\r
152 \r
153                 long now = System.currentTimeMillis();\r
154                 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);\r
155                 long endTime = now + (timeout * 1000);\r
156                 long sleep = FAST_POLL_INT_MS;\r
157 \r
158                 List<Execution> waitingProcesses = null;\r
159                 Exception queryException = null;\r
160                 int queryCount = 0;\r
161                 int queryFailCount = 0;\r
162 \r
163                 while (true) {\r
164                         try {\r
165                                 ++queryCount;\r
166                                 waitingProcesses = runtimeService.createExecutionQuery()\r
167                                         .messageEventSubscriptionName(messageEventName)\r
168                                         .processVariableValueEquals(correlationVariable, correlationValue)\r
169                                         .list();\r
170                         } catch (Exception e) {\r
171                                 ++queryFailCount;\r
172                                 queryException = e;\r
173                         }\r
174 \r
175                         if (waitingProcesses != null && waitingProcesses.size() > 0) {\r
176                                 break;\r
177                         }\r
178 \r
179                         if (now > endTime - sleep) {\r
180                                 break;\r
181                         }\r
182 \r
183                         Thread.sleep(sleep);\r
184                         now = System.currentTimeMillis();\r
185 \r
186                         if (now > fastPollEndTime) {\r
187                                 sleep = SLOW_POLL_INT_MS;\r
188                         }\r
189                 }\r
190 \r
191                 if (waitingProcesses == null) {\r
192                         waitingProcesses = new ArrayList<Execution>(0);\r
193                 }\r
194 \r
195                 int count = waitingProcesses.size();\r
196 \r
197                 List<ExecInfo> execInfoList = new ArrayList<>(count);\r
198                 for (Execution execution : waitingProcesses) {\r
199                         execInfoList.add(new ExecInfo(execution));\r
200                 }\r
201 \r
202                 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"\r
203                         + " for " + messageEventName + " with " + correlationVariable\r
204                         + " = '" + correlationValue + "': " + execInfoList);\r
205 \r
206                 if (count == 0) {\r
207                         if (queryFailCount > 0) {\r
208                                 String msg = queryFailCount + "/" + queryCount\r
209                                         + " execution queries failed attempting to correlate "\r
210                                         + messageEventName + " with " + correlationVariable\r
211                                         + " = '" + correlationValue + "'; last exception was:"\r
212                                         + queryException;\r
213                                 LOGGER.debug(msg);\r
214                                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
215                                         MsoLogger.ErrorCode.UnknownError, msg, queryException);\r
216                         }\r
217 \r
218                         return false;\r
219                 }\r
220 \r
221                 if (count > 1) {\r
222                         // Only one process should be waiting. Throw an exception back to the client.\r
223                         throw new MismatchingMessageCorrelationException(messageEventName,\r
224                                 "more than 1 process is waiting with " + correlationVariable\r
225                                 + " = '" + correlationValue + "'");\r
226                 }\r
227                 \r
228                 // We prototyped an asynchronous solution, i.e. resuming the process\r
229                 // flow in a separate thread, but this affected too many existing tests,\r
230                 // and we went back to the synchronous solution. The synchronous solution\r
231                 // has some troublesome characteristics though.  For example, the\r
232                 // resumed flow may send request #2 to a remote system before MSO has\r
233                 // acknowledged the notification associated with request #1.  \r
234 \r
235                 try {\r
236                         LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "\r
237                                 + messageEventName + " with " + correlationVariable + " = '"\r
238                                 + correlationValue + "'");\r
239 \r
240                         @SuppressWarnings("unused")\r
241                         MessageCorrelationResult result = runtimeService\r
242                                 .createMessageCorrelation(messageEventName)\r
243                                 .setVariables(variables)\r
244                                 .processInstanceVariableEquals(correlationVariable, correlationValue)\r
245                                 .correlateWithResult();\r
246                         \r
247                 } catch (MismatchingMessageCorrelationException e) {\r
248                         // A correlation exception occurred even after we identified\r
249                         // one waiting process.  Throw it back to the client.\r
250                         throw e;\r
251                 } catch (OptimisticLockingException ole) {\r
252                         \r
253                         String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName\r
254                                 + " with " + correlationVariable + " = '" + correlationValue\r
255                                 + "': " + ole;\r
256                         LOGGER.debug(msg);\r
257                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),\r
258                                 MsoLogger.ErrorCode.UnknownError, msg, ole);\r
259                         \r
260                         //Retry for OptimisticLocking Exceptions\r
261                         int retryCount = 0;\r
262                         String retryStr = properties.get("mso.bpmn.optimisticlockingexception.retrycount");\r
263                         if (retryStr != null) {\r
264                                 try {\r
265                                         retryCount = Integer.parseInt(retryStr);\r
266                                 } catch (NumberFormatException e) {\r
267                                         // Ignore\r
268                                 }\r
269                         }\r
270                         \r
271                         LOGGER.debug("Retry correlate for OptimisticLockingException, retryCount:" + retryCount);\r
272                         \r
273                         for (; retryCount >0 ; retryCount--) {\r
274                                 \r
275                                 try{\r
276                                         Thread.sleep(SLOW_POLL_INT_MS);\r
277                                         \r
278                                         @SuppressWarnings("unused")\r
279                                         MessageCorrelationResult result = runtimeService\r
280                                                 .createMessageCorrelation(messageEventName)\r
281                                                 .setVariables(variables)\r
282                                                 .processInstanceVariableEquals(correlationVariable, correlationValue)\r
283                                                 .correlateWithResult();\r
284                                         retryCount = 0;\r
285                                         LOGGER.debug("OptimisticLockingException retry was successful, seting retryCount: " + retryCount);\r
286                                 } catch (OptimisticLockingException olex) {\r
287                                         //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;\r
288                                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + olex;\r
289                                         LOGGER.debug(strMsg);\r
290                                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
291                                                 MsoLogger.ErrorCode.UnknownError, strMsg, olex);\r
292                                 } catch (Exception excep) {\r
293                                         retryCount = 0;\r
294                                         //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;\r
295                                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + excep;\r
296                                         LOGGER.debug(strMsg);\r
297                                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
298                                                 MsoLogger.ErrorCode.UnknownError, strMsg, excep);\r
299                                 }\r
300                 \r
301                         }\r
302                         \r
303                 }catch (Exception e) {\r
304                         // This must be an exception from the flow itself.  Log it, but don't\r
305                         // report it back to the client.\r
306                         String msg = "Caught " + e.getClass().getSimpleName() + " running "\r
307                                 + execInfoList.get(0) + " after receiving " + messageEventName\r
308                                 + " with " + correlationVariable + " = '" + correlationValue\r
309                                 + "': " + e;\r
310                         LOGGER.debug(msg);\r
311                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
312                                 MsoLogger.ErrorCode.UnknownError, msg, e);\r
313                 }\r
314         }  catch (Exception e) {\r
315                 // This must be an exception from the flow itself.  Log it, but don't\r
316                 // report it back to the client.\r
317                 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName\r
318                         + " with " + correlationVariable + " = '" + correlationValue\r
319                         + "': " + e;\r
320                 LOGGER.debug(msg);\r
321                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),\r
322                         MsoLogger.ErrorCode.UnknownError, msg, e);\r
323         }       \r
324 \r
325                 return true;\r
326         }\r
327         \r
328         /**\r
329          * Records audit and metric events in the log for a callback success.\r
330          * @param method the method name\r
331          * @param startTime the request start time\r
332          */\r
333         protected void logCallbackSuccess(String method, long startTime) {\r
334                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
335                         MsoLogger.ResponseCode.Suc, "Completed " + method);\r
336 \r
337                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
338                         MsoLogger.ResponseCode.Suc, "Completed " + method,\r
339                         "BPMN", MsoLogger.getServiceName(), null);\r
340         }\r
341 \r
342         /**\r
343          * Records error, audit and metric events in the log for a callback\r
344          * internal error.\r
345          * @param method the method name\r
346          * @param startTime the request start time\r
347          * @param msg the error message\r
348          */\r
349         protected void logCallbackError(String method, long startTime, String msg) {\r
350                 logCallbackError(method, startTime, msg, null);\r
351         }\r
352 \r
353         /**\r
354          * Records error, audit and metric events in the log for a callback\r
355          * internal error.\r
356          * @param method the method name\r
357          * @param startTime the request start time\r
358          * @param msg the error message\r
359          * @param e the exception\r
360          */\r
361         protected void logCallbackError(String method, long startTime, String msg, Exception e) {\r
362                 if (e == null) {\r
363                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), \r
364                                 MsoLogger.ErrorCode.UnknownError, msg);\r
365                 } else {\r
366                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), \r
367                                 MsoLogger.ErrorCode.UnknownError, msg, e);\r
368                 }\r
369 \r
370                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
371                         MsoLogger.ResponseCode.InternalError, "Completed " + method);\r
372 \r
373                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
374                         MsoLogger.ResponseCode.InternalError, "Completed " + method,\r
375                         "BPMN", MsoLogger.getServiceName(), null);\r
376         }\r
377         \r
378         /**\r
379          * Abstract callback result object.\r
380          */\r
381         protected abstract class CallbackResult {\r
382         }\r
383 \r
384         /**\r
385          * Indicates that callback handling was successful.\r
386          */\r
387         protected class CallbackSuccess extends CallbackResult {\r
388         }\r
389 \r
390         /**\r
391          * Indicates that callback handling failed.\r
392          */\r
393         protected class CallbackError extends CallbackResult {\r
394                 private final String errorMessage;\r
395 \r
396                 public CallbackError(String errorMessage) {\r
397                         this.errorMessage = errorMessage;\r
398                 }\r
399 \r
400                 /**\r
401                  * Gets the error message.\r
402                  */\r
403                 public String getErrorMessage() {\r
404                         return errorMessage;\r
405                 }\r
406         }\r
407 \r
408         private static class ExecInfo {\r
409                 private final Execution execution;\r
410 \r
411                 public ExecInfo(Execution execution) {\r
412                         this.execution = execution;\r
413                 }\r
414         \r
415                 @Override\r
416                 public String toString() {\r
417                         return "Process[" + execution.getProcessInstanceId()\r
418                                 + ":" + execution.getId() + "]";\r
419                 }\r
420         }\r
421 }\r