2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.so.bpmn.common.workflow.service;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
28 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
29 import org.camunda.bpm.engine.OptimisticLockingException;
30 import org.camunda.bpm.engine.RuntimeService;
31 import org.camunda.bpm.engine.runtime.Execution;
32 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
33 import org.onap.so.bpmn.core.UrnPropertiesReader;
34 import org.onap.so.logger.MessageEnum;
35 import org.onap.so.logger.MsoLogger;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.scheduling.annotation.Async;
38 import org.springframework.stereotype.Service;
41 * Abstract base class for callback services.
44 public class CallbackHandlerService {
45 public static final long DEFAULT_TIMEOUT_SECONDS = 60;
46 public static final long FAST_POLL_DUR_SECONDS = 5;
47 public static final long FAST_POLL_INT_MS = 100;
48 public static final long SLOW_POLL_INT_MS = 1000;
50 private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL, CallbackHandlerService.class);
53 RuntimeService runtimeService;
56 * Parameterized callback handler.
59 protected CallbackResult handleCallback(String method, Object message,
60 String messageEventName, String messageVariable,
61 String correlationVariable, String correlationValue,
64 return handleCallback(method, message, messageEventName, messageVariable,
65 correlationVariable, correlationValue, logMarker, null);
69 * Parameterized callback handler.
71 protected CallbackResult handleCallback(String method, Object message,
72 String messageEventName, String messageVariable,
73 String correlationVariable, String correlationValue,
74 String logMarker, Map<String, Object> injectedVariables) {
76 long startTime = System.currentTimeMillis();
78 LOGGER.debug(logMarker + " " + method + " received message: "
79 + (message == null ? "" : System.lineSeparator()) + message);
82 Map<String, Object> variables = new HashMap<>();
84 if (injectedVariables != null) {
85 variables.putAll(injectedVariables);
88 variables.put(correlationVariable, correlationValue);
89 variables.put(messageVariable, message == null ? null : message.toString());
91 boolean ok = correlate(messageEventName, correlationVariable,
92 correlationValue, variables, logMarker);
95 String msg = "No process is waiting for " + messageEventName
96 + " with " + correlationVariable + " = '" + correlationValue + "'";
97 logCallbackError(method, startTime, msg);
98 return new CallbackError(msg);
101 logCallbackSuccess(method, startTime);
102 return new CallbackSuccess();
103 } catch (Exception e) {
104 LOGGER.debug("Exception :",e);
105 String msg = "Caught " + e.getClass().getSimpleName()
106 + " processing " + messageEventName + " with " + correlationVariable
107 + " = '" + correlationValue + "'";
108 logCallbackError(method, startTime, msg);
109 return new CallbackError(msg);
114 * Performs message correlation. Waits a limited amount of time for
115 * a process to become ready for correlation. The return value indicates
116 * whether or not a process was found to receive the message. Due to the
117 * synchronous nature of message injection in Camunda, by the time this
118 * method returns, one of 3 things will have happened: (1) the process
119 * received the message and ended, (2) the process received the message
120 * and reached an activity that suspended, or (3) an exception occurred
121 * during correlation or while the process was executing. Correlation
122 * exceptions are handled differently from process execution exceptions.
123 * Correlation exceptions are thrown so the client knows something went
124 * wrong with the delivery of the message. Process execution exceptions
125 * are logged but not thrown.
126 * @param messageEventName the message event name
127 * @param correlationVariable the process variable used as the correlator
128 * @param correlationValue the correlation value
129 * @param variables variables to inject into the process
130 * @param logMarker a marker for debug logging
131 * @return true if a process could be found, false if not
133 protected boolean correlate(String messageEventName, String correlationVariable,
134 String correlationValue, Map<String, Object> variables, String logMarker) {
136 LOGGER.debug(logMarker + " Attempting to find process waiting"
137 + " for " + messageEventName + " with " + correlationVariable
138 + " = '" + correlationValue + "'");
142 long timeout = DEFAULT_TIMEOUT_SECONDS;
144 // The code is here in case we ever need to change the default.
145 String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
146 if (correlationTimemout != null) {
148 timeout = Long.parseLong(correlationTimemout);
149 } catch (NumberFormatException e) {
154 long now = System.currentTimeMillis();
155 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
156 long endTime = now + (timeout * 1000);
157 long sleep = FAST_POLL_INT_MS;
159 List<Execution> waitingProcesses = null;
160 Exception queryException = null;
162 int queryFailCount = 0;
167 waitingProcesses = runtimeService.createExecutionQuery()
168 .messageEventSubscriptionName(messageEventName)
169 .processVariableValueEquals(correlationVariable, correlationValue)
171 } catch (Exception e) {
176 if (waitingProcesses != null && waitingProcesses.size() > 0) {
180 if (now > endTime - sleep) {
185 now = System.currentTimeMillis();
187 if (now > fastPollEndTime) {
188 sleep = SLOW_POLL_INT_MS;
192 if (waitingProcesses == null) {
193 waitingProcesses = new ArrayList<Execution>(0);
196 int count = waitingProcesses.size();
198 List<ExecInfo> execInfoList = new ArrayList<>(count);
199 for (Execution execution : waitingProcesses) {
200 execInfoList.add(new ExecInfo(execution));
203 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"
204 + " for " + messageEventName + " with " + correlationVariable
205 + " = '" + correlationValue + "': " + execInfoList);
208 if (queryFailCount > 0) {
209 String msg = queryFailCount + "/" + queryCount
210 + " execution queries failed attempting to correlate "
211 + messageEventName + " with " + correlationVariable
212 + " = '" + correlationValue + "'; last exception was:"
215 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
216 MsoLogger.ErrorCode.UnknownError, msg, queryException);
223 // Only one process should be waiting. Throw an exception back to the client.
224 throw new MismatchingMessageCorrelationException(messageEventName,
225 "more than 1 process is waiting with " + correlationVariable
226 + " = '" + correlationValue + "'");
229 // We prototyped an asynchronous solution, i.e. resuming the process
230 // flow in a separate thread, but this affected too many existing tests,
231 // and we went back to the synchronous solution. The synchronous solution
232 // has some troublesome characteristics though. For example, the
233 // resumed flow may send request #2 to a remote system before MSO has
234 // acknowledged the notification associated with request #1.
237 LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
238 + messageEventName + " with " + correlationVariable + " = '"
239 + correlationValue + "'");
241 @SuppressWarnings("unused")
242 MessageCorrelationResult result = runtimeService
243 .createMessageCorrelation(messageEventName)
244 .setVariables(variables)
245 .processInstanceVariableEquals(correlationVariable, correlationValue)
246 .correlateWithResult();
248 } catch (MismatchingMessageCorrelationException e) {
249 // A correlation exception occurred even after we identified
250 // one waiting process. Throw it back to the client.
252 } catch (OptimisticLockingException ole) {
254 String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
255 + " with " + correlationVariable + " = '" + correlationValue
258 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
259 MsoLogger.ErrorCode.UnknownError, msg, ole);
261 //Retry for OptimisticLocking Exceptions
263 String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
264 if (retryStr != null) {
266 retryCount = Integer.parseInt(retryStr);
267 } catch (NumberFormatException e) {
272 LOGGER.debug("Retry correlate for OptimisticLockingException, retryCount:" + retryCount);
274 for (; retryCount >0 ; retryCount--) {
277 Thread.sleep(SLOW_POLL_INT_MS);
279 @SuppressWarnings("unused")
280 MessageCorrelationResult result = runtimeService
281 .createMessageCorrelation(messageEventName)
282 .setVariables(variables)
283 .processInstanceVariableEquals(correlationVariable, correlationValue)
284 .correlateWithResult();
286 LOGGER.debug("OptimisticLockingException retry was successful, seting retryCount: " + retryCount);
287 } catch (OptimisticLockingException olex) {
288 //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
289 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + olex;
290 LOGGER.debug(strMsg);
291 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
292 MsoLogger.ErrorCode.UnknownError, strMsg, olex);
293 } catch (Exception excep) {
295 //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
296 String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + excep;
297 LOGGER.debug(strMsg);
298 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
299 MsoLogger.ErrorCode.UnknownError, strMsg, excep);
304 }catch (Exception e) {
305 // This must be an exception from the flow itself. Log it, but don't
306 // report it back to the client.
307 String msg = "Caught " + e.getClass().getSimpleName() + " running "
308 + execInfoList.get(0) + " after receiving " + messageEventName
309 + " with " + correlationVariable + " = '" + correlationValue
312 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
313 MsoLogger.ErrorCode.UnknownError, msg, e);
315 } catch (Exception e) {
316 // This must be an exception from the flow itself. Log it, but don't
317 // report it back to the client.
318 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
319 + " with " + correlationVariable + " = '" + correlationValue
322 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
323 MsoLogger.ErrorCode.UnknownError, msg, e);
330 * Records audit and metric events in the log for a callback success.
331 * @param method the method name
332 * @param startTime the request start time
334 protected void logCallbackSuccess(String method, long startTime) {
335 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
336 MsoLogger.ResponseCode.Suc, "Completed " + method);
338 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
339 MsoLogger.ResponseCode.Suc, "Completed " + method,
340 "BPMN", MsoLogger.getServiceName(), null);
344 * Records error, audit and metric events in the log for a callback
346 * @param method the method name
347 * @param startTime the request start time
348 * @param msg the error message
350 protected void logCallbackError(String method, long startTime, String msg) {
351 logCallbackError(method, startTime, msg, null);
355 * Records error, audit and metric events in the log for a callback
357 * @param method the method name
358 * @param startTime the request start time
359 * @param msg the error message
360 * @param e the exception
362 protected void logCallbackError(String method, long startTime, String msg, Exception e) {
364 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
365 MsoLogger.ErrorCode.UnknownError, msg);
367 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
368 MsoLogger.ErrorCode.UnknownError, msg, e);
371 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
372 MsoLogger.ResponseCode.InternalError, "Completed " + method);
374 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
375 MsoLogger.ResponseCode.InternalError, "Completed " + method,
376 "BPMN", MsoLogger.getServiceName(), null);
380 * Abstract callback result object.
382 protected abstract class CallbackResult {
386 * Indicates that callback handling was successful.
388 protected class CallbackSuccess extends CallbackResult {
392 * Indicates that callback handling failed.
394 protected class CallbackError extends CallbackResult {
395 private final String errorMessage;
397 public CallbackError(String errorMessage) {
398 this.errorMessage = errorMessage;
402 * Gets the error message.
404 public String getErrorMessage() {
409 private static class ExecInfo {
410 private final Execution execution;
412 public ExecInfo(Execution execution) {
413 this.execution = execution;
417 public String toString() {
418 return "Process[" + execution.getProcessInstanceId()
419 + ":" + execution.getId() + "]";