2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright (c) 2019 Samsung
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.so.bpmn.common.workflow.service;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
30 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
31 import org.camunda.bpm.engine.OptimisticLockingException;
32 import org.camunda.bpm.engine.RuntimeService;
33 import org.camunda.bpm.engine.runtime.Execution;
34 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
35 import org.onap.so.bpmn.core.UrnPropertiesReader;
36 import org.onap.so.logger.MessageEnum;
37 import org.onap.so.logger.MsoLogger;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.scheduling.annotation.Async;
42 import org.springframework.stereotype.Service;
45 * Abstract base class for callback services.
48 public class CallbackHandlerService {
49 public static final long DEFAULT_TIMEOUT_SECONDS = 60;
50 public static final long FAST_POLL_DUR_SECONDS = 5;
51 public static final long FAST_POLL_INT_MS = 100;
52 public static final long SLOW_POLL_INT_MS = 1000;
54 private static final Logger logger = LoggerFactory.getLogger(CallbackHandlerService.class);
56 private RuntimeService runtimeService;
59 public CallbackHandlerService(RuntimeService runtimeService) {
60 this.runtimeService = runtimeService;
64 * Parameterized callback handler.
67 protected CallbackResult handleCallback(String method, Object message,
68 String messageEventName, String messageVariable,
69 String correlationVariable, String correlationValue,
72 return handleCallback(method, message, messageEventName, messageVariable,
73 correlationVariable, correlationValue, logMarker, null);
77 * Parameterized callback handler.
79 protected CallbackResult handleCallback(String method, Object message,
80 String messageEventName, String messageVariable,
81 String correlationVariable, String correlationValue,
82 String logMarker, Map<String, Object> injectedVariables) {
84 long startTime = System.currentTimeMillis();
86 logger.debug(logMarker + " " + method + " received message: "
87 + (message == null ? "" : System.lineSeparator()) + message);
90 Map<String, Object> variables = new HashMap<>();
92 if (injectedVariables != null) {
93 variables.putAll(injectedVariables);
96 variables.put(correlationVariable, correlationValue);
97 variables.put(messageVariable, message == null ? null : message.toString());
99 boolean ok = correlate(messageEventName, correlationVariable,
100 correlationValue, variables, logMarker);
103 String msg = "No process is waiting for " + messageEventName
104 + " with " + correlationVariable + " = '" + correlationValue + "'";
105 logCallbackError(method, startTime, msg);
106 return new CallbackError(msg);
109 logCallbackSuccess(method, startTime);
110 return new CallbackSuccess();
111 } catch (Exception e) {
112 logger.debug("Exception :",e);
113 String msg = "Caught " + e.getClass().getSimpleName()
114 + " processing " + messageEventName + " with " + correlationVariable
115 + " = '" + correlationValue + "'";
116 logCallbackError(method, startTime, msg);
117 return new CallbackError(msg);
122 * Performs message correlation. Waits a limited amount of time for
123 * a process to become ready for correlation. The return value indicates
124 * whether or not a process was found to receive the message. Due to the
125 * synchronous nature of message injection in Camunda, by the time this
126 * method returns, one of 3 things will have happened: (1) the process
127 * received the message and ended, (2) the process received the message
128 * and reached an activity that suspended, or (3) an exception occurred
129 * during correlation or while the process was executing. Correlation
130 * exceptions are handled differently from process execution exceptions.
131 * Correlation exceptions are thrown so the client knows something went
132 * wrong with the delivery of the message. Process execution exceptions
133 * are logged but not thrown.
134 * @param messageEventName the message event name
135 * @param correlationVariable the process variable used as the correlator
136 * @param correlationValue the correlation value
137 * @param variables variables to inject into the process
138 * @param logMarker a marker for debug logging
139 * @return true if a process could be found, false if not
141 protected boolean correlate(String messageEventName, String correlationVariable,
142 String correlationValue, Map<String, Object> variables, String logMarker) {
144 logger.debug(logMarker + " Attempting to find process waiting"
145 + " for " + messageEventName + " with " + correlationVariable
146 + " = '" + correlationValue + "'");
150 long timeout = DEFAULT_TIMEOUT_SECONDS;
152 // The code is here in case we ever need to change the default.
153 String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
154 if (correlationTimemout != null) {
156 timeout = Long.parseLong(correlationTimemout);
157 } catch (NumberFormatException e) {
162 long now = System.currentTimeMillis();
163 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
164 long endTime = now + (timeout * 1000);
165 long sleep = FAST_POLL_INT_MS;
167 List<Execution> waitingProcesses = null;
168 Exception queryException = null;
170 int queryFailCount = 0;
175 waitingProcesses = runtimeService.createExecutionQuery()
176 .messageEventSubscriptionName(messageEventName)
177 .processVariableValueEquals(correlationVariable, correlationValue)
179 } catch (Exception e) {
184 if (waitingProcesses != null && waitingProcesses.size() > 0) {
188 if (now > endTime - sleep) {
193 now = System.currentTimeMillis();
195 if (now > fastPollEndTime) {
196 sleep = SLOW_POLL_INT_MS;
200 if (waitingProcesses == null) {
201 waitingProcesses = new ArrayList<Execution>(0);
204 int count = waitingProcesses.size();
206 List<ExecInfo> execInfoList = new ArrayList<>(count);
207 for (Execution execution : waitingProcesses) {
208 execInfoList.add(new ExecInfo(execution));
211 logger.debug(logMarker + " Found " + count + " process(es) waiting"
212 + " for " + messageEventName + " with " + correlationVariable
213 + " = '" + correlationValue + "': " + execInfoList);
216 if (queryFailCount > 0) {
217 String msg = queryFailCount + "/" + queryCount
218 + " execution queries failed attempting to correlate "
219 + messageEventName + " with " + correlationVariable
220 + " = '" + correlationValue + "'; last exception was:"
223 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
224 MsoLogger.ErrorCode.UnknownError.getValue(), msg, queryException);
231 // Only one process should be waiting. Throw an exception back to the client.
232 throw new MismatchingMessageCorrelationException(messageEventName,
233 "more than 1 process is waiting with " + correlationVariable
234 + " = '" + correlationValue + "'");
237 // We prototyped an asynchronous solution, i.e. resuming the process
238 // flow in a separate thread, but this affected too many existing tests,
239 // and we went back to the synchronous solution. The synchronous solution
240 // has some troublesome characteristics though. For example, the
241 // resumed flow may send request #2 to a remote system before MSO has
242 // acknowledged the notification associated with request #1.
245 logger.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
246 + messageEventName + " with " + correlationVariable + " = '"
247 + correlationValue + "'");
249 @SuppressWarnings("unused")
250 MessageCorrelationResult result = runtimeService
251 .createMessageCorrelation(messageEventName)
252 .setVariables(variables)
253 .processInstanceVariableEquals(correlationVariable, correlationValue)
254 .correlateWithResult();
256 } catch (MismatchingMessageCorrelationException e) {
257 // A correlation exception occurred even after we identified
258 // one waiting process. Throw it back to the client.
260 } catch (OptimisticLockingException ole) {
262 String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
263 + " with " + correlationVariable + " = '" + correlationValue
266 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
267 MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError.getValue(), msg, ole);
269 //Retry for OptimisticLocking Exceptions
271 String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
272 if (retryStr != null) {
274 retryCount = Integer.parseInt(retryStr);
275 } catch (NumberFormatException e) {
280 logger.debug("Retry correlate for OptimisticLockingException, retryCount:{}", retryCount);
282 for (; retryCount >0 ; retryCount--) {
285 Thread.sleep(SLOW_POLL_INT_MS);
287 @SuppressWarnings("unused")
288 MessageCorrelationResult result = runtimeService
289 .createMessageCorrelation(messageEventName)
290 .setVariables(variables)
291 .processInstanceVariableEquals(correlationVariable, correlationValue)
292 .correlateWithResult();
294 logger.debug("OptimisticLockingException retry was successful, seting retryCount: {}", retryCount);
295 } catch (OptimisticLockingException olex) {
296 //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
297 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + olex;
298 logger.debug(strMsg);
299 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
300 MsoLogger.ErrorCode.UnknownError.getValue(), strMsg, olex);
301 } catch (Exception excep) {
303 //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
304 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + excep;
305 logger.debug(strMsg);
306 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
307 MsoLogger.ErrorCode.UnknownError.getValue(), strMsg, excep);
312 }catch (Exception e) {
313 // This must be an exception from the flow itself. Log it, but don't
314 // report it back to the client.
315 String msg = "Caught " + e.getClass().getSimpleName() + " running "
316 + execInfoList.get(0) + " after receiving " + messageEventName
317 + " with " + correlationVariable + " = '" + correlationValue
320 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
321 MsoLogger.ErrorCode.UnknownError.getValue(), msg, e);
323 } catch (Exception e) {
324 // This must be an exception from the flow itself. Log it, but don't
325 // report it back to the client.
326 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
327 + " with " + correlationVariable + " = '" + correlationValue
330 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
331 MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError.getValue(), msg, e);
338 * Records audit and metric events in the log for a callback success.
339 * @param method the method name
340 * @param startTime the request start time
342 protected void logCallbackSuccess(String method, long startTime) {
346 * Records error, audit and metric events in the log for a callback
348 * @param method the method name
349 * @param startTime the request start time
350 * @param msg the error message
352 protected void logCallbackError(String method, long startTime, String msg) {
353 logCallbackError(method, startTime, msg, null);
357 * Records error, audit and metric events in the log for a callback
359 * @param method the method name
360 * @param startTime the request start time
361 * @param msg the error message
362 * @param e the exception
364 protected void logCallbackError(String method, long startTime, String msg, Exception e) {
366 logger.error("{} {} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
367 MsoLogger.ErrorCode.UnknownError.getValue(), msg);
369 logger.error("{} {} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
370 MsoLogger.ErrorCode.UnknownError.getValue(), msg, e);
375 * Abstract callback result object.
377 protected abstract class CallbackResult {
381 * Indicates that callback handling was successful.
383 protected class CallbackSuccess extends CallbackResult {
387 * Indicates that callback handling failed.
389 protected class CallbackError extends CallbackResult {
390 private final String errorMessage;
392 public CallbackError(String errorMessage) {
393 this.errorMessage = errorMessage;
397 * Gets the error message.
399 public String getErrorMessage() {
404 private static class ExecInfo {
405 private final Execution execution;
407 public ExecInfo(Execution execution) {
408 this.execution = execution;
412 public String toString() {
413 return "Process[" + execution.getProcessInstanceId()
414 + ":" + execution.getId() + "]";