[SO] Release so 1.13.0 image
[so.git] / bpmn / mso-infrastructure-bpmn / src / main / java / org / onap / so / bpmn / common / workflow / service / WorkflowResource.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2017 Huawei Technologies Co., Ltd. All rights reserved.
7  * ================================================================================
8  * Modifications Copyright (c) 2019 Samsung
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  * 
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  * 
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.so.bpmn.common.workflow.service;
25
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.UUID;
30 import java.util.concurrent.atomic.AtomicLong;
31 import javax.ws.rs.Consumes;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.PathParam;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.core.Context;
37 import javax.ws.rs.core.Response;
38 import javax.ws.rs.core.UriInfo;
39 import org.onap.so.logger.LoggingAnchor;
40 import org.camunda.bpm.engine.HistoryService;
41 import org.camunda.bpm.engine.ProcessEngineException;
42 import org.camunda.bpm.engine.ProcessEngineServices;
43 import org.camunda.bpm.engine.RuntimeService;
44 import org.camunda.bpm.engine.history.HistoricVariableInstance;
45 import org.camunda.bpm.engine.runtime.ProcessInstance;
46 import org.camunda.bpm.engine.variable.VariableMap;
47 import org.camunda.bpm.engine.variable.Variables;
48 import org.camunda.bpm.engine.variable.Variables.SerializationDataFormats;
49 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
50 import org.onap.so.bpmn.common.workflow.context.WorkflowResponse;
51 import org.onap.so.bpmn.core.WorkflowException;
52 import org.onap.logging.filter.base.ErrorCode;
53 import org.onap.so.logger.MessageEnum;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import org.slf4j.MDC;
57 import org.springframework.stereotype.Component;
58 import io.swagger.v3.oas.annotations.OpenAPIDefinition;
59 import io.swagger.v3.oas.annotations.Operation;
60 import io.swagger.v3.oas.annotations.info.Info;
61
62 @Path("/workflow")
63 @OpenAPIDefinition(info = @Info(title = "/workflow", description = "Root of workflow services"))
64 @Component
65 public class WorkflowResource extends ProcessEngineAwareService {
66
67     private static final Logger logger = LoggerFactory.getLogger(WorkflowResource.class);
68     private static final String LOGMARKER = "[WRKFLOW-RESOURCE]";
69
70     private static final int DEFAULT_WAIT_TIME = 30000;
71
72     @Context
73     private UriInfo uriInfo = null;
74
75     /**
76      * Starts the process instance and responds to client synchronously If the request does not contain
77      * mso-service-request-timeout then it waits for the value specified in DEFAULT_WAIT_TIME Note: value specified in
78      * mso-service-request-timeout is in seconds During polling time, if there is an exception encountered in the
79      * process execution then polling is stopped and the error response is returned to the client
80      * 
81      * @param processKey
82      * @param variableMap
83      * @return
84      */
85     @POST
86     @Path("/services/{processKey}")
87     @Operation(description = "Starts a new process with the appropriate process synchronously")
88     @Produces("application/json")
89     @Consumes("application/json")
90     public Response startProcessInstanceByKey(@PathParam("processKey") String processKey, VariableMapImpl variableMap) {
91
92         Map<String, Object> inputVariables = getInputVariables(variableMap);
93         setLogContext(processKey, inputVariables);
94
95         WorkflowResponse workflowResponse = new WorkflowResponse();
96         long startTime = System.currentTimeMillis();
97         ProcessInstance processInstance = null;
98
99         try {
100             // Kickoff the process
101             ProcessThread thread = new ProcessThread(inputVariables, processKey);
102             thread.start();
103
104             Map<String, Object> responseMap = null;
105
106             // wait for process to be completed
107             long waitTime = getWaitTime(inputVariables);
108             long now = System.currentTimeMillis();
109             long start = now;
110             long endTime = start + waitTime;
111             long pollingInterval = 500;
112
113             // TEMPORARY LOGIC FOR UNIT TEST REFACTORING
114             // If this is a unit test (method is invoked directly), wait a max
115             // of 5 seconds after process ended for a result. In production,
116             // wait up to 60 seconds.
117             long timeToWaitAfterProcessEnded = uriInfo == null ? 5000 : 60000;
118             AtomicLong timeProcessEnded = new AtomicLong(0);
119             boolean endedWithNoResponse = false;
120             logger.debug(LOGMARKER + "WorkflowResource.startProcessInstanceByKey using timeout: " + waitTime);
121             while (now <= endTime) {
122                 Thread.sleep(pollingInterval);
123
124                 now = System.currentTimeMillis();
125
126                 // Increase the polling interval over time
127
128                 long elapsed = now - start;
129
130                 if (elapsed > 60000) {
131                     pollingInterval = 5000;
132                 } else if (elapsed > 10000) {
133                     pollingInterval = 1000;
134                 }
135                 Exception exception = thread.getException();
136                 if (exception != null) {
137                     throw new Exception(exception);
138                 }
139
140                 processInstance = thread.getProcessInstance();
141
142                 if (processInstance == null) {
143                     logger.debug("{} process has not been created yet", LOGMARKER + processKey);
144                     continue;
145                 }
146
147                 String processInstanceId = processInstance.getId();
148                 workflowResponse.setProcessInstanceID(processInstanceId);
149
150                 responseMap = getResponseMap(processInstance, processKey, timeProcessEnded);
151
152                 if (responseMap == null) {
153                     logger.debug("{} has not produced a response yet", LOGMARKER + processKey);
154
155                     if (timeProcessEnded.longValue() != 0) {
156                         long elapsedSinceEnded = System.currentTimeMillis() - timeProcessEnded.longValue();
157
158                         if (elapsedSinceEnded > timeToWaitAfterProcessEnded) {
159                             endedWithNoResponse = true;
160                             break;
161                         }
162                     }
163                 } else {
164                     processResponseMap(workflowResponse, responseMap);
165                     recordEvents(processKey, workflowResponse, startTime);
166                     return Response.status(workflowResponse.getMessageCode()).entity(workflowResponse).build();
167                 }
168             }
169
170             // if we dont get response after waiting then send timeout response
171
172             String state;
173             String processInstanceId;
174
175             if (processInstance == null) {
176                 processInstanceId = "N/A";
177                 state = "NOT STARTED";
178             } else {
179                 processInstanceId = processInstance.getProcessInstanceId();
180                 state = isProcessEnded(processInstanceId) ? "ENDED" : "NOT ENDED";
181             }
182
183             workflowResponse.setMessage("Fail");
184             if (endedWithNoResponse) {
185                 workflowResponse.setResponse("Process ended without producing a response");
186             } else {
187                 workflowResponse.setResponse("Request timed out, process state: " + state);
188             }
189             workflowResponse.setProcessInstanceID(processInstanceId);
190             recordEvents(processKey, workflowResponse, startTime);
191             workflowResponse.setMessageCode(500);
192             return Response.status(500).entity(workflowResponse).build();
193         } catch (Exception ex) {
194             logger.debug(LOGMARKER + "Exception in startProcessInstance by key", ex);
195             workflowResponse.setMessage("Fail");
196             workflowResponse.setResponse("Error occurred while executing the process: " + ex.getMessage());
197             if (processInstance != null)
198                 workflowResponse.setProcessInstanceID(processInstance.getId());
199
200             logger.error(LoggingAnchor.FIVE, MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN",
201                     MDC.get(processKey), ErrorCode.UnknownError.getValue(), LOGMARKER + workflowResponse.getMessage()
202                             + " for processKey: " + processKey + " with response: " + workflowResponse.getResponse());
203
204             workflowResponse.setMessageCode(500);
205             recordEvents(processKey, workflowResponse, startTime);
206             return Response.status(500).entity(workflowResponse).build();
207         }
208     }
209
210     /**
211      * Returns the wait time, this is used by the resource on how long it should wait to send a response If none
212      * specified DEFAULT_WAIT_TIME is used
213      * 
214      * @param inputVariables
215      * @return
216      */
217     private int getWaitTime(Map<String, Object> inputVariables) {
218         String timeout = inputVariables.get("mso-service-request-timeout") == null ? null
219                 : inputVariables.get("mso-service-request-timeout").toString();
220
221         if (timeout != null) {
222             try {
223                 return Integer.parseInt(timeout) * 1000;
224             } catch (NumberFormatException nex) {
225                 logger.debug("Invalid input for mso-service-request-timeout");
226             }
227         }
228         return DEFAULT_WAIT_TIME;
229     }
230
231     private void recordEvents(String processKey, WorkflowResponse response, long startTime) {}
232
233     private void setLogContext(String processKey, Map<String, Object> inputVariables) {}
234
235     private String getValueFromInputVariables(Map<String, Object> inputVariables, String key) {
236         Object value = inputVariables.get(key);
237         if (value == null) {
238             return "N/A";
239         } else {
240             return value.toString();
241         }
242     }
243
244     /**
245      * Checks to see if the specified process is ended.
246      * 
247      * @param processInstanceId the process instance ID
248      * @return true if the process is ended
249      */
250     private boolean isProcessEnded(String processInstanceId) {
251         ProcessEngineServices pes = getProcessEngineServices();
252         try {
253             return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId)
254                     .singleResult() == null ? true : false;
255         } catch (Exception e) {
256             logger.debug("Exception :", e);
257             return true;
258         }
259     }
260
261     private void processResponseMap(WorkflowResponse workflowResponse, Map<String, Object> responseMap) {
262         Object object = responseMap.get("Response");
263         String response = object == null ? null : String.valueOf(object);
264         if (response == null) {
265             object = responseMap.get("WorkflowResponse");
266             response = object == null ? null : String.valueOf(object);
267         }
268
269         workflowResponse.setResponse(response);
270
271         object = responseMap.get("ResponseCode");
272         String responseCode = object == null ? null : String.valueOf(object);
273
274         try {
275             workflowResponse.setMessageCode(Integer.parseInt(responseCode));
276         } catch (NumberFormatException nex) {
277             logger.debug(LOGMARKER + "Failed to parse ResponseCode: " + responseCode);
278             workflowResponse.setMessageCode(-1);
279         }
280
281         Object status = responseMap.get("Status");
282
283         if ("Success".equalsIgnoreCase(String.valueOf(status))) {
284             workflowResponse.setMessage("Success");
285         } else if ("Fail".equalsIgnoreCase(String.valueOf(status))) {
286             workflowResponse.setMessage("Fail");
287         } else {
288             logger.debug(LOGMARKER + "Unrecognized Status: " + responseCode);
289             workflowResponse.setMessage("Fail");
290         }
291     }
292
293     /**
294      * @version 1.0 Triggers the workflow in a separate thread
295      */
296     private class ProcessThread extends Thread {
297         private final Map<String, Object> inputVariables;
298         private final String processKey;
299         private final String businessKey;
300         private ProcessInstance processInstance = null;
301         private Exception exception = null;
302
303         public ProcessThread(Map<String, Object> inputVariables, String processKey) {
304             this.inputVariables = inputVariables;
305             this.processKey = processKey;
306             this.businessKey = UUID.randomUUID().toString();
307         }
308
309         /**
310          * If an exception occurs when starting the process instance, it may be obtained by calling this method. Note
311          * that exceptions are only recorded while the process is executing in its original thread. Once a process is
312          * suspended, exception recording stops.
313          * 
314          * @return the exception, or null if none has occurred
315          */
316         public Exception getException() {
317             return exception;
318         }
319
320
321         public ProcessInstance getProcessInstance() {
322             return this.processInstance;
323         }
324
325         /**
326          * Sets the process instance exception.
327          * 
328          * @param exception the exception
329          */
330         private void setException(Exception exception) {
331             this.exception = exception;
332         }
333
334         public void run() {
335             setLogContext(processKey, inputVariables);
336
337             try {
338
339                 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
340
341                 // Note that this method doesn't return until the process suspends
342                 // itself or finishes. We provide a business key so we can identify
343                 // the process instance immediately.
344                 processInstance = runtimeService.startProcessInstanceByKey(processKey, inputVariables);
345
346             } catch (Exception e) {
347                 logger.debug(LOGMARKER + "ProcessThread caught an exception executing " + processKey + ": " + e);
348                 setException(e);
349             }
350         }
351
352     }
353
354     private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
355         VariableMap inputVariables = Variables.createVariables();
356         @SuppressWarnings("unchecked")
357         Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
358         for (String key : vMap.keySet()) { // variabe name vn
359             @SuppressWarnings("unchecked")
360             Map<String, Object> valueMap = (Map<String, Object>) vMap.get(key); // value, type
361             inputVariables.putValueTyped(key,
362                     Variables.objectValue(valueMap.get("value")).serializationDataFormat(SerializationDataFormats.JAVA) // tells
363                                                                                                                         // the
364                                                                                                                         // engine
365                                                                                                                         // to
366                                                                                                                         // use
367                                                                                                                         // java
368                                                                                                                         // serialization
369                                                                                                                         // for
370                                                                                                                         // persisting
371                                                                                                                         // the
372                                                                                                                         // value
373                             .create());
374         }
375         return inputVariables;
376     }
377
378     /**
379      * Attempts to get a response map from the specified process instance.
380      * 
381      * @return the response map, or null if it is unavailable
382      */
383     private Map<String, Object> getResponseMap(ProcessInstance processInstance, String processKey,
384             AtomicLong timeProcessEnded) {
385
386         String responseMapVariable = processKey + "ResponseMap";
387         String processInstanceId = processInstance.getId();
388
389         // Query the runtime service to see if a response map is ready.
390
391         /*
392          * RuntimeService runtimeService = getProcessEngineServices().getRuntimeService(); List<Execution> executions =
393          * runtimeService.createExecutionQuery() .processInstanceId(processInstanceId).list();
394          * 
395          * for (Execution execution : executions) {
396          * 
397          * @SuppressWarnings("unchecked") Map<String, Object> responseMap = (Map<String, Object>)
398          * getVariableFromExecution(runtimeService, execution.getId(), responseMapVariable);
399          * 
400          * if (responseMap != null) { msoLogger.debug(LOGMARKER + "Obtained " + responseMapVariable + " from process " +
401          * processInstanceId + " execution " + execution.getId()); return responseMap; } }
402          */
403         // Querying history seem to return consistent results compared to querying the runtime service
404
405         boolean alreadyEnded = timeProcessEnded.longValue() != 0;
406
407         if (alreadyEnded || isProcessEnded(processInstance.getId())) {
408             if (!alreadyEnded) {
409                 timeProcessEnded.set(System.currentTimeMillis());
410             }
411
412             // Query the history service to see if a response map exists.
413
414             HistoryService historyService = getProcessEngineServices().getHistoryService();
415             @SuppressWarnings("unchecked")
416             Map<String, Object> responseMap = (Map<String, Object>) getVariableFromHistory(historyService,
417                     processInstance.getId(), responseMapVariable);
418
419             if (responseMap != null) {
420                 logger.debug(LOGMARKER + "Obtained " + responseMapVariable + " from process " + processInstanceId
421                         + " history");
422                 return responseMap;
423             }
424
425             // Query the history service for old-style response variables.
426
427             String prefix = (String) getVariableFromHistory(historyService, processInstanceId, "prefix");
428
429             if (prefix != null) {
430
431                 // Check for 'WorkflowResponse' variable
432                 Object workflowResponseObject =
433                         getVariableFromHistory(historyService, processInstanceId, "WorkflowResponse");
434                 String workflowResponse =
435                         workflowResponseObject == null ? null : String.valueOf(workflowResponseObject);
436                 logger.debug(LOGMARKER + "WorkflowResponse: " + workflowResponse);
437
438                 if (workflowResponse != null) {
439                     Object responseCodeObject =
440                             getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
441                     String responseCode = responseCodeObject == null ? null : String.valueOf(responseCodeObject);
442                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
443                     responseMap = new HashMap<>();
444                     responseMap.put("WorkflowResponse", workflowResponse);
445                     responseMap.put("ResponseCode", responseCode);
446                     responseMap.put("Status", "Success");
447                     return responseMap;
448                 }
449
450
451                 // Check for 'WorkflowException' variable
452                 WorkflowException workflowException = null;
453                 String workflowExceptionText = null;
454
455                 Object workflowExceptionObject =
456                         getVariableFromHistory(historyService, processInstanceId, "WorkflowException");
457                 if (workflowExceptionObject != null) {
458                     if (workflowExceptionObject instanceof WorkflowException) {
459                         workflowException = (WorkflowException) workflowExceptionObject;
460                         workflowExceptionText = workflowException.toString();
461                         responseMap = new HashMap<>();
462                         responseMap.put("WorkflowException", workflowExceptionText);
463                         responseMap.put("ResponseCode", workflowException.getErrorCode());
464                         responseMap.put("Status", "Fail");
465                         return responseMap;
466                     } else if (workflowExceptionObject instanceof String) {
467                         Object object =
468                                 getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
469                         String responseCode = object == null ? null : String.valueOf(object);
470                         workflowExceptionText = (String) workflowExceptionObject;
471                         responseMap = new HashMap<>();
472                         responseMap.put("WorkflowException", workflowExceptionText);
473                         responseMap.put("ResponseCode", responseCode);
474                         responseMap.put("Status", "Fail");
475                         return responseMap;
476                     }
477
478                 }
479                 logger.debug(LOGMARKER + "WorkflowException: " + workflowExceptionText);
480
481                 // BEGIN LEGACY SUPPORT. TODO: REMOVE THIS CODE
482                 Object object = getVariableFromHistory(historyService, processInstanceId, processKey + "Response");
483                 String response = object == null ? null : String.valueOf(object);
484                 logger.debug(LOGMARKER + processKey + "Response: " + response);
485
486                 if (response != null) {
487                     object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
488                     String responseCode = object == null ? null : String.valueOf(object);
489                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
490                     responseMap = new HashMap<>();
491                     responseMap.put("Response", response);
492                     responseMap.put("ResponseCode", responseCode);
493                     responseMap.put("Status", "Success");
494                     return responseMap;
495                 }
496
497                 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ErrorResponse");
498                 String errorResponse = object == null ? null : String.valueOf(object);
499                 logger.debug(LOGMARKER + prefix + "ErrorResponse: " + errorResponse);
500
501                 if (errorResponse != null) {
502                     object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
503                     String responseCode = object == null ? null : String.valueOf(object);
504                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
505                     responseMap = new HashMap<>();
506                     responseMap.put("Response", errorResponse);
507                     responseMap.put("ResponseCode", responseCode);
508                     responseMap.put("Status", "Fail");
509                     return responseMap;
510                 }
511                 // END LEGACY SUPPORT. TODO: REMOVE THIS CODE
512             }
513         }
514         return null;
515     }
516
517     /**
518      * Gets a variable value from the specified execution.
519      * 
520      * @return the variable value, or null if the variable could not be obtained
521      */
522     private Object getVariableFromExecution(RuntimeService runtimeService, String executionId, String variableName) {
523         try {
524             return runtimeService.getVariable(executionId, variableName);
525         } catch (ProcessEngineException e) {
526             // Most likely cause is that the execution no longer exists.
527             logger.debug("Error retrieving execution " + executionId + " variable " + variableName + ": " + e);
528             return null;
529         }
530     }
531
532     /**
533      * Gets a variable value from specified historical process instance.
534      * 
535      * @return the variable value, or null if the variable could not be obtained
536      */
537     private Object getVariableFromHistory(HistoryService historyService, String processInstanceId,
538             String variableName) {
539         try {
540             HistoricVariableInstance v = historyService.createHistoricVariableInstanceQuery()
541                     .processInstanceId(processInstanceId).variableName(variableName).singleResult();
542             return v == null ? null : v.getValue();
543         } catch (Exception e) {
544             logger.debug("Error retrieving process {} variable {} from history: ", processInstanceId, variableName, e);
545             return null;
546         }
547     }
548
549     @POST
550     @Path("/services/{processKey}/{processInstanceId}")
551     @Produces("application/json")
552     @Consumes("application/json")
553     @Operation(description = "Allows for retrieval of the variables for a given process")
554     public WorkflowResponse getProcessVariables(@PathParam("processKey") String processKey,
555             @PathParam("processInstanceId") String processInstanceId) {
556         // TODO filter only set of variables
557         WorkflowResponse response = new WorkflowResponse();
558
559         try {
560             ProcessEngineServices engine = getProcessEngineServices();
561             List<HistoricVariableInstance> variables = engine.getHistoryService().createHistoricVariableInstanceQuery()
562                     .processInstanceId(processInstanceId).list();
563             Map<String, String> variablesMap = new HashMap<>();
564             for (HistoricVariableInstance variableInstance : variables) {
565                 variablesMap.put(variableInstance.getName(), variableInstance.getValue().toString());
566             }
567
568             logger.debug(LOGMARKER + "***Received MSO getProcessVariables with processKey:" + processKey
569                     + " and variables: " + variablesMap.toString());
570
571             response.setVariables(variablesMap);
572             response.setMessage("Success");
573             response.setResponse("Successfully retrieved the variables");
574             response.setProcessInstanceID(processInstanceId);
575
576             logger.debug(LOGMARKER + response.getMessage() + " for processKey: " + processKey + " with response: "
577                     + response.getResponse());
578         } catch (Exception ex) {
579             response.setMessage("Fail");
580             response.setResponse("Failed to retrieve the variables," + ex.getMessage());
581             response.setProcessInstanceID(processInstanceId);
582
583             logger.error(LoggingAnchor.FIVE, MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN",
584                     MDC.get(processKey), ErrorCode.UnknownError.getValue(), LOGMARKER + response.getMessage()
585                             + " for processKey: " + processKey + " with response: " + response.getResponse());
586             logger.debug("Exception :", ex);
587         }
588         return response;
589     }
590 }