daca9fcf3f8026dfdb357d13dcee5db018b48c8d
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / openecomp / mso / bpmn / common / workflow / service / AbstractCallbackService.java
1 /*-\r
2  * ============LICENSE_START=======================================================\r
3  * ONAP - SO\r
4  * ================================================================================\r
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * you may not use this file except in compliance with the License.\r
9  * You may obtain a copy of the License at\r
10  *\r
11  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  * Unless required by applicable law or agreed to in writing, software\r
14  * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * See the License for the specific language governing permissions and\r
17  * limitations under the License.\r
18  * ============LICENSE_END=========================================================\r
19  */\r
20 \r
21 package org.openecomp.mso.bpmn.common.workflow.service;\r
22 \r
23 import java.util.ArrayList;\r
24 import java.util.HashMap;\r
25 import java.util.List;\r
26 import java.util.Map;\r
27 \r
28 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;\r
29 import org.camunda.bpm.engine.RuntimeService;\r
30 import org.camunda.bpm.engine.runtime.Execution;\r
31 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;\r
32 import org.openecomp.mso.bpmn.core.PropertyConfiguration;\r
33 import org.openecomp.mso.logger.MessageEnum;\r
34 import org.openecomp.mso.logger.MsoLogger;\r
35 \r
36 /**\r
37  * Abstract base class for callback services.\r
38  */\r
39 public abstract class AbstractCallbackService extends ProcessEngineAwareService {\r
40         public static final long DEFAULT_TIMEOUT_SECONDS = 60;\r
41         public static final long FAST_POLL_DUR_SECONDS = 5;\r
42         public static final long FAST_POLL_INT_MS = 100;\r
43         public static final long SLOW_POLL_INT_MS = 1000;\r
44         \r
45         private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);\r
46 \r
47         /**\r
48          * Parameterized callback handler.\r
49          */\r
50         protected CallbackResult handleCallback(String method, Object message,\r
51                         String messageEventName, String messageVariable,\r
52                         String correlationVariable, String correlationValue,\r
53                         String logMarker) {\r
54 \r
55                 return handleCallback(method, message, messageEventName, messageVariable,\r
56                         correlationVariable, correlationValue, logMarker, null);\r
57         }\r
58 \r
59         /**\r
60          * Parameterized callback handler.\r
61          */\r
62         protected CallbackResult handleCallback(String method, Object message,\r
63                         String messageEventName, String messageVariable,\r
64                         String correlationVariable, String correlationValue,\r
65                         String logMarker, Map<String, Object> injectedVariables) {\r
66 \r
67                 long startTime = System.currentTimeMillis();\r
68 \r
69                 LOGGER.debug(logMarker + " " + method + " received message: "\r
70                         + (message == null ? "" : System.lineSeparator()) + message);\r
71 \r
72                 try {\r
73                         Map<String, Object> variables = new HashMap<>();\r
74 \r
75                         if (injectedVariables != null) {\r
76                                 variables.putAll(injectedVariables);\r
77                         }\r
78 \r
79                         variables.put(correlationVariable, correlationValue);\r
80                         variables.put(messageVariable, message == null ? null : message.toString());\r
81 \r
82                         boolean ok = correlate(messageEventName, correlationVariable,\r
83                                 correlationValue, variables, logMarker);\r
84 \r
85                         if (!ok) {\r
86                                 String msg = "No process is waiting for " + messageEventName\r
87                                         + " with " + correlationVariable + " = '" + correlationValue + "'";\r
88                                 logCallbackError(method, startTime, msg);\r
89                                 return new CallbackError(msg);\r
90                         }\r
91 \r
92                         logCallbackSuccess(method, startTime);\r
93                         return new CallbackSuccess();\r
94                 } catch (Exception e) {\r
95                         LOGGER.debug("Exception :",e);\r
96                         String msg = "Caught " + e.getClass().getSimpleName()\r
97                                 + " processing " + messageEventName + " with " + correlationVariable\r
98                                 + " = '" + correlationValue + "'";\r
99                         logCallbackError(method, startTime, msg);\r
100                         return new CallbackError(msg);\r
101                 }\r
102         }\r
103         \r
104         /**\r
105          * Performs message correlation.  Waits a limited amount of time for\r
106          * a process to become ready for correlation.  The return value indicates\r
107          * whether or not a process was found to receive the message.  Due to the\r
108          * synchronous nature of message injection in Camunda, by the time this\r
109          * method returns, one of 3 things will have happened: (1) the process\r
110          * received the message and ended, (2) the process received the message\r
111          * and reached an activity that suspended, or (3) an exception occurred\r
112          * during correlation or while the process was executing.  Correlation\r
113          * exceptions are handled differently from process execution exceptions.\r
114          * Correlation exceptions are thrown so the client knows something went\r
115          * wrong with the delivery of the message.  Process execution exceptions\r
116          * are logged but not thrown.\r
117          * @param messageEventName the message event name\r
118          * @param correlationVariable the process variable used as the correlator\r
119          * @param correlationValue the correlation value\r
120          * @param variables variables to inject into the process\r
121          * @param logMarker a marker for debug logging\r
122          * @return true if a process could be found, false if not\r
123          * @throws Exception for correlation errors\r
124          */\r
125         protected boolean correlate(String messageEventName, String correlationVariable,\r
126                         String correlationValue, Map<String, Object> variables, String logMarker)\r
127                         throws Exception {\r
128         try{\r
129                 LOGGER.debug(logMarker + " Attempting to find process waiting"\r
130                         + " for " + messageEventName + " with " + correlationVariable\r
131                         + " = '" + correlationValue + "'");\r
132 \r
133                 RuntimeService runtimeService =\r
134                         getProcessEngineServices().getRuntimeService();\r
135 \r
136                 Map<String, String> properties =\r
137                         PropertyConfiguration.getInstance().getProperties("mso.bpmn.urn.properties");\r
138 \r
139                 long timeout = DEFAULT_TIMEOUT_SECONDS;\r
140 \r
141                 // The code is here in case we ever need to change the default.\r
142                 String s = properties.get("mso.correlation.timeout");\r
143                 if (s != null) {\r
144                         try {\r
145                                 timeout = Long.parseLong(s);\r
146                         } catch (NumberFormatException e) {\r
147                                 // Ignore\r
148                         }\r
149                 }\r
150 \r
151                 long now = System.currentTimeMillis();\r
152                 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);\r
153                 long endTime = now + (timeout * 1000);\r
154                 long sleep = FAST_POLL_INT_MS;\r
155 \r
156                 List<Execution> waitingProcesses = null;\r
157                 Exception queryException = null;\r
158                 int queryCount = 0;\r
159                 int queryFailCount = 0;\r
160 \r
161                 while (true) {\r
162                         try {\r
163                                 ++queryCount;\r
164                                 waitingProcesses = runtimeService.createExecutionQuery()\r
165                                         .messageEventSubscriptionName(messageEventName)\r
166                                         .processVariableValueEquals(correlationVariable, correlationValue)\r
167                                         .list();\r
168                         } catch (Exception e) {\r
169                                 ++queryFailCount;\r
170                                 queryException = e;\r
171                         }\r
172 \r
173                         if (waitingProcesses != null && waitingProcesses.size() > 0) {\r
174                                 break;\r
175                         }\r
176 \r
177                         if (now > endTime - sleep) {\r
178                                 break;\r
179                         }\r
180 \r
181                         Thread.sleep(sleep);\r
182                         now = System.currentTimeMillis();\r
183 \r
184                         if (now > fastPollEndTime) {\r
185                                 sleep = SLOW_POLL_INT_MS;\r
186                         }\r
187                 }\r
188 \r
189                 if (waitingProcesses == null) {\r
190                         waitingProcesses = new ArrayList<Execution>(0);\r
191                 }\r
192 \r
193                 int count = waitingProcesses.size();\r
194 \r
195                 List<ExecInfo> execInfoList = new ArrayList<>(count);\r
196                 for (Execution execution : waitingProcesses) {\r
197                         execInfoList.add(new ExecInfo(execution));\r
198                 }\r
199 \r
200                 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"\r
201                         + " for " + messageEventName + " with " + correlationVariable\r
202                         + " = '" + correlationValue + "': " + execInfoList);\r
203 \r
204                 if (count == 0) {\r
205                         if (queryFailCount > 0) {\r
206                                 String msg = queryFailCount + "/" + queryCount\r
207                                         + " execution queries failed attempting to correlate "\r
208                                         + messageEventName + " with " + correlationVariable\r
209                                         + " = '" + correlationValue + "'; last exception was:"\r
210                                         + queryException;\r
211                                 LOGGER.debug(msg);\r
212                                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
213                                         MsoLogger.ErrorCode.UnknownError, msg, queryException);\r
214                         }\r
215 \r
216                         return false;\r
217                 }\r
218 \r
219                 if (count > 1) {\r
220                         // Only one process should be waiting. Throw an exception back to the client.\r
221                         throw new MismatchingMessageCorrelationException(messageEventName,\r
222                                 "more than 1 process is waiting with " + correlationVariable\r
223                                 + " = '" + correlationValue + "'");\r
224                 }\r
225                 \r
226                 // We prototyped an asynchronous solution, i.e. resuming the process\r
227                 // flow in a separate thread, but this affected too many existing tests,\r
228                 // and we went back to the synchronous solution. The synchronous solution\r
229                 // has some troublesome characteristics though.  For example, the\r
230                 // resumed flow may send request #2 to a remote system before MSO has\r
231                 // acknowledged the notification associated with request #1.  \r
232 \r
233                 try {\r
234                         LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "\r
235                                 + messageEventName + " with " + correlationVariable + " = '"\r
236                                 + correlationValue + "'");\r
237 \r
238                         @SuppressWarnings("unused")\r
239                         MessageCorrelationResult result = runtimeService\r
240                                 .createMessageCorrelation(messageEventName)\r
241                                 .setVariables(variables)\r
242                                 .processInstanceVariableEquals(correlationVariable, correlationValue)\r
243                                 .correlateWithResult();\r
244 \r
245                 } catch (MismatchingMessageCorrelationException e) {\r
246                         // A correlation exception occurred even after we identified\r
247                         // one waiting process.  Throw it back to the client.\r
248                         throw e;\r
249                 } catch (Exception e) {\r
250                         // This must be an exception from the flow itself.  Log it, but don't\r
251                         // report it back to the client.\r
252                         String msg = "Caught " + e.getClass().getSimpleName() + " running "\r
253                                 + execInfoList.get(0) + " after receiving " + messageEventName\r
254                                 + " with " + correlationVariable + " = '" + correlationValue\r
255                                 + "': " + e;\r
256                         LOGGER.debug(msg);\r
257                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
258                                 MsoLogger.ErrorCode.UnknownError, msg, e);\r
259                 }\r
260         }  catch (Exception e) {\r
261                 // This must be an exception from the flow itself.  Log it, but don't\r
262                 // report it back to the client.\r
263                 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName\r
264                         + " with " + correlationVariable + " = '" + correlationValue\r
265                         + "': " + e;\r
266                 LOGGER.debug(msg);\r
267                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),\r
268                         MsoLogger.ErrorCode.UnknownError, msg, e);\r
269         }       \r
270 \r
271                 return true;\r
272         }\r
273         \r
274         /**\r
275          * Records audit and metric events in the log for a callback success.\r
276          * @param method the method name\r
277          * @param startTime the request start time\r
278          */\r
279         protected void logCallbackSuccess(String method, long startTime) {\r
280                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
281                         MsoLogger.ResponseCode.Suc, "Completed " + method);\r
282 \r
283                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
284                         MsoLogger.ResponseCode.Suc, "Completed " + method,\r
285                         "BPMN", MsoLogger.getServiceName(), null);\r
286         }\r
287 \r
288         /**\r
289          * Records error, audit and metric events in the log for a callback\r
290          * internal error.\r
291          * @param method the method name\r
292          * @param startTime the request start time\r
293          * @param msg the error message\r
294          */\r
295         protected void logCallbackError(String method, long startTime, String msg) {\r
296                 logCallbackError(method, startTime, msg, null);\r
297         }\r
298 \r
299         /**\r
300          * Records error, audit and metric events in the log for a callback\r
301          * internal error.\r
302          * @param method the method name\r
303          * @param startTime the request start time\r
304          * @param msg the error message\r
305          * @param e the exception\r
306          */\r
307         protected void logCallbackError(String method, long startTime, String msg, Exception e) {\r
308                 if (e == null) {\r
309                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), \r
310                                 MsoLogger.ErrorCode.UnknownError, msg);\r
311                 } else {\r
312                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), \r
313                                 MsoLogger.ErrorCode.UnknownError, msg, e);\r
314                 }\r
315 \r
316                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
317                         MsoLogger.ResponseCode.InternalError, "Completed " + method);\r
318 \r
319                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
320                         MsoLogger.ResponseCode.InternalError, "Completed " + method,\r
321                         "BPMN", MsoLogger.getServiceName(), null);\r
322         }\r
323         \r
324         /**\r
325          * Abstract callback result object.\r
326          */\r
327         protected abstract class CallbackResult {\r
328         }\r
329 \r
330         /**\r
331          * Indicates that callback handling was successful.\r
332          */\r
333         protected class CallbackSuccess extends CallbackResult {\r
334         }\r
335 \r
336         /**\r
337          * Indicates that callback handling failed.\r
338          */\r
339         protected class CallbackError extends CallbackResult {\r
340                 private final String errorMessage;\r
341 \r
342                 public CallbackError(String errorMessage) {\r
343                         this.errorMessage = errorMessage;\r
344                 }\r
345 \r
346                 /**\r
347                  * Gets the error message.\r
348                  */\r
349                 public String getErrorMessage() {\r
350                         return errorMessage;\r
351                 }\r
352         }\r
353 \r
354         private static class ExecInfo {\r
355                 private final Execution execution;\r
356 \r
357                 public ExecInfo(Execution execution) {\r
358                         this.execution = execution;\r
359                 }\r
360         \r
361                 @Override\r
362                 public String toString() {\r
363                         return "Process[" + execution.getProcessInstanceId()\r
364                                 + ":" + execution.getId() + "]";\r
365                 }\r
366         }\r
367 }\r