920cb11e4c790f9d4c0c819c53cbda33a546a1ee
[so.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Modifications Copyright (c) 2019 Samsung
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.bpmn.common.workflow.service;
24
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29
30 import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
31 import org.camunda.bpm.engine.OptimisticLockingException;
32 import org.camunda.bpm.engine.RuntimeService;
33 import org.camunda.bpm.engine.runtime.Execution;
34 import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
35 import org.onap.so.bpmn.core.UrnPropertiesReader;
36 import org.onap.so.logger.MessageEnum;
37 import org.onap.so.logger.MsoLogger;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.springframework.beans.factory.annotation.Autowired;
41 import org.springframework.scheduling.annotation.Async;
42 import org.springframework.stereotype.Service;
43
44 /**
45  * Abstract base class for callback services.
46  */
47 @Service
48 public class CallbackHandlerService {
49         public static final long DEFAULT_TIMEOUT_SECONDS = 60;
50         public static final long FAST_POLL_DUR_SECONDS = 5;
51         public static final long FAST_POLL_INT_MS = 100;
52         public static final long SLOW_POLL_INT_MS = 1000;
53         
54         private static final Logger logger = LoggerFactory.getLogger(CallbackHandlerService.class);
55
56         @Autowired
57         RuntimeService runtimeService;
58         
59         /**
60          * Parameterized callback handler.
61          */
62         @Async
63         protected CallbackResult handleCallback(String method, Object message,
64                         String messageEventName, String messageVariable,
65                         String correlationVariable, String correlationValue,
66                         String logMarker) {
67
68                 return handleCallback(method, message, messageEventName, messageVariable,
69                         correlationVariable, correlationValue, logMarker, null);
70         }
71
72         /**
73          * Parameterized callback handler.
74          */
75         protected CallbackResult handleCallback(String method, Object message,
76                         String messageEventName, String messageVariable,
77                         String correlationVariable, String correlationValue,
78                         String logMarker, Map<String, Object> injectedVariables) {
79
80                 long startTime = System.currentTimeMillis();
81
82                 logger.debug(logMarker + " " + method + " received message: "
83                         + (message == null ? "" : System.lineSeparator()) + message);
84
85                 try {
86                         Map<String, Object> variables = new HashMap<>();
87
88                         if (injectedVariables != null) {
89                                 variables.putAll(injectedVariables);
90                         }
91
92                         variables.put(correlationVariable, correlationValue);
93                         variables.put(messageVariable, message == null ? null : message.toString());
94
95                         boolean ok = correlate(messageEventName, correlationVariable,
96                                 correlationValue, variables, logMarker);
97
98                         if (!ok) {
99                                 String msg = "No process is waiting for " + messageEventName
100                                         + " with " + correlationVariable + " = '" + correlationValue + "'";
101                                 logCallbackError(method, startTime, msg);
102                                 return new CallbackError(msg);
103                         }
104
105                         logCallbackSuccess(method, startTime);
106                         return new CallbackSuccess();
107                 } catch (Exception e) {
108                         logger.debug("Exception :",e);
109                         String msg = "Caught " + e.getClass().getSimpleName()
110                                 + " processing " + messageEventName + " with " + correlationVariable
111                                 + " = '" + correlationValue + "'";
112                         logCallbackError(method, startTime, msg);
113                         return new CallbackError(msg);
114                 }
115         }
116         
117         /**
118          * Performs message correlation.  Waits a limited amount of time for
119          * a process to become ready for correlation.  The return value indicates
120          * whether or not a process was found to receive the message.  Due to the
121          * synchronous nature of message injection in Camunda, by the time this
122          * method returns, one of 3 things will have happened: (1) the process
123          * received the message and ended, (2) the process received the message
124          * and reached an activity that suspended, or (3) an exception occurred
125          * during correlation or while the process was executing.  Correlation
126          * exceptions are handled differently from process execution exceptions.
127          * Correlation exceptions are thrown so the client knows something went
128          * wrong with the delivery of the message.  Process execution exceptions
129          * are logged but not thrown.
130          * @param messageEventName the message event name
131          * @param correlationVariable the process variable used as the correlator
132          * @param correlationValue the correlation value
133          * @param variables variables to inject into the process
134          * @param logMarker a marker for debug logging
135          * @return true if a process could be found, false if not
136          */
137         protected boolean correlate(String messageEventName, String correlationVariable,
138                         String correlationValue, Map<String, Object> variables, String logMarker) {
139         try{
140                 logger.debug(logMarker + " Attempting to find process waiting"
141                         + " for " + messageEventName + " with " + correlationVariable
142                         + " = '" + correlationValue + "'");
143
144                 
145
146                 long timeout = DEFAULT_TIMEOUT_SECONDS;
147
148                 // The code is here in case we ever need to change the default.
149                 String correlationTimemout = UrnPropertiesReader.getVariable("mso.correlation.timeout");
150                 if (correlationTimemout != null) {
151                         try {
152                                 timeout = Long.parseLong(correlationTimemout);
153                         } catch (NumberFormatException e) {
154                                 // Ignore
155                         }
156                 }
157
158                 long now = System.currentTimeMillis();
159                 long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
160                 long endTime = now + (timeout * 1000);
161                 long sleep = FAST_POLL_INT_MS;
162
163                 List<Execution> waitingProcesses = null;
164                 Exception queryException = null;
165                 int queryCount = 0;
166                 int queryFailCount = 0;
167
168                 while (true) {
169                         try {
170                                 ++queryCount;
171                                 waitingProcesses = runtimeService.createExecutionQuery()
172                                         .messageEventSubscriptionName(messageEventName)
173                                         .processVariableValueEquals(correlationVariable, correlationValue)
174                                         .list();
175                         } catch (Exception e) {
176                                 ++queryFailCount;
177                                 queryException = e;
178                         }
179
180                         if (waitingProcesses != null && waitingProcesses.size() > 0) {
181                                 break;
182                         }
183
184                         if (now > endTime - sleep) {
185                                 break;
186                         }
187
188                         Thread.sleep(sleep);
189                         now = System.currentTimeMillis();
190
191                         if (now > fastPollEndTime) {
192                                 sleep = SLOW_POLL_INT_MS;
193                         }
194                 }
195
196                 if (waitingProcesses == null) {
197                         waitingProcesses = new ArrayList<Execution>(0);
198                 }
199
200                 int count = waitingProcesses.size();
201
202                 List<ExecInfo> execInfoList = new ArrayList<>(count);
203                 for (Execution execution : waitingProcesses) {
204                         execInfoList.add(new ExecInfo(execution));
205                 }
206
207                 logger.debug(logMarker + " Found " + count + " process(es) waiting"
208                         + " for " + messageEventName + " with " + correlationVariable
209                         + " = '" + correlationValue + "': " + execInfoList);
210
211                 if (count == 0) {
212                         if (queryFailCount > 0) {
213                                 String msg = queryFailCount + "/" + queryCount
214                                         + " execution queries failed attempting to correlate "
215                                         + messageEventName + " with " + correlationVariable
216                                         + " = '" + correlationValue + "'; last exception was:"
217                                         + queryException;
218                                 logger.debug(msg);
219                                 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
220                                                 MsoLogger.ErrorCode.UnknownError.getValue(), msg, queryException);
221                         }
222
223                         return false;
224                 }
225
226                 if (count > 1) {
227                         // Only one process should be waiting. Throw an exception back to the client.
228                         throw new MismatchingMessageCorrelationException(messageEventName,
229                                 "more than 1 process is waiting with " + correlationVariable
230                                 + " = '" + correlationValue + "'");
231                 }
232                 
233                 // We prototyped an asynchronous solution, i.e. resuming the process
234                 // flow in a separate thread, but this affected too many existing tests,
235                 // and we went back to the synchronous solution. The synchronous solution
236                 // has some troublesome characteristics though.  For example, the
237                 // resumed flow may send request #2 to a remote system before MSO has
238                 // acknowledged the notification associated with request #1.  
239
240                 try {
241                         logger.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
242                                 + messageEventName + " with " + correlationVariable + " = '"
243                                 + correlationValue + "'");
244
245                         @SuppressWarnings("unused")
246                         MessageCorrelationResult result = runtimeService
247                                 .createMessageCorrelation(messageEventName)
248                                 .setVariables(variables)
249                                 .processInstanceVariableEquals(correlationVariable, correlationValue)
250                                 .correlateWithResult();
251                         
252                 } catch (MismatchingMessageCorrelationException e) {
253                         // A correlation exception occurred even after we identified
254                         // one waiting process.  Throw it back to the client.
255                         throw e;
256                 } catch (OptimisticLockingException ole) {
257                         
258                         String msg = "Caught " + ole.getClass().getSimpleName() + " after receiving " + messageEventName
259                                 + " with " + correlationVariable + " = '" + correlationValue
260                                 + "': " + ole;
261                         logger.debug(msg);
262                         logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
263                                 MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError.getValue(), msg, ole);
264                         
265                         //Retry for OptimisticLocking Exceptions
266                         int retryCount = 0;
267                         String retryStr = UrnPropertiesReader.getVariable("mso.bpmn.optimisticlockingexception.retrycount");
268                         if (retryStr != null) {
269                                 try {
270                                         retryCount = Integer.parseInt(retryStr);
271                                 } catch (NumberFormatException e) {
272                                         // Ignore
273                                 }
274                         }
275                         
276                         logger.debug("Retry correlate for OptimisticLockingException, retryCount:{}", retryCount);
277                         
278                         for (; retryCount >0 ; retryCount--) {
279                                 
280                                 try{
281                                         Thread.sleep(SLOW_POLL_INT_MS);
282                                         
283                                         @SuppressWarnings("unused")
284                                         MessageCorrelationResult result = runtimeService
285                                                 .createMessageCorrelation(messageEventName)
286                                                 .setVariables(variables)
287                                                 .processInstanceVariableEquals(correlationVariable, correlationValue)
288                                                 .correlateWithResult();
289                                         retryCount = 0;
290                                         logger.debug("OptimisticLockingException retry was successful, seting retryCount: {}", retryCount);
291                                 } catch (OptimisticLockingException olex) {
292                                         //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
293                                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + olex;
294                                         logger.debug(strMsg);
295                                         logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
296                                                         MsoLogger.ErrorCode.UnknownError.getValue(), strMsg, olex);
297                                 } catch (Exception excep) {
298                                         retryCount = 0;
299                                         //oleFlag = ex instanceof org.camunda.bpm.engine.OptimisticLockingException;
300                                         String strMsg = "Received exception, OptimisticLockingException retry failed, retryCount:" + retryCount + " | exception returned: " + excep;
301                                         logger.debug(strMsg);
302                                         logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
303                                                 MsoLogger.ErrorCode.UnknownError.getValue(), strMsg, excep);
304                                 }
305                 
306                         }
307                         
308                 }catch (Exception e) {
309                         // This must be an exception from the flow itself.  Log it, but don't
310                         // report it back to the client.
311                         String msg = "Caught " + e.getClass().getSimpleName() + " running "
312                                 + execInfoList.get(0) + " after receiving " + messageEventName
313                                 + " with " + correlationVariable + " = '" + correlationValue
314                                 + "': " + e;
315                         logger.debug(msg);
316                         logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
317                                 MsoLogger.ErrorCode.UnknownError.getValue(), msg, e);
318                 }
319         }  catch (Exception e) {
320                 // This must be an exception from the flow itself.  Log it, but don't
321                 // report it back to the client.
322                 String msg = "Caught " + e.getClass().getSimpleName() + " after receiving " + messageEventName
323                         + " with " + correlationVariable + " = '" + correlationValue
324                         + "': " + e;
325                 logger.debug(msg);
326                 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION.toString(), "BPMN CORRELATION ERROR -",
327                         MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError.getValue(), msg, e);
328         }       
329
330                 return true;
331         }
332         
333         /**
334          * Records audit and metric events in the log for a callback success.
335          * @param method the method name
336          * @param startTime the request start time
337          */
338         protected void logCallbackSuccess(String method, long startTime) {
339         }
340
341         /**
342          * Records error, audit and metric events in the log for a callback
343          * internal error.
344          * @param method the method name
345          * @param startTime the request start time
346          * @param msg the error message
347          */
348         protected void logCallbackError(String method, long startTime, String msg) {
349                 logCallbackError(method, startTime, msg, null);
350         }
351
352         /**
353          * Records error, audit and metric events in the log for a callback
354          * internal error.
355          * @param method the method name
356          * @param startTime the request start time
357          * @param msg the error message
358          * @param e the exception
359          */
360         protected void logCallbackError(String method, long startTime, String msg, Exception e) {
361                 if (e == null) {
362                         logger.error("{} {} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
363                                 MsoLogger.ErrorCode.UnknownError.getValue(), msg);
364                 } else {
365                         logger.error("{} {} {} {} {}", MessageEnum.BPMN_CALLBACK_EXCEPTION.toString(), "BPMN", MsoLogger.getServiceName(),
366                                 MsoLogger.ErrorCode.UnknownError.getValue(), msg, e);
367                 }
368         }
369         
370         /**
371          * Abstract callback result object.
372          */
373         protected abstract class CallbackResult {
374         }
375
376         /**
377          * Indicates that callback handling was successful.
378          */
379         protected class CallbackSuccess extends CallbackResult {
380         }
381
382         /**
383          * Indicates that callback handling failed.
384          */
385         protected class CallbackError extends CallbackResult {
386                 private final String errorMessage;
387
388                 public CallbackError(String errorMessage) {
389                         this.errorMessage = errorMessage;
390                 }
391
392                 /**
393                  * Gets the error message.
394                  */
395                 public String getErrorMessage() {
396                         return errorMessage;
397                 }
398         }
399
400         private static class ExecInfo {
401                 private final Execution execution;
402
403                 public ExecInfo(Execution execution) {
404                         this.execution = execution;
405                 }
406         
407                 @Override
408                 public String toString() {
409                         return "Process[" + execution.getProcessInstanceId()
410                                 + ":" + execution.getId() + "]";
411                 }
412         }
413 }