Cleaned up MsoLogger class
[so.git] / bpmn / mso-infrastructure-bpmn / src / main / java / org / onap / so / bpmn / common / workflow / service / WorkflowResource.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2017 Huawei Technologies Co., Ltd. All rights reserved.
7  * ================================================================================
8  * Modifications Copyright (c) 2019 Samsung
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  * 
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  * 
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.so.bpmn.common.workflow.service;
25
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.UUID;
30 import java.util.concurrent.atomic.AtomicLong;
31
32 import javax.ws.rs.Consumes;
33 import javax.ws.rs.POST;
34 import javax.ws.rs.Path;
35 import javax.ws.rs.PathParam;
36 import javax.ws.rs.Produces;
37 import javax.ws.rs.core.Context;
38 import javax.ws.rs.core.Response;
39 import javax.ws.rs.core.UriInfo;
40
41 import org.camunda.bpm.engine.HistoryService;
42 import org.camunda.bpm.engine.ProcessEngineException;
43 import org.camunda.bpm.engine.ProcessEngineServices;
44 import org.camunda.bpm.engine.RuntimeService;
45 import org.camunda.bpm.engine.history.HistoricVariableInstance;
46 import org.camunda.bpm.engine.runtime.ProcessInstance;
47 import org.camunda.bpm.engine.variable.VariableMap;
48 import org.camunda.bpm.engine.variable.Variables;
49 import org.camunda.bpm.engine.variable.Variables.SerializationDataFormats;
50 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
51 import org.onap.so.bpmn.common.workflow.context.WorkflowResponse;
52 import org.onap.so.bpmn.core.WorkflowException;
53 import org.onap.so.logger.MessageEnum;
54 import org.onap.so.logger.MsoLogger;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import org.slf4j.MDC;
58 import org.springframework.stereotype.Component;
59
60 import io.swagger.annotations.Api;
61 import io.swagger.annotations.ApiOperation;
62
63 @Path("/workflow")
64 @Api(value = "/workflow", description = "Root of workflow services")
65 @Component
66 public class WorkflowResource extends ProcessEngineAwareService {
67         
68         private static final Logger logger = LoggerFactory.getLogger(WorkflowResource.class);
69         private static final String LOGMARKER = "[WRKFLOW-RESOURCE]";
70
71         private static final int DEFAULT_WAIT_TIME = 30000;
72
73         @Context
74         private UriInfo uriInfo = null;
75
76         /**
77          * Starts the process instance and responds to client synchronously
78          * If the request does not contain mso-service-request-timeout then it waits for the value specified in DEFAULT_WAIT_TIME
79          * Note: value specified in mso-service-request-timeout is in seconds
80          * During polling time, if there is an exception encountered in the process execution then polling is stopped and the error response is 
81          * returned to the client
82          * @param processKey
83          * @param variableMap
84          * @return
85          */
86         @POST
87         @Path("/services/{processKey}")
88         @ApiOperation(
89                 value = "Starts a new process with the appropriate process synchronously",
90                 notes = "d"
91             )
92         @Produces("application/json")
93         @Consumes("application/json")
94         public Response startProcessInstanceByKey(@PathParam("processKey") String processKey,
95                         VariableMapImpl variableMap) {
96
97                 Map<String, Object> inputVariables = getInputVariables(variableMap);    
98                 setLogContext(processKey, inputVariables);
99
100                 WorkflowResponse workflowResponse = new WorkflowResponse();
101                 long startTime = System.currentTimeMillis();
102                 ProcessInstance processInstance = null;
103
104                 try {
105                         //Kickoff the process
106                         ProcessThread thread = new ProcessThread(inputVariables,processKey);
107                         thread.start();
108
109                         Map<String, Object> responseMap = null;
110
111                         //wait for process to be completed
112                         long waitTime = getWaitTime(inputVariables);
113                         long now = System.currentTimeMillis();
114                         long start = now;
115                         long endTime = start + waitTime;
116                         long pollingInterval = 500;
117
118                         // TEMPORARY LOGIC FOR UNIT TEST REFACTORING
119                         // If this is a unit test (method is invoked directly), wait a max
120                         // of 5 seconds after process ended for a result.  In production,
121                         // wait up to 60 seconds.
122                         long timeToWaitAfterProcessEnded = uriInfo == null ? 5000 : 60000;
123                         AtomicLong timeProcessEnded = new AtomicLong(0);
124                         boolean endedWithNoResponse = false;
125
126                         while (now <= endTime) {
127                                 Thread.sleep(pollingInterval);
128
129                                 now = System.currentTimeMillis();
130
131                                 // Increase the polling interval over time
132
133                                 long elapsed = now - start;
134
135                                 if (elapsed > 60000) {
136                                         pollingInterval = 5000;
137                                 } else if (elapsed > 10000) {
138                                         pollingInterval = 1000;
139                                 }
140                                 Exception exception = thread.getException();
141                                 if (exception != null) {
142                                         throw new Exception(exception);
143                                 }
144
145                                 processInstance = thread.getProcessInstance();
146
147                                 if (processInstance == null) {
148                                         logger.debug("{} process has not been created yet", LOGMARKER + processKey );
149                                         continue;
150                                 }
151
152                                 String processInstanceId = processInstance.getId();
153                                 workflowResponse.setProcessInstanceID(processInstanceId);
154
155                                 responseMap = getResponseMap(processInstance, processKey, timeProcessEnded);
156
157                                 if (responseMap == null) {
158                                         logger.debug("{} has not produced a response yet", LOGMARKER + processKey);
159
160                                         if (timeProcessEnded.longValue() != 0) {
161                                                 long elapsedSinceEnded = System.currentTimeMillis() - timeProcessEnded.longValue();
162
163                                                 if (elapsedSinceEnded > timeToWaitAfterProcessEnded) {
164                                                         endedWithNoResponse = true;
165                                                         break;
166                                                 }
167                                         }
168                                 } else {
169                                         processResponseMap(workflowResponse, responseMap);
170                                         recordEvents(processKey, workflowResponse, startTime);
171                                         return Response.status(workflowResponse.getMessageCode()).entity(workflowResponse).build();
172                                 }
173                         }
174
175                         //if we dont get response after waiting then send timeout response
176
177                         String state;
178                         String processInstanceId;
179
180                         if (processInstance == null) {
181                                 processInstanceId = "N/A";
182                                 state = "NOT STARTED";
183                         } else {
184                                 processInstanceId = processInstance.getProcessInstanceId();
185                                 state = isProcessEnded(processInstanceId) ? "ENDED" : "NOT ENDED";
186                         }
187
188                         workflowResponse.setMessage("Fail");
189                         if (endedWithNoResponse) {
190                                 workflowResponse.setResponse("Process ended without producing a response");
191                         } else {
192                                 workflowResponse.setResponse("Request timed out, process state: " + state);
193                         }
194                         workflowResponse.setProcessInstanceID(processInstanceId);
195                         recordEvents(processKey, workflowResponse, startTime);
196                         workflowResponse.setMessageCode(500);
197                         return Response.status(500).entity(workflowResponse).build();
198                 } catch (Exception ex) {
199                         logger.debug(LOGMARKER + "Exception in startProcessInstance by key",ex);
200                         workflowResponse.setMessage("Fail" );
201                         workflowResponse.setResponse("Error occurred while executing the process: " + ex.getMessage());
202                         if (processInstance != null) workflowResponse.setProcessInstanceID(processInstance.getId());
203
204                         logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN", MDC.get(processKey),
205                                 MsoLogger.ErrorCode.UnknownError.getValue(),
206                                 LOGMARKER + workflowResponse.getMessage() + " for processKey: " + processKey + " with response: "
207                                         + workflowResponse.getResponse());
208                         
209                         workflowResponse.setMessageCode(500);
210                         recordEvents(processKey, workflowResponse, startTime);
211                         return Response.status(500).entity(workflowResponse).build();
212                 }
213         }
214
215         /**
216          * Returns the wait time, this is used by the resource on how long it should wait to send a response
217          * If none specified DEFAULT_WAIT_TIME is used
218          * @param inputVariables
219          * @return
220          */
221         private int getWaitTime(Map<String, Object> inputVariables)
222         {
223                 String timeout = inputVariables.get("mso-service-request-timeout") == null
224                         ? null : inputVariables.get("mso-service-request-timeout").toString();
225
226                 if (timeout != null) {
227                         try {
228                                 return Integer.parseInt(timeout)*1000;
229                         } catch (NumberFormatException nex) {
230                                 logger.debug("Invalid input for mso-service-request-timeout");
231                         }
232                 }
233                 return DEFAULT_WAIT_TIME;
234         }
235         
236         private void recordEvents(String processKey, WorkflowResponse response, long startTime) {
237         }
238
239         private void setLogContext(String processKey, Map<String, Object> inputVariables) {
240                 if (inputVariables != null) {
241                         MsoLogger.setLogContext(getValueFromInputVariables(inputVariables, "mso-request-id"),
242                                 getValueFromInputVariables(inputVariables, "mso-service-instance-id"));
243                 }
244         }
245
246         private String getValueFromInputVariables(Map<String,Object> inputVariables, String key) {
247                 Object value = inputVariables.get(key);
248                 if (value == null) {
249                         return "N/A";
250                 } else {
251                         return value.toString();
252                 }
253         }
254         
255         /**
256          * Checks to see if the specified process is ended.
257          * @param processInstanceId the process instance ID
258          * @return true if the process is ended
259          */
260         private boolean isProcessEnded(String processInstanceId) {
261                 ProcessEngineServices pes = getProcessEngineServices();
262                 try {
263                         return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null ? true : false ;
264                 } catch (Exception e) {
265                         logger.debug("Exception :",e);
266                         return true;
267                 }        
268         }
269
270         private void processResponseMap(WorkflowResponse workflowResponse, Map<String, Object> responseMap) {
271                 Object object = responseMap.get("Response");
272                 String response = object == null ? null : String.valueOf(object);
273                 if(response == null){
274                         object = responseMap.get("WorkflowResponse");
275                         response = object == null ? null : String.valueOf(object);
276                 }
277
278                 workflowResponse.setResponse(response);
279
280                 object = responseMap.get("ResponseCode");
281                 String responseCode = object == null ? null : String.valueOf(object);
282
283                 try {
284                         workflowResponse.setMessageCode(Integer.parseInt(responseCode));
285                 } catch(NumberFormatException nex) {
286                         logger.debug(LOGMARKER + "Failed to parse ResponseCode: " + responseCode);
287                         workflowResponse.setMessageCode(-1);
288                 }
289
290                 Object status = responseMap.get("Status");
291
292                 if ("Success".equalsIgnoreCase(String.valueOf(status))) {
293                         workflowResponse.setMessage("Success");
294                 } else if ("Fail".equalsIgnoreCase(String.valueOf(status))) {
295                         workflowResponse.setMessage("Fail");
296                 } else {
297                         logger.debug(LOGMARKER + "Unrecognized Status: " + responseCode);
298                         workflowResponse.setMessage("Fail");
299                 }
300         }
301
302         /**
303          * @version 1.0
304          * Triggers the workflow in a separate thread
305          */
306         private class ProcessThread extends Thread {
307                 private final Map<String,Object> inputVariables;
308                 private final String processKey;
309                 private final String businessKey;
310                 private ProcessInstance processInstance = null;
311                 private Exception exception = null;
312
313                 public ProcessThread(Map<String, Object> inputVariables, String processKey) {
314                         this.inputVariables = inputVariables;
315                         this.processKey = processKey;
316                         this.businessKey = UUID.randomUUID().toString();
317                 }
318
319                 /**
320                  * If an exception occurs when starting the process instance, it may
321                  * be obtained by calling this method.  Note that exceptions are only
322                  * recorded while the process is executing in its original thread.
323                  * Once a process is suspended, exception recording stops.
324                  * @return the exception, or null if none has occurred
325                  */
326                 public Exception getException() {
327                         return exception;
328                 }
329
330                 
331                 public ProcessInstance getProcessInstance() {
332                         return this.processInstance;
333                 }
334                 
335                 /**
336                  * Sets the process instance exception.
337                  * @param exception the exception
338                  */
339                 private void setException(Exception exception) {
340                         this.exception = exception;
341                 }
342
343                 public void run() {
344                         setLogContext(processKey, inputVariables);
345
346                         long startTime = System.currentTimeMillis();
347                         
348                         try {
349                                                 
350                                 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
351
352                                 // Note that this method doesn't return until the process suspends
353                                 // itself or finishes.  We provide a business key so we can identify
354                                 // the process instance immediately.
355                                 processInstance = runtimeService.startProcessInstanceByKey(
356                                         processKey, inputVariables);
357
358                         } catch (Exception e) {
359                                 logger.debug(LOGMARKER + "ProcessThread caught an exception executing "
360                                         + processKey + ": " + e);
361                                 setException(e);
362                         }
363                 }
364
365         }
366
367         private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
368                 VariableMap inputVariables = Variables.createVariables();
369                 @SuppressWarnings("unchecked")
370                 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
371                 for (String key : vMap.keySet()) { //variabe name vn
372                         @SuppressWarnings("unchecked")
373                         Map<String, Object> valueMap = (Map<String,Object>)vMap.get(key); //value, type
374                         inputVariables.putValueTyped(key, Variables
375                         .objectValue(valueMap.get("value"))
376                         .serializationDataFormat(SerializationDataFormats.JAVA) // tells the engine to use java serialization for persisting the value
377                         .create());
378                 }
379                 return inputVariables;
380         }
381
382         /**
383          * Attempts to get a response map from the specified process instance.
384          * @return the response map, or null if it is unavailable
385          */
386         private Map<String, Object> getResponseMap(ProcessInstance processInstance,
387                         String processKey, AtomicLong timeProcessEnded) {
388
389                 String responseMapVariable = processKey + "ResponseMap";
390                 String processInstanceId = processInstance.getId();
391
392                 // Query the runtime service to see if a response map is ready.
393
394 /*              RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
395                 List<Execution> executions = runtimeService.createExecutionQuery()
396                         .processInstanceId(processInstanceId).list();
397
398                 for (Execution execution : executions) {
399                         @SuppressWarnings("unchecked")
400                         Map<String, Object> responseMap = (Map<String, Object>)
401                                 getVariableFromExecution(runtimeService, execution.getId(),
402                                         responseMapVariable);
403
404                         if (responseMap != null) {
405                                 msoLogger.debug(LOGMARKER + "Obtained " + responseMapVariable
406                                         + " from process " + processInstanceId + " execution "
407                                         + execution.getId());
408                                 return responseMap;
409                         }
410                 }
411 */
412                 //Querying history seem to return consistent results compared to querying the runtime service
413
414                 boolean alreadyEnded = timeProcessEnded.longValue() != 0;
415
416                 if (alreadyEnded || isProcessEnded(processInstance.getId())) {
417                         if (!alreadyEnded) {
418                                 timeProcessEnded.set(System.currentTimeMillis());
419                         }
420
421                         // Query the history service to see if a response map exists.
422
423                         HistoryService historyService = getProcessEngineServices().getHistoryService();
424                         @SuppressWarnings("unchecked")
425                         Map<String, Object> responseMap = (Map<String, Object>)
426                                 getVariableFromHistory(historyService, processInstance.getId(),
427                                         responseMapVariable);
428
429                         if (responseMap != null) {
430                                 logger.debug(LOGMARKER + "Obtained " + responseMapVariable
431                                         + " from process " + processInstanceId + " history");
432                                 return responseMap;
433                         }
434
435                         // Query the history service for old-style response variables.
436
437                         String prefix = (String) getVariableFromHistory(historyService, processInstanceId, "prefix");
438
439                         if (prefix != null) {
440                                 
441                                 // Check for 'WorkflowResponse' variable
442                                 Object workflowResponseObject = getVariableFromHistory(historyService, processInstanceId, "WorkflowResponse");
443                                 String workflowResponse = workflowResponseObject == null ? null : String.valueOf(workflowResponseObject);
444                                 logger.debug(LOGMARKER + "WorkflowResponse: " + workflowResponse);
445                                 
446                                 if (workflowResponse != null) {
447                                         Object responseCodeObject = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
448                                         String responseCode = responseCodeObject == null ? null : String.valueOf(responseCodeObject);
449                                         logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
450                                         responseMap = new HashMap<>();
451                                         responseMap.put("WorkflowResponse", workflowResponse);
452                                         responseMap.put("ResponseCode", responseCode);
453                                         responseMap.put("Status", "Success");
454                                         return responseMap;
455                                 }
456                                 
457                                 
458                                 // Check for 'WorkflowException' variable
459                                 WorkflowException workflowException = null;
460                                 String workflowExceptionText = null;
461
462                                 Object workflowExceptionObject = getVariableFromHistory(historyService, processInstanceId, "WorkflowException");
463                                 if(workflowExceptionObject != null) {
464                                         if(workflowExceptionObject instanceof WorkflowException) {
465                                                 workflowException = (WorkflowException) workflowExceptionObject;
466                                                 workflowExceptionText = workflowException.toString();
467                                                 responseMap = new HashMap<>();
468                                                 responseMap.put("WorkflowException", workflowExceptionText);
469                                                 responseMap.put("ResponseCode", workflowException.getErrorCode());
470                                                 responseMap.put("Status", "Fail");
471                                                 return responseMap;
472                                         }
473                                         else if (workflowExceptionObject instanceof String) {
474                                                 Object object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
475                                                 String responseCode = object == null ? null : String.valueOf(object);
476                                                 workflowExceptionText = (String) workflowExceptionObject;
477                                                 responseMap = new HashMap<>();
478                                                 responseMap.put("WorkflowException", workflowExceptionText);
479                                                 responseMap.put("ResponseCode", responseCode);
480                                                 responseMap.put("Status", "Fail");
481                                                 return responseMap;
482                                         }
483                                         
484                                 }
485                                 logger.debug(LOGMARKER + "WorkflowException: " + workflowExceptionText);
486                                 
487                                 // BEGIN LEGACY SUPPORT.  TODO: REMOVE THIS CODE
488                                 Object object = getVariableFromHistory(historyService, processInstanceId, processKey + "Response");
489                                 String response = object == null ? null : String.valueOf(object);
490                                 logger.debug(LOGMARKER + processKey + "Response: " + response);
491
492                                 if (response != null) {
493                                         object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
494                                         String responseCode = object == null ? null : String.valueOf(object);
495                                         logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
496                                         responseMap = new HashMap<>();
497                                         responseMap.put("Response", response);
498                                         responseMap.put("ResponseCode", responseCode);
499                                         responseMap.put("Status", "Success");
500                                         return responseMap;
501                                 }
502         
503                                 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ErrorResponse");
504                                 String errorResponse = object == null ? null : String.valueOf(object);
505                                 logger.debug(LOGMARKER + prefix + "ErrorResponse: " + errorResponse);
506
507                                 if (errorResponse != null) {
508                                         object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
509                                         String responseCode = object == null ? null : String.valueOf(object);
510                                         logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
511                                         responseMap = new HashMap<>();
512                                         responseMap.put("Response", errorResponse);
513                                         responseMap.put("ResponseCode", responseCode);
514                                         responseMap.put("Status", "Fail");
515                                         return responseMap;
516                                 }
517                                 // END LEGACY SUPPORT.  TODO: REMOVE THIS CODE
518                         }
519                 }
520                 return null;
521         }
522         
523         /**
524          * Gets a variable value from the specified execution.
525          * @return the variable value, or null if the variable could not be
526          * obtained
527          */
528         private Object getVariableFromExecution(RuntimeService runtimeService,
529                         String executionId, String variableName) {
530                 try {
531                         return runtimeService.getVariable(executionId, variableName);
532                 } catch (ProcessEngineException e) {
533                         // Most likely cause is that the execution no longer exists.
534                         logger.debug("Error retrieving execution " + executionId
535                                 + " variable " + variableName + ": " + e);
536                         return null;
537                 }
538         }
539         /**
540          * Gets a variable value from specified historical process instance.
541          * @return the variable value, or null if the variable could not be
542          * obtained
543          */
544         private Object getVariableFromHistory(HistoryService historyService,
545                         String processInstanceId, String variableName) {
546                 try {
547                         HistoricVariableInstance v = historyService.createHistoricVariableInstanceQuery()
548                                 .processInstanceId(processInstanceId).variableName(variableName).singleResult();
549                         return v == null ? null : v.getValue();
550                 } catch (Exception e) {
551                         logger.debug("Error retrieving process {} variable {} from history: ", processInstanceId,
552                                 variableName, e);
553                         return null;
554                 }
555         }
556         
557         @POST
558         @Path("/services/{processKey}/{processInstanceId}")
559         @Produces("application/json")
560         @Consumes("application/json")
561         @ApiOperation(
562                 value = "Allows for retrieval of the variables for a given process",
563                 notes = ""
564             )
565         public WorkflowResponse getProcessVariables(@PathParam("processKey") String processKey, @PathParam("processInstanceId") String processInstanceId) {
566                 //TODO filter only set of variables
567                 WorkflowResponse response = new WorkflowResponse();
568
569                 long startTime = System.currentTimeMillis();
570                 try {
571                         ProcessEngineServices engine = getProcessEngineServices();
572                         List<HistoricVariableInstance> variables = engine.getHistoryService().createHistoricVariableInstanceQuery().processInstanceId(processInstanceId).list();
573                         Map<String,String> variablesMap = new HashMap<>();
574                         for (HistoricVariableInstance variableInstance: variables) {
575                                 variablesMap.put(variableInstance.getName(), variableInstance.getValue().toString());
576                         }
577
578                         logger.debug(LOGMARKER + "***Received MSO getProcessVariables with processKey:" + processKey + " and variables: " +
579                                 variablesMap.toString());
580                         
581                         response.setVariables(variablesMap);
582                         response.setMessage("Success");
583                         response.setResponse("Successfully retrieved the variables");
584                         response.setProcessInstanceID(processInstanceId);
585
586                         logger.debug(LOGMARKER + response.getMessage() + " for processKey: " + processKey + " with response: " + response
587                                 .getResponse());
588                 } catch (Exception ex) {
589                         response.setMessage("Fail");
590                         response.setResponse("Failed to retrieve the variables," + ex.getMessage());
591                         response.setProcessInstanceID(processInstanceId);
592
593                         logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN", MDC.get(processKey),
594                                 MsoLogger.ErrorCode.UnknownError.getValue(),
595                                 LOGMARKER + response.getMessage() + " for processKey: " + processKey + " with response: " + response
596                                         .getResponse());
597                         logger.debug("Exception :",ex);
598                 }
599                 return response;
600         }
601 }