f274850b8d91be0edc2c10c828982d7fa53607f2
[so.git] / bpmn / mso-infrastructure-bpmn / src / main / java / org / onap / so / bpmn / common / workflow / service / CallbackHandlerService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.so.bpmn.common.workflow.service;
22
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27
28 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
29 import org.camunda.bpm.engine.OptimisticLockingException;
30 import org.camunda.bpm.engine.RuntimeService;
31 import org.camunda.bpm.engine.runtime.Execution;
32 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
33 import org.onap.so.bpmn.core.UrnPropertiesReader;
34 import org.onap.so.logger.MessageEnum;
35 import org.onap.so.logger.MsoLogger;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.scheduling.annotation.Async;
38 import org.springframework.stereotype.Component;
39 import org.springframework.stereotype.Service;
40
41 /**
42  * Abstract base class for callback services.
43  */
44 @Service
45 public class CallbackHandlerService {
46         public static final long DEFAULT_TIMEOUT_SECONDS = 60;
47         public static final long FAST_POLL_DUR_SECONDS = 5;
48         public static final long FAST_POLL_INT_MS = 100;
49         public static final long SLOW_POLL_INT_MS = 1000;
50         
51         private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL, CallbackHandlerService.class);
52
53         @Autowired
54         RuntimeService runtimeService;
55         
56         /**
57          * Parameterized callback handler.
58          */
59         @Async
60         protected CallbackResult handleCallback(String method, Object message,
61                         String messageEventName, String messageVariable,
62                         String correlationVariable, String correlationValue,
63                         String logMarker) {
64
65                 return handleCallback(method, message, messageEventName, messageVariable,
66                         correlationVariable, correlationValue, logMarker, null);
67         }
68
69         /**
70          * Parameterized callback handler.
71          */
72         protected CallbackResult handleCallback(String method, Object message,
73                         String messageEventName, String messageVariable,
74                         String correlationVariable, String correlationValue,
75                         String logMarker, Map<String, Object> injectedVariables) {
76
77                 long startTime = System.currentTimeMillis();
78
79                 LOGGER.debug(logMarker + " " + method + " received message: "
80                         + (message == null ? "" : System.lineSeparator()) + message);
81
82                 try {
83                         Map<String, Object> variables = new HashMap<>();
84
85                         if (injectedVariables != null) {
86                                 variables.putAll(injectedVariables);
87                         }
88
89                         variables.put(correlationVariable, correlationValue);
90                         variables.put(messageVariable, message == null ? null : message.toString());
91
92                         boolean ok = correlate(messageEventName, correlationVariable,
93                                 correlationValue, variables, logMarker);
94
95                         if (!ok) {
96                                 String msg = "No process is waiting for " + messageEventName
97                                         + " with " + correlationVariable + " = '" + correlationValue + "'";
98                                 logCallbackError(method, startTime, msg);
99                                 return new CallbackError(msg);
100                         }
101
102                         logCallbackSuccess(method, startTime);
103                         return new CallbackSuccess();
104                 } catch (Exception e) {
105                         LOGGER.debug("Exception :",e);
106                         String msg = "Caught " + e.getClass().getSimpleName()
107                                 + " processing " + messageEventName + " with " + correlationVariable
108                                 + " = '" + correlationValue + "'";
109                         logCallbackError(method, startTime, msg);
110                         return new CallbackError(msg);
111                 }
112         }
113         
114         /**
115          * Performs message correlation.  Waits a limited amount of time for
116          * a process to become ready for correlation.  The return value indicates
117          * whether or not a process was found to receive the message.  Due to the
118          * synchronous nature of message injection in Camunda, by the time this
119          * method returns, one of 3 things will have happened: (1) the process
120          * received the message and ended, (2) the process received the message
121          * and reached an activity that suspended, or (3) an exception occurred
122          * during correlation or while the process was executing.  Correlation
123          * exceptions are handled differently from process execution exceptions.
124          * Correlation exceptions are thrown so the client knows something went
125          * wrong with the delivery of the message.  Process execution exceptions
126          * are logged but not thrown.
127          * @param messageEventName the message event name
128          * @param correlationVariable the process variable used as the correlator
129          * @param correlationValue the correlation value
130          * @param variables variables to inject into the process
131          * @param logMarker a marker for debug logging
132          * @return true if a process could be found, false if not
133          * @throws Exception for correlation errors
134          */
135         protected boolean correlate(String messageEventName, String correlationVariable,
136                         String correlationValue, Map<String, Object> variables, String logMarker)
137                         throws Exception {
138         try{
139                 LOGGER.debug(logMarker + " Attempting to find process waiting"
140                         + " for " + messageEventName + " with " + correlationVariable
141                         + " = '" + correlationValue + "'");
142
143                 
144
145                 long timeout = DEFAULT_TIMEOUT_SECONDS;
146
147                 // The code is here in case we ever need to change the default.
148                 String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
149                 if (correlationTimemout != null) {
150                         try {
151                                 timeout = Long.parseLong(correlationTimemout);
152                         } catch (NumberFormatException e) {
153                                 // Ignore
154                         }
155                 }
156
157                 long now = System.currentTimeMillis();
158                 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
159                 long endTime = now + (timeout * 1000);
160                 long sleep = FAST_POLL_INT_MS;
161
162                 List<Execution> waitingProcesses = null;
163                 Exception queryException = null;
164                 int queryCount = 0;
165                 int queryFailCount = 0;
166
167                 while (true) {
168                         try {
169                                 ++queryCount;
170                                 waitingProcesses = runtimeService.createExecutionQuery()
171                                         .messageEventSubscriptionName(messageEventName)
172                                         .processVariableValueEquals(correlationVariable, correlationValue)
173                                         .list();
174                         } catch (Exception e) {
175                                 ++queryFailCount;
176                                 queryException = e;
177                         }
178
179                         if (waitingProcesses != null && waitingProcesses.size() > 0) {
180                                 break;
181                         }
182
183                         if (now > endTime - sleep) {
184                                 break;
185                         }
186
187                         Thread.sleep(sleep);
188                         now = System.currentTimeMillis();
189
190                         if (now > fastPollEndTime) {
191                                 sleep = SLOW_POLL_INT_MS;
192                         }
193                 }
194
195                 if (waitingProcesses == null) {
196                         waitingProcesses = new ArrayList<Execution>(0);
197                 }
198
199                 int count = waitingProcesses.size();
200
201                 List<ExecInfo> execInfoList = new ArrayList<>(count);
202                 for (Execution execution : waitingProcesses) {
203                         execInfoList.add(new ExecInfo(execution));
204                 }
205
206                 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"
207                         + " for " + messageEventName + " with " + correlationVariable
208                         + " = '" + correlationValue + "': " + execInfoList);
209
210                 if (count == 0) {
211                         if (queryFailCount > 0) {
212                                 String msg = queryFailCount + "/" + queryCount
213                                         + " execution queries failed attempting to correlate "
214                                         + messageEventName + " with " + correlationVariable
215                                         + " = '" + correlationValue + "'; last exception was:"
216                                         + queryException;
217                                 LOGGER.debug(msg);
218                                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
219                                         MsoLogger.ErrorCode.UnknownError, msg, queryException);
220                         }
221
222                         return false;
223                 }
224
225                 if (count > 1) {
226                         // Only one process should be waiting. Throw an exception back to the client.
227                         throw new MismatchingMessageCorrelationException(messageEventName,
228                                 "more than 1 process is waiting with " + correlationVariable
229                                 + " = '" + correlationValue + "'");
230                 }
231                 
232                 // We prototyped an asynchronous solution, i.e. resuming the process
233                 // flow in a separate thread, but this affected too many existing tests,
234                 // and we went back to the synchronous solution. The synchronous solution
235                 // has some troublesome characteristics though.  For example, the
236                 // resumed flow may send request #2 to a remote system before MSO has
237                 // acknowledged the notification associated with request #1.  
238
239                 try {
240                         LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
241                                 + messageEventName + " with " + correlationVariable + " = '"
242                                 + correlationValue + "'");
243
244                         @SuppressWarnings("unused")
245                         MessageCorrelationResult result = runtimeService
246                                 .createMessageCorrelation(messageEventName)
247                                 .setVariables(variables)
248                                 .processInstanceVariableEquals(correlationVariable, correlationValue)
249                                 .correlateWithResult();
250                         
251                 } catch (MismatchingMessageCorrelationException e) {
252                         // A correlation exception occurred even after we identified
253                         // one waiting process.  Throw it back to the client.
254                         throw e;
255                 } catch (OptimisticLockingException ole) {
256                         
257                         String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
258                                 + " with " + correlationVariable + " = '" + correlationValue
259                                 + "': " + ole;
260                         LOGGER.debug(msg);
261                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
262                                 MsoLogger.ErrorCode.UnknownError, msg, ole);
263                         
264                         //Retry for OptimisticLocking Exceptions
265                         int retryCount = 0;
266                         String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
267                         if (retryStr != null) {
268                                 try {
269                                         retryCount = Integer.parseInt(retryStr);
270                                 } catch (NumberFormatException e) {
271                                         // Ignore
272                                 }
273                         }
274                         
275                         LOGGER.debug("Retry correlate for OptimisticLockingException, retryCount:" + retryCount);
276                         
277                         for (; retryCount >0 ; retryCount--) {
278                                 
279                                 try{
280                                         Thread.sleep(SLOW_POLL_INT_MS);
281                                         
282                                         @SuppressWarnings("unused")
283                                         MessageCorrelationResult result = runtimeService
284                                                 .createMessageCorrelation(messageEventName)
285                                                 .setVariables(variables)
286                                                 .processInstanceVariableEquals(correlationVariable, correlationValue)
287                                                 .correlateWithResult();
288                                         retryCount = 0;
289                                         LOGGER.debug("OptimisticLockingException retry was successful, seting retryCount: " + retryCount);
290                                 } catch (OptimisticLockingException olex) {
291                                         //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
292                                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + olex;
293                                         LOGGER.debug(strMsg);
294                                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
295                                                 MsoLogger.ErrorCode.UnknownError, strMsg, olex);
296                                 } catch (Exception excep) {
297                                         retryCount = 0;
298                                         //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
299                                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + excep;
300                                         LOGGER.debug(strMsg);
301                                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
302                                                 MsoLogger.ErrorCode.UnknownError, strMsg, excep);
303                                 }
304                 
305                         }
306                         
307                 }catch (Exception e) {
308                         // This must be an exception from the flow itself.  Log it, but don't
309                         // report it back to the client.
310                         String msg = "Caught " + e.getClass().getSimpleName() + " running "
311                                 + execInfoList.get(0) + " after receiving " + messageEventName
312                                 + " with " + correlationVariable + " = '" + correlationValue
313                                 + "': " + e;
314                         LOGGER.debug(msg);
315                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
316                                 MsoLogger.ErrorCode.UnknownError, msg, e);
317                 }
318         }  catch (Exception e) {
319                 // This must be an exception from the flow itself.  Log it, but don't
320                 // report it back to the client.
321                 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
322                         + " with " + correlationVariable + " = '" + correlationValue
323                         + "': " + e;
324                 LOGGER.debug(msg);
325                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
326                         MsoLogger.ErrorCode.UnknownError, msg, e);
327         }       
328
329                 return true;
330         }
331         
332         /**
333          * Records audit and metric events in the log for a callback success.
334          * @param method the method name
335          * @param startTime the request start time
336          */
337         protected void logCallbackSuccess(String method, long startTime) {
338                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
339                         MsoLogger.ResponseCode.Suc, "Completed " + method);
340
341                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
342                         MsoLogger.ResponseCode.Suc, "Completed " + method,
343                         "BPMN", MsoLogger.getServiceName(), null);
344         }
345
346         /**
347          * Records error, audit and metric events in the log for a callback
348          * internal error.
349          * @param method the method name
350          * @param startTime the request start time
351          * @param msg the error message
352          */
353         protected void logCallbackError(String method, long startTime, String msg) {
354                 logCallbackError(method, startTime, msg, null);
355         }
356
357         /**
358          * Records error, audit and metric events in the log for a callback
359          * internal error.
360          * @param method the method name
361          * @param startTime the request start time
362          * @param msg the error message
363          * @param e the exception
364          */
365         protected void logCallbackError(String method, long startTime, String msg, Exception e) {
366                 if (e == null) {
367                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), 
368                                 MsoLogger.ErrorCode.UnknownError, msg);
369                 } else {
370                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), 
371                                 MsoLogger.ErrorCode.UnknownError, msg, e);
372                 }
373
374                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
375                         MsoLogger.ResponseCode.InternalError, "Completed " + method);
376
377                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
378                         MsoLogger.ResponseCode.InternalError, "Completed " + method,
379                         "BPMN", MsoLogger.getServiceName(), null);
380         }
381         
382         /**
383          * Abstract callback result object.
384          */
385         protected abstract class CallbackResult {
386         }
387
388         /**
389          * Indicates that callback handling was successful.
390          */
391         protected class CallbackSuccess extends CallbackResult {
392         }
393
394         /**
395          * Indicates that callback handling failed.
396          */
397         protected class CallbackError extends CallbackResult {
398                 private final String errorMessage;
399
400                 public CallbackError(String errorMessage) {
401                         this.errorMessage = errorMessage;
402                 }
403
404                 /**
405                  * Gets the error message.
406                  */
407                 public String getErrorMessage() {
408                         return errorMessage;
409                 }
410         }
411
412         private static class ExecInfo {
413                 private final Execution execution;
414
415                 public ExecInfo(Execution execution) {
416                         this.execution = execution;
417                 }
418         
419                 @Override
420                 public String toString() {
421                         return "Process[" + execution.getProcessInstanceId()
422                                 + ":" + execution.getId() + "]";
423                 }
424         }
425 }