731e9d8703a9c444f813355aaa8b3019d9438169
[so.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2017 Huawei Technologies Co., Ltd. All rights reserved.
7  * ================================================================================
8  * Modifications Copyright (c) 2019 Samsung
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  * 
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  * 
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.so.bpmn.common.workflow.service;
25
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.UUID;
30 import java.util.concurrent.atomic.AtomicLong;
31 import javax.ws.rs.Consumes;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.PathParam;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.core.Context;
37 import javax.ws.rs.core.Response;
38 import javax.ws.rs.core.UriInfo;
39 import org.onap.so.logger.LoggingAnchor;
40 import org.camunda.bpm.engine.HistoryService;
41 import org.camunda.bpm.engine.ProcessEngineException;
42 import org.camunda.bpm.engine.ProcessEngineServices;
43 import org.camunda.bpm.engine.RuntimeService;
44 import org.camunda.bpm.engine.history.HistoricVariableInstance;
45 import org.camunda.bpm.engine.runtime.ProcessInstance;
46 import org.camunda.bpm.engine.variable.VariableMap;
47 import org.camunda.bpm.engine.variable.Variables;
48 import org.camunda.bpm.engine.variable.Variables.SerializationDataFormats;
49 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
50 import org.onap.so.bpmn.common.workflow.context.WorkflowResponse;
51 import org.onap.so.bpmn.core.WorkflowException;
52 import org.onap.logging.filter.base.ErrorCode;
53 import org.onap.so.logger.MessageEnum;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import org.slf4j.MDC;
57 import org.springframework.stereotype.Component;
58 import io.swagger.annotations.Api;
59 import io.swagger.annotations.ApiOperation;
60
61 @Path("/workflow")
62 @Api(value = "/workflow", description = "Root of workflow services")
63 @Component
64 public class WorkflowResource extends ProcessEngineAwareService {
65
66     private static final Logger logger = LoggerFactory.getLogger(WorkflowResource.class);
67     private static final String LOGMARKER = "[WRKFLOW-RESOURCE]";
68
69     private static final int DEFAULT_WAIT_TIME = 30000;
70
71     @Context
72     private UriInfo uriInfo = null;
73
74     /**
75      * Starts the process instance and responds to client synchronously If the request does not contain
76      * mso-service-request-timeout then it waits for the value specified in DEFAULT_WAIT_TIME Note: value specified in
77      * mso-service-request-timeout is in seconds During polling time, if there is an exception encountered in the
78      * process execution then polling is stopped and the error response is returned to the client
79      * 
80      * @param processKey
81      * @param variableMap
82      * @return
83      */
84     @POST
85     @Path("/services/{processKey}")
86     @ApiOperation(value = "Starts a new process with the appropriate process synchronously", notes = "d")
87     @Produces("application/json")
88     @Consumes("application/json")
89     public Response startProcessInstanceByKey(@PathParam("processKey") String processKey, VariableMapImpl variableMap) {
90
91         Map<String, Object> inputVariables = getInputVariables(variableMap);
92         setLogContext(processKey, inputVariables);
93
94         WorkflowResponse workflowResponse = new WorkflowResponse();
95         long startTime = System.currentTimeMillis();
96         ProcessInstance processInstance = null;
97
98         try {
99             // Kickoff the process
100             ProcessThread thread = new ProcessThread(inputVariables, processKey);
101             thread.start();
102
103             Map<String, Object> responseMap = null;
104
105             // wait for process to be completed
106             long waitTime = getWaitTime(inputVariables);
107             long now = System.currentTimeMillis();
108             long start = now;
109             long endTime = start + waitTime;
110             long pollingInterval = 500;
111
112             // TEMPORARY LOGIC FOR UNIT TEST REFACTORING
113             // If this is a unit test (method is invoked directly), wait a max
114             // of 5 seconds after process ended for a result. In production,
115             // wait up to 60 seconds.
116             long timeToWaitAfterProcessEnded = uriInfo == null ? 5000 : 60000;
117             AtomicLong timeProcessEnded = new AtomicLong(0);
118             boolean endedWithNoResponse = false;
119             logger.debug(LOGMARKER + "WorkflowResource.startProcessInstanceByKey using timeout: " + waitTime);
120             while (now <= endTime) {
121                 Thread.sleep(pollingInterval);
122
123                 now = System.currentTimeMillis();
124
125                 // Increase the polling interval over time
126
127                 long elapsed = now - start;
128
129                 if (elapsed > 60000) {
130                     pollingInterval = 5000;
131                 } else if (elapsed > 10000) {
132                     pollingInterval = 1000;
133                 }
134                 Exception exception = thread.getException();
135                 if (exception != null) {
136                     throw new Exception(exception);
137                 }
138
139                 processInstance = thread.getProcessInstance();
140
141                 if (processInstance == null) {
142                     logger.debug("{} process has not been created yet", LOGMARKER + processKey);
143                     continue;
144                 }
145
146                 String processInstanceId = processInstance.getId();
147                 workflowResponse.setProcessInstanceID(processInstanceId);
148
149                 responseMap = getResponseMap(processInstance, processKey, timeProcessEnded);
150
151                 if (responseMap == null) {
152                     logger.debug("{} has not produced a response yet", LOGMARKER + processKey);
153
154                     if (timeProcessEnded.longValue() != 0) {
155                         long elapsedSinceEnded = System.currentTimeMillis() - timeProcessEnded.longValue();
156
157                         if (elapsedSinceEnded > timeToWaitAfterProcessEnded) {
158                             endedWithNoResponse = true;
159                             break;
160                         }
161                     }
162                 } else {
163                     processResponseMap(workflowResponse, responseMap);
164                     recordEvents(processKey, workflowResponse, startTime);
165                     return Response.status(workflowResponse.getMessageCode()).entity(workflowResponse).build();
166                 }
167             }
168
169             // if we dont get response after waiting then send timeout response
170
171             String state;
172             String processInstanceId;
173
174             if (processInstance == null) {
175                 processInstanceId = "N/A";
176                 state = "NOT STARTED";
177             } else {
178                 processInstanceId = processInstance.getProcessInstanceId();
179                 state = isProcessEnded(processInstanceId) ? "ENDED" : "NOT ENDED";
180             }
181
182             workflowResponse.setMessage("Fail");
183             if (endedWithNoResponse) {
184                 workflowResponse.setResponse("Process ended without producing a response");
185             } else {
186                 workflowResponse.setResponse("Request timed out, process state: " + state);
187             }
188             workflowResponse.setProcessInstanceID(processInstanceId);
189             recordEvents(processKey, workflowResponse, startTime);
190             workflowResponse.setMessageCode(500);
191             return Response.status(500).entity(workflowResponse).build();
192         } catch (Exception ex) {
193             logger.debug(LOGMARKER + "Exception in startProcessInstance by key", ex);
194             workflowResponse.setMessage("Fail");
195             workflowResponse.setResponse("Error occurred while executing the process: " + ex.getMessage());
196             if (processInstance != null)
197                 workflowResponse.setProcessInstanceID(processInstance.getId());
198
199             logger.error(LoggingAnchor.FIVE, MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN",
200                     MDC.get(processKey), ErrorCode.UnknownError.getValue(), LOGMARKER + workflowResponse.getMessage()
201                             + " for processKey: " + processKey + " with response: " + workflowResponse.getResponse());
202
203             workflowResponse.setMessageCode(500);
204             recordEvents(processKey, workflowResponse, startTime);
205             return Response.status(500).entity(workflowResponse).build();
206         }
207     }
208
209     /**
210      * Returns the wait time, this is used by the resource on how long it should wait to send a response If none
211      * specified DEFAULT_WAIT_TIME is used
212      * 
213      * @param inputVariables
214      * @return
215      */
216     private int getWaitTime(Map<String, Object> inputVariables) {
217         String timeout = inputVariables.get("mso-service-request-timeout") == null ? null
218                 : inputVariables.get("mso-service-request-timeout").toString();
219
220         if (timeout != null) {
221             try {
222                 return Integer.parseInt(timeout) * 1000;
223             } catch (NumberFormatException nex) {
224                 logger.debug("Invalid input for mso-service-request-timeout");
225             }
226         }
227         return DEFAULT_WAIT_TIME;
228     }
229
230     private void recordEvents(String processKey, WorkflowResponse response, long startTime) {}
231
232     private void setLogContext(String processKey, Map<String, Object> inputVariables) {}
233
234     private String getValueFromInputVariables(Map<String, Object> inputVariables, String key) {
235         Object value = inputVariables.get(key);
236         if (value == null) {
237             return "N/A";
238         } else {
239             return value.toString();
240         }
241     }
242
243     /**
244      * Checks to see if the specified process is ended.
245      * 
246      * @param processInstanceId the process instance ID
247      * @return true if the process is ended
248      */
249     private boolean isProcessEnded(String processInstanceId) {
250         ProcessEngineServices pes = getProcessEngineServices();
251         try {
252             return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId)
253                     .singleResult() == null ? true : false;
254         } catch (Exception e) {
255             logger.debug("Exception :", e);
256             return true;
257         }
258     }
259
260     private void processResponseMap(WorkflowResponse workflowResponse, Map<String, Object> responseMap) {
261         Object object = responseMap.get("Response");
262         String response = object == null ? null : String.valueOf(object);
263         if (response == null) {
264             object = responseMap.get("WorkflowResponse");
265             response = object == null ? null : String.valueOf(object);
266         }
267
268         workflowResponse.setResponse(response);
269
270         object = responseMap.get("ResponseCode");
271         String responseCode = object == null ? null : String.valueOf(object);
272
273         try {
274             workflowResponse.setMessageCode(Integer.parseInt(responseCode));
275         } catch (NumberFormatException nex) {
276             logger.debug(LOGMARKER + "Failed to parse ResponseCode: " + responseCode);
277             workflowResponse.setMessageCode(-1);
278         }
279
280         Object status = responseMap.get("Status");
281
282         if ("Success".equalsIgnoreCase(String.valueOf(status))) {
283             workflowResponse.setMessage("Success");
284         } else if ("Fail".equalsIgnoreCase(String.valueOf(status))) {
285             workflowResponse.setMessage("Fail");
286         } else {
287             logger.debug(LOGMARKER + "Unrecognized Status: " + responseCode);
288             workflowResponse.setMessage("Fail");
289         }
290     }
291
292     /**
293      * @version 1.0 Triggers the workflow in a separate thread
294      */
295     private class ProcessThread extends Thread {
296         private final Map<String, Object> inputVariables;
297         private final String processKey;
298         private final String businessKey;
299         private ProcessInstance processInstance = null;
300         private Exception exception = null;
301
302         public ProcessThread(Map<String, Object> inputVariables, String processKey) {
303             this.inputVariables = inputVariables;
304             this.processKey = processKey;
305             this.businessKey = UUID.randomUUID().toString();
306         }
307
308         /**
309          * If an exception occurs when starting the process instance, it may be obtained by calling this method. Note
310          * that exceptions are only recorded while the process is executing in its original thread. Once a process is
311          * suspended, exception recording stops.
312          * 
313          * @return the exception, or null if none has occurred
314          */
315         public Exception getException() {
316             return exception;
317         }
318
319
320         public ProcessInstance getProcessInstance() {
321             return this.processInstance;
322         }
323
324         /**
325          * Sets the process instance exception.
326          * 
327          * @param exception the exception
328          */
329         private void setException(Exception exception) {
330             this.exception = exception;
331         }
332
333         public void run() {
334             setLogContext(processKey, inputVariables);
335
336             try {
337
338                 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
339
340                 // Note that this method doesn't return until the process suspends
341                 // itself or finishes. We provide a business key so we can identify
342                 // the process instance immediately.
343                 processInstance = runtimeService.startProcessInstanceByKey(processKey, inputVariables);
344
345             } catch (Exception e) {
346                 logger.debug(LOGMARKER + "ProcessThread caught an exception executing " + processKey + ": " + e);
347                 setException(e);
348             }
349         }
350
351     }
352
353     private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
354         VariableMap inputVariables = Variables.createVariables();
355         @SuppressWarnings("unchecked")
356         Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
357         for (String key : vMap.keySet()) { // variabe name vn
358             @SuppressWarnings("unchecked")
359             Map<String, Object> valueMap = (Map<String, Object>) vMap.get(key); // value, type
360             inputVariables.putValueTyped(key,
361                     Variables.objectValue(valueMap.get("value")).serializationDataFormat(SerializationDataFormats.JAVA) // tells
362                                                                                                                         // the
363                                                                                                                         // engine
364                                                                                                                         // to
365                                                                                                                         // use
366                                                                                                                         // java
367                                                                                                                         // serialization
368                                                                                                                         // for
369                                                                                                                         // persisting
370                                                                                                                         // the
371                                                                                                                         // value
372                             .create());
373         }
374         return inputVariables;
375     }
376
377     /**
378      * Attempts to get a response map from the specified process instance.
379      * 
380      * @return the response map, or null if it is unavailable
381      */
382     private Map<String, Object> getResponseMap(ProcessInstance processInstance, String processKey,
383             AtomicLong timeProcessEnded) {
384
385         String responseMapVariable = processKey + "ResponseMap";
386         String processInstanceId = processInstance.getId();
387
388         // Query the runtime service to see if a response map is ready.
389
390         /*
391          * RuntimeService runtimeService = getProcessEngineServices().getRuntimeService(); List<Execution> executions =
392          * runtimeService.createExecutionQuery() .processInstanceId(processInstanceId).list();
393          * 
394          * for (Execution execution : executions) {
395          * 
396          * @SuppressWarnings("unchecked") Map<String, Object> responseMap = (Map<String, Object>)
397          * getVariableFromExecution(runtimeService, execution.getId(), responseMapVariable);
398          * 
399          * if (responseMap != null) { msoLogger.debug(LOGMARKER + "Obtained " + responseMapVariable + " from process " +
400          * processInstanceId + " execution " + execution.getId()); return responseMap; } }
401          */
402         // Querying history seem to return consistent results compared to querying the runtime service
403
404         boolean alreadyEnded = timeProcessEnded.longValue() != 0;
405
406         if (alreadyEnded || isProcessEnded(processInstance.getId())) {
407             if (!alreadyEnded) {
408                 timeProcessEnded.set(System.currentTimeMillis());
409             }
410
411             // Query the history service to see if a response map exists.
412
413             HistoryService historyService = getProcessEngineServices().getHistoryService();
414             @SuppressWarnings("unchecked")
415             Map<String, Object> responseMap = (Map<String, Object>) getVariableFromHistory(historyService,
416                     processInstance.getId(), responseMapVariable);
417
418             if (responseMap != null) {
419                 logger.debug(LOGMARKER + "Obtained " + responseMapVariable + " from process " + processInstanceId
420                         + " history");
421                 return responseMap;
422             }
423
424             // Query the history service for old-style response variables.
425
426             String prefix = (String) getVariableFromHistory(historyService, processInstanceId, "prefix");
427
428             if (prefix != null) {
429
430                 // Check for 'WorkflowResponse' variable
431                 Object workflowResponseObject =
432                         getVariableFromHistory(historyService, processInstanceId, "WorkflowResponse");
433                 String workflowResponse =
434                         workflowResponseObject == null ? null : String.valueOf(workflowResponseObject);
435                 logger.debug(LOGMARKER + "WorkflowResponse: " + workflowResponse);
436
437                 if (workflowResponse != null) {
438                     Object responseCodeObject =
439                             getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
440                     String responseCode = responseCodeObject == null ? null : String.valueOf(responseCodeObject);
441                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
442                     responseMap = new HashMap<>();
443                     responseMap.put("WorkflowResponse", workflowResponse);
444                     responseMap.put("ResponseCode", responseCode);
445                     responseMap.put("Status", "Success");
446                     return responseMap;
447                 }
448
449
450                 // Check for 'WorkflowException' variable
451                 WorkflowException workflowException = null;
452                 String workflowExceptionText = null;
453
454                 Object workflowExceptionObject =
455                         getVariableFromHistory(historyService, processInstanceId, "WorkflowException");
456                 if (workflowExceptionObject != null) {
457                     if (workflowExceptionObject instanceof WorkflowException) {
458                         workflowException = (WorkflowException) workflowExceptionObject;
459                         workflowExceptionText = workflowException.toString();
460                         responseMap = new HashMap<>();
461                         responseMap.put("WorkflowException", workflowExceptionText);
462                         responseMap.put("ResponseCode", workflowException.getErrorCode());
463                         responseMap.put("Status", "Fail");
464                         return responseMap;
465                     } else if (workflowExceptionObject instanceof String) {
466                         Object object =
467                                 getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
468                         String responseCode = object == null ? null : String.valueOf(object);
469                         workflowExceptionText = (String) workflowExceptionObject;
470                         responseMap = new HashMap<>();
471                         responseMap.put("WorkflowException", workflowExceptionText);
472                         responseMap.put("ResponseCode", responseCode);
473                         responseMap.put("Status", "Fail");
474                         return responseMap;
475                     }
476
477                 }
478                 logger.debug(LOGMARKER + "WorkflowException: " + workflowExceptionText);
479
480                 // BEGIN LEGACY SUPPORT. TODO: REMOVE THIS CODE
481                 Object object = getVariableFromHistory(historyService, processInstanceId, processKey + "Response");
482                 String response = object == null ? null : String.valueOf(object);
483                 logger.debug(LOGMARKER + processKey + "Response: " + response);
484
485                 if (response != null) {
486                     object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
487                     String responseCode = object == null ? null : String.valueOf(object);
488                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
489                     responseMap = new HashMap<>();
490                     responseMap.put("Response", response);
491                     responseMap.put("ResponseCode", responseCode);
492                     responseMap.put("Status", "Success");
493                     return responseMap;
494                 }
495
496                 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ErrorResponse");
497                 String errorResponse = object == null ? null : String.valueOf(object);
498                 logger.debug(LOGMARKER + prefix + "ErrorResponse: " + errorResponse);
499
500                 if (errorResponse != null) {
501                     object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
502                     String responseCode = object == null ? null : String.valueOf(object);
503                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
504                     responseMap = new HashMap<>();
505                     responseMap.put("Response", errorResponse);
506                     responseMap.put("ResponseCode", responseCode);
507                     responseMap.put("Status", "Fail");
508                     return responseMap;
509                 }
510                 // END LEGACY SUPPORT. TODO: REMOVE THIS CODE
511             }
512         }
513         return null;
514     }
515
516     /**
517      * Gets a variable value from the specified execution.
518      * 
519      * @return the variable value, or null if the variable could not be obtained
520      */
521     private Object getVariableFromExecution(RuntimeService runtimeService, String executionId, String variableName) {
522         try {
523             return runtimeService.getVariable(executionId, variableName);
524         } catch (ProcessEngineException e) {
525             // Most likely cause is that the execution no longer exists.
526             logger.debug("Error retrieving execution " + executionId + " variable " + variableName + ": " + e);
527             return null;
528         }
529     }
530
531     /**
532      * Gets a variable value from specified historical process instance.
533      * 
534      * @return the variable value, or null if the variable could not be obtained
535      */
536     private Object getVariableFromHistory(HistoryService historyService, String processInstanceId,
537             String variableName) {
538         try {
539             HistoricVariableInstance v = historyService.createHistoricVariableInstanceQuery()
540                     .processInstanceId(processInstanceId).variableName(variableName).singleResult();
541             return v == null ? null : v.getValue();
542         } catch (Exception e) {
543             logger.debug("Error retrieving process {} variable {} from history: ", processInstanceId, variableName, e);
544             return null;
545         }
546     }
547
548     @POST
549     @Path("/services/{processKey}/{processInstanceId}")
550     @Produces("application/json")
551     @Consumes("application/json")
552     @ApiOperation(value = "Allows for retrieval of the variables for a given process", notes = "")
553     public WorkflowResponse getProcessVariables(@PathParam("processKey") String processKey,
554             @PathParam("processInstanceId") String processInstanceId) {
555         // TODO filter only set of variables
556         WorkflowResponse response = new WorkflowResponse();
557
558         try {
559             ProcessEngineServices engine = getProcessEngineServices();
560             List<HistoricVariableInstance> variables = engine.getHistoryService().createHistoricVariableInstanceQuery()
561                     .processInstanceId(processInstanceId).list();
562             Map<String, String> variablesMap = new HashMap<>();
563             for (HistoricVariableInstance variableInstance : variables) {
564                 variablesMap.put(variableInstance.getName(), variableInstance.getValue().toString());
565             }
566
567             logger.debug(LOGMARKER + "***Received MSO getProcessVariables with processKey:" + processKey
568                     + " and variables: " + variablesMap.toString());
569
570             response.setVariables(variablesMap);
571             response.setMessage("Success");
572             response.setResponse("Successfully retrieved the variables");
573             response.setProcessInstanceID(processInstanceId);
574
575             logger.debug(LOGMARKER + response.getMessage() + " for processKey: " + processKey + " with response: "
576                     + response.getResponse());
577         } catch (Exception ex) {
578             response.setMessage("Fail");
579             response.setResponse("Failed to retrieve the variables," + ex.getMessage());
580             response.setProcessInstanceID(processInstanceId);
581
582             logger.error(LoggingAnchor.FIVE, MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN",
583                     MDC.get(processKey), ErrorCode.UnknownError.getValue(), LOGMARKER + response.getMessage()
584                             + " for processKey: " + processKey + " with response: " + response.getResponse());
585             logger.debug("Exception :", ex);
586         }
587         return response;
588     }
589 }