2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright (c) 2019 Samsung
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.so.bpmn.common.workflow.service;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
29 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
30 import org.camunda.bpm.engine.OptimisticLockingException;
31 import org.camunda.bpm.engine.RuntimeService;
32 import org.camunda.bpm.engine.runtime.Execution;
33 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
34 import org.onap.so.bpmn.core.UrnPropertiesReader;
35 import org.onap.so.logger.ErrorCode;
36 import org.onap.so.logger.MessageEnum;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.springframework.beans.factory.annotation.Autowired;
40 import org.springframework.scheduling.annotation.Async;
41 import org.springframework.stereotype.Service;
44 * Abstract base class for callback services.
47 public class CallbackHandlerService {
48 public static final long DEFAULT_TIMEOUT_SECONDS = 60;
49 public static final long FAST_POLL_DUR_SECONDS = 5;
50 public static final long FAST_POLL_INT_MS = 100;
51 public static final long SLOW_POLL_INT_MS = 1000;
53 private static final Logger logger = LoggerFactory.getLogger(CallbackHandlerService.class);
55 private RuntimeService runtimeService;
58 public CallbackHandlerService(RuntimeService runtimeService) {
59 this.runtimeService = runtimeService;
63 * Parameterized callback handler.
66 protected CallbackResult handleCallback(String method, Object message, String messageEventName,
67 String messageVariable, String correlationVariable, String correlationValue, String logMarker) {
69 return handleCallback(method, message, messageEventName, messageVariable, correlationVariable, correlationValue,
74 * Parameterized callback handler.
76 protected CallbackResult handleCallback(String method, Object message, String messageEventName,
77 String messageVariable, String correlationVariable, String correlationValue, String logMarker,
78 Map<String, Object> injectedVariables) {
80 long startTime = System.currentTimeMillis();
82 logger.debug(logMarker + " " + method + " received message: " + (message == null ? "" : System.lineSeparator())
86 Map<String, Object> variables = new HashMap<>();
88 if (injectedVariables != null) {
89 variables.putAll(injectedVariables);
92 variables.put(correlationVariable, correlationValue);
93 variables.put(messageVariable, message == null ? null : message.toString());
95 boolean ok = correlate(messageEventName, correlationVariable, correlationValue, variables, logMarker);
98 String msg = "No process is waiting for " + messageEventName + " with " + correlationVariable + " = '"
99 + correlationValue + "'";
100 logCallbackError(method, startTime, msg);
101 return new CallbackError(msg);
104 logCallbackSuccess(method, startTime);
105 return new CallbackSuccess();
106 } catch (Exception e) {
107 logger.debug("Exception :", e);
108 String msg = "Caught " + e.getClass().getSimpleName() + " processing " + messageEventName + " with "
109 + correlationVariable + " = '" + correlationValue + "'";
110 logCallbackError(method, startTime, msg);
111 return new CallbackError(msg);
116 * Performs message correlation. Waits a limited amount of time for a process to become ready for correlation. The
117 * return value indicates whether or not a process was found to receive the message. Due to the synchronous nature
118 * of message injection in Camunda, by the time this method returns, one of 3 things will have happened: (1) the
119 * process received the message and ended, (2) the process received the message and reached an activity that
120 * suspended, or (3) an exception occurred during correlation or while the process was executing. Correlation
121 * exceptions are handled differently from process execution exceptions. Correlation exceptions are thrown so the
122 * client knows something went wrong with the delivery of the message. Process execution exceptions are logged but
125 * @param messageEventName the message event name
126 * @param correlationVariable the process variable used as the correlator
127 * @param correlationValue the correlation value
128 * @param variables variables to inject into the process
129 * @param logMarker a marker for debug logging
130 * @return true if a process could be found, false if not
132 protected boolean correlate(String messageEventName, String correlationVariable, String correlationValue,
133 Map<String, Object> variables, String logMarker) {
135 logger.debug(logMarker + " Attempting to find process waiting" + " for " + messageEventName + " with "
136 + correlationVariable + " = '" + correlationValue + "'");
140 long timeout = DEFAULT_TIMEOUT_SECONDS;
142 // The code is here in case we ever need to change the default.
143 String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
144 if (correlationTimemout != null) {
146 timeout = Long.parseLong(correlationTimemout);
147 } catch (NumberFormatException e) {
152 long now = System.currentTimeMillis();
153 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
154 long endTime = now + (timeout * 1000);
155 long sleep = FAST_POLL_INT_MS;
157 List<Execution> waitingProcesses = null;
158 Exception queryException = null;
160 int queryFailCount = 0;
166 runtimeService.createExecutionQuery().messageEventSubscriptionName(messageEventName)
167 .processVariableValueEquals(correlationVariable, correlationValue).list();
168 } catch (Exception e) {
173 if (waitingProcesses != null && waitingProcesses.size() > 0) {
177 if (now > endTime - sleep) {
182 now = System.currentTimeMillis();
184 if (now > fastPollEndTime) {
185 sleep = SLOW_POLL_INT_MS;
189 if (waitingProcesses == null) {
190 waitingProcesses = new ArrayList<Execution>(0);
193 int count = waitingProcesses.size();
195 List<ExecInfo> execInfoList = new ArrayList<>(count);
196 for (Execution execution : waitingProcesses) {
197 execInfoList.add(new ExecInfo(execution));
200 logger.debug(logMarker + " Found " + count + " process(es) waiting" + " for " + messageEventName + " with "
201 + correlationVariable + " = '" + correlationValue + "': " + execInfoList);
204 if (queryFailCount > 0) {
206 queryFailCount + "/" + queryCount + " execution queries failed attempting to correlate "
207 + messageEventName + " with " + correlationVariable + " = '" + correlationValue
208 + "'; last exception was:" + queryException;
210 logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
211 ErrorCode.UnknownError.getValue(), msg, queryException);
218 // Only one process should be waiting. Throw an exception back to the client.
219 throw new MismatchingMessageCorrelationException(messageEventName,
220 "more than 1 process is waiting with " + correlationVariable + " = '" + correlationValue + "'");
223 // We prototyped an asynchronous solution, i.e. resuming the process
224 // flow in a separate thread, but this affected too many existing tests,
225 // and we went back to the synchronous solution. The synchronous solution
226 // has some troublesome characteristics though. For example, the
227 // resumed flow may send request #2 to a remote system before MSO has
228 // acknowledged the notification associated with request #1.
231 logger.debug(logMarker + " Running " + execInfoList.get(0) + " to receive " + messageEventName
232 + " with " + correlationVariable + " = '" + correlationValue + "'");
234 @SuppressWarnings("unused")
235 MessageCorrelationResult result = runtimeService.createMessageCorrelation(messageEventName)
236 .setVariables(variables).processInstanceVariableEquals(correlationVariable, correlationValue)
237 .correlateWithResult();
239 } catch (MismatchingMessageCorrelationException e) {
240 // A correlation exception occurred even after we identified
241 // one waiting process. Throw it back to the client.
243 } catch (OptimisticLockingException ole) {
245 String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
246 + " with " + correlationVariable + " = '" + correlationValue + "': " + ole;
248 logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
249 ErrorCode.UnknownError.getValue(), msg, ole);
251 // Retry for OptimisticLocking Exceptions
253 String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
254 if (retryStr != null) {
256 retryCount = Integer.parseInt(retryStr);
257 } catch (NumberFormatException e) {
262 logger.debug("Retry correlate for OptimisticLockingException, retryCount:{}", retryCount);
264 for (; retryCount > 0; retryCount--) {
267 Thread.sleep(SLOW_POLL_INT_MS);
269 @SuppressWarnings("unused")
270 MessageCorrelationResult result =
271 runtimeService.createMessageCorrelation(messageEventName).setVariables(variables)
272 .processInstanceVariableEquals(correlationVariable, correlationValue)
273 .correlateWithResult();
275 logger.debug("OptimisticLockingException retry was successful, seting retryCount: {}",
277 } catch (OptimisticLockingException olex) {
278 // oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
279 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:"
280 + retryCount + " | exception returned: " + olex;
281 logger.debug(strMsg);
282 logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
283 ErrorCode.UnknownError.getValue(), strMsg, olex);
284 } catch (Exception excep) {
286 // oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
287 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:"
288 + retryCount + " | exception returned: " + excep;
289 logger.debug(strMsg);
290 logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
291 ErrorCode.UnknownError.getValue(), strMsg, excep);
296 } catch (Exception e) {
297 // This must be an exception from the flow itself. Log it, but don't
298 // report it back to the client.
299 String msg = "Caught " + e.getClass().getSimpleName() + " running " + execInfoList.get(0)
300 + " after receiving " + messageEventName + " with " + correlationVariable + " = '"
301 + correlationValue + "': " + e;
303 logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
304 ErrorCode.UnknownError.getValue(), msg, e);
306 } catch (Exception e) {
307 // This must be an exception from the flow itself. Log it, but don't
308 // report it back to the client.
309 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName + " with "
310 + correlationVariable + " = '" + correlationValue + "': " + e;
312 logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
313 ErrorCode.UnknownError.getValue(), msg, e);
320 * Records audit and metric events in the log for a callback success.
322 * @param method the method name
323 * @param startTime the request start time
325 protected void logCallbackSuccess(String method, long startTime) {}
328 * Records error, audit and metric events in the log for a callback internal error.
330 * @param method the method name
331 * @param startTime the request start time
332 * @param msg the error message
334 protected void logCallbackError(String method, long startTime, String msg) {
335 logCallbackError(method, startTime, msg, null);
339 * Records error, audit and metric events in the log for a callback internal error.
341 * @param method the method name
342 * @param startTime the request start time
343 * @param msg the error message
344 * @param e the exception
346 protected void logCallbackError(String method, long startTime, String msg, Exception e) {
348 logger.error("{} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN",
349 ErrorCode.UnknownError.getValue(), msg);
351 logger.error("{} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN",
352 ErrorCode.UnknownError.getValue(), msg, e);
357 * Abstract callback result object.
359 protected abstract class CallbackResult {
363 * Indicates that callback handling was successful.
365 protected class CallbackSuccess extends CallbackResult {
369 * Indicates that callback handling failed.
371 protected class CallbackError extends CallbackResult {
372 private final String errorMessage;
374 public CallbackError(String errorMessage) {
375 this.errorMessage = errorMessage;
379 * Gets the error message.
381 public String getErrorMessage() {
386 private static class ExecInfo {
387 private final Execution execution;
389 public ExecInfo(Execution execution) {
390 this.execution = execution;
394 public String toString() {
395 return "Process[" + execution.getProcessInstanceId() + ":" + execution.getId() + "]";