Replaced all tabs with spaces in java and pom.xml
[so.git] / bpmn / mso-infrastructure-bpmn / src / main / java / org / onap / so / bpmn / common / workflow / service / WorkflowResource.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2017 Huawei Technologies Co., Ltd. All rights reserved.
7  * ================================================================================
8  * Modifications Copyright (c) 2019 Samsung
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  * 
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  * 
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.so.bpmn.common.workflow.service;
25
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.UUID;
30 import java.util.concurrent.atomic.AtomicLong;
31 import javax.ws.rs.Consumes;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.PathParam;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.core.Context;
37 import javax.ws.rs.core.Response;
38 import javax.ws.rs.core.UriInfo;
39 import org.camunda.bpm.engine.HistoryService;
40 import org.camunda.bpm.engine.ProcessEngineException;
41 import org.camunda.bpm.engine.ProcessEngineServices;
42 import org.camunda.bpm.engine.RuntimeService;
43 import org.camunda.bpm.engine.history.HistoricVariableInstance;
44 import org.camunda.bpm.engine.runtime.ProcessInstance;
45 import org.camunda.bpm.engine.variable.VariableMap;
46 import org.camunda.bpm.engine.variable.Variables;
47 import org.camunda.bpm.engine.variable.Variables.SerializationDataFormats;
48 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
49 import org.onap.so.bpmn.common.workflow.context.WorkflowResponse;
50 import org.onap.so.bpmn.core.WorkflowException;
51 import org.onap.so.logger.ErrorCode;
52 import org.onap.so.logger.MessageEnum;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.slf4j.MDC;
56 import org.springframework.stereotype.Component;
57 import io.swagger.annotations.Api;
58 import io.swagger.annotations.ApiOperation;
59
60 @Path("/workflow")
61 @Api(value = "/workflow", description = "Root of workflow services")
62 @Component
63 public class WorkflowResource extends ProcessEngineAwareService {
64
65     private static final Logger logger = LoggerFactory.getLogger(WorkflowResource.class);
66     private static final String LOGMARKER = "[WRKFLOW-RESOURCE]";
67
68     private static final int DEFAULT_WAIT_TIME = 30000;
69
70     @Context
71     private UriInfo uriInfo = null;
72
73     /**
74      * Starts the process instance and responds to client synchronously If the request does not contain
75      * mso-service-request-timeout then it waits for the value specified in DEFAULT_WAIT_TIME Note: value specified in
76      * mso-service-request-timeout is in seconds During polling time, if there is an exception encountered in the
77      * process execution then polling is stopped and the error response is returned to the client
78      * 
79      * @param processKey
80      * @param variableMap
81      * @return
82      */
83     @POST
84     @Path("/services/{processKey}")
85     @ApiOperation(value = "Starts a new process with the appropriate process synchronously", notes = "d")
86     @Produces("application/json")
87     @Consumes("application/json")
88     public Response startProcessInstanceByKey(@PathParam("processKey") String processKey, VariableMapImpl variableMap) {
89
90         Map<String, Object> inputVariables = getInputVariables(variableMap);
91         setLogContext(processKey, inputVariables);
92
93         WorkflowResponse workflowResponse = new WorkflowResponse();
94         long startTime = System.currentTimeMillis();
95         ProcessInstance processInstance = null;
96
97         try {
98             // Kickoff the process
99             ProcessThread thread = new ProcessThread(inputVariables, processKey);
100             thread.start();
101
102             Map<String, Object> responseMap = null;
103
104             // wait for process to be completed
105             long waitTime = getWaitTime(inputVariables);
106             long now = System.currentTimeMillis();
107             long start = now;
108             long endTime = start + waitTime;
109             long pollingInterval = 500;
110
111             // TEMPORARY LOGIC FOR UNIT TEST REFACTORING
112             // If this is a unit test (method is invoked directly), wait a max
113             // of 5 seconds after process ended for a result. In production,
114             // wait up to 60 seconds.
115             long timeToWaitAfterProcessEnded = uriInfo == null ? 5000 : 60000;
116             AtomicLong timeProcessEnded = new AtomicLong(0);
117             boolean endedWithNoResponse = false;
118
119             while (now <= endTime) {
120                 Thread.sleep(pollingInterval);
121
122                 now = System.currentTimeMillis();
123
124                 // Increase the polling interval over time
125
126                 long elapsed = now - start;
127
128                 if (elapsed > 60000) {
129                     pollingInterval = 5000;
130                 } else if (elapsed > 10000) {
131                     pollingInterval = 1000;
132                 }
133                 Exception exception = thread.getException();
134                 if (exception != null) {
135                     throw new Exception(exception);
136                 }
137
138                 processInstance = thread.getProcessInstance();
139
140                 if (processInstance == null) {
141                     logger.debug("{} process has not been created yet", LOGMARKER + processKey);
142                     continue;
143                 }
144
145                 String processInstanceId = processInstance.getId();
146                 workflowResponse.setProcessInstanceID(processInstanceId);
147
148                 responseMap = getResponseMap(processInstance, processKey, timeProcessEnded);
149
150                 if (responseMap == null) {
151                     logger.debug("{} has not produced a response yet", LOGMARKER + processKey);
152
153                     if (timeProcessEnded.longValue() != 0) {
154                         long elapsedSinceEnded = System.currentTimeMillis() - timeProcessEnded.longValue();
155
156                         if (elapsedSinceEnded > timeToWaitAfterProcessEnded) {
157                             endedWithNoResponse = true;
158                             break;
159                         }
160                     }
161                 } else {
162                     processResponseMap(workflowResponse, responseMap);
163                     recordEvents(processKey, workflowResponse, startTime);
164                     return Response.status(workflowResponse.getMessageCode()).entity(workflowResponse).build();
165                 }
166             }
167
168             // if we dont get response after waiting then send timeout response
169
170             String state;
171             String processInstanceId;
172
173             if (processInstance == null) {
174                 processInstanceId = "N/A";
175                 state = "NOT STARTED";
176             } else {
177                 processInstanceId = processInstance.getProcessInstanceId();
178                 state = isProcessEnded(processInstanceId) ? "ENDED" : "NOT ENDED";
179             }
180
181             workflowResponse.setMessage("Fail");
182             if (endedWithNoResponse) {
183                 workflowResponse.setResponse("Process ended without producing a response");
184             } else {
185                 workflowResponse.setResponse("Request timed out, process state: " + state);
186             }
187             workflowResponse.setProcessInstanceID(processInstanceId);
188             recordEvents(processKey, workflowResponse, startTime);
189             workflowResponse.setMessageCode(500);
190             return Response.status(500).entity(workflowResponse).build();
191         } catch (Exception ex) {
192             logger.debug(LOGMARKER + "Exception in startProcessInstance by key", ex);
193             workflowResponse.setMessage("Fail");
194             workflowResponse.setResponse("Error occurred while executing the process: " + ex.getMessage());
195             if (processInstance != null)
196                 workflowResponse.setProcessInstanceID(processInstance.getId());
197
198             logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN",
199                     MDC.get(processKey), ErrorCode.UnknownError.getValue(), LOGMARKER + workflowResponse.getMessage()
200                             + " for processKey: " + processKey + " with response: " + workflowResponse.getResponse());
201
202             workflowResponse.setMessageCode(500);
203             recordEvents(processKey, workflowResponse, startTime);
204             return Response.status(500).entity(workflowResponse).build();
205         }
206     }
207
208     /**
209      * Returns the wait time, this is used by the resource on how long it should wait to send a response If none
210      * specified DEFAULT_WAIT_TIME is used
211      * 
212      * @param inputVariables
213      * @return
214      */
215     private int getWaitTime(Map<String, Object> inputVariables) {
216         String timeout = inputVariables.get("mso-service-request-timeout") == null ? null
217                 : inputVariables.get("mso-service-request-timeout").toString();
218
219         if (timeout != null) {
220             try {
221                 return Integer.parseInt(timeout) * 1000;
222             } catch (NumberFormatException nex) {
223                 logger.debug("Invalid input for mso-service-request-timeout");
224             }
225         }
226         return DEFAULT_WAIT_TIME;
227     }
228
229     private void recordEvents(String processKey, WorkflowResponse response, long startTime) {}
230
231     private void setLogContext(String processKey, Map<String, Object> inputVariables) {}
232
233     private String getValueFromInputVariables(Map<String, Object> inputVariables, String key) {
234         Object value = inputVariables.get(key);
235         if (value == null) {
236             return "N/A";
237         } else {
238             return value.toString();
239         }
240     }
241
242     /**
243      * Checks to see if the specified process is ended.
244      * 
245      * @param processInstanceId the process instance ID
246      * @return true if the process is ended
247      */
248     private boolean isProcessEnded(String processInstanceId) {
249         ProcessEngineServices pes = getProcessEngineServices();
250         try {
251             return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId)
252                     .singleResult() == null ? true : false;
253         } catch (Exception e) {
254             logger.debug("Exception :", e);
255             return true;
256         }
257     }
258
259     private void processResponseMap(WorkflowResponse workflowResponse, Map<String, Object> responseMap) {
260         Object object = responseMap.get("Response");
261         String response = object == null ? null : String.valueOf(object);
262         if (response == null) {
263             object = responseMap.get("WorkflowResponse");
264             response = object == null ? null : String.valueOf(object);
265         }
266
267         workflowResponse.setResponse(response);
268
269         object = responseMap.get("ResponseCode");
270         String responseCode = object == null ? null : String.valueOf(object);
271
272         try {
273             workflowResponse.setMessageCode(Integer.parseInt(responseCode));
274         } catch (NumberFormatException nex) {
275             logger.debug(LOGMARKER + "Failed to parse ResponseCode: " + responseCode);
276             workflowResponse.setMessageCode(-1);
277         }
278
279         Object status = responseMap.get("Status");
280
281         if ("Success".equalsIgnoreCase(String.valueOf(status))) {
282             workflowResponse.setMessage("Success");
283         } else if ("Fail".equalsIgnoreCase(String.valueOf(status))) {
284             workflowResponse.setMessage("Fail");
285         } else {
286             logger.debug(LOGMARKER + "Unrecognized Status: " + responseCode);
287             workflowResponse.setMessage("Fail");
288         }
289     }
290
291     /**
292      * @version 1.0 Triggers the workflow in a separate thread
293      */
294     private class ProcessThread extends Thread {
295         private final Map<String, Object> inputVariables;
296         private final String processKey;
297         private final String businessKey;
298         private ProcessInstance processInstance = null;
299         private Exception exception = null;
300
301         public ProcessThread(Map<String, Object> inputVariables, String processKey) {
302             this.inputVariables = inputVariables;
303             this.processKey = processKey;
304             this.businessKey = UUID.randomUUID().toString();
305         }
306
307         /**
308          * If an exception occurs when starting the process instance, it may be obtained by calling this method. Note
309          * that exceptions are only recorded while the process is executing in its original thread. Once a process is
310          * suspended, exception recording stops.
311          * 
312          * @return the exception, or null if none has occurred
313          */
314         public Exception getException() {
315             return exception;
316         }
317
318
319         public ProcessInstance getProcessInstance() {
320             return this.processInstance;
321         }
322
323         /**
324          * Sets the process instance exception.
325          * 
326          * @param exception the exception
327          */
328         private void setException(Exception exception) {
329             this.exception = exception;
330         }
331
332         public void run() {
333             setLogContext(processKey, inputVariables);
334
335             long startTime = System.currentTimeMillis();
336
337             try {
338
339                 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
340
341                 // Note that this method doesn't return until the process suspends
342                 // itself or finishes. We provide a business key so we can identify
343                 // the process instance immediately.
344                 processInstance = runtimeService.startProcessInstanceByKey(processKey, inputVariables);
345
346             } catch (Exception e) {
347                 logger.debug(LOGMARKER + "ProcessThread caught an exception executing " + processKey + ": " + e);
348                 setException(e);
349             }
350         }
351
352     }
353
354     private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
355         VariableMap inputVariables = Variables.createVariables();
356         @SuppressWarnings("unchecked")
357         Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
358         for (String key : vMap.keySet()) { // variabe name vn
359             @SuppressWarnings("unchecked")
360             Map<String, Object> valueMap = (Map<String, Object>) vMap.get(key); // value, type
361             inputVariables.putValueTyped(key,
362                     Variables.objectValue(valueMap.get("value")).serializationDataFormat(SerializationDataFormats.JAVA) // tells
363                                                                                                                         // the
364                                                                                                                         // engine
365                                                                                                                         // to
366                                                                                                                         // use
367                                                                                                                         // java
368                                                                                                                         // serialization
369                                                                                                                         // for
370                                                                                                                         // persisting
371                                                                                                                         // the
372                                                                                                                         // value
373                             .create());
374         }
375         return inputVariables;
376     }
377
378     /**
379      * Attempts to get a response map from the specified process instance.
380      * 
381      * @return the response map, or null if it is unavailable
382      */
383     private Map<String, Object> getResponseMap(ProcessInstance processInstance, String processKey,
384             AtomicLong timeProcessEnded) {
385
386         String responseMapVariable = processKey + "ResponseMap";
387         String processInstanceId = processInstance.getId();
388
389         // Query the runtime service to see if a response map is ready.
390
391         /*
392          * RuntimeService runtimeService = getProcessEngineServices().getRuntimeService(); List<Execution> executions =
393          * runtimeService.createExecutionQuery() .processInstanceId(processInstanceId).list();
394          * 
395          * for (Execution execution : executions) {
396          * 
397          * @SuppressWarnings("unchecked") Map<String, Object> responseMap = (Map<String, Object>)
398          * getVariableFromExecution(runtimeService, execution.getId(), responseMapVariable);
399          * 
400          * if (responseMap != null) { msoLogger.debug(LOGMARKER + "Obtained " + responseMapVariable + " from process " +
401          * processInstanceId + " execution " + execution.getId()); return responseMap; } }
402          */
403         // Querying history seem to return consistent results compared to querying the runtime service
404
405         boolean alreadyEnded = timeProcessEnded.longValue() != 0;
406
407         if (alreadyEnded || isProcessEnded(processInstance.getId())) {
408             if (!alreadyEnded) {
409                 timeProcessEnded.set(System.currentTimeMillis());
410             }
411
412             // Query the history service to see if a response map exists.
413
414             HistoryService historyService = getProcessEngineServices().getHistoryService();
415             @SuppressWarnings("unchecked")
416             Map<String, Object> responseMap = (Map<String, Object>) getVariableFromHistory(historyService,
417                     processInstance.getId(), responseMapVariable);
418
419             if (responseMap != null) {
420                 logger.debug(LOGMARKER + "Obtained " + responseMapVariable + " from process " + processInstanceId
421                         + " history");
422                 return responseMap;
423             }
424
425             // Query the history service for old-style response variables.
426
427             String prefix = (String) getVariableFromHistory(historyService, processInstanceId, "prefix");
428
429             if (prefix != null) {
430
431                 // Check for 'WorkflowResponse' variable
432                 Object workflowResponseObject =
433                         getVariableFromHistory(historyService, processInstanceId, "WorkflowResponse");
434                 String workflowResponse =
435                         workflowResponseObject == null ? null : String.valueOf(workflowResponseObject);
436                 logger.debug(LOGMARKER + "WorkflowResponse: " + workflowResponse);
437
438                 if (workflowResponse != null) {
439                     Object responseCodeObject =
440                             getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
441                     String responseCode = responseCodeObject == null ? null : String.valueOf(responseCodeObject);
442                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
443                     responseMap = new HashMap<>();
444                     responseMap.put("WorkflowResponse", workflowResponse);
445                     responseMap.put("ResponseCode", responseCode);
446                     responseMap.put("Status", "Success");
447                     return responseMap;
448                 }
449
450
451                 // Check for 'WorkflowException' variable
452                 WorkflowException workflowException = null;
453                 String workflowExceptionText = null;
454
455                 Object workflowExceptionObject =
456                         getVariableFromHistory(historyService, processInstanceId, "WorkflowException");
457                 if (workflowExceptionObject != null) {
458                     if (workflowExceptionObject instanceof WorkflowException) {
459                         workflowException = (WorkflowException) workflowExceptionObject;
460                         workflowExceptionText = workflowException.toString();
461                         responseMap = new HashMap<>();
462                         responseMap.put("WorkflowException", workflowExceptionText);
463                         responseMap.put("ResponseCode", workflowException.getErrorCode());
464                         responseMap.put("Status", "Fail");
465                         return responseMap;
466                     } else if (workflowExceptionObject instanceof String) {
467                         Object object =
468                                 getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
469                         String responseCode = object == null ? null : String.valueOf(object);
470                         workflowExceptionText = (String) workflowExceptionObject;
471                         responseMap = new HashMap<>();
472                         responseMap.put("WorkflowException", workflowExceptionText);
473                         responseMap.put("ResponseCode", responseCode);
474                         responseMap.put("Status", "Fail");
475                         return responseMap;
476                     }
477
478                 }
479                 logger.debug(LOGMARKER + "WorkflowException: " + workflowExceptionText);
480
481                 // BEGIN LEGACY SUPPORT. TODO: REMOVE THIS CODE
482                 Object object = getVariableFromHistory(historyService, processInstanceId, processKey + "Response");
483                 String response = object == null ? null : String.valueOf(object);
484                 logger.debug(LOGMARKER + processKey + "Response: " + response);
485
486                 if (response != null) {
487                     object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
488                     String responseCode = object == null ? null : String.valueOf(object);
489                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
490                     responseMap = new HashMap<>();
491                     responseMap.put("Response", response);
492                     responseMap.put("ResponseCode", responseCode);
493                     responseMap.put("Status", "Success");
494                     return responseMap;
495                 }
496
497                 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ErrorResponse");
498                 String errorResponse = object == null ? null : String.valueOf(object);
499                 logger.debug(LOGMARKER + prefix + "ErrorResponse: " + errorResponse);
500
501                 if (errorResponse != null) {
502                     object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
503                     String responseCode = object == null ? null : String.valueOf(object);
504                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
505                     responseMap = new HashMap<>();
506                     responseMap.put("Response", errorResponse);
507                     responseMap.put("ResponseCode", responseCode);
508                     responseMap.put("Status", "Fail");
509                     return responseMap;
510                 }
511                 // END LEGACY SUPPORT. TODO: REMOVE THIS CODE
512             }
513         }
514         return null;
515     }
516
517     /**
518      * Gets a variable value from the specified execution.
519      * 
520      * @return the variable value, or null if the variable could not be obtained
521      */
522     private Object getVariableFromExecution(RuntimeService runtimeService, String executionId, String variableName) {
523         try {
524             return runtimeService.getVariable(executionId, variableName);
525         } catch (ProcessEngineException e) {
526             // Most likely cause is that the execution no longer exists.
527             logger.debug("Error retrieving execution " + executionId + " variable " + variableName + ": " + e);
528             return null;
529         }
530     }
531
532     /**
533      * Gets a variable value from specified historical process instance.
534      * 
535      * @return the variable value, or null if the variable could not be obtained
536      */
537     private Object getVariableFromHistory(HistoryService historyService, String processInstanceId,
538             String variableName) {
539         try {
540             HistoricVariableInstance v = historyService.createHistoricVariableInstanceQuery()
541                     .processInstanceId(processInstanceId).variableName(variableName).singleResult();
542             return v == null ? null : v.getValue();
543         } catch (Exception e) {
544             logger.debug("Error retrieving process {} variable {} from history: ", processInstanceId, variableName, e);
545             return null;
546         }
547     }
548
549     @POST
550     @Path("/services/{processKey}/{processInstanceId}")
551     @Produces("application/json")
552     @Consumes("application/json")
553     @ApiOperation(value = "Allows for retrieval of the variables for a given process", notes = "")
554     public WorkflowResponse getProcessVariables(@PathParam("processKey") String processKey,
555             @PathParam("processInstanceId") String processInstanceId) {
556         // TODO filter only set of variables
557         WorkflowResponse response = new WorkflowResponse();
558
559         long startTime = System.currentTimeMillis();
560         try {
561             ProcessEngineServices engine = getProcessEngineServices();
562             List<HistoricVariableInstance> variables = engine.getHistoryService().createHistoricVariableInstanceQuery()
563                     .processInstanceId(processInstanceId).list();
564             Map<String, String> variablesMap = new HashMap<>();
565             for (HistoricVariableInstance variableInstance : variables) {
566                 variablesMap.put(variableInstance.getName(), variableInstance.getValue().toString());
567             }
568
569             logger.debug(LOGMARKER + "***Received MSO getProcessVariables with processKey:" + processKey
570                     + " and variables: " + variablesMap.toString());
571
572             response.setVariables(variablesMap);
573             response.setMessage("Success");
574             response.setResponse("Successfully retrieved the variables");
575             response.setProcessInstanceID(processInstanceId);
576
577             logger.debug(LOGMARKER + response.getMessage() + " for processKey: " + processKey + " with response: "
578                     + response.getResponse());
579         } catch (Exception ex) {
580             response.setMessage("Fail");
581             response.setResponse("Failed to retrieve the variables," + ex.getMessage());
582             response.setProcessInstanceID(processInstanceId);
583
584             logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN",
585                     MDC.get(processKey), ErrorCode.UnknownError.getValue(), LOGMARKER + response.getMessage()
586                             + " for processKey: " + processKey + " with response: " + response.getResponse());
587             logger.debug("Exception :", ex);
588         }
589         return response;
590     }
591 }