46a6fa3ef3e386ddf9fb82334a614d6ef38218d4
[so.git] / bpmn / mso-infrastructure-bpmn / src / main / java / org / onap / so / bpmn / common / workflow / service / CallbackHandlerService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Modifications Copyright (c) 2019 Samsung
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.bpmn.common.workflow.service;
24
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29
30 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
31 import org.camunda.bpm.engine.OptimisticLockingException;
32 import org.camunda.bpm.engine.RuntimeService;
33 import org.camunda.bpm.engine.runtime.Execution;
34 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
35 import org.onap.so.bpmn.core.UrnPropertiesReader;
36 import org.onap.so.logger.ErrorCode;
37 import org.onap.so.logger.MessageEnum;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.scheduling.annotation.Async;
42 import org.springframework.stereotype.Service;
43
44 /**
45  * Abstract base class for callback services.
46  */
47 @Service
48 public class CallbackHandlerService {
49         public static final long DEFAULT_TIMEOUT_SECONDS = 60;
50         public static final long FAST_POLL_DUR_SECONDS = 5;
51         public static final long FAST_POLL_INT_MS = 100;
52         public static final long SLOW_POLL_INT_MS = 1000;
53
54         private static final Logger logger = LoggerFactory.getLogger(CallbackHandlerService.class);
55
56         private RuntimeService runtimeService;
57
58         @Autowired
59         public CallbackHandlerService(RuntimeService runtimeService) {
60                 this.runtimeService = runtimeService;
61         }
62
63         /**
64          * Parameterized callback handler.
65          */
66         @Async
67         protected CallbackResult handleCallback(String method, Object message,
68                         String messageEventName, String messageVariable,
69                         String correlationVariable, String correlationValue,
70                         String logMarker) {
71
72                 return handleCallback(method, message, messageEventName, messageVariable,
73                         correlationVariable, correlationValue, logMarker, null);
74         }
75
76         /**
77          * Parameterized callback handler.
78          */
79         protected CallbackResult handleCallback(String method, Object message,
80                         String messageEventName, String messageVariable,
81                         String correlationVariable, String correlationValue,
82                         String logMarker, Map<String, Object> injectedVariables) {
83
84                 long startTime = System.currentTimeMillis();
85
86                 logger.debug(logMarker + " " + method + " received message: "
87                         + (message == null ? "" : System.lineSeparator()) + message);
88
89                 try {
90                         Map<String, Object> variables = new HashMap<>();
91
92                         if (injectedVariables != null) {
93                                 variables.putAll(injectedVariables);
94                         }
95
96                         variables.put(correlationVariable, correlationValue);
97                         variables.put(messageVariable, message == null ? null : message.toString());
98
99                         boolean ok = correlate(messageEventName, correlationVariable,
100                                 correlationValue, variables, logMarker);
101
102                         if (!ok) {
103                                 String msg = "No process is waiting for " + messageEventName
104                                         + " with " + correlationVariable + " = '" + correlationValue + "'";
105                                 logCallbackError(method, startTime, msg);
106                                 return new CallbackError(msg);
107                         }
108
109                         logCallbackSuccess(method, startTime);
110                         return new CallbackSuccess();
111                 } catch (Exception e) {
112                         logger.debug("Exception :",e);
113                         String msg = "Caught " + e.getClass().getSimpleName()
114                                 + " processing " + messageEventName + " with " + correlationVariable
115                                 + " = '" + correlationValue + "'";
116                         logCallbackError(method, startTime, msg);
117                         return new CallbackError(msg);
118                 }
119         }
120         
121         /**
122          * Performs message correlation.  Waits a limited amount of time for
123          * a process to become ready for correlation.  The return value indicates
124          * whether or not a process was found to receive the message.  Due to the
125          * synchronous nature of message injection in Camunda, by the time this
126          * method returns, one of 3 things will have happened: (1) the process
127          * received the message and ended, (2) the process received the message
128          * and reached an activity that suspended, or (3) an exception occurred
129          * during correlation or while the process was executing.  Correlation
130          * exceptions are handled differently from process execution exceptions.
131          * Correlation exceptions are thrown so the client knows something went
132          * wrong with the delivery of the message.  Process execution exceptions
133          * are logged but not thrown.
134          * @param messageEventName the message event name
135          * @param correlationVariable the process variable used as the correlator
136          * @param correlationValue the correlation value
137          * @param variables variables to inject into the process
138          * @param logMarker a marker for debug logging
139          * @return true if a process could be found, false if not
140          */
141         protected boolean correlate(String messageEventName, String correlationVariable,
142                         String correlationValue, Map<String, Object> variables, String logMarker) {
143         try{
144                 logger.debug(logMarker + " Attempting to find process waiting"
145                         + " for " + messageEventName + " with " + correlationVariable
146                         + " = '" + correlationValue + "'");
147
148                 
149
150                 long timeout = DEFAULT_TIMEOUT_SECONDS;
151
152                 // The code is here in case we ever need to change the default.
153                 String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
154                 if (correlationTimemout != null) {
155                         try {
156                                 timeout = Long.parseLong(correlationTimemout);
157                         } catch (NumberFormatException e) {
158                                 // Ignore
159                         }
160                 }
161
162                 long now = System.currentTimeMillis();
163                 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
164                 long endTime = now + (timeout * 1000);
165                 long sleep = FAST_POLL_INT_MS;
166
167                 List<Execution> waitingProcesses = null;
168                 Exception queryException = null;
169                 int queryCount = 0;
170                 int queryFailCount = 0;
171
172                 while (true) {
173                         try {
174                                 ++queryCount;
175                                 waitingProcesses = runtimeService.createExecutionQuery()
176                                         .messageEventSubscriptionName(messageEventName)
177                                         .processVariableValueEquals(correlationVariable, correlationValue)
178                                         .list();
179                         } catch (Exception e) {
180                                 ++queryFailCount;
181                                 queryException = e;
182                         }
183
184                         if (waitingProcesses != null && waitingProcesses.size() > 0) {
185                                 break;
186                         }
187
188                         if (now > endTime - sleep) {
189                                 break;
190                         }
191
192                         Thread.sleep(sleep);
193                         now = System.currentTimeMillis();
194
195                         if (now > fastPollEndTime) {
196                                 sleep = SLOW_POLL_INT_MS;
197                         }
198                 }
199
200                 if (waitingProcesses == null) {
201                         waitingProcesses = new ArrayList<Execution>(0);
202                 }
203
204                 int count = waitingProcesses.size();
205
206                 List<ExecInfo> execInfoList = new ArrayList<>(count);
207                 for (Execution execution : waitingProcesses) {
208                         execInfoList.add(new ExecInfo(execution));
209                 }
210
211                 logger.debug(logMarker + " Found " + count + " process(es) waiting"
212                         + " for " + messageEventName + " with " + correlationVariable
213                         + " = '" + correlationValue + "': " + execInfoList);
214
215                 if (count == 0) {
216                         if (queryFailCount > 0) {
217                                 String msg = queryFailCount + "/" + queryCount
218                                         + " execution queries failed attempting to correlate "
219                                         + messageEventName + " with " + correlationVariable
220                                         + " = '" + correlationValue + "'; last exception was:"
221                                         + queryException;
222                                 logger.debug(msg);
223                                 logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
224                                                 ErrorCode.UnknownError.getValue(), msg, queryException);
225                         }
226
227                         return false;
228                 }
229
230                 if (count > 1) {
231                         // Only one process should be waiting. Throw an exception back to the client.
232                         throw new MismatchingMessageCorrelationException(messageEventName,
233                                 "more than 1 process is waiting with " + correlationVariable
234                                 + " = '" + correlationValue + "'");
235                 }
236                 
237                 // We prototyped an asynchronous solution, i.e. resuming the process
238                 // flow in a separate thread, but this affected too many existing tests,
239                 // and we went back to the synchronous solution. The synchronous solution
240                 // has some troublesome characteristics though.  For example, the
241                 // resumed flow may send request #2 to a remote system before MSO has
242                 // acknowledged the notification associated with request #1.  
243
244                 try {
245                         logger.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
246                                 + messageEventName + " with " + correlationVariable + " = '"
247                                 + correlationValue + "'");
248
249                         @SuppressWarnings("unused")
250                         MessageCorrelationResult result = runtimeService
251                                 .createMessageCorrelation(messageEventName)
252                                 .setVariables(variables)
253                                 .processInstanceVariableEquals(correlationVariable, correlationValue)
254                                 .correlateWithResult();
255                         
256                 } catch (MismatchingMessageCorrelationException e) {
257                         // A correlation exception occurred even after we identified
258                         // one waiting process.  Throw it back to the client.
259                         throw e;
260                 } catch (OptimisticLockingException ole) {
261                         
262                         String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
263                                 + " with " + correlationVariable + " = '" + correlationValue
264                                 + "': " + ole;
265                         logger.debug(msg);
266                         logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
267                                 ErrorCode.UnknownError.getValue(), msg, ole);
268                         
269                         //Retry for OptimisticLocking Exceptions
270                         int retryCount = 0;
271                         String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
272                         if (retryStr != null) {
273                                 try {
274                                         retryCount = Integer.parseInt(retryStr);
275                                 } catch (NumberFormatException e) {
276                                         // Ignore
277                                 }
278                         }
279                         
280                         logger.debug("Retry correlate for OptimisticLockingException, retryCount:{}", retryCount);
281                         
282                         for (; retryCount >0 ; retryCount--) {
283                                 
284                                 try{
285                                         Thread.sleep(SLOW_POLL_INT_MS);
286                                         
287                                         @SuppressWarnings("unused")
288                                         MessageCorrelationResult result = runtimeService
289                                                 .createMessageCorrelation(messageEventName)
290                                                 .setVariables(variables)
291                                                 .processInstanceVariableEquals(correlationVariable, correlationValue)
292                                                 .correlateWithResult();
293                                         retryCount = 0;
294                                         logger.debug("OptimisticLockingException retry was successful, seting retryCount: {}", retryCount);
295                                 } catch (OptimisticLockingException olex) {
296                                         //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
297                                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + olex;
298                                         logger.debug(strMsg);
299                                         logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
300                                                         ErrorCode.UnknownError.getValue(), strMsg, olex);
301                                 } catch (Exception excep) {
302                                         retryCount = 0;
303                                         //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
304                                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + excep;
305                                         logger.debug(strMsg);
306                                         logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
307                                                 ErrorCode.UnknownError.getValue(), strMsg, excep);
308                                 }
309                 
310                         }
311                         
312                 }catch (Exception e) {
313                         // This must be an exception from the flow itself.  Log it, but don't
314                         // report it back to the client.
315                         String msg = "Caught " + e.getClass().getSimpleName() + " running "
316                                 + execInfoList.get(0) + " after receiving " + messageEventName
317                                 + " with " + correlationVariable + " = '" + correlationValue
318                                 + "': " + e;
319                         logger.debug(msg);
320                         logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN",
321                                 ErrorCode.UnknownError.getValue(), msg, e);
322                 }
323         }  catch (Exception e) {
324                 // This must be an exception from the flow itself.  Log it, but don't
325                 // report it back to the client.
326                 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
327                         + " with " + correlationVariable + " = '" + correlationValue
328                         + "': " + e;
329                 logger.debug(msg);
330                 logger.error("{} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
331                         ErrorCode.UnknownError.getValue(), msg, e);
332         }       
333
334                 return true;
335         }
336         
337         /**
338          * Records audit and metric events in the log for a callback success.
339          * @param method the method name
340          * @param startTime the request start time
341          */
342         protected void logCallbackSuccess(String method, long startTime) {
343         }
344
345         /**
346          * Records error, audit and metric events in the log for a callback
347          * internal error.
348          * @param method the method name
349          * @param startTime the request start time
350          * @param msg the error message
351          */
352         protected void logCallbackError(String method, long startTime, String msg) {
353                 logCallbackError(method, startTime, msg, null);
354         }
355
356         /**
357          * Records error, audit and metric events in the log for a callback
358          * internal error.
359          * @param method the method name
360          * @param startTime the request start time
361          * @param msg the error message
362          * @param e the exception
363          */
364         protected void logCallbackError(String method, long startTime, String msg, Exception e) {
365                 if (e == null) {
366                         logger.error("{} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN",
367                                 ErrorCode.UnknownError.getValue(), msg);
368                 } else {
369                         logger.error("{} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN",
370                                 ErrorCode.UnknownError.getValue(), msg, e);
371                 }
372         }
373         
374         /**
375          * Abstract callback result object.
376          */
377         protected abstract class CallbackResult {
378         }
379
380         /**
381          * Indicates that callback handling was successful.
382          */
383         protected class CallbackSuccess extends CallbackResult {
384         }
385
386         /**
387          * Indicates that callback handling failed.
388          */
389         protected class CallbackError extends CallbackResult {
390                 private final String errorMessage;
391
392                 public CallbackError(String errorMessage) {
393                         this.errorMessage = errorMessage;
394                 }
395
396                 /**
397                  * Gets the error message.
398                  */
399                 public String getErrorMessage() {
400                         return errorMessage;
401                 }
402         }
403
404         private static class ExecInfo {
405                 private final Execution execution;
406
407                 public ExecInfo(Execution execution) {
408                         this.execution = execution;
409                 }
410         
411                 @Override
412                 public String toString() {
413                         return "Process[" + execution.getProcessInstanceId()
414                                 + ":" + execution.getId() + "]";
415                 }
416         }
417 }