2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Modifications Copyright (c) 2019 Samsung
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.so.bpmn.common.workflow.service;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
30 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
31 import org.camunda.bpm.engine.OptimisticLockingException;
32 import org.camunda.bpm.engine.RuntimeService;
33 import org.camunda.bpm.engine.runtime.Execution;
34 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
35 import org.onap.so.bpmn.core.UrnPropertiesReader;
36 import org.onap.so.logger.MessageEnum;
37 import org.onap.so.logger.MsoLogger;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.scheduling.annotation.Async;
42 import org.springframework.stereotype.Service;
45 * Abstract base class for callback services.
48 public class CallbackHandlerService {
49 public static final long DEFAULT_TIMEOUT_SECONDS = 60;
50 public static final long FAST_POLL_DUR_SECONDS = 5;
51 public static final long FAST_POLL_INT_MS = 100;
52 public static final long SLOW_POLL_INT_MS = 1000;
54 private static final Logger logger = LoggerFactory.getLogger(CallbackHandlerService.class);
57 RuntimeService runtimeService;
60 * Parameterized callback handler.
63 protected CallbackResult handleCallback(String method, Object message,
64 String messageEventName, String messageVariable,
65 String correlationVariable, String correlationValue,
68 return handleCallback(method, message, messageEventName, messageVariable,
69 correlationVariable, correlationValue, logMarker, null);
73 * Parameterized callback handler.
75 protected CallbackResult handleCallback(String method, Object message,
76 String messageEventName, String messageVariable,
77 String correlationVariable, String correlationValue,
78 String logMarker, Map<String, Object> injectedVariables) {
80 long startTime = System.currentTimeMillis();
82 logger.debug(logMarker + " " + method + " received message: "
83 + (message == null ? "" : System.lineSeparator()) + message);
86 Map<String, Object> variables = new HashMap<>();
88 if (injectedVariables != null) {
89 variables.putAll(injectedVariables);
92 variables.put(correlationVariable, correlationValue);
93 variables.put(messageVariable, message == null ? null : message.toString());
95 boolean ok = correlate(messageEventName, correlationVariable,
96 correlationValue, variables, logMarker);
99 String msg = "No process is waiting for " + messageEventName
100 + " with " + correlationVariable + " = '" + correlationValue + "'";
101 logCallbackError(method, startTime, msg);
102 return new CallbackError(msg);
105 logCallbackSuccess(method, startTime);
106 return new CallbackSuccess();
107 } catch (Exception e) {
108 logger.debug("Exception :",e);
109 String msg = "Caught " + e.getClass().getSimpleName()
110 + " processing " + messageEventName + " with " + correlationVariable
111 + " = '" + correlationValue + "'";
112 logCallbackError(method, startTime, msg);
113 return new CallbackError(msg);
118 * Performs message correlation. Waits a limited amount of time for
119 * a process to become ready for correlation. The return value indicates
120 * whether or not a process was found to receive the message. Due to the
121 * synchronous nature of message injection in Camunda, by the time this
122 * method returns, one of 3 things will have happened: (1) the process
123 * received the message and ended, (2) the process received the message
124 * and reached an activity that suspended, or (3) an exception occurred
125 * during correlation or while the process was executing. Correlation
126 * exceptions are handled differently from process execution exceptions.
127 * Correlation exceptions are thrown so the client knows something went
128 * wrong with the delivery of the message. Process execution exceptions
129 * are logged but not thrown.
130 * @param messageEventName the message event name
131 * @param correlationVariable the process variable used as the correlator
132 * @param correlationValue the correlation value
133 * @param variables variables to inject into the process
134 * @param logMarker a marker for debug logging
135 * @return true if a process could be found, false if not
137 protected boolean correlate(String messageEventName, String correlationVariable,
138 String correlationValue, Map<String, Object> variables, String logMarker) {
140 logger.debug(logMarker + " Attempting to find process waiting"
141 + " for " + messageEventName + " with " + correlationVariable
142 + " = '" + correlationValue + "'");
146 long timeout = DEFAULT_TIMEOUT_SECONDS;
148 // The code is here in case we ever need to change the default.
149 String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
150 if (correlationTimemout != null) {
152 timeout = Long.parseLong(correlationTimemout);
153 } catch (NumberFormatException e) {
158 long now = System.currentTimeMillis();
159 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
160 long endTime = now + (timeout * 1000);
161 long sleep = FAST_POLL_INT_MS;
163 List<Execution> waitingProcesses = null;
164 Exception queryException = null;
166 int queryFailCount = 0;
171 waitingProcesses = runtimeService.createExecutionQuery()
172 .messageEventSubscriptionName(messageEventName)
173 .processVariableValueEquals(correlationVariable, correlationValue)
175 } catch (Exception e) {
180 if (waitingProcesses != null && waitingProcesses.size() > 0) {
184 if (now > endTime - sleep) {
189 now = System.currentTimeMillis();
191 if (now > fastPollEndTime) {
192 sleep = SLOW_POLL_INT_MS;
196 if (waitingProcesses == null) {
197 waitingProcesses = new ArrayList<Execution>(0);
200 int count = waitingProcesses.size();
202 List<ExecInfo> execInfoList = new ArrayList<>(count);
203 for (Execution execution : waitingProcesses) {
204 execInfoList.add(new ExecInfo(execution));
207 logger.debug(logMarker + " Found " + count + " process(es) waiting"
208 + " for " + messageEventName + " with " + correlationVariable
209 + " = '" + correlationValue + "': " + execInfoList);
212 if (queryFailCount > 0) {
213 String msg = queryFailCount + "/" + queryCount
214 + " execution queries failed attempting to correlate "
215 + messageEventName + " with " + correlationVariable
216 + " = '" + correlationValue + "'; last exception was:"
219 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
220 MsoLogger.ErrorCode.UnknownError.getValue(), msg, queryException);
227 // Only one process should be waiting. Throw an exception back to the client.
228 throw new MismatchingMessageCorrelationException(messageEventName,
229 "more than 1 process is waiting with " + correlationVariable
230 + " = '" + correlationValue + "'");
233 // We prototyped an asynchronous solution, i.e. resuming the process
234 // flow in a separate thread, but this affected too many existing tests,
235 // and we went back to the synchronous solution. The synchronous solution
236 // has some troublesome characteristics though. For example, the
237 // resumed flow may send request #2 to a remote system before MSO has
238 // acknowledged the notification associated with request #1.
241 logger.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
242 + messageEventName + " with " + correlationVariable + " = '"
243 + correlationValue + "'");
245 @SuppressWarnings("unused")
246 MessageCorrelationResult result = runtimeService
247 .createMessageCorrelation(messageEventName)
248 .setVariables(variables)
249 .processInstanceVariableEquals(correlationVariable, correlationValue)
250 .correlateWithResult();
252 } catch (MismatchingMessageCorrelationException e) {
253 // A correlation exception occurred even after we identified
254 // one waiting process. Throw it back to the client.
256 } catch (OptimisticLockingException ole) {
258 String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
259 + " with " + correlationVariable + " = '" + correlationValue
262 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
263 MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError.getValue(), msg, ole);
265 //Retry for OptimisticLocking Exceptions
267 String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
268 if (retryStr != null) {
270 retryCount = Integer.parseInt(retryStr);
271 } catch (NumberFormatException e) {
276 logger.debug("Retry correlate for OptimisticLockingException, retryCount:{}", retryCount);
278 for (; retryCount >0 ; retryCount--) {
281 Thread.sleep(SLOW_POLL_INT_MS);
283 @SuppressWarnings("unused")
284 MessageCorrelationResult result = runtimeService
285 .createMessageCorrelation(messageEventName)
286 .setVariables(variables)
287 .processInstanceVariableEquals(correlationVariable, correlationValue)
288 .correlateWithResult();
290 logger.debug("OptimisticLockingException retry was successful, seting retryCount: {}", retryCount);
291 } catch (OptimisticLockingException olex) {
292 //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
293 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + olex;
294 logger.debug(strMsg);
295 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
296 MsoLogger.ErrorCode.UnknownError.getValue(), strMsg, olex);
297 } catch (Exception excep) {
299 //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
300 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + excep;
301 logger.debug(strMsg);
302 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
303 MsoLogger.ErrorCode.UnknownError.getValue(), strMsg, excep);
308 }catch (Exception e) {
309 // This must be an exception from the flow itself. Log it, but don't
310 // report it back to the client.
311 String msg = "Caught " + e.getClass().getSimpleName() + " running "
312 + execInfoList.get(0) + " after receiving " + messageEventName
313 + " with " + correlationVariable + " = '" + correlationValue
316 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
317 MsoLogger.ErrorCode.UnknownError.getValue(), msg, e);
319 } catch (Exception e) {
320 // This must be an exception from the flow itself. Log it, but don't
321 // report it back to the client.
322 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
323 + " with " + correlationVariable + " = '" + correlationValue
326 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
327 MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError.getValue(), msg, e);
334 * Records audit and metric events in the log for a callback success.
335 * @param method the method name
336 * @param startTime the request start time
338 protected void logCallbackSuccess(String method, long startTime) {
342 * Records error, audit and metric events in the log for a callback
344 * @param method the method name
345 * @param startTime the request start time
346 * @param msg the error message
348 protected void logCallbackError(String method, long startTime, String msg) {
349 logCallbackError(method, startTime, msg, null);
353 * Records error, audit and metric events in the log for a callback
355 * @param method the method name
356 * @param startTime the request start time
357 * @param msg the error message
358 * @param e the exception
360 protected void logCallbackError(String method, long startTime, String msg, Exception e) {
362 logger.error("{} {} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
363 MsoLogger.ErrorCode.UnknownError.getValue(), msg);
365 logger.error("{} {} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
366 MsoLogger.ErrorCode.UnknownError.getValue(), msg, e);
371 * Abstract callback result object.
373 protected abstract class CallbackResult {
377 * Indicates that callback handling was successful.
379 protected class CallbackSuccess extends CallbackResult {
383 * Indicates that callback handling failed.
385 protected class CallbackError extends CallbackResult {
386 private final String errorMessage;
388 public CallbackError(String errorMessage) {
389 this.errorMessage = errorMessage;
393 * Gets the error message.
395 public String getErrorMessage() {
400 private static class ExecInfo {
401 private final Execution execution;
403 public ExecInfo(Execution execution) {
404 this.execution = execution;
408 public String toString() {
409 return "Process[" + execution.getProcessInstanceId()
410 + ":" + execution.getId() + "]";