2 * ============LICENSE_START=======================================================
\r
4 * ================================================================================
\r
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============LICENSE_END=========================================================
\r
21 package org.openecomp.mso.bpmn.common.workflow.service;
\r
23 import java.util.ArrayList;
\r
24 import java.util.HashMap;
\r
25 import java.util.List;
\r
26 import java.util.Map;
\r
28 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
\r
29 import org.camunda.bpm.engine.ProcessEngineServices;
\r
30 import org.camunda.bpm.engine.ProcessEngines;
\r
31 import org.camunda.bpm.engine.RuntimeService;
\r
32 import org.camunda.bpm.engine.runtime.Execution;
\r
33 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
\r
34 import org.openecomp.mso.bpmn.core.PropertyConfiguration;
\r
35 import org.openecomp.mso.logger.MessageEnum;
\r
36 import org.openecomp.mso.logger.MsoLogger;
\r
39 * Abstract base class for callback services.
\r
41 public abstract class AbstractCallbackService {
\r
42 public static final long DEFAULT_TIMEOUT_SECONDS = 60;
\r
43 public static final long FAST_POLL_DUR_SECONDS = 5;
\r
44 public static final long FAST_POLL_INT_MS = 100;
\r
45 public static final long SLOW_POLL_INT_MS = 1000;
\r
47 private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
\r
49 protected volatile ProcessEngineServices pes4junit = null;
\r
52 * Parameterized callback handler.
\r
54 protected CallbackResult handleCallback(String method, Object message,
\r
55 String messageEventName, String messageVariable,
\r
56 String correlationVariable, String correlationValue,
\r
59 return handleCallback(method, message, messageEventName, messageVariable,
\r
60 correlationVariable, correlationValue, logMarker, null);
\r
64 * Parameterized callback handler.
\r
66 protected CallbackResult handleCallback(String method, Object message,
\r
67 String messageEventName, String messageVariable,
\r
68 String correlationVariable, String correlationValue,
\r
69 String logMarker, Map<String, Object> injectedVariables) {
\r
71 long startTime = System.currentTimeMillis();
\r
73 LOGGER.debug(logMarker + " " + method + " received message: "
\r
74 + (message == null ? "" : System.lineSeparator()) + message);
\r
77 Map<String, Object> variables = new HashMap<String, Object>();
\r
79 if (injectedVariables != null) {
\r
80 variables.putAll(injectedVariables);
\r
83 variables.put(correlationVariable, correlationValue);
\r
84 variables.put(messageVariable, message == null ? null : message.toString());
\r
86 boolean ok = correlate(messageEventName, correlationVariable,
\r
87 correlationValue, variables, logMarker);
\r
90 String msg = "No process is waiting for " + messageEventName
\r
91 + " with " + correlationVariable + " = '" + correlationValue + "'";
\r
92 logCallbackError(method, startTime, msg);
\r
93 return new CallbackError(msg);
\r
96 logCallbackSuccess(method, startTime);
\r
97 return new CallbackSuccess();
\r
98 } catch (Exception e) {
\r
99 LOGGER.debug("Exception :",e);
\r
100 String msg = "Caught " + e.getClass().getSimpleName()
\r
101 + " processing " + messageEventName + " with " + correlationVariable
\r
102 + " = '" + correlationValue + "'";
\r
103 logCallbackError(method, startTime, msg);
\r
104 return new CallbackError(msg);
\r
109 * Performs message correlation. Waits a limited amount of time for
\r
110 * a process to become ready for correlation. The return value indicates
\r
111 * whether or not a process was found to receive the message. Due to the
\r
112 * synchronous nature of message injection in Camunda, by the time this
\r
113 * method returns, one of 3 things will have happened: (1) the process
\r
114 * received the message and ended, (2) the process received the message
\r
115 * and reached an activity that suspended, or (3) an exception occurred
\r
116 * during correlation or while the process was executing. Correlation
\r
117 * exceptions are handled differently from process execution exceptions.
\r
118 * Correlation exceptions are thrown so the client knows something went
\r
119 * wrong with the delivery of the message. Process execution exceptions
\r
120 * are logged but not thrown.
\r
121 * @param messageEventName the message event name
\r
122 * @param correlationVariable the process variable used as the correlator
\r
123 * @param correlationValue the correlation value
\r
124 * @param variables variables to inject into the process
\r
125 * @param logMarker a marker for debug logging
\r
126 * @return true if a process could be found, false if not
\r
127 * @throws Exception for correlation errors
\r
129 protected boolean correlate(String messageEventName, String correlationVariable,
\r
130 String correlationValue, Map<String, Object> variables, String logMarker)
\r
133 LOGGER.debug(logMarker + " Attempting to find process waiting"
\r
134 + " for " + messageEventName + " with " + correlationVariable
\r
135 + " = '" + correlationValue + "'");
\r
137 RuntimeService runtimeService =
\r
138 getProcessEngineServices().getRuntimeService();
\r
140 Map<String, String> properties =
\r
141 PropertyConfiguration.getInstance().getProperties("mso.bpmn.urn.properties");
\r
143 long timeout = DEFAULT_TIMEOUT_SECONDS;
\r
145 // The code is here in case we ever need to change the default.
\r
146 String s = properties.get("mso.correlation.timeout");
\r
149 timeout = Long.parseLong(s);
\r
150 } catch (NumberFormatException e) {
\r
155 long now = System.currentTimeMillis();
\r
156 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
\r
157 long endTime = now + (timeout * 1000);
\r
158 long sleep = FAST_POLL_INT_MS;
\r
160 List<Execution> waitingProcesses = null;
\r
161 Exception queryException = null;
\r
162 int queryCount = 0;
\r
163 int queryFailCount = 0;
\r
168 waitingProcesses = runtimeService.createExecutionQuery()
\r
169 .messageEventSubscriptionName(messageEventName)
\r
170 .processVariableValueEquals(correlationVariable, correlationValue)
\r
172 } catch (Exception e) {
\r
174 queryException = e;
\r
177 if (waitingProcesses != null && waitingProcesses.size() > 0) {
\r
181 if (now > endTime - sleep) {
\r
185 Thread.sleep(sleep);
\r
186 now = System.currentTimeMillis();
\r
188 if (now > fastPollEndTime) {
\r
189 sleep = SLOW_POLL_INT_MS;
\r
193 if (waitingProcesses == null) {
\r
194 waitingProcesses = new ArrayList<Execution>(0);
\r
197 int count = waitingProcesses.size();
\r
199 List<ExecInfo> execInfoList = new ArrayList<ExecInfo>(count);
\r
200 for (Execution execution : waitingProcesses) {
\r
201 execInfoList.add(new ExecInfo(execution));
\r
204 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"
\r
205 + " for " + messageEventName + " with " + correlationVariable
\r
206 + " = '" + correlationValue + "': " + execInfoList);
\r
209 if (queryFailCount > 0) {
\r
210 String msg = queryFailCount + "/" + queryCount
\r
211 + " execution queries failed attempting to correlate "
\r
212 + messageEventName + " with " + correlationVariable
\r
213 + " = '" + correlationValue + "'; last exception was:"
\r
216 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
217 MsoLogger.ErrorCode.UnknownError, msg, queryException);
\r
224 // Only one process should be waiting. Throw an exception back to the client.
\r
225 throw new MismatchingMessageCorrelationException(messageEventName,
\r
226 "more than 1 process is waiting with " + correlationVariable
\r
227 + " = '" + correlationValue + "'");
\r
230 // We prototyped an asynchronous solution, i.e. resuming the process
\r
231 // flow in a separate thread, but this affected too many existing tests,
\r
232 // and we went back to the synchronous solution. The synchronous solution
\r
233 // has some troublesome characteristics though. For example, the
\r
234 // resumed flow may send request #2 to a remote system before MSO has
\r
235 // acknowledged the notification associated with request #1.
\r
238 LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
\r
239 + messageEventName + " with " + correlationVariable + " = '"
\r
240 + correlationValue + "'");
\r
242 @SuppressWarnings("unused")
\r
243 MessageCorrelationResult result = runtimeService
\r
244 .createMessageCorrelation(messageEventName)
\r
245 .setVariables(variables)
\r
246 .processInstanceVariableEquals(correlationVariable, correlationValue)
\r
247 .correlateWithResult();
\r
249 } catch (MismatchingMessageCorrelationException e) {
\r
250 // A correlation exception occurred even after we identified
\r
251 // one waiting process. Throw it back to the client.
\r
253 } catch (Exception e) {
\r
254 // This must be an exception from the flow itself. Log it, but don't
\r
255 // report it back to the client.
\r
256 String msg = "Caught " + e.getClass().getSimpleName() + " running "
\r
257 + execInfoList.get(0) + " after receiving " + messageEventName
\r
258 + " with " + correlationVariable + " = '" + correlationValue
\r
261 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
262 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
264 } catch (Exception e) {
\r
265 // This must be an exception from the flow itself. Log it, but don't
\r
266 // report it back to the client.
\r
267 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
\r
268 + " with " + correlationVariable + " = '" + correlationValue
\r
271 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
\r
272 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
279 * Records audit and metric events in the log for a callback success.
\r
280 * @param method the method name
\r
281 * @param startTime the request start time
\r
283 protected void logCallbackSuccess(String method, long startTime) {
\r
284 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
285 MsoLogger.ResponseCode.Suc, "Completed " + method);
\r
287 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
288 MsoLogger.ResponseCode.Suc, "Completed " + method,
\r
289 "BPMN", MsoLogger.getServiceName(), null);
\r
293 * Records error, audit and metric events in the log for a callback
\r
295 * @param method the method name
\r
296 * @param startTime the request start time
\r
297 * @param msg the error message
\r
299 protected void logCallbackError(String method, long startTime, String msg) {
\r
300 logCallbackError(method, startTime, msg, null);
\r
304 * Records error, audit and metric events in the log for a callback
\r
306 * @param method the method name
\r
307 * @param startTime the request start time
\r
308 * @param msg the error message
\r
309 * @param e the exception
\r
311 protected void logCallbackError(String method, long startTime, String msg, Exception e) {
\r
313 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
314 MsoLogger.ErrorCode.UnknownError, msg);
\r
316 LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
\r
317 MsoLogger.ErrorCode.UnknownError, msg, e);
\r
320 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
321 MsoLogger.ResponseCode.InternalError, "Completed " + method);
\r
323 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
\r
324 MsoLogger.ResponseCode.InternalError, "Completed " + method,
\r
325 "BPMN", MsoLogger.getServiceName(), null);
\r
329 * Abstract callback result object.
\r
331 protected abstract class CallbackResult {
\r
335 * Indicates that callback handling was successful.
\r
337 protected class CallbackSuccess extends CallbackResult {
\r
341 * Indicates that callback handling failed.
\r
343 protected class CallbackError extends CallbackResult {
\r
344 private final String errorMessage;
\r
346 public CallbackError(String errorMessage) {
\r
347 this.errorMessage = errorMessage;
\r
351 * Gets the error message.
\r
353 public String getErrorMessage() {
\r
354 return errorMessage;
\r
358 private static class ExecInfo {
\r
359 private final Execution execution;
\r
361 public ExecInfo(Execution execution) {
\r
362 this.execution = execution;
\r
366 public String toString() {
\r
367 return "Process[" + execution.getProcessInstanceId()
\r
368 + ":" + execution.getId() + "]";
\r
372 protected ProcessEngineServices getProcessEngineServices() {
\r
373 if (pes4junit == null) {
\r
374 return ProcessEngines.getProcessEngine("infrastructure");
\r
380 public void setProcessEngineServices4junit(ProcessEngineServices pes) {
\r