2 * ============LICENSE_START=======================================================
\r
4 * ================================================================================
\r
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============LICENSE_END=========================================================
\r
21 package org.openecomp.mso.bpmn.common.workflow.service;
\r
23 import java.util.ArrayList;
\r
24 import java.util.HashMap;
\r
25 import java.util.List;
\r
26 import java.util.Map;
\r
28 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
\r
29 import org.camunda.bpm.engine.RuntimeService;
\r
30 import org.camunda.bpm.engine.runtime.Execution;
\r
31 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
\r
32 import org.openecomp.mso.bpmn.core.PropertyConfiguration;
\r
33 import org.openecomp.mso.logger.MessageEnum;
\r
34 import org.openecomp.mso.logger.MsoLogger;
\r
37 * Abstract base class for callback services.
\r
39 public abstract class AbstractCallbackService extends ProcessEngineAwareService {
\r
40 public static final long DEFAULT_TIMEOUT_SECONDS = 60;
\r
41 public static final long FAST_POLL_DUR_SECONDS = 5;
\r
42 public static final long FAST_POLL_INT_MS = 100;
\r
43 public static final long SLOW_POLL_INT_MS = 1000;
\r
45 private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
\r
48 * Parameterized callback handler.
\r
50 protected CallbackResult handleCallback(String method, Object message,
\r
51 String messageEventName, String messageVariable,
\r
52 String correlationVariable, String correlationValue,
\r
55 return handleCallback(method, message, messageEventName, messageVariable,
\r
56 correlationVariable, correlationValue, logMarker, null);
\r
60 * Parameterized callback handler.
\r
62 protected CallbackResult handleCallback(String method, Object message,
\r
63 String messageEventName, String messageVariable,
\r
64 String correlationVariable, String correlationValue,
\r
65 String logMarker, Map<String, Object> injectedVariables) {
\r
67 long startTime = System.currentTimeMillis();
\r
69 LOGGER.debug(logMarker + " " + method + " received message: "
\r
70 + (message == null ? "" : System.lineSeparator()) + message);
\r
73 Map<String, Object> variables = new HashMap<>();
\r
75 if (injectedVariables != null) {
\r
76 variables.putAll(injectedVariables);
\r
79 variables.put(correlationVariable, correlationValue);
\r
80 variables.put(messageVariable, message == null ? null : message.toString());
\r
82 boolean ok = correlate(messageEventName, correlationVariable,
\r
83 correlationValue, variables, logMarker);
\r
86 String msg = "No process is waiting for " + messageEventName
\r
87 + " with " + correlationVariable + " = '" + correlationValue + "'";
\r
88 logCallbackError(method, startTime, msg);
\r
89 return new CallbackError(msg);
\r
92 logCallbackSuccess(method, startTime);
\r
93 return new CallbackSuccess();
\r
94 } catch (Exception e) {
\r
95 LOGGER.debug("Exception :",e);
\r
96 String msg = "Caught " + e.getClass().getSimpleName()
\r
97 + " processing " + messageEventName + " with " + correlationVariable
\r
98 + " = '" + correlationValue + "'";
\r
99 logCallbackError(method, startTime, msg);
\r
100 return new CallbackError(msg);
\r
105 * Performs message correlation. Waits a limited amount of time for
\r
106 * a process to become ready for correlation. The return value indicates
\r
107 * whether or not a process was found to receive the message. Due to the
\r
108 * synchronous nature of message injection in Camunda, by the time this
\r
109 * method returns, one of 3 things will have happened: (1) the process
\r
110 * received the message and ended, (2) the process received the message
\r
111 * and reached an activity that suspended, or (3) an exception occurred
\r
112 * during correlation or while the process was executing. Correlation
\r
113 * exceptions are handled differently from process execution exceptions.
\r
114 * Correlation exceptions are thrown so the client knows something went
\r
115 * wrong with the delivery of the message. Process execution exceptions
\r
116 * are logged but not thrown.
\r
117 * @param messageEventName the message event name
\r
118 * @param correlationVariable the process variable used as the correlator
\r
119 * @param correlationValue the correlation value
\r
120 * @param variables variables to inject into the process
\r
121 * @param logMarker a marker for debug logging
\r
122 * @return true if a process could be found, false if not
\r
123 * @throws Exception for correlation errors
\r
125 protected boolean correlate(String messageEventName, String correlationVariable,
\r
126 String correlationValue, Map<String, Object> variables, String logMarker)
\r
129 LOGGER.debug(logMarker + " Attempting to find process waiting"
\r
130 + " for " + messageEventName + " with " + correlationVariable
\r
131 + " = '" + correlationValue + "'");
\r
133 RuntimeService runtimeService =
\r
134 getProcessEngineServices().getRuntimeService();
\r
136 Map<String, String> properties =
\r
137 PropertyConfiguration.getInstance().getProperties("mso.bpmn.urn.properties");
\r
139 long timeout = DEFAULT_TIMEOUT_SECONDS;
\r
141 // The code is here in case we ever need to change the default.
\r
142 String s = properties.get("mso.correlation.timeout");
\r
145 timeout = Long.parseLong(s);
\r
146 } catch (NumberFormatException e) {
\r
151 long now = System.currentTimeMillis();
\r
152 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
\r
153 long endTime = now + (timeout * 1000);
\r
154 long sleep = FAST_POLL_INT_MS;
\r
156 List<Execution> waitingProcesses = null;
\r
157 Exception queryException = null;
\r
158 int queryCount = 0;
\r
159 int queryFailCount = 0;
\r
164 waitingProcesses = runtimeService.createExecutionQuery()
\r
165 .messageEventSubscriptionName(messageEventName)
\r
166 .processVariableValueEquals(correlationVariable, correlationValue)
\r
168 } catch (Exception e) {
\r
170 queryException = e;
\r
173 if (waitingProcesses != null && waitingProcesses.size() > 0) {
\r
177 if (now > endTime - sleep) {
\r
181 Thread.sleep(sleep);
\r
182 now = System.currentTimeMillis();
\r
184 if (now > fastPollEndTime) {
\r
185 sleep = SLOW_POLL_INT_MS;
\r
189 if (waitingProcesses == null) {
\r
190 waitingProcesses = new ArrayList<Execution>(0);
\r
193 int count = waitingProcesses.size();
\r
195 List<ExecInfo> execInfoList = new ArrayList<>(count);
\r
196 for (Execution execution : waitingProcesses) {
\r
197 execInfoList.add(new ExecInfo(execution));
\r
200 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"
\r
201 + " for " + messageEventName + " with " + correlationVariable
\r
202 + " = '" + correlationValue + "': " + execInfoList);
\r
205 if (queryFailCount > 0) {
\r
206 String msg = queryFailCount + "/" + queryCount
\r
207 + " execution queries failed attempting to correlate "
\r
208 + messageEventName + " with " + correlationVariable
\r
209 + " = '" + correlationValue + "'; last exception was:"
\r
212 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
213 MsoLogger.ErrorCode.UnknownError, msg, queryException);
\r
220 // Only one process should be waiting. Throw an exception back to the client.
\r
221 throw new MismatchingMessageCorrelationException(messageEventName,
\r
222 "more than 1 process is waiting with " + correlationVariable
\r
223 + " = '" + correlationValue + "'");
\r
226 // We prototyped an asynchronous solution, i.e. resuming the process
\r
227 // flow in a separate thread, but this affected too many existing tests,
\r
228 // and we went back to the synchronous solution. The synchronous solution
\r
229 // has some troublesome characteristics though. For example, the
\r
230 // resumed flow may send request #2 to a remote system before MSO has
\r
231 // acknowledged the notification associated with request #1.
\r
234 LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
\r
235 + messageEventName + " with " + correlationVariable + " = '"
\r
236 + correlationValue + "'");
\r
238 @SuppressWarnings("unused")
\r
239 MessageCorrelationResult result = runtimeService
\r
240 .createMessageCorrelation(messageEventName)
\r
241 .setVariables(variables)
\r
242 .processInstanceVariableEquals(correlationVariable, correlationValue)
\r
243 .correlateWithResult();
\r
245 } catch (MismatchingMessageCorrelationException e) {
\r
246 // A correlation exception occurred even after we identified
\r
247 // one waiting process. Throw it back to the client.
\r
249 } catch (Exception e) {
\r
250 // This must be an exception from the flow itself. Log it, but don't
\r
251 // report it back to the client.
\r
252 String msg = "Caught " + e.getClass().getSimpleName() + " running "
\r
253 + execInfoList.get(0) + " after receiving " + messageEventName
\r
254 + " with " + correlationVariable + " = '" + correlationValue
\r
257 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
258 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
260 } catch (Exception e) {
\r
261 // This must be an exception from the flow itself. Log it, but don't
\r
262 // report it back to the client.
\r
263 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
\r
264 + " with " + correlationVariable + " = '" + correlationValue
\r
267 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
\r
268 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
275 * Records audit and metric events in the log for a callback success.
\r
276 * @param method the method name
\r
277 * @param startTime the request start time
\r
279 protected void logCallbackSuccess(String method, long startTime) {
\r
280 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
281 MsoLogger.ResponseCode.Suc, "Completed " + method);
\r
283 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
284 MsoLogger.ResponseCode.Suc, "Completed " + method,
\r
285 "BPMN", MsoLogger.getServiceName(), null);
\r
289 * Records error, audit and metric events in the log for a callback
\r
291 * @param method the method name
\r
292 * @param startTime the request start time
\r
293 * @param msg the error message
\r
295 protected void logCallbackError(String method, long startTime, String msg) {
\r
296 logCallbackError(method, startTime, msg, null);
\r
300 * Records error, audit and metric events in the log for a callback
\r
302 * @param method the method name
\r
303 * @param startTime the request start time
\r
304 * @param msg the error message
\r
305 * @param e the exception
\r
307 protected void logCallbackError(String method, long startTime, String msg, Exception e) {
\r
309 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
310 MsoLogger.ErrorCode.UnknownError, msg);
\r
312 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
313 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
316 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
317 MsoLogger.ResponseCode.InternalError, "Completed " + method);
\r
319 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
320 MsoLogger.ResponseCode.InternalError, "Completed " + method,
\r
321 "BPMN", MsoLogger.getServiceName(), null);
\r
325 * Abstract callback result object.
\r
327 protected abstract class CallbackResult {
\r
331 * Indicates that callback handling was successful.
\r
333 protected class CallbackSuccess extends CallbackResult {
\r
337 * Indicates that callback handling failed.
\r
339 protected class CallbackError extends CallbackResult {
\r
340 private final String errorMessage;
\r
342 public CallbackError(String errorMessage) {
\r
343 this.errorMessage = errorMessage;
\r
347 * Gets the error message.
\r
349 public String getErrorMessage() {
\r
350 return errorMessage;
\r
354 private static class ExecInfo {
\r
355 private final Execution execution;
\r
357 public ExecInfo(Execution execution) {
\r
358 this.execution = execution;
\r
362 public String toString() {
\r
363 return "Process[" + execution.getProcessInstanceId()
\r
364 + ":" + execution.getId() + "]";
\r