2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright (C) 2017 Huawei Technologies Co., Ltd. All rights reserved.
7 * ================================================================================
8 * Modifications Copyright (c) 2019 Samsung
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.so.bpmn.common.workflow.service;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.UUID;
30 import java.util.concurrent.atomic.AtomicLong;
31 import javax.ws.rs.Consumes;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.PathParam;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.core.Context;
37 import javax.ws.rs.core.Response;
38 import javax.ws.rs.core.UriInfo;
39 import org.onap.so.logger.LoggingAnchor;
40 import org.camunda.bpm.engine.HistoryService;
41 import org.camunda.bpm.engine.ProcessEngineException;
42 import org.camunda.bpm.engine.ProcessEngineServices;
43 import org.camunda.bpm.engine.RuntimeService;
44 import org.camunda.bpm.engine.history.HistoricVariableInstance;
45 import org.camunda.bpm.engine.runtime.ProcessInstance;
46 import org.camunda.bpm.engine.variable.VariableMap;
47 import org.camunda.bpm.engine.variable.Variables;
48 import org.camunda.bpm.engine.variable.Variables.SerializationDataFormats;
49 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
50 import org.onap.so.bpmn.common.workflow.context.WorkflowResponse;
51 import org.onap.so.bpmn.core.WorkflowException;
52 import org.onap.logging.filter.base.ErrorCode;
53 import org.onap.so.logger.MessageEnum;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 import org.springframework.stereotype.Component;
58 import io.swagger.v3.oas.annotations.OpenAPIDefinition;
59 import io.swagger.v3.oas.annotations.Operation;
60 import io.swagger.v3.oas.annotations.info.Info;
63 @OpenAPIDefinition(info = @Info(title = "/workflow", description = "Root of workflow services"))
65 public class WorkflowResource extends ProcessEngineAwareService {
67 private static final Logger logger = LoggerFactory.getLogger(WorkflowResource.class);
68 private static final String LOGMARKER = "[WRKFLOW-RESOURCE]";
70 private static final int DEFAULT_WAIT_TIME = 30000;
73 private UriInfo uriInfo = null;
76 * Starts the process instance and responds to client synchronously If the request does not contain
77 * mso-service-request-timeout then it waits for the value specified in DEFAULT_WAIT_TIME Note: value specified in
78 * mso-service-request-timeout is in seconds During polling time, if there is an exception encountered in the
79 * process execution then polling is stopped and the error response is returned to the client
86 @Path("/services/{processKey}")
87 @Operation(description = "Starts a new process with the appropriate process synchronously")
88 @Produces("application/json")
89 @Consumes("application/json")
90 public Response startProcessInstanceByKey(@PathParam("processKey") String processKey, VariableMapImpl variableMap) {
92 Map<String, Object> inputVariables = getInputVariables(variableMap);
93 setLogContext(processKey, inputVariables);
95 WorkflowResponse workflowResponse = new WorkflowResponse();
96 long startTime = System.currentTimeMillis();
97 ProcessInstance processInstance = null;
100 // Kickoff the process
101 ProcessThread thread = new ProcessThread(inputVariables, processKey);
104 Map<String, Object> responseMap = null;
106 // wait for process to be completed
107 long waitTime = getWaitTime(inputVariables);
108 long now = System.currentTimeMillis();
110 long endTime = start + waitTime;
111 long pollingInterval = 500;
113 // TEMPORARY LOGIC FOR UNIT TEST REFACTORING
114 // If this is a unit test (method is invoked directly), wait a max
115 // of 5 seconds after process ended for a result. In production,
116 // wait up to 60 seconds.
117 long timeToWaitAfterProcessEnded = uriInfo == null ? 5000 : 60000;
118 AtomicLong timeProcessEnded = new AtomicLong(0);
119 boolean endedWithNoResponse = false;
120 logger.debug(LOGMARKER + "WorkflowResource.startProcessInstanceByKey using timeout: " + waitTime);
121 while (now <= endTime) {
122 Thread.sleep(pollingInterval);
124 now = System.currentTimeMillis();
126 // Increase the polling interval over time
128 long elapsed = now - start;
130 if (elapsed > 60000) {
131 pollingInterval = 5000;
132 } else if (elapsed > 10000) {
133 pollingInterval = 1000;
135 Exception exception = thread.getException();
136 if (exception != null) {
137 throw new Exception(exception);
140 processInstance = thread.getProcessInstance();
142 if (processInstance == null) {
143 logger.debug("{} process has not been created yet", LOGMARKER + processKey);
147 String processInstanceId = processInstance.getId();
148 workflowResponse.setProcessInstanceID(processInstanceId);
150 responseMap = getResponseMap(processInstance, processKey, timeProcessEnded);
152 if (responseMap == null) {
153 logger.debug("{} has not produced a response yet", LOGMARKER + processKey);
155 if (timeProcessEnded.longValue() != 0) {
156 long elapsedSinceEnded = System.currentTimeMillis() - timeProcessEnded.longValue();
158 if (elapsedSinceEnded > timeToWaitAfterProcessEnded) {
159 endedWithNoResponse = true;
164 processResponseMap(workflowResponse, responseMap);
165 recordEvents(processKey, workflowResponse, startTime);
166 return Response.status(workflowResponse.getMessageCode()).entity(workflowResponse).build();
170 // if we dont get response after waiting then send timeout response
173 String processInstanceId;
175 if (processInstance == null) {
176 processInstanceId = "N/A";
177 state = "NOT STARTED";
179 processInstanceId = processInstance.getProcessInstanceId();
180 state = isProcessEnded(processInstanceId) ? "ENDED" : "NOT ENDED";
183 workflowResponse.setMessage("Fail");
184 if (endedWithNoResponse) {
185 workflowResponse.setResponse("Process ended without producing a response");
187 workflowResponse.setResponse("Request timed out, process state: " + state);
189 workflowResponse.setProcessInstanceID(processInstanceId);
190 recordEvents(processKey, workflowResponse, startTime);
191 workflowResponse.setMessageCode(500);
192 return Response.status(500).entity(workflowResponse).build();
193 } catch (Exception ex) {
194 logger.debug(LOGMARKER + "Exception in startProcessInstance by key", ex);
195 workflowResponse.setMessage("Fail");
196 workflowResponse.setResponse("Error occurred while executing the process: " + ex.getMessage());
197 if (processInstance != null)
198 workflowResponse.setProcessInstanceID(processInstance.getId());
200 logger.error(LoggingAnchor.FIVE, MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN",
201 MDC.get(processKey), ErrorCode.UnknownError.getValue(), LOGMARKER + workflowResponse.getMessage()
202 + " for processKey: " + processKey + " with response: " + workflowResponse.getResponse());
204 workflowResponse.setMessageCode(500);
205 recordEvents(processKey, workflowResponse, startTime);
206 return Response.status(500).entity(workflowResponse).build();
211 * Returns the wait time, this is used by the resource on how long it should wait to send a response If none
212 * specified DEFAULT_WAIT_TIME is used
214 * @param inputVariables
217 private int getWaitTime(Map<String, Object> inputVariables) {
218 String timeout = inputVariables.get("mso-service-request-timeout") == null ? null
219 : inputVariables.get("mso-service-request-timeout").toString();
221 if (timeout != null) {
223 return Integer.parseInt(timeout) * 1000;
224 } catch (NumberFormatException nex) {
225 logger.debug("Invalid input for mso-service-request-timeout");
228 return DEFAULT_WAIT_TIME;
231 private void recordEvents(String processKey, WorkflowResponse response, long startTime) {}
233 private void setLogContext(String processKey, Map<String, Object> inputVariables) {}
235 private String getValueFromInputVariables(Map<String, Object> inputVariables, String key) {
236 Object value = inputVariables.get(key);
240 return value.toString();
245 * Checks to see if the specified process is ended.
247 * @param processInstanceId the process instance ID
248 * @return true if the process is ended
250 private boolean isProcessEnded(String processInstanceId) {
251 ProcessEngineServices pes = getProcessEngineServices();
253 return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId)
254 .singleResult() == null ? true : false;
255 } catch (Exception e) {
256 logger.debug("Exception :", e);
261 private void processResponseMap(WorkflowResponse workflowResponse, Map<String, Object> responseMap) {
262 Object object = responseMap.get("Response");
263 String response = object == null ? null : String.valueOf(object);
264 if (response == null) {
265 object = responseMap.get("WorkflowResponse");
266 response = object == null ? null : String.valueOf(object);
269 workflowResponse.setResponse(response);
271 object = responseMap.get("ResponseCode");
272 String responseCode = object == null ? null : String.valueOf(object);
275 workflowResponse.setMessageCode(Integer.parseInt(responseCode));
276 } catch (NumberFormatException nex) {
277 logger.debug(LOGMARKER + "Failed to parse ResponseCode: " + responseCode);
278 workflowResponse.setMessageCode(-1);
281 Object status = responseMap.get("Status");
283 if ("Success".equalsIgnoreCase(String.valueOf(status))) {
284 workflowResponse.setMessage("Success");
285 } else if ("Fail".equalsIgnoreCase(String.valueOf(status))) {
286 workflowResponse.setMessage("Fail");
288 logger.debug(LOGMARKER + "Unrecognized Status: " + responseCode);
289 workflowResponse.setMessage("Fail");
294 * @version 1.0 Triggers the workflow in a separate thread
296 private class ProcessThread extends Thread {
297 private final Map<String, Object> inputVariables;
298 private final String processKey;
299 private final String businessKey;
300 private ProcessInstance processInstance = null;
301 private Exception exception = null;
303 public ProcessThread(Map<String, Object> inputVariables, String processKey) {
304 this.inputVariables = inputVariables;
305 this.processKey = processKey;
306 this.businessKey = UUID.randomUUID().toString();
310 * If an exception occurs when starting the process instance, it may be obtained by calling this method. Note
311 * that exceptions are only recorded while the process is executing in its original thread. Once a process is
312 * suspended, exception recording stops.
314 * @return the exception, or null if none has occurred
316 public Exception getException() {
321 public ProcessInstance getProcessInstance() {
322 return this.processInstance;
326 * Sets the process instance exception.
328 * @param exception the exception
330 private void setException(Exception exception) {
331 this.exception = exception;
335 setLogContext(processKey, inputVariables);
339 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
341 // Note that this method doesn't return until the process suspends
342 // itself or finishes. We provide a business key so we can identify
343 // the process instance immediately.
344 processInstance = runtimeService.startProcessInstanceByKey(processKey, inputVariables);
346 } catch (Exception e) {
347 logger.debug(LOGMARKER + "ProcessThread caught an exception executing " + processKey + ": " + e);
354 private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
355 VariableMap inputVariables = Variables.createVariables();
356 @SuppressWarnings("unchecked")
357 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
358 for (String key : vMap.keySet()) { // variabe name vn
359 @SuppressWarnings("unchecked")
360 Map<String, Object> valueMap = (Map<String, Object>) vMap.get(key); // value, type
361 inputVariables.putValueTyped(key,
362 Variables.objectValue(valueMap.get("value")).serializationDataFormat(SerializationDataFormats.JAVA) // tells
375 return inputVariables;
379 * Attempts to get a response map from the specified process instance.
381 * @return the response map, or null if it is unavailable
383 private Map<String, Object> getResponseMap(ProcessInstance processInstance, String processKey,
384 AtomicLong timeProcessEnded) {
386 String responseMapVariable = processKey + "ResponseMap";
387 String processInstanceId = processInstance.getId();
389 // Query the runtime service to see if a response map is ready.
392 * RuntimeService runtimeService = getProcessEngineServices().getRuntimeService(); List<Execution> executions =
393 * runtimeService.createExecutionQuery() .processInstanceId(processInstanceId).list();
395 * for (Execution execution : executions) {
397 * @SuppressWarnings("unchecked") Map<String, Object> responseMap = (Map<String, Object>)
398 * getVariableFromExecution(runtimeService, execution.getId(), responseMapVariable);
400 * if (responseMap != null) { msoLogger.debug(LOGMARKER + "Obtained " + responseMapVariable + " from process " +
401 * processInstanceId + " execution " + execution.getId()); return responseMap; } }
403 // Querying history seem to return consistent results compared to querying the runtime service
405 boolean alreadyEnded = timeProcessEnded.longValue() != 0;
407 if (alreadyEnded || isProcessEnded(processInstance.getId())) {
409 timeProcessEnded.set(System.currentTimeMillis());
412 // Query the history service to see if a response map exists.
414 HistoryService historyService = getProcessEngineServices().getHistoryService();
415 @SuppressWarnings("unchecked")
416 Map<String, Object> responseMap = (Map<String, Object>) getVariableFromHistory(historyService,
417 processInstance.getId(), responseMapVariable);
419 if (responseMap != null) {
420 logger.debug(LOGMARKER + "Obtained " + responseMapVariable + " from process " + processInstanceId
425 // Query the history service for old-style response variables.
427 String prefix = (String) getVariableFromHistory(historyService, processInstanceId, "prefix");
429 if (prefix != null) {
431 // Check for 'WorkflowResponse' variable
432 Object workflowResponseObject =
433 getVariableFromHistory(historyService, processInstanceId, "WorkflowResponse");
434 String workflowResponse =
435 workflowResponseObject == null ? null : String.valueOf(workflowResponseObject);
436 logger.debug(LOGMARKER + "WorkflowResponse: " + workflowResponse);
438 if (workflowResponse != null) {
439 Object responseCodeObject =
440 getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
441 String responseCode = responseCodeObject == null ? null : String.valueOf(responseCodeObject);
442 logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
443 responseMap = new HashMap<>();
444 responseMap.put("WorkflowResponse", workflowResponse);
445 responseMap.put("ResponseCode", responseCode);
446 responseMap.put("Status", "Success");
451 // Check for 'WorkflowException' variable
452 WorkflowException workflowException = null;
453 String workflowExceptionText = null;
455 Object workflowExceptionObject =
456 getVariableFromHistory(historyService, processInstanceId, "WorkflowException");
457 if (workflowExceptionObject != null) {
458 if (workflowExceptionObject instanceof WorkflowException) {
459 workflowException = (WorkflowException) workflowExceptionObject;
460 workflowExceptionText = workflowException.toString();
461 responseMap = new HashMap<>();
462 responseMap.put("WorkflowException", workflowExceptionText);
463 responseMap.put("ResponseCode", workflowException.getErrorCode());
464 responseMap.put("Status", "Fail");
466 } else if (workflowExceptionObject instanceof String) {
468 getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
469 String responseCode = object == null ? null : String.valueOf(object);
470 workflowExceptionText = (String) workflowExceptionObject;
471 responseMap = new HashMap<>();
472 responseMap.put("WorkflowException", workflowExceptionText);
473 responseMap.put("ResponseCode", responseCode);
474 responseMap.put("Status", "Fail");
479 logger.debug(LOGMARKER + "WorkflowException: " + workflowExceptionText);
481 // BEGIN LEGACY SUPPORT. TODO: REMOVE THIS CODE
482 Object object = getVariableFromHistory(historyService, processInstanceId, processKey + "Response");
483 String response = object == null ? null : String.valueOf(object);
484 logger.debug(LOGMARKER + processKey + "Response: " + response);
486 if (response != null) {
487 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
488 String responseCode = object == null ? null : String.valueOf(object);
489 logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
490 responseMap = new HashMap<>();
491 responseMap.put("Response", response);
492 responseMap.put("ResponseCode", responseCode);
493 responseMap.put("Status", "Success");
497 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ErrorResponse");
498 String errorResponse = object == null ? null : String.valueOf(object);
499 logger.debug(LOGMARKER + prefix + "ErrorResponse: " + errorResponse);
501 if (errorResponse != null) {
502 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
503 String responseCode = object == null ? null : String.valueOf(object);
504 logger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
505 responseMap = new HashMap<>();
506 responseMap.put("Response", errorResponse);
507 responseMap.put("ResponseCode", responseCode);
508 responseMap.put("Status", "Fail");
511 // END LEGACY SUPPORT. TODO: REMOVE THIS CODE
518 * Gets a variable value from the specified execution.
520 * @return the variable value, or null if the variable could not be obtained
522 private Object getVariableFromExecution(RuntimeService runtimeService, String executionId, String variableName) {
524 return runtimeService.getVariable(executionId, variableName);
525 } catch (ProcessEngineException e) {
526 // Most likely cause is that the execution no longer exists.
527 logger.debug("Error retrieving execution " + executionId + " variable " + variableName + ": " + e);
533 * Gets a variable value from specified historical process instance.
535 * @return the variable value, or null if the variable could not be obtained
537 private Object getVariableFromHistory(HistoryService historyService, String processInstanceId,
538 String variableName) {
540 HistoricVariableInstance v = historyService.createHistoricVariableInstanceQuery()
541 .processInstanceId(processInstanceId).variableName(variableName).singleResult();
542 return v == null ? null : v.getValue();
543 } catch (Exception e) {
544 logger.debug("Error retrieving process {} variable {} from history: ", processInstanceId, variableName, e);
550 @Path("/services/{processKey}/{processInstanceId}")
551 @Produces("application/json")
552 @Consumes("application/json")
553 @Operation(description = "Allows for retrieval of the variables for a given process")
554 public WorkflowResponse getProcessVariables(@PathParam("processKey") String processKey,
555 @PathParam("processInstanceId") String processInstanceId) {
556 // TODO filter only set of variables
557 WorkflowResponse response = new WorkflowResponse();
560 ProcessEngineServices engine = getProcessEngineServices();
561 List<HistoricVariableInstance> variables = engine.getHistoryService().createHistoricVariableInstanceQuery()
562 .processInstanceId(processInstanceId).list();
563 Map<String, String> variablesMap = new HashMap<>();
564 for (HistoricVariableInstance variableInstance : variables) {
565 variablesMap.put(variableInstance.getName(), variableInstance.getValue().toString());
568 logger.debug(LOGMARKER + "***Received MSO getProcessVariables with processKey:" + processKey
569 + " and variables: " + variablesMap.toString());
571 response.setVariables(variablesMap);
572 response.setMessage("Success");
573 response.setResponse("Successfully retrieved the variables");
574 response.setProcessInstanceID(processInstanceId);
576 logger.debug(LOGMARKER + response.getMessage() + " for processKey: " + processKey + " with response: "
577 + response.getResponse());
578 } catch (Exception ex) {
579 response.setMessage("Fail");
580 response.setResponse("Failed to retrieve the variables," + ex.getMessage());
581 response.setProcessInstanceID(processInstanceId);
583 logger.error(LoggingAnchor.FIVE, MessageEnum.BPMN_GENERAL_EXCEPTION_ARG.toString(), "BPMN",
584 MDC.get(processKey), ErrorCode.UnknownError.getValue(), LOGMARKER + response.getMessage()
585 + " for processKey: " + processKey + " with response: " + response.getResponse());
586 logger.debug("Exception :", ex);