Merge "remove unused code"
[so.git] / bpmn / mso-infrastructure-bpmn / src / main / java / org / onap / so / bpmn / common / workflow / service / CallbackHandlerService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.so.bpmn.common.workflow.service;
22
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27
28 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
29 import org.camunda.bpm.engine.OptimisticLockingException;
30 import org.camunda.bpm.engine.RuntimeService;
31 import org.camunda.bpm.engine.runtime.Execution;
32 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
33 import org.onap.so.bpmn.core.UrnPropertiesReader;
34 import org.onap.so.logger.MessageEnum;
35 import org.onap.so.logger.MsoLogger;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.scheduling.annotation.Async;
38 import org.springframework.stereotype.Service;
39
40 /**
41  * Abstract base class for callback services.
42  */
43 @Service
44 public class CallbackHandlerService {
45         public static final long DEFAULT_TIMEOUT_SECONDS = 60;
46         public static final long FAST_POLL_DUR_SECONDS = 5;
47         public static final long FAST_POLL_INT_MS = 100;
48         public static final long SLOW_POLL_INT_MS = 1000;
49         
50         private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL, CallbackHandlerService.class);
51
52         @Autowired
53         RuntimeService runtimeService;
54         
55         /**
56          * Parameterized callback handler.
57          */
58         @Async
59         protected CallbackResult handleCallback(String method, Object message,
60                         String messageEventName, String messageVariable,
61                         String correlationVariable, String correlationValue,
62                         String logMarker) {
63
64                 return handleCallback(method, message, messageEventName, messageVariable,
65                         correlationVariable, correlationValue, logMarker, null);
66         }
67
68         /**
69          * Parameterized callback handler.
70          */
71         protected CallbackResult handleCallback(String method, Object message,
72                         String messageEventName, String messageVariable,
73                         String correlationVariable, String correlationValue,
74                         String logMarker, Map<String, Object> injectedVariables) {
75
76                 long startTime = System.currentTimeMillis();
77
78                 LOGGER.debug(logMarker + " " + method + " received message: "
79                         + (message == null ? "" : System.lineSeparator()) + message);
80
81                 try {
82                         Map<String, Object> variables = new HashMap<>();
83
84                         if (injectedVariables != null) {
85                                 variables.putAll(injectedVariables);
86                         }
87
88                         variables.put(correlationVariable, correlationValue);
89                         variables.put(messageVariable, message == null ? null : message.toString());
90
91                         boolean ok = correlate(messageEventName, correlationVariable,
92                                 correlationValue, variables, logMarker);
93
94                         if (!ok) {
95                                 String msg = "No process is waiting for " + messageEventName
96                                         + " with " + correlationVariable + " = '" + correlationValue + "'";
97                                 logCallbackError(method, startTime, msg);
98                                 return new CallbackError(msg);
99                         }
100
101                         logCallbackSuccess(method, startTime);
102                         return new CallbackSuccess();
103                 } catch (Exception e) {
104                         LOGGER.debug("Exception :",e);
105                         String msg = "Caught " + e.getClass().getSimpleName()
106                                 + " processing " + messageEventName + " with " + correlationVariable
107                                 + " = '" + correlationValue + "'";
108                         logCallbackError(method, startTime, msg);
109                         return new CallbackError(msg);
110                 }
111         }
112         
113         /**
114          * Performs message correlation.  Waits a limited amount of time for
115          * a process to become ready for correlation.  The return value indicates
116          * whether or not a process was found to receive the message.  Due to the
117          * synchronous nature of message injection in Camunda, by the time this
118          * method returns, one of 3 things will have happened: (1) the process
119          * received the message and ended, (2) the process received the message
120          * and reached an activity that suspended, or (3) an exception occurred
121          * during correlation or while the process was executing.  Correlation
122          * exceptions are handled differently from process execution exceptions.
123          * Correlation exceptions are thrown so the client knows something went
124          * wrong with the delivery of the message.  Process execution exceptions
125          * are logged but not thrown.
126          * @param messageEventName the message event name
127          * @param correlationVariable the process variable used as the correlator
128          * @param correlationValue the correlation value
129          * @param variables variables to inject into the process
130          * @param logMarker a marker for debug logging
131          * @return true if a process could be found, false if not
132          */
133         protected boolean correlate(String messageEventName, String correlationVariable,
134                         String correlationValue, Map<String, Object> variables, String logMarker) {
135         try{
136                 LOGGER.debug(logMarker + " Attempting to find process waiting"
137                         + " for " + messageEventName + " with " + correlationVariable
138                         + " = '" + correlationValue + "'");
139
140                 
141
142                 long timeout = DEFAULT_TIMEOUT_SECONDS;
143
144                 // The code is here in case we ever need to change the default.
145                 String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
146                 if (correlationTimemout != null) {
147                         try {
148                                 timeout = Long.parseLong(correlationTimemout);
149                         } catch (NumberFormatException e) {
150                                 // Ignore
151                         }
152                 }
153
154                 long now = System.currentTimeMillis();
155                 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
156                 long endTime = now + (timeout * 1000);
157                 long sleep = FAST_POLL_INT_MS;
158
159                 List<Execution> waitingProcesses = null;
160                 Exception queryException = null;
161                 int queryCount = 0;
162                 int queryFailCount = 0;
163
164                 while (true) {
165                         try {
166                                 ++queryCount;
167                                 waitingProcesses = runtimeService.createExecutionQuery()
168                                         .messageEventSubscriptionName(messageEventName)
169                                         .processVariableValueEquals(correlationVariable, correlationValue)
170                                         .list();
171                         } catch (Exception e) {
172                                 ++queryFailCount;
173                                 queryException = e;
174                         }
175
176                         if (waitingProcesses != null && waitingProcesses.size() > 0) {
177                                 break;
178                         }
179
180                         if (now > endTime - sleep) {
181                                 break;
182                         }
183
184                         Thread.sleep(sleep);
185                         now = System.currentTimeMillis();
186
187                         if (now > fastPollEndTime) {
188                                 sleep = SLOW_POLL_INT_MS;
189                         }
190                 }
191
192                 if (waitingProcesses == null) {
193                         waitingProcesses = new ArrayList<Execution>(0);
194                 }
195
196                 int count = waitingProcesses.size();
197
198                 List<ExecInfo> execInfoList = new ArrayList<>(count);
199                 for (Execution execution : waitingProcesses) {
200                         execInfoList.add(new ExecInfo(execution));
201                 }
202
203                 LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"
204                         + " for " + messageEventName + " with " + correlationVariable
205                         + " = '" + correlationValue + "': " + execInfoList);
206
207                 if (count == 0) {
208                         if (queryFailCount > 0) {
209                                 String msg = queryFailCount + "/" + queryCount
210                                         + " execution queries failed attempting to correlate "
211                                         + messageEventName + " with " + correlationVariable
212                                         + " = '" + correlationValue + "'; last exception was:"
213                                         + queryException;
214                                 LOGGER.debug(msg);
215                                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
216                                         MsoLogger.ErrorCode.UnknownError, msg, queryException);
217                         }
218
219                         return false;
220                 }
221
222                 if (count > 1) {
223                         // Only one process should be waiting. Throw an exception back to the client.
224                         throw new MismatchingMessageCorrelationException(messageEventName,
225                                 "more than 1 process is waiting with " + correlationVariable
226                                 + " = '" + correlationValue + "'");
227                 }
228                 
229                 // We prototyped an asynchronous solution, i.e. resuming the process
230                 // flow in a separate thread, but this affected too many existing tests,
231                 // and we went back to the synchronous solution. The synchronous solution
232                 // has some troublesome characteristics though.  For example, the
233                 // resumed flow may send request #2 to a remote system before MSO has
234                 // acknowledged the notification associated with request #1.  
235
236                 try {
237                         LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
238                                 + messageEventName + " with " + correlationVariable + " = '"
239                                 + correlationValue + "'");
240
241                         @SuppressWarnings("unused")
242                         MessageCorrelationResult result = runtimeService
243                                 .createMessageCorrelation(messageEventName)
244                                 .setVariables(variables)
245                                 .processInstanceVariableEquals(correlationVariable, correlationValue)
246                                 .correlateWithResult();
247                         
248                 } catch (MismatchingMessageCorrelationException e) {
249                         // A correlation exception occurred even after we identified
250                         // one waiting process.  Throw it back to the client.
251                         throw e;
252                 } catch (OptimisticLockingException ole) {
253                         
254                         String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
255                                 + " with " + correlationVariable + " = '" + correlationValue
256                                 + "': " + ole;
257                         LOGGER.debug(msg);
258                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
259                                 MsoLogger.ErrorCode.UnknownError, msg, ole);
260                         
261                         //Retry for OptimisticLocking Exceptions
262                         int retryCount = 0;
263                         String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
264                         if (retryStr != null) {
265                                 try {
266                                         retryCount = Integer.parseInt(retryStr);
267                                 } catch (NumberFormatException e) {
268                                         // Ignore
269                                 }
270                         }
271                         
272                         LOGGER.debug("Retry correlate for OptimisticLockingException, retryCount:" + retryCount);
273                         
274                         for (; retryCount >0 ; retryCount--) {
275                                 
276                                 try{
277                                         Thread.sleep(SLOW_POLL_INT_MS);
278                                         
279                                         @SuppressWarnings("unused")
280                                         MessageCorrelationResult result = runtimeService
281                                                 .createMessageCorrelation(messageEventName)
282                                                 .setVariables(variables)
283                                                 .processInstanceVariableEquals(correlationVariable, correlationValue)
284                                                 .correlateWithResult();
285                                         retryCount = 0;
286                                         LOGGER.debug("OptimisticLockingException retry was successful, seting retryCount: " + retryCount);
287                                 } catch (OptimisticLockingException olex) {
288                                         //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
289                                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + olex;
290                                         LOGGER.debug(strMsg);
291                                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
292                                                 MsoLogger.ErrorCode.UnknownError, strMsg, olex);
293                                 } catch (Exception excep) {
294                                         retryCount = 0;
295                                         //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
296                                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + excep;
297                                         LOGGER.debug(strMsg);
298                                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
299                                                 MsoLogger.ErrorCode.UnknownError, strMsg, excep);
300                                 }
301                 
302                         }
303                         
304                 }catch (Exception e) {
305                         // This must be an exception from the flow itself.  Log it, but don't
306                         // report it back to the client.
307                         String msg = "Caught " + e.getClass().getSimpleName() + " running "
308                                 + execInfoList.get(0) + " after receiving " + messageEventName
309                                 + " with " + correlationVariable + " = '" + correlationValue
310                                 + "': " + e;
311                         LOGGER.debug(msg);
312                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
313                                 MsoLogger.ErrorCode.UnknownError, msg, e);
314                 }
315         }  catch (Exception e) {
316                 // This must be an exception from the flow itself.  Log it, but don't
317                 // report it back to the client.
318                 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
319                         + " with " + correlationVariable + " = '" + correlationValue
320                         + "': " + e;
321                 LOGGER.debug(msg);
322                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN CORRELATION ERROR -", MsoLogger.getServiceName(),
323                         MsoLogger.ErrorCode.UnknownError, msg, e);
324         }       
325
326                 return true;
327         }
328         
329         /**
330          * Records audit and metric events in the log for a callback success.
331          * @param method the method name
332          * @param startTime the request start time
333          */
334         protected void logCallbackSuccess(String method, long startTime) {
335                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
336                         MsoLogger.ResponseCode.Suc, "Completed " + method);
337
338                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
339                         MsoLogger.ResponseCode.Suc, "Completed " + method,
340                         "BPMN", MsoLogger.getServiceName(), null);
341         }
342
343         /**
344          * Records error, audit and metric events in the log for a callback
345          * internal error.
346          * @param method the method name
347          * @param startTime the request start time
348          * @param msg the error message
349          */
350         protected void logCallbackError(String method, long startTime, String msg) {
351                 logCallbackError(method, startTime, msg, null);
352         }
353
354         /**
355          * Records error, audit and metric events in the log for a callback
356          * internal error.
357          * @param method the method name
358          * @param startTime the request start time
359          * @param msg the error message
360          * @param e the exception
361          */
362         protected void logCallbackError(String method, long startTime, String msg, Exception e) {
363                 if (e == null) {
364                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), 
365                                 MsoLogger.ErrorCode.UnknownError, msg);
366                 } else {
367                         LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), 
368                                 MsoLogger.ErrorCode.UnknownError, msg, e);
369                 }
370
371                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
372                         MsoLogger.ResponseCode.InternalError, "Completed " + method);
373
374                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
375                         MsoLogger.ResponseCode.InternalError, "Completed " + method,
376                         "BPMN", MsoLogger.getServiceName(), null);
377         }
378         
379         /**
380          * Abstract callback result object.
381          */
382         protected abstract class CallbackResult {
383         }
384
385         /**
386          * Indicates that callback handling was successful.
387          */
388         protected class CallbackSuccess extends CallbackResult {
389         }
390
391         /**
392          * Indicates that callback handling failed.
393          */
394         protected class CallbackError extends CallbackResult {
395                 private final String errorMessage;
396
397                 public CallbackError(String errorMessage) {
398                         this.errorMessage = errorMessage;
399                 }
400
401                 /**
402                  * Gets the error message.
403                  */
404                 public String getErrorMessage() {
405                         return errorMessage;
406                 }
407         }
408
409         private static class ExecInfo {
410                 private final Execution execution;
411
412                 public ExecInfo(Execution execution) {
413                         this.execution = execution;
414                 }
415         
416                 @Override
417                 public String toString() {
418                         return "Process[" + execution.getProcessInstanceId()
419                                 + ":" + execution.getId() + "]";
420                 }
421         }
422 }