a4a88597bdfb955406e8d4bdb43957ee87a0b55b
[so.git] /
1 package org.openecomp.mso.bpmn.common.workflow.service;\r
2 \r
3 import java.util.ArrayList;\r
4 import java.util.HashMap;\r
5 import java.util.List;\r
6 import java.util.Map;\r
7 \r
8 import org.camunda.bpm.BpmPlatform;\r
9 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;\r
10 import org.camunda.bpm.engine.ProcessEngineServices;\r
11 import org.camunda.bpm.engine.RuntimeService;\r
12 import org.camunda.bpm.engine.runtime.Execution;\r
13 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;\r
14 import org.openecomp.mso.bpmn.core.PropertyConfiguration;\r
15 import org.openecomp.mso.logger.MessageEnum;\r
16 import org.openecomp.mso.logger.MsoLogger;\r
17 \r
18 /**\r
19  * Abstract base class for callback services.\r
20  */\r
21 public abstract class AbstractCallbackService {\r
22         public static final long DEFAULT_TIMEOUT_SECONDS = 60;\r
23         public static final long FAST_POLL_DUR_SECONDS = 5;\r
24         public static final long FAST_POLL_INT_MS = 100;\r
25         public static final long SLOW_POLL_INT_MS = 1000;\r
26         \r
27         private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);\r
28 \r
29         protected volatile ProcessEngineServices pes4junit = null;\r
30         \r
31         /**\r
32          * Parameterized callback handler.\r
33          */\r
34         protected CallbackResult handleCallback(String method, Object message,\r
35                         String messageEventName, String messageVariable,\r
36                         String correlationVariable, String correlationValue,\r
37                         String logMarker) {\r
38 \r
39                 return handleCallback(method, message, messageEventName, messageVariable,\r
40                         correlationVariable, correlationValue, logMarker, null);\r
41         }\r
42 \r
43         /**\r
44          * Parameterized callback handler.\r
45          */\r
46         protected CallbackResult handleCallback(String method, Object message,\r
47                         String messageEventName, String messageVariable,\r
48                         String correlationVariable, String correlationValue,\r
49                         String logMarker, Map<String, Object> injectedVariables) {\r
50 \r
51                 long startTime = System.currentTimeMillis();\r
52 \r
53                 LOGGER.debug(logMarker + " " + method + " received message: "\r
54                         + (message == null ? "" : System.lineSeparator()) + message);\r
55 \r
56                 try {\r
57                         Map<String, Object> variables = new HashMap<String, Object>();\r
58 \r
59                         if (injectedVariables != null) {\r
60                                 variables.putAll(injectedVariables);\r
61                         }\r
62 \r
63                         variables.put(correlationVariable, correlationValue);\r
64                         variables.put(messageVariable, message == null ? null : message.toString());\r
65 \r
66                         boolean ok = correlate(messageEventName, correlationVariable,\r
67                                 correlationValue, variables, logMarker);\r
68 \r
69                         if (!ok) {\r
70                                 String msg = "No process is waiting for " + messageEventName\r
71                                         + " with " + correlationVariable + " = '" + correlationValue + "'";\r
72                                 logCallbackError(method, startTime, msg);\r
73                                 return new CallbackError(msg);\r
74                         }\r
75 \r
76                         logCallbackSuccess(method, startTime);\r
77                         return new CallbackSuccess();\r
78                 } catch (Exception e) {\r
79                         LOGGER.debug("Exception :",e);\r
80                         String msg = "Caught " + e.getClass().getSimpleName()\r
81                                 + " processing " + messageEventName + " with " + correlationVariable\r
82                                 + " = '" + correlationValue + "'";\r
83                         logCallbackError(method, startTime, msg);\r
84                         return new CallbackError(msg);\r
85                 }\r
86         }\r
87         \r
88         /**\r
89          * Performs message correlation.  Waits a limited amount of time for\r
90          * a process to become ready for correlation.  The return value indicates\r
91          * whether or not a process was found to receive the message.  Due to the\r
92          * synchronous nature of message injection in Camunda, by the time this\r
93          * method returns, one of 3 things will have happened: (1) the process\r
94          * received the message and ended, (2) the process received the message\r
95          * and reached an activity that suspended, or (3) an exception occurred\r
96          * during correlation or while the process was executing.  Correlation\r
97          * exceptions are handled differently from process execution exceptions.\r
98          * Correlation exceptions are thrown so the client knows something went\r
99          * wrong with the delivery of the message.  Process execution exceptions\r
100          * are logged but not thrown.\r
101          * @param messageEventName the message event name\r
102          * @param correlationVariable the process variable used as the correlator\r
103          * @param correlationValue the correlation value\r
104          * @param variables variables to inject into the process\r
105          * @param logMarker a marker for debug logging\r
106          * @return true if a process could be found, false if not\r
107          * @throws Exception for correlation errors\r
108          */\r
109         protected boolean correlate(String messageEventName, String correlationVariable,\r
110                         String correlationValue, Map<String, Object> variables, String logMarker)\r
111                         throws Exception {\r
112         try{\r
113                 LOGGER.debug(logMarker + " Attempting to find process waiting"\r
114                         + " for " + messageEventName + " with " + correlationVariable\r
115                         + " = '" + correlationValue + "'");\r
116 \r
117                 RuntimeService runtimeService =\r
118                         getProcessEngineServices().getRuntimeService();\r
119 \r
120                 Map<String, String> properties =\r
121                         PropertyConfiguration.getInstance().getProperties("mso.bpmn.urn.properties");\r
122 \r
123                 long timeout = DEFAULT_TIMEOUT_SECONDS;\r
124 \r
125                 // The code is here in case we ever need to change the default.\r
126                 String s = properties.get("mso.correlation.timeout");\r
127                 if (s != null) {\r
128                         try {\r
129                                 timeout = Long.parseLong(s);\r
130                         } catch (NumberFormatException e) {\r
131                                 // Ignore\r
132                         }\r
133                 }\r
134 \r
135                 long now = System.currentTimeMillis();\r
136                 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);\r
137                 long endTime = now + (timeout * 1000);\r
138                 long sleep = FAST_POLL_INT_MS;\r
139 \r
140                 List<Execution> waitingProcesses = null;\r
141                 Exception queryException = null;\r
142                 int queryCount = 0;\r
143                 int queryFailCount = 0;\r
144 \r
145                 while (true) {\r
146                         try {\r
147                                 ++queryCount;\r
148                                 waitingProcesses = runtimeService.createExecutionQuery()\r
149                                         .messageEventSubscriptionName(messageEventName)\r
150                                         .processVariableValueEquals(correlationVariable, correlationValue)\r
151                                         .list();\r
152                         } catch (Exception e) {\r
153                                 ++queryFailCount;\r
154                                 queryException = e;\r
155                         }\r
156 \r
157                         if (waitingProcesses != null && waitingProcesses.size() > 0) {\r
158                                 break;\r
159                         }\r
160 \r
161                         if (now > endTime - sleep) {\r
162                                 break;\r
163                         }\r
164 \r
165                         Thread.sleep(sleep);\r
166                         now = System.currentTimeMillis();\r
167 \r
168                         if (now > fastPollEndTime) {\r
169                                 sleep = SLOW_POLL_INT_MS;\r
170                         }\r
171                 }\r
172 \r
173                 if (waitingProcesses == null) {\r
174                         waitingProcesses = new ArrayList<Execution>(0);\r
175                 }\r
176 \r
177                 int count = waitingProcesses.size();\r
178 \r
179                 List<ExecInfo> execInfoList = new ArrayList<ExecInfo>(count);\r
180                 for (Execution execution : waitingProcesses) {\r
181                         execInfoList.add(new ExecInfo(execution));\r
182                 }\r
183 \r
184                 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"\r
185                         + " for " + messageEventName + " with " + correlationVariable\r
186                         + " = '" + correlationValue + "': " + execInfoList);\r
187 \r
188                 if (count == 0) {\r
189                         if (queryFailCount > 0) {\r
190                                 String msg = queryFailCount + "/" + queryCount\r
191                                         + " execution queries failed attempting to correlate "\r
192                                         + messageEventName + " with " + correlationVariable\r
193                                         + " = '" + correlationValue + "'; last exception was:"\r
194                                         + queryException;\r
195                                 LOGGER.debug(msg);\r
196                                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
197                                         MsoLogger.ErrorCode.UnknownError, msg, queryException);\r
198                         }\r
199 \r
200                         return false;\r
201                 }\r
202 \r
203                 if (count > 1) {\r
204                         // Only one process should be waiting. Throw an exception back to the client.\r
205                         throw new MismatchingMessageCorrelationException(messageEventName,\r
206                                 "more than 1 process is waiting with " + correlationVariable\r
207                                 + " = '" + correlationValue + "'");\r
208                 }\r
209                 \r
210                 // We prototyped an asynchronous solution, i.e. resuming the process\r
211                 // flow in a separate thread, but this affected too many existing tests,\r
212                 // and we went back to the synchronous solution. The synchronous solution\r
213                 // has some troublesome characteristics though.  For example, the\r
214                 // resumed flow may send request #2 to a remote system before MSO has\r
215                 // acknowledged the notification associated with request #1.  \r
216 \r
217                 try {\r
218                         LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "\r
219                                 + messageEventName + " with " + correlationVariable + " = '"\r
220                                 + correlationValue + "'");\r
221 \r
222                         @SuppressWarnings("unused")\r
223                         MessageCorrelationResult result = runtimeService\r
224                                 .createMessageCorrelation(messageEventName)\r
225                                 .setVariables(variables)\r
226                                 .processInstanceVariableEquals(correlationVariable, correlationValue)\r
227                                 .correlateWithResult();\r
228 \r
229                 } catch (MismatchingMessageCorrelationException e) {\r
230                         // A correlation exception occurred even after we identified\r
231                         // one waiting process.  Throw it back to the client.\r
232                         throw e;\r
233                 } catch (Exception e) {\r
234                         // This must be an exception from the flow itself.  Log it, but don't\r
235                         // report it back to the client.\r
236                         String msg = "Caught " + e.getClass().getSimpleName() + " running "\r
237                                 + execInfoList.get(0) + " after receiving " + messageEventName\r
238                                 + " with " + correlationVariable + " = '" + correlationValue\r
239                                 + "': " + e;\r
240                         LOGGER.debug(msg);\r
241                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
242                                 MsoLogger.ErrorCode.UnknownError, msg, e);\r
243                 }\r
244         }  catch (Exception e) {\r
245                 // This must be an exception from the flow itself.  Log it, but don't\r
246                 // report it back to the client.\r
247                 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName\r
248                         + " with " + correlationVariable + " = '" + correlationValue\r
249                         + "': " + e;\r
250                 LOGGER.debug(msg);\r
251                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),\r
252                         MsoLogger.ErrorCode.UnknownError, msg, e);\r
253         }       \r
254 \r
255                 return true;\r
256         }\r
257         \r
258         /**\r
259          * Records audit and metric events in the log for a callback success.\r
260          * @param method the method name\r
261          * @param startTime the request start time\r
262          */\r
263         protected void logCallbackSuccess(String method, long startTime) {\r
264                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
265                         MsoLogger.ResponseCode.Suc, "Completed " + method);\r
266 \r
267                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
268                         MsoLogger.ResponseCode.Suc, "Completed " + method,\r
269                         "BPMN", MsoLogger.getServiceName(), null);\r
270         }\r
271 \r
272         /**\r
273          * Records error, audit and metric events in the log for a callback\r
274          * internal error.\r
275          * @param method the method name\r
276          * @param startTime the request start time\r
277          * @param msg the error message\r
278          */\r
279         protected void logCallbackError(String method, long startTime, String msg) {\r
280                 logCallbackError(method, startTime, msg, null);\r
281         }\r
282 \r
283         /**\r
284          * Records error, audit and metric events in the log for a callback\r
285          * internal error.\r
286          * @param method the method name\r
287          * @param startTime the request start time\r
288          * @param msg the error message\r
289          * @param e the exception\r
290          */\r
291         protected void logCallbackError(String method, long startTime, String msg, Exception e) {\r
292                 if (e == null) {\r
293                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), \r
294                                 MsoLogger.ErrorCode.UnknownError, msg);\r
295                 } else {\r
296                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), \r
297                                 MsoLogger.ErrorCode.UnknownError, msg, e);\r
298                 }\r
299 \r
300                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
301                         MsoLogger.ResponseCode.InternalError, "Completed " + method);\r
302 \r
303                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
304                         MsoLogger.ResponseCode.InternalError, "Completed " + method,\r
305                         "BPMN", MsoLogger.getServiceName(), null);\r
306         }\r
307         \r
308         /**\r
309          * Abstract callback result object.\r
310          */\r
311         protected abstract class CallbackResult {\r
312         }\r
313 \r
314         /**\r
315          * Indicates that callback handling was successful.\r
316          */\r
317         protected class CallbackSuccess extends CallbackResult {\r
318         }\r
319 \r
320         /**\r
321          * Indicates that callback handling failed.\r
322          */\r
323         protected class CallbackError extends CallbackResult {\r
324                 private final String errorMessage;\r
325 \r
326                 public CallbackError(String errorMessage) {\r
327                         this.errorMessage = errorMessage;\r
328                 }\r
329 \r
330                 /**\r
331                  * Gets the error message.\r
332                  */\r
333                 public String getErrorMessage() {\r
334                         return errorMessage;\r
335                 }\r
336         }\r
337 \r
338         private static class ExecInfo {\r
339                 private final Execution execution;\r
340 \r
341                 public ExecInfo(Execution execution) {\r
342                         this.execution = execution;\r
343                 }\r
344         \r
345                 @Override\r
346                 public String toString() {\r
347                         return "Process[" + execution.getProcessInstanceId()\r
348                                 + ":" + execution.getId() + "]";\r
349                 }\r
350         }\r
351         \r
352         protected ProcessEngineServices getProcessEngineServices() {\r
353                 if (pes4junit == null) {\r
354                         return BpmPlatform.getDefaultProcessEngine();\r
355                 } else {\r
356                         return pes4junit;\r
357                 }\r
358         }\r
359 \r
360         public void setProcessEngineServices4junit(ProcessEngineServices pes) {\r
361                 pes4junit = pes;\r
362         }\r
363 }