2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.so.bpmn.common.workflow.service;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
28 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
29 import org.camunda.bpm.engine.OptimisticLockingException;
30 import org.camunda.bpm.engine.RuntimeService;
31 import org.camunda.bpm.engine.runtime.Execution;
32 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
33 import org.onap.so.bpmn.core.UrnPropertiesReader;
34 import org.onap.so.logger.MessageEnum;
35 import org.onap.so.logger.MsoLogger;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.scheduling.annotation.Async;
38 import org.springframework.stereotype.Component;
39 import org.springframework.stereotype.Service;
42 * Abstract base class for callback services.
45 public class CallbackHandlerService {
46 public static final long DEFAULT_TIMEOUT_SECONDS = 60;
47 public static final long FAST_POLL_DUR_SECONDS = 5;
48 public static final long FAST_POLL_INT_MS = 100;
49 public static final long SLOW_POLL_INT_MS = 1000;
51 private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL, CallbackHandlerService.class);
54 RuntimeService runtimeService;
57 * Parameterized callback handler.
60 protected CallbackResult handleCallback(String method, Object message,
61 String messageEventName, String messageVariable,
62 String correlationVariable, String correlationValue,
65 return handleCallback(method, message, messageEventName, messageVariable,
66 correlationVariable, correlationValue, logMarker, null);
70 * Parameterized callback handler.
72 protected CallbackResult handleCallback(String method, Object message,
73 String messageEventName, String messageVariable,
74 String correlationVariable, String correlationValue,
75 String logMarker, Map<String, Object> injectedVariables) {
77 long startTime = System.currentTimeMillis();
79 LOGGER.debug(logMarker + " " + method + " received message: "
80 + (message == null ? "" : System.lineSeparator()) + message);
83 Map<String, Object> variables = new HashMap<>();
85 if (injectedVariables != null) {
86 variables.putAll(injectedVariables);
89 variables.put(correlationVariable, correlationValue);
90 variables.put(messageVariable, message == null ? null : message.toString());
92 boolean ok = correlate(messageEventName, correlationVariable,
93 correlationValue, variables, logMarker);
96 String msg = "No process is waiting for " + messageEventName
97 + " with " + correlationVariable + " = '" + correlationValue + "'";
98 logCallbackError(method, startTime, msg);
99 return new CallbackError(msg);
102 logCallbackSuccess(method, startTime);
103 return new CallbackSuccess();
104 } catch (Exception e) {
105 LOGGER.debug("Exception :",e);
106 String msg = "Caught " + e.getClass().getSimpleName()
107 + " processing " + messageEventName + " with " + correlationVariable
108 + " = '" + correlationValue + "'";
109 logCallbackError(method, startTime, msg);
110 return new CallbackError(msg);
115 * Performs message correlation. Waits a limited amount of time for
116 * a process to become ready for correlation. The return value indicates
117 * whether or not a process was found to receive the message. Due to the
118 * synchronous nature of message injection in Camunda, by the time this
119 * method returns, one of 3 things will have happened: (1) the process
120 * received the message and ended, (2) the process received the message
121 * and reached an activity that suspended, or (3) an exception occurred
122 * during correlation or while the process was executing. Correlation
123 * exceptions are handled differently from process execution exceptions.
124 * Correlation exceptions are thrown so the client knows something went
125 * wrong with the delivery of the message. Process execution exceptions
126 * are logged but not thrown.
127 * @param messageEventName the message event name
128 * @param correlationVariable the process variable used as the correlator
129 * @param correlationValue the correlation value
130 * @param variables variables to inject into the process
131 * @param logMarker a marker for debug logging
132 * @return true if a process could be found, false if not
133 * @throws Exception for correlation errors
135 protected boolean correlate(String messageEventName, String correlationVariable,
136 String correlationValue, Map<String, Object> variables, String logMarker)
139 LOGGER.debug(logMarker + " Attempting to find process waiting"
140 + " for " + messageEventName + " with " + correlationVariable
141 + " = '" + correlationValue + "'");
145 long timeout = DEFAULT_TIMEOUT_SECONDS;
147 // The code is here in case we ever need to change the default.
148 String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
149 if (correlationTimemout != null) {
151 timeout = Long.parseLong(correlationTimemout);
152 } catch (NumberFormatException e) {
157 long now = System.currentTimeMillis();
158 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
159 long endTime = now + (timeout * 1000);
160 long sleep = FAST_POLL_INT_MS;
162 List<Execution> waitingProcesses = null;
163 Exception queryException = null;
165 int queryFailCount = 0;
170 waitingProcesses = runtimeService.createExecutionQuery()
171 .messageEventSubscriptionName(messageEventName)
172 .processVariableValueEquals(correlationVariable, correlationValue)
174 } catch (Exception e) {
179 if (waitingProcesses != null && waitingProcesses.size() > 0) {
183 if (now > endTime - sleep) {
188 now = System.currentTimeMillis();
190 if (now > fastPollEndTime) {
191 sleep = SLOW_POLL_INT_MS;
195 if (waitingProcesses == null) {
196 waitingProcesses = new ArrayList<Execution>(0);
199 int count = waitingProcesses.size();
201 List<ExecInfo> execInfoList = new ArrayList<>(count);
202 for (Execution execution : waitingProcesses) {
203 execInfoList.add(new ExecInfo(execution));
206 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"
207 + " for " + messageEventName + " with " + correlationVariable
208 + " = '" + correlationValue + "': " + execInfoList);
211 if (queryFailCount > 0) {
212 String msg = queryFailCount + "/" + queryCount
213 + " execution queries failed attempting to correlate "
214 + messageEventName + " with " + correlationVariable
215 + " = '" + correlationValue + "'; last exception was:"
218 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
219 MsoLogger.ErrorCode.UnknownError, msg, queryException);
226 // Only one process should be waiting. Throw an exception back to the client.
227 throw new MismatchingMessageCorrelationException(messageEventName,
228 "more than 1 process is waiting with " + correlationVariable
229 + " = '" + correlationValue + "'");
232 // We prototyped an asynchronous solution, i.e. resuming the process
233 // flow in a separate thread, but this affected too many existing tests,
234 // and we went back to the synchronous solution. The synchronous solution
235 // has some troublesome characteristics though. For example, the
236 // resumed flow may send request #2 to a remote system before MSO has
237 // acknowledged the notification associated with request #1.
240 LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
241 + messageEventName + " with " + correlationVariable + " = '"
242 + correlationValue + "'");
244 @SuppressWarnings("unused")
245 MessageCorrelationResult result = runtimeService
246 .createMessageCorrelation(messageEventName)
247 .setVariables(variables)
248 .processInstanceVariableEquals(correlationVariable, correlationValue)
249 .correlateWithResult();
251 } catch (MismatchingMessageCorrelationException e) {
252 // A correlation exception occurred even after we identified
253 // one waiting process. Throw it back to the client.
255 } catch (OptimisticLockingException ole) {
257 String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
258 + " with " + correlationVariable + " = '" + correlationValue
261 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
262 MsoLogger.ErrorCode.UnknownError, msg, ole);
264 //Retry for OptimisticLocking Exceptions
266 String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
267 if (retryStr != null) {
269 retryCount = Integer.parseInt(retryStr);
270 } catch (NumberFormatException e) {
275 LOGGER.debug("Retry correlate for OptimisticLockingException, retryCount:" + retryCount);
277 for (; retryCount >0 ; retryCount--) {
280 Thread.sleep(SLOW_POLL_INT_MS);
282 @SuppressWarnings("unused")
283 MessageCorrelationResult result = runtimeService
284 .createMessageCorrelation(messageEventName)
285 .setVariables(variables)
286 .processInstanceVariableEquals(correlationVariable, correlationValue)
287 .correlateWithResult();
289 LOGGER.debug("OptimisticLockingException retry was successful, seting retryCount: " + retryCount);
290 } catch (OptimisticLockingException olex) {
291 //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
292 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + olex;
293 LOGGER.debug(strMsg);
294 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
295 MsoLogger.ErrorCode.UnknownError, strMsg, olex);
296 } catch (Exception excep) {
298 //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
299 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + excep;
300 LOGGER.debug(strMsg);
301 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
302 MsoLogger.ErrorCode.UnknownError, strMsg, excep);
307 }catch (Exception e) {
308 // This must be an exception from the flow itself. Log it, but don't
309 // report it back to the client.
310 String msg = "Caught " + e.getClass().getSimpleName() + " running "
311 + execInfoList.get(0) + " after receiving " + messageEventName
312 + " with " + correlationVariable + " = '" + correlationValue
315 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
316 MsoLogger.ErrorCode.UnknownError, msg, e);
318 } catch (Exception e) {
319 // This must be an exception from the flow itself. Log it, but don't
320 // report it back to the client.
321 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
322 + " with " + correlationVariable + " = '" + correlationValue
325 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
326 MsoLogger.ErrorCode.UnknownError, msg, e);
333 * Records audit and metric events in the log for a callback success.
334 * @param method the method name
335 * @param startTime the request start time
337 protected void logCallbackSuccess(String method, long startTime) {
338 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
339 MsoLogger.ResponseCode.Suc, "Completed " + method);
341 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
342 MsoLogger.ResponseCode.Suc, "Completed " + method,
343 "BPMN", MsoLogger.getServiceName(), null);
347 * Records error, audit and metric events in the log for a callback
349 * @param method the method name
350 * @param startTime the request start time
351 * @param msg the error message
353 protected void logCallbackError(String method, long startTime, String msg) {
354 logCallbackError(method, startTime, msg, null);
358 * Records error, audit and metric events in the log for a callback
360 * @param method the method name
361 * @param startTime the request start time
362 * @param msg the error message
363 * @param e the exception
365 protected void logCallbackError(String method, long startTime, String msg, Exception e) {
367 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
368 MsoLogger.ErrorCode.UnknownError, msg);
370 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
371 MsoLogger.ErrorCode.UnknownError, msg, e);
374 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
375 MsoLogger.ResponseCode.InternalError, "Completed " + method);
377 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
378 MsoLogger.ResponseCode.InternalError, "Completed " + method,
379 "BPMN", MsoLogger.getServiceName(), null);
383 * Abstract callback result object.
385 protected abstract class CallbackResult {
389 * Indicates that callback handling was successful.
391 protected class CallbackSuccess extends CallbackResult {
395 * Indicates that callback handling failed.
397 protected class CallbackError extends CallbackResult {
398 private final String errorMessage;
400 public CallbackError(String errorMessage) {
401 this.errorMessage = errorMessage;
405 * Gets the error message.
407 public String getErrorMessage() {
412 private static class ExecInfo {
413 private final Execution execution;
415 public ExecInfo(Execution execution) {
416 this.execution = execution;
420 public String toString() {
421 return "Process[" + execution.getProcessInstanceId()
422 + ":" + execution.getId() + "]";