c0be8992ef4b1ddf13f78840630d4705b7de1750
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / openecomp / mso / bpmn / common / workflow / service / AbstractCallbackService.java
1 /*-\r
2  * ============LICENSE_START=======================================================\r
3  * ONAP - SO\r
4  * ================================================================================\r
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * you may not use this file except in compliance with the License.\r
9  * You may obtain a copy of the License at\r
10  *\r
11  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  * Unless required by applicable law or agreed to in writing, software\r
14  * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * See the License for the specific language governing permissions and\r
17  * limitations under the License.\r
18  * ============LICENSE_END=========================================================\r
19  */\r
20 \r
21 package org.openecomp.mso.bpmn.common.workflow.service;\r
22 \r
23 import java.util.ArrayList;\r
24 import java.util.HashMap;\r
25 import java.util.List;\r
26 import java.util.Map;\r
27 \r
28 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;\r
29 import org.camunda.bpm.engine.ProcessEngineServices;\r
30 import org.camunda.bpm.engine.ProcessEngines;\r
31 import org.camunda.bpm.engine.RuntimeService;\r
32 import org.camunda.bpm.engine.runtime.Execution;\r
33 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;\r
34 import org.openecomp.mso.bpmn.core.PropertyConfiguration;\r
35 import org.openecomp.mso.logger.MessageEnum;\r
36 import org.openecomp.mso.logger.MsoLogger;\r
37 \r
38 /**\r
39  * Abstract base class for callback services.\r
40  */\r
41 public abstract class AbstractCallbackService {\r
42         public static final long DEFAULT_TIMEOUT_SECONDS = 60;\r
43         public static final long FAST_POLL_DUR_SECONDS = 5;\r
44         public static final long FAST_POLL_INT_MS = 100;\r
45         public static final long SLOW_POLL_INT_MS = 1000;\r
46         \r
47         private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);\r
48 \r
49         protected volatile ProcessEngineServices pes4junit = null;\r
50         \r
51         /**\r
52          * Parameterized callback handler.\r
53          */\r
54         protected CallbackResult handleCallback(String method, Object message,\r
55                         String messageEventName, String messageVariable,\r
56                         String correlationVariable, String correlationValue,\r
57                         String logMarker) {\r
58 \r
59                 return handleCallback(method, message, messageEventName, messageVariable,\r
60                         correlationVariable, correlationValue, logMarker, null);\r
61         }\r
62 \r
63         /**\r
64          * Parameterized callback handler.\r
65          */\r
66         protected CallbackResult handleCallback(String method, Object message,\r
67                         String messageEventName, String messageVariable,\r
68                         String correlationVariable, String correlationValue,\r
69                         String logMarker, Map<String, Object> injectedVariables) {\r
70 \r
71                 long startTime = System.currentTimeMillis();\r
72 \r
73                 LOGGER.debug(logMarker + " " + method + " received message: "\r
74                         + (message == null ? "" : System.lineSeparator()) + message);\r
75 \r
76                 try {\r
77                         Map<String, Object> variables = new HashMap<String, Object>();\r
78 \r
79                         if (injectedVariables != null) {\r
80                                 variables.putAll(injectedVariables);\r
81                         }\r
82 \r
83                         variables.put(correlationVariable, correlationValue);\r
84                         variables.put(messageVariable, message == null ? null : message.toString());\r
85 \r
86                         boolean ok = correlate(messageEventName, correlationVariable,\r
87                                 correlationValue, variables, logMarker);\r
88 \r
89                         if (!ok) {\r
90                                 String msg = "No process is waiting for " + messageEventName\r
91                                         + " with " + correlationVariable + " = '" + correlationValue + "'";\r
92                                 logCallbackError(method, startTime, msg);\r
93                                 return new CallbackError(msg);\r
94                         }\r
95 \r
96                         logCallbackSuccess(method, startTime);\r
97                         return new CallbackSuccess();\r
98                 } catch (Exception e) {\r
99                         LOGGER.debug("Exception :",e);\r
100                         String msg = "Caught " + e.getClass().getSimpleName()\r
101                                 + " processing " + messageEventName + " with " + correlationVariable\r
102                                 + " = '" + correlationValue + "'";\r
103                         logCallbackError(method, startTime, msg);\r
104                         return new CallbackError(msg);\r
105                 }\r
106         }\r
107         \r
108         /**\r
109          * Performs message correlation.  Waits a limited amount of time for\r
110          * a process to become ready for correlation.  The return value indicates\r
111          * whether or not a process was found to receive the message.  Due to the\r
112          * synchronous nature of message injection in Camunda, by the time this\r
113          * method returns, one of 3 things will have happened: (1) the process\r
114          * received the message and ended, (2) the process received the message\r
115          * and reached an activity that suspended, or (3) an exception occurred\r
116          * during correlation or while the process was executing.  Correlation\r
117          * exceptions are handled differently from process execution exceptions.\r
118          * Correlation exceptions are thrown so the client knows something went\r
119          * wrong with the delivery of the message.  Process execution exceptions\r
120          * are logged but not thrown.\r
121          * @param messageEventName the message event name\r
122          * @param correlationVariable the process variable used as the correlator\r
123          * @param correlationValue the correlation value\r
124          * @param variables variables to inject into the process\r
125          * @param logMarker a marker for debug logging\r
126          * @return true if a process could be found, false if not\r
127          * @throws Exception for correlation errors\r
128          */\r
129         protected boolean correlate(String messageEventName, String correlationVariable,\r
130                         String correlationValue, Map<String, Object> variables, String logMarker)\r
131                         throws Exception {\r
132         try{\r
133                 LOGGER.debug(logMarker + " Attempting to find process waiting"\r
134                         + " for " + messageEventName + " with " + correlationVariable\r
135                         + " = '" + correlationValue + "'");\r
136 \r
137                 RuntimeService runtimeService =\r
138                         getProcessEngineServices().getRuntimeService();\r
139 \r
140                 Map<String, String> properties =\r
141                         PropertyConfiguration.getInstance().getProperties("mso.bpmn.urn.properties");\r
142 \r
143                 long timeout = DEFAULT_TIMEOUT_SECONDS;\r
144 \r
145                 // The code is here in case we ever need to change the default.\r
146                 String s = properties.get("mso.correlation.timeout");\r
147                 if (s != null) {\r
148                         try {\r
149                                 timeout = Long.parseLong(s);\r
150                         } catch (NumberFormatException e) {\r
151                                 // Ignore\r
152                         }\r
153                 }\r
154 \r
155                 long now = System.currentTimeMillis();\r
156                 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);\r
157                 long endTime = now + (timeout * 1000);\r
158                 long sleep = FAST_POLL_INT_MS;\r
159 \r
160                 List<Execution> waitingProcesses = null;\r
161                 Exception queryException = null;\r
162                 int queryCount = 0;\r
163                 int queryFailCount = 0;\r
164 \r
165                 while (true) {\r
166                         try {\r
167                                 ++queryCount;\r
168                                 waitingProcesses = runtimeService.createExecutionQuery()\r
169                                         .messageEventSubscriptionName(messageEventName)\r
170                                         .processVariableValueEquals(correlationVariable, correlationValue)\r
171                                         .list();\r
172                         } catch (Exception e) {\r
173                                 ++queryFailCount;\r
174                                 queryException = e;\r
175                         }\r
176 \r
177                         if (waitingProcesses != null && waitingProcesses.size() > 0) {\r
178                                 break;\r
179                         }\r
180 \r
181                         if (now > endTime - sleep) {\r
182                                 break;\r
183                         }\r
184 \r
185                         Thread.sleep(sleep);\r
186                         now = System.currentTimeMillis();\r
187 \r
188                         if (now > fastPollEndTime) {\r
189                                 sleep = SLOW_POLL_INT_MS;\r
190                         }\r
191                 }\r
192 \r
193                 if (waitingProcesses == null) {\r
194                         waitingProcesses = new ArrayList<Execution>(0);\r
195                 }\r
196 \r
197                 int count = waitingProcesses.size();\r
198 \r
199                 List<ExecInfo> execInfoList = new ArrayList<ExecInfo>(count);\r
200                 for (Execution execution : waitingProcesses) {\r
201                         execInfoList.add(new ExecInfo(execution));\r
202                 }\r
203 \r
204                 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"\r
205                         + " for " + messageEventName + " with " + correlationVariable\r
206                         + " = '" + correlationValue + "': " + execInfoList);\r
207 \r
208                 if (count == 0) {\r
209                         if (queryFailCount > 0) {\r
210                                 String msg = queryFailCount + "/" + queryCount\r
211                                         + " execution queries failed attempting to correlate "\r
212                                         + messageEventName + " with " + correlationVariable\r
213                                         + " = '" + correlationValue + "'; last exception was:"\r
214                                         + queryException;\r
215                                 LOGGER.debug(msg);\r
216                                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
217                                         MsoLogger.ErrorCode.UnknownError, msg, queryException);\r
218                         }\r
219 \r
220                         return false;\r
221                 }\r
222 \r
223                 if (count > 1) {\r
224                         // Only one process should be waiting. Throw an exception back to the client.\r
225                         throw new MismatchingMessageCorrelationException(messageEventName,\r
226                                 "more than 1 process is waiting with " + correlationVariable\r
227                                 + " = '" + correlationValue + "'");\r
228                 }\r
229                 \r
230                 // We prototyped an asynchronous solution, i.e. resuming the process\r
231                 // flow in a separate thread, but this affected too many existing tests,\r
232                 // and we went back to the synchronous solution. The synchronous solution\r
233                 // has some troublesome characteristics though.  For example, the\r
234                 // resumed flow may send request #2 to a remote system before MSO has\r
235                 // acknowledged the notification associated with request #1.  \r
236 \r
237                 try {\r
238                         LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "\r
239                                 + messageEventName + " with " + correlationVariable + " = '"\r
240                                 + correlationValue + "'");\r
241 \r
242                         @SuppressWarnings("unused")\r
243                         MessageCorrelationResult result = runtimeService\r
244                                 .createMessageCorrelation(messageEventName)\r
245                                 .setVariables(variables)\r
246                                 .processInstanceVariableEquals(correlationVariable, correlationValue)\r
247                                 .correlateWithResult();\r
248 \r
249                 } catch (MismatchingMessageCorrelationException e) {\r
250                         // A correlation exception occurred even after we identified\r
251                         // one waiting process.  Throw it back to the client.\r
252                         throw e;\r
253                 } catch (Exception e) {\r
254                         // This must be an exception from the flow itself.  Log it, but don't\r
255                         // report it back to the client.\r
256                         String msg = "Caught " + e.getClass().getSimpleName() + " running "\r
257                                 + execInfoList.get(0) + " after receiving " + messageEventName\r
258                                 + " with " + correlationVariable + " = '" + correlationValue\r
259                                 + "': " + e;\r
260                         LOGGER.debug(msg);\r
261                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
262                                 MsoLogger.ErrorCode.UnknownError, msg, e);\r
263                 }\r
264         }  catch (Exception e) {\r
265                 // This must be an exception from the flow itself.  Log it, but don't\r
266                 // report it back to the client.\r
267                 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName\r
268                         + " with " + correlationVariable + " = '" + correlationValue\r
269                         + "': " + e;\r
270                 LOGGER.debug(msg);\r
271                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),\r
272                         MsoLogger.ErrorCode.UnknownError, msg, e);\r
273         }       \r
274 \r
275                 return true;\r
276         }\r
277         \r
278         /**\r
279          * Records audit and metric events in the log for a callback success.\r
280          * @param method the method name\r
281          * @param startTime the request start time\r
282          */\r
283         protected void logCallbackSuccess(String method, long startTime) {\r
284                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
285                         MsoLogger.ResponseCode.Suc, "Completed " + method);\r
286 \r
287                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
288                         MsoLogger.ResponseCode.Suc, "Completed " + method,\r
289                         "BPMN", MsoLogger.getServiceName(), null);\r
290         }\r
291 \r
292         /**\r
293          * Records error, audit and metric events in the log for a callback\r
294          * internal error.\r
295          * @param method the method name\r
296          * @param startTime the request start time\r
297          * @param msg the error message\r
298          */\r
299         protected void logCallbackError(String method, long startTime, String msg) {\r
300                 logCallbackError(method, startTime, msg, null);\r
301         }\r
302 \r
303         /**\r
304          * Records error, audit and metric events in the log for a callback\r
305          * internal error.\r
306          * @param method the method name\r
307          * @param startTime the request start time\r
308          * @param msg the error message\r
309          * @param e the exception\r
310          */\r
311         protected void logCallbackError(String method, long startTime, String msg, Exception e) {\r
312                 if (e == null) {\r
313                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), \r
314                                 MsoLogger.ErrorCode.UnknownError, msg);\r
315                 } else {\r
316                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), \r
317                                 MsoLogger.ErrorCode.UnknownError, msg, e);\r
318                 }\r
319 \r
320                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
321                         MsoLogger.ResponseCode.InternalError, "Completed " + method);\r
322 \r
323                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,\r
324                         MsoLogger.ResponseCode.InternalError, "Completed " + method,\r
325                         "BPMN", MsoLogger.getServiceName(), null);\r
326         }\r
327         \r
328         /**\r
329          * Abstract callback result object.\r
330          */\r
331         protected abstract class CallbackResult {\r
332         }\r
333 \r
334         /**\r
335          * Indicates that callback handling was successful.\r
336          */\r
337         protected class CallbackSuccess extends CallbackResult {\r
338         }\r
339 \r
340         /**\r
341          * Indicates that callback handling failed.\r
342          */\r
343         protected class CallbackError extends CallbackResult {\r
344                 private final String errorMessage;\r
345 \r
346                 public CallbackError(String errorMessage) {\r
347                         this.errorMessage = errorMessage;\r
348                 }\r
349 \r
350                 /**\r
351                  * Gets the error message.\r
352                  */\r
353                 public String getErrorMessage() {\r
354                         return errorMessage;\r
355                 }\r
356         }\r
357 \r
358         private static class ExecInfo {\r
359                 private final Execution execution;\r
360 \r
361                 public ExecInfo(Execution execution) {\r
362                         this.execution = execution;\r
363                 }\r
364         \r
365                 @Override\r
366                 public String toString() {\r
367                         return "Process[" + execution.getProcessInstanceId()\r
368                                 + ":" + execution.getId() + "]";\r
369                 }\r
370         }\r
371         \r
372         protected ProcessEngineServices getProcessEngineServices() {\r
373                 if (pes4junit == null) {\r
374                         return ProcessEngines.getProcessEngine("infrastructure");\r
375                 } else {\r
376                         return pes4junit;\r
377                 }\r
378         }\r
379 \r
380         public void setProcessEngineServices4junit(ProcessEngineServices pes) {\r
381                 pes4junit = pes;\r
382         }\r
383 }