2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright (C) 2017 Huawei Technologies Co., Ltd. All rights reserved.
7 * ================================================================================
8 * Modifications Copyright (c) 2019 Samsung
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.so.bpmn.common.workflow.service;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.UUID;
30 import java.util.concurrent.atomic.AtomicLong;
32 import javax.ws.rs.Consumes;
33 import javax.ws.rs.POST;
34 import javax.ws.rs.Path;
35 import javax.ws.rs.PathParam;
36 import javax.ws.rs.Produces;
37 import javax.ws.rs.core.Context;
38 import javax.ws.rs.core.Response;
39 import javax.ws.rs.core.UriInfo;
41 import org.camunda.bpm.engine.HistoryService;
42 import org.camunda.bpm.engine.ProcessEngineException;
43 import org.camunda.bpm.engine.ProcessEngineServices;
44 import org.camunda.bpm.engine.RuntimeService;
45 import org.camunda.bpm.engine.history.HistoricVariableInstance;
46 import org.camunda.bpm.engine.runtime.ProcessInstance;
47 import org.camunda.bpm.engine.variable.VariableMap;
48 import org.camunda.bpm.engine.variable.Variables;
49 import org.camunda.bpm.engine.variable.Variables.SerializationDataFormats;
50 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
51 import org.onap.so.bpmn.common.workflow.context.WorkflowResponse;
52 import org.onap.so.bpmn.core.WorkflowException;
53 import org.onap.so.logger.ErrorCode;
54 import org.onap.so.logger.MessageEnum;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
58 import org.springframework.stereotype.Component;
60 import io.swagger.annotations.Api;
61 import io.swagger.annotations.ApiOperation;
64 @Api(value = "/workflow", description = "Root of workflow services")
66 public class WorkflowResource extends ProcessEngineAwareService {
68 private static final Logger logger = LoggerFactory.getLogger(WorkflowResource.class);
69 private static final String LOGMARKER = "[WRKFLOW-RESOURCE]";
71 private static final int DEFAULT_WAIT_TIME = 30000;
74 private UriInfo uriInfo = null;
77 * Starts the process instance and responds to client synchronously
78 * If the request does not contain mso-service-request-timeout then it waits for the value specified in DEFAULT_WAIT_TIME
79 * Note: value specified in mso-service-request-timeout is in seconds
80 * During polling time, if there is an exception encountered in the process execution then polling is stopped and the error response is
81 * returned to the client
87 @Path("/services/{processKey}")
89 value = "Starts a new process with the appropriate process synchronously",
92 @Produces("application/json")
93 @Consumes("application/json")
94 public Response startProcessInstanceByKey(@PathParam("processKey") String processKey,
95 VariableMapImpl variableMap) {
97 Map<String, Object> inputVariables = getInputVariables(variableMap);
98 setLogContext(processKey, inputVariables);
100 WorkflowResponse workflowResponse = new WorkflowResponse();
101 long startTime = System.currentTimeMillis();
102 ProcessInstance processInstance = null;
105 //Kickoff the process
106 ProcessThread thread = new ProcessThread(inputVariables,processKey);
109 Map<String, Object> responseMap = null;
111 //wait for process to be completed
112 long waitTime = getWaitTime(inputVariables);
113 long now = System.currentTimeMillis();
115 long endTime = start + waitTime;
116 long pollingInterval = 500;
118 // TEMPORARY LOGIC FOR UNIT TEST REFACTORING
119 // If this is a unit test (method is invoked directly), wait a max
120 // of 5 seconds after process ended for a result. In production,
121 // wait up to 60 seconds.
122 long timeToWaitAfterProcessEnded = uriInfo == null ? 5000 : 60000;
123 AtomicLong timeProcessEnded = new AtomicLong(0);
124 boolean endedWithNoResponse = false;
126 while (now <= endTime) {
127 Thread.sleep(pollingInterval);
129 now = System.currentTimeMillis();
131 // Increase the polling interval over time
133 long elapsed = now - start;
135 if (elapsed > 60000) {
136 pollingInterval = 5000;
137 } else if (elapsed > 10000) {
138 pollingInterval = 1000;
140 Exception exception = thread.getException();
141 if (exception != null) {
142 throw new Exception(exception);
145 processInstance = thread.getProcessInstance();
147 if (processInstance == null) {
148 logger.debug("{} process has not been created yet", LOGMARKER + processKey );
152 String processInstanceId = processInstance.getId();
153 workflowResponse.setProcessInstanceID(processInstanceId);
155 responseMap = getResponseMap(processInstance, processKey, timeProcessEnded);
157 if (responseMap == null) {
158 logger.debug("{} has not produced a response yet", LOGMARKER + processKey);
160 if (timeProcessEnded.longValue() != 0) {
161 long elapsedSinceEnded = System.currentTimeMillis() - timeProcessEnded.longValue();
163 if (elapsedSinceEnded > timeToWaitAfterProcessEnded) {
164 endedWithNoResponse = true;
169 processResponseMap(workflowResponse, responseMap);
170 recordEvents(processKey, workflowResponse, startTime);
171 return Response.status(workflowResponse.getMessageCode()).entity(workflowResponse).build();
175 //if we dont get response after waiting then send timeout response
178 String processInstanceId;
180 if (processInstance == null) {
181 processInstanceId = "N/A";
182 state = "NOT STARTED";
184 processInstanceId = processInstance.getProcessInstanceId();
185 state = isProcessEnded(processInstanceId) ? "ENDED" : "NOT ENDED";
188 workflowResponse.setMessage("Fail");
189 if (endedWithNoResponse) {
190 workflowResponse.setResponse("Process ended without producing a response");
192 workflowResponse.setResponse("Request timed out, process state: " + state);
194 workflowResponse.setProcessInstanceID(processInstanceId);
195 recordEvents(processKey, workflowResponse, startTime);
196 workflowResponse.setMessageCode(500);
197 return Response.status(500).entity(workflowResponse).build();
198 } catch (Exception ex) {
199 logger.debug(LOGMARKER + "Exception in startProcessInstance by key",ex);
200 workflowResponse.setMessage("Fail" );
201 workflowResponse.setResponse("Error occurred while executing the process: " + ex.getMessage());
202 if (processInstance != null) workflowResponse.setProcessInstanceID(processInstance.getId());
204 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN", MDC.get(processKey),
205 ErrorCode.UnknownError.getValue(),
206 LOGMARKER + workflowResponse.getMessage() + " for processKey: " + processKey + " with response: "
207 + workflowResponse.getResponse());
209 workflowResponse.setMessageCode(500);
210 recordEvents(processKey, workflowResponse, startTime);
211 return Response.status(500).entity(workflowResponse).build();
216 * Returns the wait time, this is used by the resource on how long it should wait to send a response
217 * If none specified DEFAULT_WAIT_TIME is used
218 * @param inputVariables
221 private int getWaitTime(Map<String, Object> inputVariables)
223 String timeout = inputVariables.get("mso-service-request-timeout") == null
224 ? null : inputVariables.get("mso-service-request-timeout").toString();
226 if (timeout != null) {
228 return Integer.parseInt(timeout)*1000;
229 } catch (NumberFormatException nex) {
230 logger.debug("Invalid input for mso-service-request-timeout");
233 return DEFAULT_WAIT_TIME;
236 private void recordEvents(String processKey, WorkflowResponse response, long startTime) {
239 private void setLogContext(String processKey, Map<String, Object> inputVariables) {
242 private String getValueFromInputVariables(Map<String,Object> inputVariables, String key) {
243 Object value = inputVariables.get(key);
247 return value.toString();
252 * Checks to see if the specified process is ended.
253 * @param processInstanceId the process instance ID
254 * @return true if the process is ended
256 private boolean isProcessEnded(String processInstanceId) {
257 ProcessEngineServices pes = getProcessEngineServices();
259 return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null ? true : false ;
260 } catch (Exception e) {
261 logger.debug("Exception :",e);
266 private void processResponseMap(WorkflowResponse workflowResponse, Map<String, Object> responseMap) {
267 Object object = responseMap.get("Response");
268 String response = object == null ? null : String.valueOf(object);
269 if(response == null){
270 object = responseMap.get("WorkflowResponse");
271 response = object == null ? null : String.valueOf(object);
274 workflowResponse.setResponse(response);
276 object = responseMap.get("ResponseCode");
277 String responseCode = object == null ? null : String.valueOf(object);
280 workflowResponse.setMessageCode(Integer.parseInt(responseCode));
281 } catch(NumberFormatException nex) {
282 logger.debug(LOGMARKER + "Failed to parse ResponseCode: " + responseCode);
283 workflowResponse.setMessageCode(-1);
286 Object status = responseMap.get("Status");
288 if ("Success".equalsIgnoreCase(String.valueOf(status))) {
289 workflowResponse.setMessage("Success");
290 } else if ("Fail".equalsIgnoreCase(String.valueOf(status))) {
291 workflowResponse.setMessage("Fail");
293 logger.debug(LOGMARKER + "Unrecognized Status: " + responseCode);
294 workflowResponse.setMessage("Fail");
300 * Triggers the workflow in a separate thread
302 private class ProcessThread extends Thread {
303 private final Map<String,Object> inputVariables;
304 private final String processKey;
305 private final String businessKey;
306 private ProcessInstance processInstance = null;
307 private Exception exception = null;
309 public ProcessThread(Map<String, Object> inputVariables, String processKey) {
310 this.inputVariables = inputVariables;
311 this.processKey = processKey;
312 this.businessKey = UUID.randomUUID().toString();
316 * If an exception occurs when starting the process instance, it may
317 * be obtained by calling this method. Note that exceptions are only
318 * recorded while the process is executing in its original thread.
319 * Once a process is suspended, exception recording stops.
320 * @return the exception, or null if none has occurred
322 public Exception getException() {
327 public ProcessInstance getProcessInstance() {
328 return this.processInstance;
332 * Sets the process instance exception.
333 * @param exception the exception
335 private void setException(Exception exception) {
336 this.exception = exception;
340 setLogContext(processKey, inputVariables);
342 long startTime = System.currentTimeMillis();
346 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
348 // Note that this method doesn't return until the process suspends
349 // itself or finishes. We provide a business key so we can identify
350 // the process instance immediately.
351 processInstance = runtimeService.startProcessInstanceByKey(
352 processKey, inputVariables);
354 } catch (Exception e) {
355 logger.debug(LOGMARKER + "ProcessThread caught an exception executing "
356 + processKey + ": " + e);
363 private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
364 VariableMap inputVariables = Variables.createVariables();
365 @SuppressWarnings("unchecked")
366 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
367 for (String key : vMap.keySet()) { //variabe name vn
368 @SuppressWarnings("unchecked")
369 Map<String, Object> valueMap = (Map<String,Object>)vMap.get(key); //value, type
370 inputVariables.putValueTyped(key, Variables
371 .objectValue(valueMap.get("value"))
372 .serializationDataFormat(SerializationDataFormats.JAVA) // tells the engine to use java serialization for persisting the value
375 return inputVariables;
379 * Attempts to get a response map from the specified process instance.
380 * @return the response map, or null if it is unavailable
382 private Map<String, Object> getResponseMap(ProcessInstance processInstance,
383 String processKey, AtomicLong timeProcessEnded) {
385 String responseMapVariable = processKey + "ResponseMap";
386 String processInstanceId = processInstance.getId();
388 // Query the runtime service to see if a response map is ready.
390 /* RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
391 List<Execution> executions = runtimeService.createExecutionQuery()
392 .processInstanceId(processInstanceId).list();
394 for (Execution execution : executions) {
395 @SuppressWarnings("unchecked")
396 Map<String, Object> responseMap = (Map<String, Object>)
397 getVariableFromExecution(runtimeService, execution.getId(),
398 responseMapVariable);
400 if (responseMap != null) {
401 msoLogger.debug(LOGMARKER + "Obtained " + responseMapVariable
402 + " from process " + processInstanceId + " execution "
403 + execution.getId());
408 //Querying history seem to return consistent results compared to querying the runtime service
410 boolean alreadyEnded = timeProcessEnded.longValue() != 0;
412 if (alreadyEnded || isProcessEnded(processInstance.getId())) {
414 timeProcessEnded.set(System.currentTimeMillis());
417 // Query the history service to see if a response map exists.
419 HistoryService historyService = getProcessEngineServices().getHistoryService();
420 @SuppressWarnings("unchecked")
421 Map<String, Object> responseMap = (Map<String, Object>)
422 getVariableFromHistory(historyService, processInstance.getId(),
423 responseMapVariable);
425 if (responseMap != null) {
426 logger.debug(LOGMARKER + "Obtained " + responseMapVariable
427 + " from process " + processInstanceId + " history");
431 // Query the history service for old-style response variables.
433 String prefix = (String) getVariableFromHistory(historyService, processInstanceId, "prefix");
435 if (prefix != null) {
437 // Check for 'WorkflowResponse' variable
438 Object workflowResponseObject = getVariableFromHistory(historyService, processInstanceId, "WorkflowResponse");
439 String workflowResponse = workflowResponseObject == null ? null : String.valueOf(workflowResponseObject);
440 logger.debug(LOGMARKER + "WorkflowResponse: " + workflowResponse);
442 if (workflowResponse != null) {
443 Object responseCodeObject = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
444 String responseCode = responseCodeObject == null ? null : String.valueOf(responseCodeObject);
445 logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
446 responseMap = new HashMap<>();
447 responseMap.put("WorkflowResponse", workflowResponse);
448 responseMap.put("ResponseCode", responseCode);
449 responseMap.put("Status", "Success");
454 // Check for 'WorkflowException' variable
455 WorkflowException workflowException = null;
456 String workflowExceptionText = null;
458 Object workflowExceptionObject = getVariableFromHistory(historyService, processInstanceId, "WorkflowException");
459 if(workflowExceptionObject != null) {
460 if(workflowExceptionObject instanceof WorkflowException) {
461 workflowException = (WorkflowException) workflowExceptionObject;
462 workflowExceptionText = workflowException.toString();
463 responseMap = new HashMap<>();
464 responseMap.put("WorkflowException", workflowExceptionText);
465 responseMap.put("ResponseCode", workflowException.getErrorCode());
466 responseMap.put("Status", "Fail");
469 else if (workflowExceptionObject instanceof String) {
470 Object object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
471 String responseCode = object == null ? null : String.valueOf(object);
472 workflowExceptionText = (String) workflowExceptionObject;
473 responseMap = new HashMap<>();
474 responseMap.put("WorkflowException", workflowExceptionText);
475 responseMap.put("ResponseCode", responseCode);
476 responseMap.put("Status", "Fail");
481 logger.debug(LOGMARKER + "WorkflowException: " + workflowExceptionText);
483 // BEGIN LEGACY SUPPORT. TODO: REMOVE THIS CODE
484 Object object = getVariableFromHistory(historyService, processInstanceId, processKey + "Response");
485 String response = object == null ? null : String.valueOf(object);
486 logger.debug(LOGMARKER + processKey + "Response: " + response);
488 if (response != null) {
489 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
490 String responseCode = object == null ? null : String.valueOf(object);
491 logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
492 responseMap = new HashMap<>();
493 responseMap.put("Response", response);
494 responseMap.put("ResponseCode", responseCode);
495 responseMap.put("Status", "Success");
499 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ErrorResponse");
500 String errorResponse = object == null ? null : String.valueOf(object);
501 logger.debug(LOGMARKER + prefix + "ErrorResponse: " + errorResponse);
503 if (errorResponse != null) {
504 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
505 String responseCode = object == null ? null : String.valueOf(object);
506 logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
507 responseMap = new HashMap<>();
508 responseMap.put("Response", errorResponse);
509 responseMap.put("ResponseCode", responseCode);
510 responseMap.put("Status", "Fail");
513 // END LEGACY SUPPORT. TODO: REMOVE THIS CODE
520 * Gets a variable value from the specified execution.
521 * @return the variable value, or null if the variable could not be
524 private Object getVariableFromExecution(RuntimeService runtimeService,
525 String executionId, String variableName) {
527 return runtimeService.getVariable(executionId, variableName);
528 } catch (ProcessEngineException e) {
529 // Most likely cause is that the execution no longer exists.
530 logger.debug("Error retrieving execution " + executionId
531 + " variable " + variableName + ": " + e);
536 * Gets a variable value from specified historical process instance.
537 * @return the variable value, or null if the variable could not be
540 private Object getVariableFromHistory(HistoryService historyService,
541 String processInstanceId, String variableName) {
543 HistoricVariableInstance v = historyService.createHistoricVariableInstanceQuery()
544 .processInstanceId(processInstanceId).variableName(variableName).singleResult();
545 return v == null ? null : v.getValue();
546 } catch (Exception e) {
547 logger.debug("Error retrieving process {} variable {} from history: ", processInstanceId,
554 @Path("/services/{processKey}/{processInstanceId}")
555 @Produces("application/json")
556 @Consumes("application/json")
558 value = "Allows for retrieval of the variables for a given process",
561 public WorkflowResponse getProcessVariables(@PathParam("processKey") String processKey, @PathParam("processInstanceId") String processInstanceId) {
562 //TODO filter only set of variables
563 WorkflowResponse response = new WorkflowResponse();
565 long startTime = System.currentTimeMillis();
567 ProcessEngineServices engine = getProcessEngineServices();
568 List<HistoricVariableInstance> variables = engine.getHistoryService().createHistoricVariableInstanceQuery().processInstanceId(processInstanceId).list();
569 Map<String,String> variablesMap = new HashMap<>();
570 for (HistoricVariableInstance variableInstance: variables) {
571 variablesMap.put(variableInstance.getName(), variableInstance.getValue().toString());
574 logger.debug(LOGMARKER + "***Received MSO getProcessVariables with processKey:" + processKey + " and variables: " +
575 variablesMap.toString());
577 response.setVariables(variablesMap);
578 response.setMessage("Success");
579 response.setResponse("Successfully retrieved the variables");
580 response.setProcessInstanceID(processInstanceId);
582 logger.debug(LOGMARKER + response.getMessage() + " for processKey: " + processKey + " with response: " + response
584 } catch (Exception ex) {
585 response.setMessage("Fail");
586 response.setResponse("Failed to retrieve the variables," + ex.getMessage());
587 response.setProcessInstanceID(processInstanceId);
589 logger.error("{} {} {} {} {}", MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN", MDC.get(processKey),
590 ErrorCode.UnknownError.getValue(),
591 LOGMARKER + response.getMessage() + " for processKey: " + processKey + " with response: " + response
593 logger.debug("Exception :",ex);