1 package org.openecomp.mso.bpmn.common.workflow.service;
\r
3 import java.util.ArrayList;
\r
4 import java.util.HashMap;
\r
5 import java.util.List;
\r
6 import java.util.Map;
\r
8 import org.camunda.bpm.BpmPlatform;
\r
9 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
\r
10 import org.camunda.bpm.engine.ProcessEngineServices;
\r
11 import org.camunda.bpm.engine.RuntimeService;
\r
12 import org.camunda.bpm.engine.runtime.Execution;
\r
13 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
\r
14 import org.openecomp.mso.bpmn.core.PropertyConfiguration;
\r
15 import org.openecomp.mso.logger.MessageEnum;
\r
16 import org.openecomp.mso.logger.MsoLogger;
\r
19 * Abstract base class for callback services.
\r
21 public abstract class AbstractCallbackService {
\r
22 public static final long DEFAULT_TIMEOUT_SECONDS = 60;
\r
23 public static final long FAST_POLL_DUR_SECONDS = 5;
\r
24 public static final long FAST_POLL_INT_MS = 100;
\r
25 public static final long SLOW_POLL_INT_MS = 1000;
\r
27 private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
\r
29 protected volatile ProcessEngineServices pes4junit = null;
\r
32 * Parameterized callback handler.
\r
34 protected CallbackResult handleCallback(String method, Object message,
\r
35 String messageEventName, String messageVariable,
\r
36 String correlationVariable, String correlationValue,
\r
39 return handleCallback(method, message, messageEventName, messageVariable,
\r
40 correlationVariable, correlationValue, logMarker, null);
\r
44 * Parameterized callback handler.
\r
46 protected CallbackResult handleCallback(String method, Object message,
\r
47 String messageEventName, String messageVariable,
\r
48 String correlationVariable, String correlationValue,
\r
49 String logMarker, Map<String, Object> injectedVariables) {
\r
51 long startTime = System.currentTimeMillis();
\r
53 LOGGER.debug(logMarker + " " + method + " received message: "
\r
54 + (message == null ? "" : System.lineSeparator()) + message);
\r
57 Map<String, Object> variables = new HashMap<String, Object>();
\r
59 if (injectedVariables != null) {
\r
60 variables.putAll(injectedVariables);
\r
63 variables.put(correlationVariable, correlationValue);
\r
64 variables.put(messageVariable, message == null ? null : message.toString());
\r
66 boolean ok = correlate(messageEventName, correlationVariable,
\r
67 correlationValue, variables, logMarker);
\r
70 String msg = "No process is waiting for " + messageEventName
\r
71 + " with " + correlationVariable + " = '" + correlationValue + "'";
\r
72 logCallbackError(method, startTime, msg);
\r
73 return new CallbackError(msg);
\r
76 logCallbackSuccess(method, startTime);
\r
77 return new CallbackSuccess();
\r
78 } catch (Exception e) {
\r
79 LOGGER.debug("Exception :",e);
\r
80 String msg = "Caught " + e.getClass().getSimpleName()
\r
81 + " processing " + messageEventName + " with " + correlationVariable
\r
82 + " = '" + correlationValue + "'";
\r
83 logCallbackError(method, startTime, msg);
\r
84 return new CallbackError(msg);
\r
89 * Performs message correlation. Waits a limited amount of time for
\r
90 * a process to become ready for correlation. The return value indicates
\r
91 * whether or not a process was found to receive the message. Due to the
\r
92 * synchronous nature of message injection in Camunda, by the time this
\r
93 * method returns, one of 3 things will have happened: (1) the process
\r
94 * received the message and ended, (2) the process received the message
\r
95 * and reached an activity that suspended, or (3) an exception occurred
\r
96 * during correlation or while the process was executing. Correlation
\r
97 * exceptions are handled differently from process execution exceptions.
\r
98 * Correlation exceptions are thrown so the client knows something went
\r
99 * wrong with the delivery of the message. Process execution exceptions
\r
100 * are logged but not thrown.
\r
101 * @param messageEventName the message event name
\r
102 * @param correlationVariable the process variable used as the correlator
\r
103 * @param correlationValue the correlation value
\r
104 * @param variables variables to inject into the process
\r
105 * @param logMarker a marker for debug logging
\r
106 * @return true if a process could be found, false if not
\r
107 * @throws Exception for correlation errors
\r
109 protected boolean correlate(String messageEventName, String correlationVariable,
\r
110 String correlationValue, Map<String, Object> variables, String logMarker)
\r
113 LOGGER.debug(logMarker + " Attempting to find process waiting"
\r
114 + " for " + messageEventName + " with " + correlationVariable
\r
115 + " = '" + correlationValue + "'");
\r
117 RuntimeService runtimeService =
\r
118 getProcessEngineServices().getRuntimeService();
\r
120 Map<String, String> properties =
\r
121 PropertyConfiguration.getInstance().getProperties("mso.bpmn.urn.properties");
\r
123 long timeout = DEFAULT_TIMEOUT_SECONDS;
\r
125 // The code is here in case we ever need to change the default.
\r
126 String s = properties.get("mso.correlation.timeout");
\r
129 timeout = Long.parseLong(s);
\r
130 } catch (NumberFormatException e) {
\r
135 long now = System.currentTimeMillis();
\r
136 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
\r
137 long endTime = now + (timeout * 1000);
\r
138 long sleep = FAST_POLL_INT_MS;
\r
140 List<Execution> waitingProcesses = null;
\r
141 Exception queryException = null;
\r
142 int queryCount = 0;
\r
143 int queryFailCount = 0;
\r
148 waitingProcesses = runtimeService.createExecutionQuery()
\r
149 .messageEventSubscriptionName(messageEventName)
\r
150 .processVariableValueEquals(correlationVariable, correlationValue)
\r
152 } catch (Exception e) {
\r
154 queryException = e;
\r
157 if (waitingProcesses != null && waitingProcesses.size() > 0) {
\r
161 if (now > endTime - sleep) {
\r
165 Thread.sleep(sleep);
\r
166 now = System.currentTimeMillis();
\r
168 if (now > fastPollEndTime) {
\r
169 sleep = SLOW_POLL_INT_MS;
\r
173 if (waitingProcesses == null) {
\r
174 waitingProcesses = new ArrayList<Execution>(0);
\r
177 int count = waitingProcesses.size();
\r
179 List<ExecInfo> execInfoList = new ArrayList<ExecInfo>(count);
\r
180 for (Execution execution : waitingProcesses) {
\r
181 execInfoList.add(new ExecInfo(execution));
\r
184 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"
\r
185 + " for " + messageEventName + " with " + correlationVariable
\r
186 + " = '" + correlationValue + "': " + execInfoList);
\r
189 if (queryFailCount > 0) {
\r
190 String msg = queryFailCount + "/" + queryCount
\r
191 + " execution queries failed attempting to correlate "
\r
192 + messageEventName + " with " + correlationVariable
\r
193 + " = '" + correlationValue + "'; last exception was:"
\r
196 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
197 MsoLogger.ErrorCode.UnknownError, msg, queryException);
\r
204 // Only one process should be waiting. Throw an exception back to the client.
\r
205 throw new MismatchingMessageCorrelationException(messageEventName,
\r
206 "more than 1 process is waiting with " + correlationVariable
\r
207 + " = '" + correlationValue + "'");
\r
210 // We prototyped an asynchronous solution, i.e. resuming the process
\r
211 // flow in a separate thread, but this affected too many existing tests,
\r
212 // and we went back to the synchronous solution. The synchronous solution
\r
213 // has some troublesome characteristics though. For example, the
\r
214 // resumed flow may send request #2 to a remote system before MSO has
\r
215 // acknowledged the notification associated with request #1.
\r
218 LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
\r
219 + messageEventName + " with " + correlationVariable + " = '"
\r
220 + correlationValue + "'");
\r
222 @SuppressWarnings("unused")
\r
223 MessageCorrelationResult result = runtimeService
\r
224 .createMessageCorrelation(messageEventName)
\r
225 .setVariables(variables)
\r
226 .processInstanceVariableEquals(correlationVariable, correlationValue)
\r
227 .correlateWithResult();
\r
229 } catch (MismatchingMessageCorrelationException e) {
\r
230 // A correlation exception occurred even after we identified
\r
231 // one waiting process. Throw it back to the client.
\r
233 } catch (Exception e) {
\r
234 // This must be an exception from the flow itself. Log it, but don't
\r
235 // report it back to the client.
\r
236 String msg = "Caught " + e.getClass().getSimpleName() + " running "
\r
237 + execInfoList.get(0) + " after receiving " + messageEventName
\r
238 + " with " + correlationVariable + " = '" + correlationValue
\r
241 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
242 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
244 } catch (Exception e) {
\r
245 // This must be an exception from the flow itself. Log it, but don't
\r
246 // report it back to the client.
\r
247 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
\r
248 + " with " + correlationVariable + " = '" + correlationValue
\r
251 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
\r
252 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
259 * Records audit and metric events in the log for a callback success.
\r
260 * @param method the method name
\r
261 * @param startTime the request start time
\r
263 protected void logCallbackSuccess(String method, long startTime) {
\r
264 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
265 MsoLogger.ResponseCode.Suc, "Completed " + method);
\r
267 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
268 MsoLogger.ResponseCode.Suc, "Completed " + method,
\r
269 "BPMN", MsoLogger.getServiceName(), null);
\r
273 * Records error, audit and metric events in the log for a callback
\r
275 * @param method the method name
\r
276 * @param startTime the request start time
\r
277 * @param msg the error message
\r
279 protected void logCallbackError(String method, long startTime, String msg) {
\r
280 logCallbackError(method, startTime, msg, null);
\r
284 * Records error, audit and metric events in the log for a callback
\r
286 * @param method the method name
\r
287 * @param startTime the request start time
\r
288 * @param msg the error message
\r
289 * @param e the exception
\r
291 protected void logCallbackError(String method, long startTime, String msg, Exception e) {
\r
293 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
294 MsoLogger.ErrorCode.UnknownError, msg);
\r
296 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
297 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
300 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
301 MsoLogger.ResponseCode.InternalError, "Completed " + method);
\r
303 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
304 MsoLogger.ResponseCode.InternalError, "Completed " + method,
\r
305 "BPMN", MsoLogger.getServiceName(), null);
\r
309 * Abstract callback result object.
\r
311 protected abstract class CallbackResult {
\r
315 * Indicates that callback handling was successful.
\r
317 protected class CallbackSuccess extends CallbackResult {
\r
321 * Indicates that callback handling failed.
\r
323 protected class CallbackError extends CallbackResult {
\r
324 private final String errorMessage;
\r
326 public CallbackError(String errorMessage) {
\r
327 this.errorMessage = errorMessage;
\r
331 * Gets the error message.
\r
333 public String getErrorMessage() {
\r
334 return errorMessage;
\r
338 private static class ExecInfo {
\r
339 private final Execution execution;
\r
341 public ExecInfo(Execution execution) {
\r
342 this.execution = execution;
\r
346 public String toString() {
\r
347 return "Process[" + execution.getProcessInstanceId()
\r
348 + ":" + execution.getId() + "]";
\r
352 protected ProcessEngineServices getProcessEngineServices() {
\r
353 if (pes4junit == null) {
\r
354 return BpmPlatform.getDefaultProcessEngine();
\r
360 public void setProcessEngineServices4junit(ProcessEngineServices pes) {
\r