bcc3739c3208b9eecc11e1897a6a460a7881bd1e
[so.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2017 Huawei Technologies Co., Ltd. All rights reserved.
7  * ================================================================================
8  * Modifications Copyright (c) 2019 Samsung
9  * ================================================================================
10  * Licensed under the Apache License, Version 2.0 (the "License");
11  * you may not use this file except in compliance with the License.
12  * You may obtain a copy of the License at
13  * 
14  *      http://www.apache.org/licenses/LICENSE-2.0
15  * 
16  * Unless required by applicable law or agreed to in writing, software
17  * distributed under the License is distributed on an "AS IS" BASIS,
18  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  * See the License for the specific language governing permissions and
20  * limitations under the License.
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.so.bpmn.common.workflow.service;
25
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.UUID;
30 import java.util.concurrent.atomic.AtomicLong;
31 import javax.ws.rs.Consumes;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.PathParam;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.core.Context;
37 import javax.ws.rs.core.Response;
38 import javax.ws.rs.core.UriInfo;
39 import org.onap.so.logger.LoggingAnchor;
40 import org.camunda.bpm.engine.HistoryService;
41 import org.camunda.bpm.engine.ProcessEngineException;
42 import org.camunda.bpm.engine.ProcessEngineServices;
43 import org.camunda.bpm.engine.RuntimeService;
44 import org.camunda.bpm.engine.history.HistoricVariableInstance;
45 import org.camunda.bpm.engine.runtime.ProcessInstance;
46 import org.camunda.bpm.engine.variable.VariableMap;
47 import org.camunda.bpm.engine.variable.Variables;
48 import org.camunda.bpm.engine.variable.Variables.SerializationDataFormats;
49 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
50 import org.onap.so.bpmn.common.workflow.context.WorkflowResponse;
51 import org.onap.so.bpmn.core.WorkflowException;
52 import org.onap.so.logger.ErrorCode;
53 import org.onap.so.logger.MessageEnum;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import org.slf4j.MDC;
57 import org.springframework.stereotype.Component;
58 import io.swagger.annotations.Api;
59 import io.swagger.annotations.ApiOperation;
60
61 @Path("/workflow")
62 @Api(value = "/workflow", description = "Root of workflow services")
63 @Component
64 public class WorkflowResource extends ProcessEngineAwareService {
65
66     private static final Logger logger = LoggerFactory.getLogger(WorkflowResource.class);
67     private static final String LOGMARKER = "[WRKFLOW-RESOURCE]";
68
69     private static final int DEFAULT_WAIT_TIME = 30000;
70
71     @Context
72     private UriInfo uriInfo = null;
73
74     /**
75      * Starts the process instance and responds to client synchronously If the request does not contain
76      * mso-service-request-timeout then it waits for the value specified in DEFAULT_WAIT_TIME Note: value specified in
77      * mso-service-request-timeout is in seconds During polling time, if there is an exception encountered in the
78      * process execution then polling is stopped and the error response is returned to the client
79      * 
80      * @param processKey
81      * @param variableMap
82      * @return
83      */
84     @POST
85     @Path("/services/{processKey}")
86     @ApiOperation(value = "Starts a new process with the appropriate process synchronously", notes = "d")
87     @Produces("application/json")
88     @Consumes("application/json")
89     public Response startProcessInstanceByKey(@PathParam("processKey") String processKey, VariableMapImpl variableMap) {
90
91         Map<String, Object> inputVariables = getInputVariables(variableMap);
92         setLogContext(processKey, inputVariables);
93
94         WorkflowResponse workflowResponse = new WorkflowResponse();
95         long startTime = System.currentTimeMillis();
96         ProcessInstance processInstance = null;
97
98         try {
99             // Kickoff the process
100             ProcessThread thread = new ProcessThread(inputVariables, processKey);
101             thread.start();
102
103             Map<String, Object> responseMap = null;
104
105             // wait for process to be completed
106             long waitTime = getWaitTime(inputVariables);
107             long now = System.currentTimeMillis();
108             long start = now;
109             long endTime = start + waitTime;
110             long pollingInterval = 500;
111
112             // TEMPORARY LOGIC FOR UNIT TEST REFACTORING
113             // If this is a unit test (method is invoked directly), wait a max
114             // of 5 seconds after process ended for a result. In production,
115             // wait up to 60 seconds.
116             long timeToWaitAfterProcessEnded = uriInfo == null ? 5000 : 60000;
117             AtomicLong timeProcessEnded = new AtomicLong(0);
118             boolean endedWithNoResponse = false;
119             logger.debug(LOGMARKER + "WorkflowResource.startProcessInstanceByKey using timeout: " + waitTime);
120             while (now <= endTime) {
121                 Thread.sleep(pollingInterval);
122
123                 now = System.currentTimeMillis();
124
125                 // Increase the polling interval over time
126
127                 long elapsed = now - start;
128
129                 if (elapsed > 60000) {
130                     pollingInterval = 5000;
131                 } else if (elapsed > 10000) {
132                     pollingInterval = 1000;
133                 }
134                 Exception exception = thread.getException();
135                 if (exception != null) {
136                     throw new Exception(exception);
137                 }
138
139                 processInstance = thread.getProcessInstance();
140
141                 if (processInstance == null) {
142                     logger.debug("{} process has not been created yet", LOGMARKER + processKey);
143                     continue;
144                 }
145
146                 String processInstanceId = processInstance.getId();
147                 workflowResponse.setProcessInstanceID(processInstanceId);
148
149                 responseMap = getResponseMap(processInstance, processKey, timeProcessEnded);
150
151                 if (responseMap == null) {
152                     logger.debug("{} has not produced a response yet", LOGMARKER + processKey);
153
154                     if (timeProcessEnded.longValue() != 0) {
155                         long elapsedSinceEnded = System.currentTimeMillis() - timeProcessEnded.longValue();
156
157                         if (elapsedSinceEnded > timeToWaitAfterProcessEnded) {
158                             endedWithNoResponse = true;
159                             break;
160                         }
161                     }
162                 } else {
163                     processResponseMap(workflowResponse, responseMap);
164                     recordEvents(processKey, workflowResponse, startTime);
165                     return Response.status(workflowResponse.getMessageCode()).entity(workflowResponse).build();
166                 }
167             }
168
169             // if we dont get response after waiting then send timeout response
170
171             String state;
172             String processInstanceId;
173
174             if (processInstance == null) {
175                 processInstanceId = "N/A";
176                 state = "NOT STARTED";
177             } else {
178                 processInstanceId = processInstance.getProcessInstanceId();
179                 state = isProcessEnded(processInstanceId) ? "ENDED" : "NOT ENDED";
180             }
181
182             workflowResponse.setMessage("Fail");
183             if (endedWithNoResponse) {
184                 workflowResponse.setResponse("Process ended without producing a response");
185             } else {
186                 workflowResponse.setResponse("Request timed out, process state: " + state);
187             }
188             workflowResponse.setProcessInstanceID(processInstanceId);
189             recordEvents(processKey, workflowResponse, startTime);
190             workflowResponse.setMessageCode(500);
191             return Response.status(500).entity(workflowResponse).build();
192         } catch (Exception ex) {
193             logger.debug(LOGMARKER + "Exception in startProcessInstance by key", ex);
194             workflowResponse.setMessage("Fail");
195             workflowResponse.setResponse("Error occurred while executing the process: " + ex.getMessage());
196             if (processInstance != null)
197                 workflowResponse.setProcessInstanceID(processInstance.getId());
198
199             logger.error(LoggingAnchor.FIVE, MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN",
200                     MDC.get(processKey), ErrorCode.UnknownError.getValue(), LOGMARKER + workflowResponse.getMessage()
201                             + " for processKey: " + processKey + " with response: " + workflowResponse.getResponse());
202
203             workflowResponse.setMessageCode(500);
204             recordEvents(processKey, workflowResponse, startTime);
205             return Response.status(500).entity(workflowResponse).build();
206         }
207     }
208
209     /**
210      * Returns the wait time, this is used by the resource on how long it should wait to send a response If none
211      * specified DEFAULT_WAIT_TIME is used
212      * 
213      * @param inputVariables
214      * @return
215      */
216     private int getWaitTime(Map<String, Object> inputVariables) {
217         String timeout = inputVariables.get("mso-service-request-timeout") == null ? null
218                 : inputVariables.get("mso-service-request-timeout").toString();
219
220         if (timeout != null) {
221             try {
222                 return Integer.parseInt(timeout) * 1000;
223             } catch (NumberFormatException nex) {
224                 logger.debug("Invalid input for mso-service-request-timeout");
225             }
226         }
227         return DEFAULT_WAIT_TIME;
228     }
229
230     private void recordEvents(String processKey, WorkflowResponse response, long startTime) {}
231
232     private void setLogContext(String processKey, Map<String, Object> inputVariables) {}
233
234     private String getValueFromInputVariables(Map<String, Object> inputVariables, String key) {
235         Object value = inputVariables.get(key);
236         if (value == null) {
237             return "N/A";
238         } else {
239             return value.toString();
240         }
241     }
242
243     /**
244      * Checks to see if the specified process is ended.
245      * 
246      * @param processInstanceId the process instance ID
247      * @return true if the process is ended
248      */
249     private boolean isProcessEnded(String processInstanceId) {
250         ProcessEngineServices pes = getProcessEngineServices();
251         try {
252             return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId)
253                     .singleResult() == null ? true : false;
254         } catch (Exception e) {
255             logger.debug("Exception :", e);
256             return true;
257         }
258     }
259
260     private void processResponseMap(WorkflowResponse workflowResponse, Map<String, Object> responseMap) {
261         Object object = responseMap.get("Response");
262         String response = object == null ? null : String.valueOf(object);
263         if (response == null) {
264             object = responseMap.get("WorkflowResponse");
265             response = object == null ? null : String.valueOf(object);
266         }
267
268         workflowResponse.setResponse(response);
269
270         object = responseMap.get("ResponseCode");
271         String responseCode = object == null ? null : String.valueOf(object);
272
273         try {
274             workflowResponse.setMessageCode(Integer.parseInt(responseCode));
275         } catch (NumberFormatException nex) {
276             logger.debug(LOGMARKER + "Failed to parse ResponseCode: " + responseCode);
277             workflowResponse.setMessageCode(-1);
278         }
279
280         Object status = responseMap.get("Status");
281
282         if ("Success".equalsIgnoreCase(String.valueOf(status))) {
283             workflowResponse.setMessage("Success");
284         } else if ("Fail".equalsIgnoreCase(String.valueOf(status))) {
285             workflowResponse.setMessage("Fail");
286         } else {
287             logger.debug(LOGMARKER + "Unrecognized Status: " + responseCode);
288             workflowResponse.setMessage("Fail");
289         }
290     }
291
292     /**
293      * @version 1.0 Triggers the workflow in a separate thread
294      */
295     private class ProcessThread extends Thread {
296         private final Map<String, Object> inputVariables;
297         private final String processKey;
298         private final String businessKey;
299         private ProcessInstance processInstance = null;
300         private Exception exception = null;
301
302         public ProcessThread(Map<String, Object> inputVariables, String processKey) {
303             this.inputVariables = inputVariables;
304             this.processKey = processKey;
305             this.businessKey = UUID.randomUUID().toString();
306         }
307
308         /**
309          * If an exception occurs when starting the process instance, it may be obtained by calling this method. Note
310          * that exceptions are only recorded while the process is executing in its original thread. Once a process is
311          * suspended, exception recording stops.
312          * 
313          * @return the exception, or null if none has occurred
314          */
315         public Exception getException() {
316             return exception;
317         }
318
319
320         public ProcessInstance getProcessInstance() {
321             return this.processInstance;
322         }
323
324         /**
325          * Sets the process instance exception.
326          * 
327          * @param exception the exception
328          */
329         private void setException(Exception exception) {
330             this.exception = exception;
331         }
332
333         public void run() {
334             setLogContext(processKey, inputVariables);
335
336             long startTime = System.currentTimeMillis();
337
338             try {
339
340                 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
341
342                 // Note that this method doesn't return until the process suspends
343                 // itself or finishes. We provide a business key so we can identify
344                 // the process instance immediately.
345                 processInstance = runtimeService.startProcessInstanceByKey(processKey, inputVariables);
346
347             } catch (Exception e) {
348                 logger.debug(LOGMARKER + "ProcessThread caught an exception executing " + processKey + ": " + e);
349                 setException(e);
350             }
351         }
352
353     }
354
355     private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
356         VariableMap inputVariables = Variables.createVariables();
357         @SuppressWarnings("unchecked")
358         Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
359         for (String key : vMap.keySet()) { // variabe name vn
360             @SuppressWarnings("unchecked")
361             Map<String, Object> valueMap = (Map<String, Object>) vMap.get(key); // value, type
362             inputVariables.putValueTyped(key,
363                     Variables.objectValue(valueMap.get("value")).serializationDataFormat(SerializationDataFormats.JAVA) // tells
364                                                                                                                         // the
365                                                                                                                         // engine
366                                                                                                                         // to
367                                                                                                                         // use
368                                                                                                                         // java
369                                                                                                                         // serialization
370                                                                                                                         // for
371                                                                                                                         // persisting
372                                                                                                                         // the
373                                                                                                                         // value
374                             .create());
375         }
376         return inputVariables;
377     }
378
379     /**
380      * Attempts to get a response map from the specified process instance.
381      * 
382      * @return the response map, or null if it is unavailable
383      */
384     private Map<String, Object> getResponseMap(ProcessInstance processInstance, String processKey,
385             AtomicLong timeProcessEnded) {
386
387         String responseMapVariable = processKey + "ResponseMap";
388         String processInstanceId = processInstance.getId();
389
390         // Query the runtime service to see if a response map is ready.
391
392         /*
393          * RuntimeService runtimeService = getProcessEngineServices().getRuntimeService(); List<Execution> executions =
394          * runtimeService.createExecutionQuery() .processInstanceId(processInstanceId).list();
395          * 
396          * for (Execution execution : executions) {
397          * 
398          * @SuppressWarnings("unchecked") Map<String, Object> responseMap = (Map<String, Object>)
399          * getVariableFromExecution(runtimeService, execution.getId(), responseMapVariable);
400          * 
401          * if (responseMap != null) { msoLogger.debug(LOGMARKER + "Obtained " + responseMapVariable + " from process " +
402          * processInstanceId + " execution " + execution.getId()); return responseMap; } }
403          */
404         // Querying history seem to return consistent results compared to querying the runtime service
405
406         boolean alreadyEnded = timeProcessEnded.longValue() != 0;
407
408         if (alreadyEnded || isProcessEnded(processInstance.getId())) {
409             if (!alreadyEnded) {
410                 timeProcessEnded.set(System.currentTimeMillis());
411             }
412
413             // Query the history service to see if a response map exists.
414
415             HistoryService historyService = getProcessEngineServices().getHistoryService();
416             @SuppressWarnings("unchecked")
417             Map<String, Object> responseMap = (Map<String, Object>) getVariableFromHistory(historyService,
418                     processInstance.getId(), responseMapVariable);
419
420             if (responseMap != null) {
421                 logger.debug(LOGMARKER + "Obtained " + responseMapVariable + " from process " + processInstanceId
422                         + " history");
423                 return responseMap;
424             }
425
426             // Query the history service for old-style response variables.
427
428             String prefix = (String) getVariableFromHistory(historyService, processInstanceId, "prefix");
429
430             if (prefix != null) {
431
432                 // Check for 'WorkflowResponse' variable
433                 Object workflowResponseObject =
434                         getVariableFromHistory(historyService, processInstanceId, "WorkflowResponse");
435                 String workflowResponse =
436                         workflowResponseObject == null ? null : String.valueOf(workflowResponseObject);
437                 logger.debug(LOGMARKER + "WorkflowResponse: " + workflowResponse);
438
439                 if (workflowResponse != null) {
440                     Object responseCodeObject =
441                             getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
442                     String responseCode = responseCodeObject == null ? null : String.valueOf(responseCodeObject);
443                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
444                     responseMap = new HashMap<>();
445                     responseMap.put("WorkflowResponse", workflowResponse);
446                     responseMap.put("ResponseCode", responseCode);
447                     responseMap.put("Status", "Success");
448                     return responseMap;
449                 }
450
451
452                 // Check for 'WorkflowException' variable
453                 WorkflowException workflowException = null;
454                 String workflowExceptionText = null;
455
456                 Object workflowExceptionObject =
457                         getVariableFromHistory(historyService, processInstanceId, "WorkflowException");
458                 if (workflowExceptionObject != null) {
459                     if (workflowExceptionObject instanceof WorkflowException) {
460                         workflowException = (WorkflowException) workflowExceptionObject;
461                         workflowExceptionText = workflowException.toString();
462                         responseMap = new HashMap<>();
463                         responseMap.put("WorkflowException", workflowExceptionText);
464                         responseMap.put("ResponseCode", workflowException.getErrorCode());
465                         responseMap.put("Status", "Fail");
466                         return responseMap;
467                     } else if (workflowExceptionObject instanceof String) {
468                         Object object =
469                                 getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
470                         String responseCode = object == null ? null : String.valueOf(object);
471                         workflowExceptionText = (String) workflowExceptionObject;
472                         responseMap = new HashMap<>();
473                         responseMap.put("WorkflowException", workflowExceptionText);
474                         responseMap.put("ResponseCode", responseCode);
475                         responseMap.put("Status", "Fail");
476                         return responseMap;
477                     }
478
479                 }
480                 logger.debug(LOGMARKER + "WorkflowException: " + workflowExceptionText);
481
482                 // BEGIN LEGACY SUPPORT. TODO: REMOVE THIS CODE
483                 Object object = getVariableFromHistory(historyService, processInstanceId, processKey + "Response");
484                 String response = object == null ? null : String.valueOf(object);
485                 logger.debug(LOGMARKER + processKey + "Response: " + response);
486
487                 if (response != null) {
488                     object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
489                     String responseCode = object == null ? null : String.valueOf(object);
490                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
491                     responseMap = new HashMap<>();
492                     responseMap.put("Response", response);
493                     responseMap.put("ResponseCode", responseCode);
494                     responseMap.put("Status", "Success");
495                     return responseMap;
496                 }
497
498                 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ErrorResponse");
499                 String errorResponse = object == null ? null : String.valueOf(object);
500                 logger.debug(LOGMARKER + prefix + "ErrorResponse: " + errorResponse);
501
502                 if (errorResponse != null) {
503                     object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
504                     String responseCode = object == null ? null : String.valueOf(object);
505                     logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
506                     responseMap = new HashMap<>();
507                     responseMap.put("Response", errorResponse);
508                     responseMap.put("ResponseCode", responseCode);
509                     responseMap.put("Status", "Fail");
510                     return responseMap;
511                 }
512                 // END LEGACY SUPPORT. TODO: REMOVE THIS CODE
513             }
514         }
515         return null;
516     }
517
518     /**
519      * Gets a variable value from the specified execution.
520      * 
521      * @return the variable value, or null if the variable could not be obtained
522      */
523     private Object getVariableFromExecution(RuntimeService runtimeService, String executionId, String variableName) {
524         try {
525             return runtimeService.getVariable(executionId, variableName);
526         } catch (ProcessEngineException e) {
527             // Most likely cause is that the execution no longer exists.
528             logger.debug("Error retrieving execution " + executionId + " variable " + variableName + ": " + e);
529             return null;
530         }
531     }
532
533     /**
534      * Gets a variable value from specified historical process instance.
535      * 
536      * @return the variable value, or null if the variable could not be obtained
537      */
538     private Object getVariableFromHistory(HistoryService historyService, String processInstanceId,
539             String variableName) {
540         try {
541             HistoricVariableInstance v = historyService.createHistoricVariableInstanceQuery()
542                     .processInstanceId(processInstanceId).variableName(variableName).singleResult();
543             return v == null ? null : v.getValue();
544         } catch (Exception e) {
545             logger.debug("Error retrieving process {} variable {} from history: ", processInstanceId, variableName, e);
546             return null;
547         }
548     }
549
550     @POST
551     @Path("/services/{processKey}/{processInstanceId}")
552     @Produces("application/json")
553     @Consumes("application/json")
554     @ApiOperation(value = "Allows for retrieval of the variables for a given process", notes = "")
555     public WorkflowResponse getProcessVariables(@PathParam("processKey") String processKey,
556             @PathParam("processInstanceId") String processInstanceId) {
557         // TODO filter only set of variables
558         WorkflowResponse response = new WorkflowResponse();
559
560         long startTime = System.currentTimeMillis();
561         try {
562             ProcessEngineServices engine = getProcessEngineServices();
563             List<HistoricVariableInstance> variables = engine.getHistoryService().createHistoricVariableInstanceQuery()
564                     .processInstanceId(processInstanceId).list();
565             Map<String, String> variablesMap = new HashMap<>();
566             for (HistoricVariableInstance variableInstance : variables) {
567                 variablesMap.put(variableInstance.getName(), variableInstance.getValue().toString());
568             }
569
570             logger.debug(LOGMARKER + "***Received MSO getProcessVariables with processKey:" + processKey
571                     + " and variables: " + variablesMap.toString());
572
573             response.setVariables(variablesMap);
574             response.setMessage("Success");
575             response.setResponse("Successfully retrieved the variables");
576             response.setProcessInstanceID(processInstanceId);
577
578             logger.debug(LOGMARKER + response.getMessage() + " for processKey: " + processKey + " with response: "
579                     + response.getResponse());
580         } catch (Exception ex) {
581             response.setMessage("Fail");
582             response.setResponse("Failed to retrieve the variables," + ex.getMessage());
583             response.setProcessInstanceID(processInstanceId);
584
585             logger.error(LoggingAnchor.FIVE, MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN",
586                     MDC.get(processKey), ErrorCode.UnknownError.getValue(), LOGMARKER + response.getMessage()
587                             + " for processKey: " + processKey + " with response: " + response.getResponse());
588             logger.debug("Exception :", ex);
589         }
590         return response;
591     }
592 }