214ae28b704757a9e14e7f03f9b3983746b1912a
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / openecomp / mso / bpmn / common / workflow / service / AbstractCallbackService.java
1 package org.openecomp.mso.bpmn.common.workflow.service;\r
2 \r
3 import java.util.ArrayList;\r
4 import java.util.HashMap;\r
5 import java.util.List;\r
6 import java.util.Map;\r
7 \r
8 import org.camunda.bpm.BpmPlatform;\r
9 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;\r
10 import org.camunda.bpm.engine.ProcessEngineServices;\r
11 import org.camunda.bpm.engine.RuntimeService;\r
12 import org.camunda.bpm.engine.runtime.Execution;\r
13 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;\r
14 import org.openecomp.mso.bpmn.core.PropertyConfiguration;\r
15 import org.openecomp.mso.logger.MessageEnum;\r
16 import org.openecomp.mso.logger.MsoLogger;\r
17 \r
18 /**\r
19  * Abstract base class for callback services.\r
20  */\r
21 public abstract class AbstractCallbackService {\r
22         public static final long DEFAULT_TIMEOUT_SECONDS = 60;\r
23         public static final long FAST_POLL_DUR_SECONDS = 5;\r
24         public static final long FAST_POLL_INT_MS = 100;\r
25         public static final long SLOW_POLL_INT_MS = 1000;\r
26         \r
27         private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);\r
28 \r
29         protected volatile ProcessEngineServices pes4junit = null;\r
30         \r
31         /**\r
32          * Parameterized callback handler.\r
33          */\r
34         protected CallbackResult handleCallback(String method, Object message,\r
35                         String messageEventName, String messageVariable,\r
36                         String correlationVariable, String correlationValue,\r
37                         String logMarker) {\r
38 \r
39                 return handleCallback(method, message, messageEventName, messageVariable,\r
40                         correlationVariable, correlationValue, logMarker, null);\r
41         }\r
42 \r
43         /**\r
44          * Parameterized callback handler.\r
45          */\r
46         protected CallbackResult handleCallback(String method, Object message,\r
47                         String messageEventName, String messageVariable,\r
48                         String correlationVariable, String correlationValue,\r
49                         String logMarker, Map<String, Object> injectedVariables) {\r
50 \r
51                 long startTime = System.currentTimeMillis();\r
52 \r
53                 LOGGER.debug(logMarker + " " + method + " received message: "\r
54                         + (message == null ? "" : System.lineSeparator()) + message);\r
55 \r
56                 try {\r
57                         Map<String, Object> variables = new HashMap<String, Object>();\r
58 \r
59                         if (injectedVariables != null) {\r
60                                 variables.putAll(injectedVariables);\r
61                         }\r
62 \r
63                         variables.put(correlationVariable, correlationValue);\r
64                         variables.put(messageVariable, message == null ? null : message.toString());\r
65 \r
66                         boolean ok = correlate(messageEventName, correlationVariable,\r
67                                 correlationValue, variables, logMarker);\r
68 \r
69                         if (!ok) {\r
70                                 String msg = "No process is waiting for " + messageEventName\r
71                                         + " with " + correlationVariable + " = '" + correlationValue + "'";\r
72                                 logCallbackError(method, startTime, msg);\r
73                                 return new CallbackError(msg);\r
74                         }\r
75 \r
76                         logCallbackSuccess(method, startTime);\r
77                         return new CallbackSuccess();\r
78                 } catch (Exception e) {\r
79                         String msg = "Caught " + e.getClass().getSimpleName()\r
80                                 + " processing " + messageEventName + " with " + correlationVariable\r
81                                 + " = '" + correlationValue + "'";\r
82                         logCallbackError(method, startTime, msg);\r
83                         return new CallbackError(msg);\r
84                 }\r
85         }\r
86         \r
87         /**\r
88          * Performs message correlation.  Waits a limited amount of time for\r
89          * a process to become ready for correlation.  The return value indicates\r
90          * whether or not a process was found to receive the message.  Due to the\r
91          * synchronous nature of message injection in Camunda, by the time this\r
92          * method returns, one of 3 things will have happened: (1) the process\r
93          * received the message and ended, (2) the process received the message\r
94          * and reached an activity that suspended, or (3) an exception occurred\r
95          * during correlation or while the process was executing.  Correlation\r
96          * exceptions are handled differently from process execution exceptions.\r
97          * Correlation exceptions are thrown so the client knows something went\r
98          * wrong with the delivery of the message.  Process execution exceptions\r
99          * are logged but not thrown.\r
100          * @param messageEventName the message event name\r
101          * @param correlationVariable the process variable used as the correlator\r
102          * @param correlationValue the correlation value\r
103          * @param variables variables to inject into the process\r
104          * @param logMarker a marker for debug logging\r
105          * @return true if a process could be found, false if not\r
106          * @throws Exception for correlation errors\r
107          */\r
108         protected boolean correlate(String messageEventName, String correlationVariable,\r
109                         String correlationValue, Map<String, Object> variables, String logMarker)\r
110                         throws Exception {\r
111 \r
112                 LOGGER.debug(logMarker + " Attempting to find process waiting"\r
113                         + " for " + messageEventName + " with " + correlationVariable\r
114                         + " = '" + correlationValue + "'");\r
115 \r
116                 RuntimeService runtimeService =\r
117                         getProcessEngineServices().getRuntimeService();\r
118 \r
119                 Map<String, String> properties =\r
120                         PropertyConfiguration.getInstance().getProperties("mso.bpmn.urn.properties");\r
121 \r
122                 long timeout = DEFAULT_TIMEOUT_SECONDS;\r
123 \r
124                 // The code is here in case we ever need to change the default.\r
125                 String s = properties.get("mso.correlation.timeout");\r
126                 if (s != null) {\r
127                         try {\r
128                                 timeout = Long.parseLong(s);\r
129                         } catch (NumberFormatException e) {\r
130                                 // Ignore\r
131                         }\r
132                 }\r
133 \r
134                 long now = System.currentTimeMillis();\r
135                 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);\r
136                 long endTime = now + (timeout * 1000);\r
137                 long sleep = FAST_POLL_INT_MS;\r
138 \r
139                 List<Execution> waitingProcesses = null;\r
140                 Exception queryException = null;\r
141                 int queryCount = 0;\r
142                 int queryFailCount = 0;\r
143 \r
144                 while (true) {\r
145                         try {\r
146                                 ++queryCount;\r
147                                 waitingProcesses = runtimeService.createExecutionQuery()\r
148                                         .messageEventSubscriptionName(messageEventName)\r
149                                         .processVariableValueEquals(correlationVariable, correlationValue)\r
150                                         .list();\r
151                         } catch (Exception e) {\r
152                                 ++queryFailCount;\r
153                                 queryException = e;\r
154                         }\r
155 \r
156                         if (waitingProcesses != null && waitingProcesses.size() > 0) {\r
157                                 break;\r
158                         }\r
159 \r
160                         if (now > endTime - sleep) {\r
161                                 break;\r
162                         }\r
163 \r
164                         Thread.sleep(sleep);\r
165                         now = System.currentTimeMillis();\r
166 \r
167                         if (now > fastPollEndTime) {\r
168                                 sleep = SLOW_POLL_INT_MS;\r
169                         }\r
170                 }\r
171 \r
172                 if (waitingProcesses == null) {\r
173                         waitingProcesses = new ArrayList<Execution>(0);\r
174                 }\r
175 \r
176                 int count = waitingProcesses.size();\r
177 \r
178                 List<ExecInfo> execInfoList = new ArrayList<ExecInfo>(count);\r
179                 for (Execution execution : waitingProcesses) {\r
180                         execInfoList.add(new ExecInfo(execution));\r
181                 }\r
182 \r
183                 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"\r
184                         + " for " + messageEventName + " with " + correlationVariable\r
185                         + " = '" + correlationValue + "': " + execInfoList);\r
186 \r
187                 if (count == 0) {\r
188                         if (queryFailCount > 0) {\r
189                                 String msg = queryFailCount + "/" + queryCount\r
190                                         + " execution queries failed attempting to correlate "\r
191                                         + messageEventName + " with " + correlationVariable\r
192                                         + " = '" + correlationValue + "'; last exception was:"\r
193                                         + queryException;\r
194                                 LOGGER.debug(msg);\r
195                                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
196                                         MsoLogger.ErrorCode.UnknownError, msg, queryException);\r
197                         }\r
198 \r
199                         return false;\r
200                 }\r
201 \r
202                 if (count > 1) {\r
203                         // Only one process should be waiting. Throw an exception back to the client.\r
204                         throw new MismatchingMessageCorrelationException(messageEventName,\r
205                                 "more than 1 process is waiting with " + correlationVariable\r
206                                 + " = '" + correlationValue + "'");\r
207                 }\r
208                 \r
209                 // We prototyped an asynchronous solution, i.e. resuming the process\r
210                 // flow in a separate thread, but this affected too many existing tests,\r
211                 // and we went back to the synchronous solution. The synchronous solution\r
212                 // has some troublesome characteristics though.  For example, the\r
213                 // resumed flow may send request #2 to a remote system before MSO has\r
214                 // acknowledged the notification associated with request #1.  \r
215 \r
216                 try {\r
217                         LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "\r
218                                 + messageEventName + " with " + correlationVariable + " = '"\r
219                                 + correlationValue + "'");\r
220 \r
221                         @SuppressWarnings("unused")\r
222                         MessageCorrelationResult result = runtimeService\r
223                                 .createMessageCorrelation(messageEventName)\r
224                                 .setVariables(variables)\r
225                                 .processInstanceVariableEquals(correlationVariable, correlationValue)\r
226                                 .correlateWithResult();\r
227 \r
228                 } catch (MismatchingMessageCorrelationException e) {\r
229                         // A correlation exception occurred even after we identified\r
230                         // one waiting process.  Throw it back to the client.\r
231                         throw e;\r
232                 } catch (Exception e) {\r
233                         // This must be an exception from the flow itself.  Log it, but don't\r
234                         // report it back to the client.\r
235                         String msg = "Caught " + e.getClass().getSimpleName() + " running "\r
236                                 + execInfoList.get(0) + " after receiving " + messageEventName\r
237                                 + " with " + correlationVariable + " = '" + correlationValue\r
238                                 + "': " + e;\r
239                         LOGGER.debug(msg);\r
240                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
241                                 MsoLogger.ErrorCode.UnknownError, msg, e);\r
242                 }\r
243 \r
244                 return true;\r
245         }\r
246         \r
247         /**\r
248          * Records audit and metric events in the log for a callback success.\r
249          * @param method the method name\r
250          * @param startTime the request start time\r
251          */\r
252         protected void logCallbackSuccess(String method, long startTime) {\r
253                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
254                         MsoLogger.ResponseCode.Suc, "Completed " + method);\r
255 \r
256                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
257                         MsoLogger.ResponseCode.Suc, "Completed " + method,\r
258                         "BPMN", MsoLogger.getServiceName(), null);\r
259         }\r
260 \r
261         /**\r
262          * Records error, audit and metric events in the log for a callback\r
263          * internal error.\r
264          * @param method the method name\r
265          * @param startTime the request start time\r
266          * @param msg the error message\r
267          */\r
268         protected void logCallbackError(String method, long startTime, String msg) {\r
269                 logCallbackError(method, startTime, msg, null);\r
270         }\r
271 \r
272         /**\r
273          * Records error, audit and metric events in the log for a callback\r
274          * internal error.\r
275          * @param method the method name\r
276          * @param startTime the request start time\r
277          * @param msg the error message\r
278          * @param e the exception\r
279          */\r
280         protected void logCallbackError(String method, long startTime, String msg, Exception e) {\r
281                 if (e == null) {\r
282                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), \r
283                                 MsoLogger.ErrorCode.UnknownError, msg);\r
284                 } else {\r
285                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), \r
286                                 MsoLogger.ErrorCode.UnknownError, msg, e);\r
287                 }\r
288 \r
289                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
290                         MsoLogger.ResponseCode.InternalError, "Completed " + method);\r
291 \r
292                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
293                         MsoLogger.ResponseCode.InternalError, "Completed " + method,\r
294                         "BPMN", MsoLogger.getServiceName(), null);\r
295         }\r
296         \r
297         /**\r
298          * Abstract callback result object.\r
299          */\r
300         protected abstract class CallbackResult {\r
301         }\r
302 \r
303         /**\r
304          * Indicates that callback handling was successful.\r
305          */\r
306         protected class CallbackSuccess extends CallbackResult {\r
307         }\r
308 \r
309         /**\r
310          * Indicates that callback handling failed.\r
311          */\r
312         protected class CallbackError extends CallbackResult {\r
313                 private final String errorMessage;\r
314 \r
315                 public CallbackError(String errorMessage) {\r
316                         this.errorMessage = errorMessage;\r
317                 }\r
318 \r
319                 /**\r
320                  * Gets the error message.\r
321                  */\r
322                 public String getErrorMessage() {\r
323                         return errorMessage;\r
324                 }\r
325         }\r
326 \r
327         private static class ExecInfo {\r
328                 private final Execution execution;\r
329 \r
330                 public ExecInfo(Execution execution) {\r
331                         this.execution = execution;\r
332                 }\r
333         \r
334                 @Override\r
335                 public String toString() {\r
336                         return "Process[" + execution.getProcessInstanceId()\r
337                                 + ":" + execution.getId() + "]";\r
338                 }\r
339         }\r
340         \r
341         protected ProcessEngineServices getProcessEngineServices() {\r
342                 if (pes4junit == null) {\r
343                         return BpmPlatform.getDefaultProcessEngine();\r
344                 } else {\r
345                         return pes4junit;\r
346                 }\r
347         }\r
348 \r
349         public void setProcessEngineServices4junit(ProcessEngineServices pes) {\r
350                 pes4junit = pes;\r
351         }\r
352 }