2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright (c) 2019 Samsung
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.so.bpmn.common.workflow.service;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
29 import org.onap.so.logger.LoggingAnchor;
30 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
31 import org.camunda.bpm.engine.OptimisticLockingException;
32 import org.camunda.bpm.engine.RuntimeService;
33 import org.camunda.bpm.engine.runtime.Execution;
34 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
35 import org.onap.so.bpmn.core.UrnPropertiesReader;
36 import org.onap.so.logger.ErrorCode;
37 import org.onap.so.logger.MessageEnum;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.scheduling.annotation.Async;
42 import org.springframework.stereotype.Service;
45 * Abstract base class for callback services.
48 public class CallbackHandlerService {
49 public static final long DEFAULT_TIMEOUT_SECONDS = 60;
50 public static final long FAST_POLL_DUR_SECONDS = 5;
51 public static final long FAST_POLL_INT_MS = 100;
52 public static final long SLOW_POLL_INT_MS = 1000;
54 private static final Logger logger = LoggerFactory.getLogger(CallbackHandlerService.class);
56 private RuntimeService runtimeService;
59 public CallbackHandlerService(RuntimeService runtimeService) {
60 this.runtimeService = runtimeService;
64 * Parameterized callback handler.
67 protected CallbackResult handleCallback(String method, Object message, String messageEventName,
68 String messageVariable, String correlationVariable, String correlationValue, String logMarker) {
70 return handleCallback(method, message, messageEventName, messageVariable, correlationVariable, correlationValue,
75 * Parameterized callback handler.
77 protected CallbackResult handleCallback(String method, Object message, String messageEventName,
78 String messageVariable, String correlationVariable, String correlationValue, String logMarker,
79 Map<String, Object> injectedVariables) {
81 long startTime = System.currentTimeMillis();
83 logger.debug(logMarker + " " + method + " received message: " + (message == null ? "" : System.lineSeparator())
87 Map<String, Object> variables = new HashMap<>();
89 if (injectedVariables != null) {
90 variables.putAll(injectedVariables);
93 variables.put(correlationVariable, correlationValue);
94 variables.put(messageVariable, message == null ? null : message.toString());
96 boolean ok = correlate(messageEventName, correlationVariable, correlationValue, variables, logMarker);
99 String msg = "No process is waiting for " + messageEventName + " with " + correlationVariable + " = '"
100 + correlationValue + "'";
101 logCallbackError(method, startTime, msg);
102 return new CallbackError(msg);
105 logCallbackSuccess(method, startTime);
106 return new CallbackSuccess();
107 } catch (Exception e) {
108 logger.debug("Exception :", e);
109 String msg = "Caught " + e.getClass().getSimpleName() + " processing " + messageEventName + " with "
110 + correlationVariable + " = '" + correlationValue + "'";
111 logCallbackError(method, startTime, msg);
112 return new CallbackError(msg);
117 * Performs message correlation. Waits a limited amount of time for a process to become ready for correlation. The
118 * return value indicates whether or not a process was found to receive the message. Due to the synchronous nature
119 * of message injection in Camunda, by the time this method returns, one of 3 things will have happened: (1) the
120 * process received the message and ended, (2) the process received the message and reached an activity that
121 * suspended, or (3) an exception occurred during correlation or while the process was executing. Correlation
122 * exceptions are handled differently from process execution exceptions. Correlation exceptions are thrown so the
123 * client knows something went wrong with the delivery of the message. Process execution exceptions are logged but
126 * @param messageEventName the message event name
127 * @param correlationVariable the process variable used as the correlator
128 * @param correlationValue the correlation value
129 * @param variables variables to inject into the process
130 * @param logMarker a marker for debug logging
131 * @return true if a process could be found, false if not
133 protected boolean correlate(String messageEventName, String correlationVariable, String correlationValue,
134 Map<String, Object> variables, String logMarker) {
136 logger.debug(logMarker + " Attempting to find process waiting" + " for " + messageEventName + " with "
137 + correlationVariable + " = '" + correlationValue + "'");
141 long timeout = DEFAULT_TIMEOUT_SECONDS;
143 // The code is here in case we ever need to change the default.
144 String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
145 if (correlationTimemout != null) {
147 timeout = Long.parseLong(correlationTimemout);
148 } catch (NumberFormatException e) {
153 long now = System.currentTimeMillis();
154 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
155 long endTime = now + (timeout * 1000);
156 long sleep = FAST_POLL_INT_MS;
158 List<Execution> waitingProcesses = null;
159 Exception queryException = null;
161 int queryFailCount = 0;
167 runtimeService.createExecutionQuery().messageEventSubscriptionName(messageEventName)
168 .processVariableValueEquals(correlationVariable, correlationValue).list();
169 } catch (Exception e) {
174 if (waitingProcesses != null && waitingProcesses.size() > 0) {
178 if (now > endTime - sleep) {
183 now = System.currentTimeMillis();
185 if (now > fastPollEndTime) {
186 sleep = SLOW_POLL_INT_MS;
190 if (waitingProcesses == null) {
191 waitingProcesses = new ArrayList<Execution>(0);
194 int count = waitingProcesses.size();
196 List<ExecInfo> execInfoList = new ArrayList<>(count);
197 for (Execution execution : waitingProcesses) {
198 execInfoList.add(new ExecInfo(execution));
201 logger.debug(logMarker + " Found " + count + " process(es) waiting" + " for " + messageEventName + " with "
202 + correlationVariable + " = '" + correlationValue + "': " + execInfoList);
205 if (queryFailCount > 0) {
207 queryFailCount + "/" + queryCount + " execution queries failed attempting to correlate "
208 + messageEventName + " with " + correlationVariable + " = '" + correlationValue
209 + "'; last exception was:" + queryException;
211 logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
212 ErrorCode.UnknownError.getValue(), msg, queryException);
219 // Only one process should be waiting. Throw an exception back to the client.
220 throw new MismatchingMessageCorrelationException(messageEventName,
221 "more than 1 process is waiting with " + correlationVariable + " = '" + correlationValue + "'");
224 // We prototyped an asynchronous solution, i.e. resuming the process
225 // flow in a separate thread, but this affected too many existing tests,
226 // and we went back to the synchronous solution. The synchronous solution
227 // has some troublesome characteristics though. For example, the
228 // resumed flow may send request #2 to a remote system before MSO has
229 // acknowledged the notification associated with request #1.
232 logger.debug(logMarker + " Running " + execInfoList.get(0) + " to receive " + messageEventName
233 + " with " + correlationVariable + " = '" + correlationValue + "'");
235 @SuppressWarnings("unused")
236 MessageCorrelationResult result = runtimeService.createMessageCorrelation(messageEventName)
237 .setVariables(variables).processInstanceVariableEquals(correlationVariable, correlationValue)
238 .correlateWithResult();
240 } catch (MismatchingMessageCorrelationException e) {
241 // A correlation exception occurred even after we identified
242 // one waiting process. Throw it back to the client.
244 } catch (OptimisticLockingException ole) {
246 String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
247 + " with " + correlationVariable + " = '" + correlationValue + "': " + ole;
249 logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(),
250 "BPMN CORRELATION ERROR -", ErrorCode.UnknownError.getValue(), msg, ole);
252 // Retry for OptimisticLocking Exceptions
254 String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
255 if (retryStr != null) {
257 retryCount = Integer.parseInt(retryStr);
258 } catch (NumberFormatException e) {
263 logger.debug("Retry correlate for OptimisticLockingException, retryCount:{}", retryCount);
265 for (; retryCount > 0; retryCount--) {
268 Thread.sleep(SLOW_POLL_INT_MS);
270 @SuppressWarnings("unused")
271 MessageCorrelationResult result =
272 runtimeService.createMessageCorrelation(messageEventName).setVariables(variables)
273 .processInstanceVariableEquals(correlationVariable, correlationValue)
274 .correlateWithResult();
276 logger.debug("OptimisticLockingException retry was successful, seting retryCount: {}",
278 } catch (OptimisticLockingException olex) {
279 // oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
280 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:"
281 + retryCount + " | exception returned: " + olex;
282 logger.debug(strMsg);
283 logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
284 ErrorCode.UnknownError.getValue(), strMsg, olex);
285 } catch (Exception excep) {
287 // oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
288 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:"
289 + retryCount + " | exception returned: " + excep;
290 logger.debug(strMsg);
291 logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
292 ErrorCode.UnknownError.getValue(), strMsg, excep);
297 } catch (Exception e) {
298 // This must be an exception from the flow itself. Log it, but don't
299 // report it back to the client.
300 String msg = "Caught " + e.getClass().getSimpleName() + " running " + execInfoList.get(0)
301 + " after receiving " + messageEventName + " with " + correlationVariable + " = '"
302 + correlationValue + "': " + e;
304 logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
305 ErrorCode.UnknownError.getValue(), msg, e);
307 } catch (Exception e) {
308 // This must be an exception from the flow itself. Log it, but don't
309 // report it back to the client.
310 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName + " with "
311 + correlationVariable + " = '" + correlationValue + "': " + e;
313 logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
314 ErrorCode.UnknownError.getValue(), msg, e);
321 * Records audit and metric events in the log for a callback success.
323 * @param method the method name
324 * @param startTime the request start time
326 protected void logCallbackSuccess(String method, long startTime) {}
329 * Records error, audit and metric events in the log for a callback internal error.
331 * @param method the method name
332 * @param startTime the request start time
333 * @param msg the error message
335 protected void logCallbackError(String method, long startTime, String msg) {
336 logCallbackError(method, startTime, msg, null);
340 * Records error, audit and metric events in the log for a callback internal error.
342 * @param method the method name
343 * @param startTime the request start time
344 * @param msg the error message
345 * @param e the exception
347 protected void logCallbackError(String method, long startTime, String msg, Exception e) {
349 logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN",
350 ErrorCode.UnknownError.getValue(), msg);
352 logger.error(LoggingAnchor.FOUR, MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN",
353 ErrorCode.UnknownError.getValue(), msg, e);
358 * Abstract callback result object.
360 protected abstract class CallbackResult {
364 * Indicates that callback handling was successful.
366 protected class CallbackSuccess extends CallbackResult {
370 * Indicates that callback handling failed.
372 protected class CallbackError extends CallbackResult {
373 private final String errorMessage;
375 public CallbackError(String errorMessage) {
376 this.errorMessage = errorMessage;
380 * Gets the error message.
382 public String getErrorMessage() {
387 private static class ExecInfo {
388 private final Execution execution;
390 public ExecInfo(Execution execution) {
391 this.execution = execution;
395 public String toString() {
396 return "Process[" + execution.getProcessInstanceId() + ":" + execution.getId() + "]";