2 * ============LICENSE_START=======================================================
\r
4 * ================================================================================
\r
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============LICENSE_END=========================================================
\r
21 package org.openecomp.mso.bpmn.common.workflow.service;
\r
23 import java.util.ArrayList;
\r
24 import java.util.HashMap;
\r
25 import java.util.List;
\r
26 import java.util.Map;
\r
28 import org.camunda.bpm.BpmPlatform;
\r
29 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
\r
30 import org.camunda.bpm.engine.OptimisticLockingException;
\r
31 import org.camunda.bpm.engine.RuntimeService;
\r
32 import org.camunda.bpm.engine.runtime.Execution;
\r
33 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
\r
34 import org.openecomp.mso.bpmn.core.PropertyConfiguration;
\r
35 import org.openecomp.mso.logger.MessageEnum;
\r
36 import org.openecomp.mso.logger.MsoLogger;
\r
39 * Abstract base class for callback services.
\r
41 public abstract class AbstractCallbackService extends ProcessEngineAwareService {
\r
42 public static final long DEFAULT_TIMEOUT_SECONDS = 60;
\r
43 public static final long FAST_POLL_DUR_SECONDS = 5;
\r
44 public static final long FAST_POLL_INT_MS = 100;
\r
45 public static final long SLOW_POLL_INT_MS = 1000;
\r
47 private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
\r
50 * Parameterized callback handler.
\r
52 protected CallbackResult handleCallback(String method, Object message,
\r
53 String messageEventName, String messageVariable,
\r
54 String correlationVariable, String correlationValue,
\r
57 return handleCallback(method, message, messageEventName, messageVariable,
\r
58 correlationVariable, correlationValue, logMarker, null);
\r
62 * Parameterized callback handler.
\r
64 protected CallbackResult handleCallback(String method, Object message,
\r
65 String messageEventName, String messageVariable,
\r
66 String correlationVariable, String correlationValue,
\r
67 String logMarker, Map<String, Object> injectedVariables) {
\r
69 long startTime = System.currentTimeMillis();
\r
71 LOGGER.debug(logMarker + " " + method + " received message: "
\r
72 + (message == null ? "" : System.lineSeparator()) + message);
\r
75 Map<String, Object> variables = new HashMap<>();
\r
77 if (injectedVariables != null) {
\r
78 variables.putAll(injectedVariables);
\r
81 variables.put(correlationVariable, correlationValue);
\r
82 variables.put(messageVariable, message == null ? null : message.toString());
\r
84 boolean ok = correlate(messageEventName, correlationVariable,
\r
85 correlationValue, variables, logMarker);
\r
88 String msg = "No process is waiting for " + messageEventName
\r
89 + " with " + correlationVariable + " = '" + correlationValue + "'";
\r
90 logCallbackError(method, startTime, msg);
\r
91 return new CallbackError(msg);
\r
94 logCallbackSuccess(method, startTime);
\r
95 return new CallbackSuccess();
\r
96 } catch (Exception e) {
\r
97 LOGGER.debug("Exception :",e);
\r
98 String msg = "Caught " + e.getClass().getSimpleName()
\r
99 + " processing " + messageEventName + " with " + correlationVariable
\r
100 + " = '" + correlationValue + "'";
\r
101 logCallbackError(method, startTime, msg);
\r
102 return new CallbackError(msg);
\r
107 * Performs message correlation. Waits a limited amount of time for
\r
108 * a process to become ready for correlation. The return value indicates
\r
109 * whether or not a process was found to receive the message. Due to the
\r
110 * synchronous nature of message injection in Camunda, by the time this
\r
111 * method returns, one of 3 things will have happened: (1) the process
\r
112 * received the message and ended, (2) the process received the message
\r
113 * and reached an activity that suspended, or (3) an exception occurred
\r
114 * during correlation or while the process was executing. Correlation
\r
115 * exceptions are handled differently from process execution exceptions.
\r
116 * Correlation exceptions are thrown so the client knows something went
\r
117 * wrong with the delivery of the message. Process execution exceptions
\r
118 * are logged but not thrown.
\r
119 * @param messageEventName the message event name
\r
120 * @param correlationVariable the process variable used as the correlator
\r
121 * @param correlationValue the correlation value
\r
122 * @param variables variables to inject into the process
\r
123 * @param logMarker a marker for debug logging
\r
124 * @return true if a process could be found, false if not
\r
125 * @throws Exception for correlation errors
\r
127 protected boolean correlate(String messageEventName, String correlationVariable,
\r
128 String correlationValue, Map<String, Object> variables, String logMarker)
\r
131 LOGGER.debug(logMarker + " Attempting to find process waiting"
\r
132 + " for " + messageEventName + " with " + correlationVariable
\r
133 + " = '" + correlationValue + "'");
\r
135 RuntimeService runtimeService =
\r
136 getProcessEngineServices().getRuntimeService();
\r
138 Map<String, String> properties =
\r
139 PropertyConfiguration.getInstance().getProperties("mso.bpmn.urn.properties");
\r
141 long timeout = DEFAULT_TIMEOUT_SECONDS;
\r
143 // The code is here in case we ever need to change the default.
\r
144 String s = properties.get("mso.correlation.timeout");
\r
147 timeout = Long.parseLong(s);
\r
148 } catch (NumberFormatException e) {
\r
153 long now = System.currentTimeMillis();
\r
154 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
\r
155 long endTime = now + (timeout * 1000);
\r
156 long sleep = FAST_POLL_INT_MS;
\r
158 List<Execution> waitingProcesses = null;
\r
159 Exception queryException = null;
\r
160 int queryCount = 0;
\r
161 int queryFailCount = 0;
\r
166 waitingProcesses = runtimeService.createExecutionQuery()
\r
167 .messageEventSubscriptionName(messageEventName)
\r
168 .processVariableValueEquals(correlationVariable, correlationValue)
\r
170 } catch (Exception e) {
\r
172 queryException = e;
\r
175 if (waitingProcesses != null && waitingProcesses.size() > 0) {
\r
179 if (now > endTime - sleep) {
\r
183 Thread.sleep(sleep);
\r
184 now = System.currentTimeMillis();
\r
186 if (now > fastPollEndTime) {
\r
187 sleep = SLOW_POLL_INT_MS;
\r
191 if (waitingProcesses == null) {
\r
192 waitingProcesses = new ArrayList<Execution>(0);
\r
195 int count = waitingProcesses.size();
\r
197 List<ExecInfo> execInfoList = new ArrayList<>(count);
\r
198 for (Execution execution : waitingProcesses) {
\r
199 execInfoList.add(new ExecInfo(execution));
\r
202 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"
\r
203 + " for " + messageEventName + " with " + correlationVariable
\r
204 + " = '" + correlationValue + "': " + execInfoList);
\r
207 if (queryFailCount > 0) {
\r
208 String msg = queryFailCount + "/" + queryCount
\r
209 + " execution queries failed attempting to correlate "
\r
210 + messageEventName + " with " + correlationVariable
\r
211 + " = '" + correlationValue + "'; last exception was:"
\r
214 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
215 MsoLogger.ErrorCode.UnknownError, msg, queryException);
\r
222 // Only one process should be waiting. Throw an exception back to the client.
\r
223 throw new MismatchingMessageCorrelationException(messageEventName,
\r
224 "more than 1 process is waiting with " + correlationVariable
\r
225 + " = '" + correlationValue + "'");
\r
228 // We prototyped an asynchronous solution, i.e. resuming the process
\r
229 // flow in a separate thread, but this affected too many existing tests,
\r
230 // and we went back to the synchronous solution. The synchronous solution
\r
231 // has some troublesome characteristics though. For example, the
\r
232 // resumed flow may send request #2 to a remote system before MSO has
\r
233 // acknowledged the notification associated with request #1.
\r
236 LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
\r
237 + messageEventName + " with " + correlationVariable + " = '"
\r
238 + correlationValue + "'");
\r
240 @SuppressWarnings("unused")
\r
241 MessageCorrelationResult result = runtimeService
\r
242 .createMessageCorrelation(messageEventName)
\r
243 .setVariables(variables)
\r
244 .processInstanceVariableEquals(correlationVariable, correlationValue)
\r
245 .correlateWithResult();
\r
247 } catch (MismatchingMessageCorrelationException e) {
\r
248 // A correlation exception occurred even after we identified
\r
249 // one waiting process. Throw it back to the client.
\r
251 } catch (OptimisticLockingException ole) {
\r
253 String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
\r
254 + " with " + correlationVariable + " = '" + correlationValue
\r
257 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
\r
258 MsoLogger.ErrorCode.UnknownError, msg, ole);
\r
260 //Retry for OptimisticLocking Exceptions
\r
261 int retryCount = 0;
\r
262 String retryStr = properties.get("mso.bpmn.optimisticlockingexception.retrycount");
\r
263 if (retryStr != null) {
\r
265 retryCount = Integer.parseInt(retryStr);
\r
266 } catch (NumberFormatException e) {
\r
271 LOGGER.debug("Retry correlate for OptimisticLockingException, retryCount:" + retryCount);
\r
273 for (; retryCount >0 ; retryCount--) {
\r
276 Thread.sleep(SLOW_POLL_INT_MS);
\r
278 @SuppressWarnings("unused")
\r
279 MessageCorrelationResult result = runtimeService
\r
280 .createMessageCorrelation(messageEventName)
\r
281 .setVariables(variables)
\r
282 .processInstanceVariableEquals(correlationVariable, correlationValue)
\r
283 .correlateWithResult();
\r
285 LOGGER.debug("OptimisticLockingException retry was successful, seting retryCount: " + retryCount);
\r
286 } catch (OptimisticLockingException olex) {
\r
287 //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
\r
288 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + olex;
\r
289 LOGGER.debug(strMsg);
\r
290 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
291 MsoLogger.ErrorCode.UnknownError, strMsg, olex);
\r
292 } catch (Exception excep) {
\r
294 //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
\r
295 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + excep;
\r
296 LOGGER.debug(strMsg);
\r
297 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
298 MsoLogger.ErrorCode.UnknownError, strMsg, excep);
\r
303 }catch (Exception e) {
\r
304 // This must be an exception from the flow itself. Log it, but don't
\r
305 // report it back to the client.
\r
306 String msg = "Caught " + e.getClass().getSimpleName() + " running "
\r
307 + execInfoList.get(0) + " after receiving " + messageEventName
\r
308 + " with " + correlationVariable + " = '" + correlationValue
\r
311 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
312 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
314 } catch (Exception e) {
\r
315 // This must be an exception from the flow itself. Log it, but don't
\r
316 // report it back to the client.
\r
317 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
\r
318 + " with " + correlationVariable + " = '" + correlationValue
\r
321 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
\r
322 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
329 * Records audit and metric events in the log for a callback success.
\r
330 * @param method the method name
\r
331 * @param startTime the request start time
\r
333 protected void logCallbackSuccess(String method, long startTime) {
\r
334 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
335 MsoLogger.ResponseCode.Suc, "Completed " + method);
\r
337 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
338 MsoLogger.ResponseCode.Suc, "Completed " + method,
\r
339 "BPMN", MsoLogger.getServiceName(), null);
\r
343 * Records error, audit and metric events in the log for a callback
\r
345 * @param method the method name
\r
346 * @param startTime the request start time
\r
347 * @param msg the error message
\r
349 protected void logCallbackError(String method, long startTime, String msg) {
\r
350 logCallbackError(method, startTime, msg, null);
\r
354 * Records error, audit and metric events in the log for a callback
\r
356 * @param method the method name
\r
357 * @param startTime the request start time
\r
358 * @param msg the error message
\r
359 * @param e the exception
\r
361 protected void logCallbackError(String method, long startTime, String msg, Exception e) {
\r
363 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
364 MsoLogger.ErrorCode.UnknownError, msg);
\r
366 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
367 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
370 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
371 MsoLogger.ResponseCode.InternalError, "Completed " + method);
\r
373 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
374 MsoLogger.ResponseCode.InternalError, "Completed " + method,
\r
375 "BPMN", MsoLogger.getServiceName(), null);
\r
379 * Abstract callback result object.
\r
381 protected abstract class CallbackResult {
\r
385 * Indicates that callback handling was successful.
\r
387 protected class CallbackSuccess extends CallbackResult {
\r
391 * Indicates that callback handling failed.
\r
393 protected class CallbackError extends CallbackResult {
\r
394 private final String errorMessage;
\r
396 public CallbackError(String errorMessage) {
\r
397 this.errorMessage = errorMessage;
\r
401 * Gets the error message.
\r
403 public String getErrorMessage() {
\r
404 return errorMessage;
\r
408 private static class ExecInfo {
\r
409 private final Execution execution;
\r
411 public ExecInfo(Execution execution) {
\r
412 this.execution = execution;
\r
416 public String toString() {
\r
417 return "Process[" + execution.getProcessInstanceId()
\r
418 + ":" + execution.getId() + "]";
\r