1 package org.openecomp.mso.bpmn.common.workflow.service;
\r
3 import java.util.ArrayList;
\r
4 import java.util.HashMap;
\r
5 import java.util.List;
\r
6 import java.util.Map;
\r
8 import org.camunda.bpm.BpmPlatform;
\r
9 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
\r
10 import org.camunda.bpm.engine.ProcessEngineServices;
\r
11 import org.camunda.bpm.engine.RuntimeService;
\r
12 import org.camunda.bpm.engine.runtime.Execution;
\r
13 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
\r
14 import org.openecomp.mso.bpmn.core.PropertyConfiguration;
\r
15 import org.openecomp.mso.logger.MessageEnum;
\r
16 import org.openecomp.mso.logger.MsoLogger;
\r
19 * Abstract base class for callback services.
\r
21 public abstract class AbstractCallbackService {
\r
22 public static final long DEFAULT_TIMEOUT_SECONDS = 60;
\r
23 public static final long FAST_POLL_DUR_SECONDS = 5;
\r
24 public static final long FAST_POLL_INT_MS = 100;
\r
25 public static final long SLOW_POLL_INT_MS = 1000;
\r
27 private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
\r
29 protected volatile ProcessEngineServices pes4junit = null;
\r
32 * Parameterized callback handler.
\r
34 protected CallbackResult handleCallback(String method, Object message,
\r
35 String messageEventName, String messageVariable,
\r
36 String correlationVariable, String correlationValue,
\r
39 return handleCallback(method, message, messageEventName, messageVariable,
\r
40 correlationVariable, correlationValue, logMarker, null);
\r
44 * Parameterized callback handler.
\r
46 protected CallbackResult handleCallback(String method, Object message,
\r
47 String messageEventName, String messageVariable,
\r
48 String correlationVariable, String correlationValue,
\r
49 String logMarker, Map<String, Object> injectedVariables) {
\r
51 long startTime = System.currentTimeMillis();
\r
53 LOGGER.debug(logMarker + " " + method + " received message: "
\r
54 + (message == null ? "" : System.lineSeparator()) + message);
\r
57 Map<String, Object> variables = new HashMap<String, Object>();
\r
59 if (injectedVariables != null) {
\r
60 variables.putAll(injectedVariables);
\r
63 variables.put(correlationVariable, correlationValue);
\r
64 variables.put(messageVariable, message == null ? null : message.toString());
\r
66 boolean ok = correlate(messageEventName, correlationVariable,
\r
67 correlationValue, variables, logMarker);
\r
70 String msg = "No process is waiting for " + messageEventName
\r
71 + " with " + correlationVariable + " = '" + correlationValue + "'";
\r
72 logCallbackError(method, startTime, msg);
\r
73 return new CallbackError(msg);
\r
76 logCallbackSuccess(method, startTime);
\r
77 return new CallbackSuccess();
\r
78 } catch (Exception e) {
\r
79 String msg = "Caught " + e.getClass().getSimpleName()
\r
80 + " processing " + messageEventName + " with " + correlationVariable
\r
81 + " = '" + correlationValue + "'";
\r
82 logCallbackError(method, startTime, msg);
\r
83 return new CallbackError(msg);
\r
88 * Performs message correlation. Waits a limited amount of time for
\r
89 * a process to become ready for correlation. The return value indicates
\r
90 * whether or not a process was found to receive the message. Due to the
\r
91 * synchronous nature of message injection in Camunda, by the time this
\r
92 * method returns, one of 3 things will have happened: (1) the process
\r
93 * received the message and ended, (2) the process received the message
\r
94 * and reached an activity that suspended, or (3) an exception occurred
\r
95 * during correlation or while the process was executing. Correlation
\r
96 * exceptions are handled differently from process execution exceptions.
\r
97 * Correlation exceptions are thrown so the client knows something went
\r
98 * wrong with the delivery of the message. Process execution exceptions
\r
99 * are logged but not thrown.
\r
100 * @param messageEventName the message event name
\r
101 * @param correlationVariable the process variable used as the correlator
\r
102 * @param correlationValue the correlation value
\r
103 * @param variables variables to inject into the process
\r
104 * @param logMarker a marker for debug logging
\r
105 * @return true if a process could be found, false if not
\r
106 * @throws Exception for correlation errors
\r
108 protected boolean correlate(String messageEventName, String correlationVariable,
\r
109 String correlationValue, Map<String, Object> variables, String logMarker)
\r
112 LOGGER.debug(logMarker + " Attempting to find process waiting"
\r
113 + " for " + messageEventName + " with " + correlationVariable
\r
114 + " = '" + correlationValue + "'");
\r
116 RuntimeService runtimeService =
\r
117 getProcessEngineServices().getRuntimeService();
\r
119 Map<String, String> properties =
\r
120 PropertyConfiguration.getInstance().getProperties("mso.bpmn.urn.properties");
\r
122 long timeout = DEFAULT_TIMEOUT_SECONDS;
\r
124 // The code is here in case we ever need to change the default.
\r
125 String s = properties.get("mso.correlation.timeout");
\r
128 timeout = Long.parseLong(s);
\r
129 } catch (NumberFormatException e) {
\r
134 long now = System.currentTimeMillis();
\r
135 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
\r
136 long endTime = now + (timeout * 1000);
\r
137 long sleep = FAST_POLL_INT_MS;
\r
139 List<Execution> waitingProcesses = null;
\r
140 Exception queryException = null;
\r
141 int queryCount = 0;
\r
142 int queryFailCount = 0;
\r
147 waitingProcesses = runtimeService.createExecutionQuery()
\r
148 .messageEventSubscriptionName(messageEventName)
\r
149 .processVariableValueEquals(correlationVariable, correlationValue)
\r
151 } catch (Exception e) {
\r
153 queryException = e;
\r
156 if (waitingProcesses != null && waitingProcesses.size() > 0) {
\r
160 if (now > endTime - sleep) {
\r
164 Thread.sleep(sleep);
\r
165 now = System.currentTimeMillis();
\r
167 if (now > fastPollEndTime) {
\r
168 sleep = SLOW_POLL_INT_MS;
\r
172 if (waitingProcesses == null) {
\r
173 waitingProcesses = new ArrayList<Execution>(0);
\r
176 int count = waitingProcesses.size();
\r
178 List<ExecInfo> execInfoList = new ArrayList<ExecInfo>(count);
\r
179 for (Execution execution : waitingProcesses) {
\r
180 execInfoList.add(new ExecInfo(execution));
\r
183 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"
\r
184 + " for " + messageEventName + " with " + correlationVariable
\r
185 + " = '" + correlationValue + "': " + execInfoList);
\r
188 if (queryFailCount > 0) {
\r
189 String msg = queryFailCount + "/" + queryCount
\r
190 + " execution queries failed attempting to correlate "
\r
191 + messageEventName + " with " + correlationVariable
\r
192 + " = '" + correlationValue + "'; last exception was:"
\r
195 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
196 MsoLogger.ErrorCode.UnknownError, msg, queryException);
\r
203 // Only one process should be waiting. Throw an exception back to the client.
\r
204 throw new MismatchingMessageCorrelationException(messageEventName,
\r
205 "more than 1 process is waiting with " + correlationVariable
\r
206 + " = '" + correlationValue + "'");
\r
209 // We prototyped an asynchronous solution, i.e. resuming the process
\r
210 // flow in a separate thread, but this affected too many existing tests,
\r
211 // and we went back to the synchronous solution. The synchronous solution
\r
212 // has some troublesome characteristics though. For example, the
\r
213 // resumed flow may send request #2 to a remote system before MSO has
\r
214 // acknowledged the notification associated with request #1.
\r
217 LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
\r
218 + messageEventName + " with " + correlationVariable + " = '"
\r
219 + correlationValue + "'");
\r
221 @SuppressWarnings("unused")
\r
222 MessageCorrelationResult result = runtimeService
\r
223 .createMessageCorrelation(messageEventName)
\r
224 .setVariables(variables)
\r
225 .processInstanceVariableEquals(correlationVariable, correlationValue)
\r
226 .correlateWithResult();
\r
228 } catch (MismatchingMessageCorrelationException e) {
\r
229 // A correlation exception occurred even after we identified
\r
230 // one waiting process. Throw it back to the client.
\r
232 } catch (Exception e) {
\r
233 // This must be an exception from the flow itself. Log it, but don't
\r
234 // report it back to the client.
\r
235 String msg = "Caught " + e.getClass().getSimpleName() + " running "
\r
236 + execInfoList.get(0) + " after receiving " + messageEventName
\r
237 + " with " + correlationVariable + " = '" + correlationValue
\r
240 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
241 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
248 * Records audit and metric events in the log for a callback success.
\r
249 * @param method the method name
\r
250 * @param startTime the request start time
\r
252 protected void logCallbackSuccess(String method, long startTime) {
\r
253 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
254 MsoLogger.ResponseCode.Suc, "Completed " + method);
\r
256 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
257 MsoLogger.ResponseCode.Suc, "Completed " + method,
\r
258 "BPMN", MsoLogger.getServiceName(), null);
\r
262 * Records error, audit and metric events in the log for a callback
\r
264 * @param method the method name
\r
265 * @param startTime the request start time
\r
266 * @param msg the error message
\r
268 protected void logCallbackError(String method, long startTime, String msg) {
\r
269 logCallbackError(method, startTime, msg, null);
\r
273 * Records error, audit and metric events in the log for a callback
\r
275 * @param method the method name
\r
276 * @param startTime the request start time
\r
277 * @param msg the error message
\r
278 * @param e the exception
\r
280 protected void logCallbackError(String method, long startTime, String msg, Exception e) {
\r
282 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
283 MsoLogger.ErrorCode.UnknownError, msg);
\r
285 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
286 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
289 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
290 MsoLogger.ResponseCode.InternalError, "Completed " + method);
\r
292 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
293 MsoLogger.ResponseCode.InternalError, "Completed " + method,
\r
294 "BPMN", MsoLogger.getServiceName(), null);
\r
298 * Abstract callback result object.
\r
300 protected abstract class CallbackResult {
\r
304 * Indicates that callback handling was successful.
\r
306 protected class CallbackSuccess extends CallbackResult {
\r
310 * Indicates that callback handling failed.
\r
312 protected class CallbackError extends CallbackResult {
\r
313 private final String errorMessage;
\r
315 public CallbackError(String errorMessage) {
\r
316 this.errorMessage = errorMessage;
\r
320 * Gets the error message.
\r
322 public String getErrorMessage() {
\r
323 return errorMessage;
\r
327 private static class ExecInfo {
\r
328 private final Execution execution;
\r
330 public ExecInfo(Execution execution) {
\r
331 this.execution = execution;
\r
335 public String toString() {
\r
336 return "Process[" + execution.getProcessInstanceId()
\r
337 + ":" + execution.getId() + "]";
\r
341 protected ProcessEngineServices getProcessEngineServices() {
\r
342 if (pes4junit == null) {
\r
343 return BpmPlatform.getDefaultProcessEngine();
\r
349 public void setProcessEngineServices4junit(ProcessEngineServices pes) {
\r