2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright (C) 2017 Huawei Technologies Co., Ltd. All rights reserved.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.so.bpmn.common.workflow.service;
24 import java.util.HashMap;
25 import java.util.List;
27 import java.util.UUID;
28 import java.util.concurrent.atomic.AtomicLong;
30 import javax.ws.rs.Consumes;
31 import javax.ws.rs.POST;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.core.Context;
36 import javax.ws.rs.core.Response;
37 import javax.ws.rs.core.UriInfo;
39 import org.camunda.bpm.engine.HistoryService;
40 import org.camunda.bpm.engine.ProcessEngineException;
41 import org.camunda.bpm.engine.ProcessEngineServices;
42 import org.camunda.bpm.engine.RuntimeService;
43 import org.camunda.bpm.engine.history.HistoricVariableInstance;
44 import org.camunda.bpm.engine.runtime.ProcessInstance;
45 import org.camunda.bpm.engine.variable.VariableMap;
46 import org.camunda.bpm.engine.variable.Variables;
47 import org.camunda.bpm.engine.variable.Variables.SerializationDataFormats;
48 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
49 import org.onap.so.bpmn.common.workflow.context.WorkflowResponse;
50 import org.onap.so.bpmn.core.WorkflowException;
51 import org.onap.so.logger.MessageEnum;
52 import org.onap.so.logger.MsoLogger;
54 import org.springframework.stereotype.Component;
56 import io.swagger.annotations.Api;
57 import io.swagger.annotations.ApiOperation;
60 @Api(value = "/workflow", description = "Root of workflow services")
62 public class WorkflowResource extends ProcessEngineAwareService {
64 private static final MsoLogger msoLogger = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL, WorkflowResource.class);
65 private static final String LOGMARKER = "[WRKFLOW-RESOURCE]";
67 private static final int DEFAULT_WAIT_TIME = 30000;
70 private UriInfo uriInfo = null;
73 * Starts the process instance and responds to client synchronously
74 * If the request does not contain mso-service-request-timeout then it waits for the value specified in DEFAULT_WAIT_TIME
75 * Note: value specified in mso-service-request-timeout is in seconds
76 * During polling time, if there is an exception encountered in the process execution then polling is stopped and the error response is
77 * returned to the client
83 @Path("/services/{processKey}")
85 value = "Starts a new process with the appropriate process synchronously",
88 @Produces("application/json")
89 @Consumes("application/json")
90 public Response startProcessInstanceByKey(@PathParam("processKey") String processKey,
91 VariableMapImpl variableMap) {
93 Map<String, Object> inputVariables = getInputVariables(variableMap);
94 setLogContext(processKey, inputVariables);
96 WorkflowResponse workflowResponse = new WorkflowResponse();
97 long startTime = System.currentTimeMillis();
98 ProcessInstance processInstance = null;
101 //Kickoff the process
102 ProcessThread thread = new ProcessThread(inputVariables,processKey,msoLogger);
105 Map<String, Object> responseMap = null;
107 //wait for process to be completed
108 long waitTime = getWaitTime(inputVariables);
109 long now = System.currentTimeMillis();
111 long endTime = start + waitTime;
112 long pollingInterval = 500;
114 // TEMPORARY LOGIC FOR UNIT TEST REFACTORING
115 // If this is a unit test (method is invoked directly), wait a max
116 // of 5 seconds after process ended for a result. In production,
117 // wait up to 60 seconds.
118 long timeToWaitAfterProcessEnded = uriInfo == null ? 5000 : 60000;
119 AtomicLong timeProcessEnded = new AtomicLong(0);
120 boolean endedWithNoResponse = false;
122 while (now <= endTime) {
123 Thread.sleep(pollingInterval);
125 now = System.currentTimeMillis();
127 // Increase the polling interval over time
129 long elapsed = now - start;
131 if (elapsed > 60000) {
132 pollingInterval = 5000;
133 } else if (elapsed > 10000) {
134 pollingInterval = 1000;
136 Exception exception = thread.getException();
137 if (exception != null) {
138 throw new Exception(exception);
141 processInstance = thread.getProcessInstance();
143 if (processInstance == null) {
144 msoLogger.debug(LOGMARKER + processKey + " process has not been created yet");
148 String processInstanceId = processInstance.getId();
149 workflowResponse.setProcessInstanceID(processInstanceId);
151 responseMap = getResponseMap(processInstance, processKey, timeProcessEnded);
153 if (responseMap == null) {
154 msoLogger.debug(LOGMARKER + processKey + " has not produced a response yet");
156 if (timeProcessEnded.longValue() != 0) {
157 long elapsedSinceEnded = System.currentTimeMillis() - timeProcessEnded.longValue();
159 if (elapsedSinceEnded > timeToWaitAfterProcessEnded) {
160 endedWithNoResponse = true;
165 processResponseMap(workflowResponse, responseMap);
166 recordEvents(processKey, workflowResponse, startTime);
167 return Response.status(workflowResponse.getMessageCode()).entity(workflowResponse).build();
171 //if we dont get response after waiting then send timeout response
174 String processInstanceId;
176 if (processInstance == null) {
177 processInstanceId = "N/A";
178 state = "NOT STARTED";
180 processInstanceId = processInstance.getProcessInstanceId();
181 state = isProcessEnded(processInstanceId) ? "ENDED" : "NOT ENDED";
184 workflowResponse.setMessage("Fail");
185 if (endedWithNoResponse) {
186 workflowResponse.setResponse("Process ended without producing a response");
188 workflowResponse.setResponse("Request timed out, process state: " + state);
190 workflowResponse.setProcessInstanceID(processInstanceId);
191 recordEvents(processKey, workflowResponse, startTime);
192 workflowResponse.setMessageCode(500);
193 return Response.status(500).entity(workflowResponse).build();
194 } catch (Exception ex) {
195 msoLogger.debug(LOGMARKER + "Exception in startProcessInstance by key",ex);
196 workflowResponse.setMessage("Fail" );
197 workflowResponse.setResponse("Error occurred while executing the process: " + ex.getMessage());
198 if (processInstance != null) workflowResponse.setProcessInstanceID(processInstance.getId());
200 msoLogger.error(MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MDC.get(processKey),
201 MsoLogger.ErrorCode.UnknownError, LOGMARKER + workflowResponse.getMessage()
202 + " for processKey: " + processKey + " with response: " + workflowResponse.getResponse());
204 workflowResponse.setMessageCode(500);
205 recordEvents(processKey, workflowResponse, startTime);
206 return Response.status(500).entity(workflowResponse).build();
211 * Returns the wait time, this is used by the resource on how long it should wait to send a response
212 * If none specified DEFAULT_WAIT_TIME is used
213 * @param inputVariables
216 private int getWaitTime(Map<String, Object> inputVariables)
218 String timeout = inputVariables.get("mso-service-request-timeout") == null
219 ? null : inputVariables.get("mso-service-request-timeout").toString();
221 if (timeout != null) {
223 return Integer.parseInt(timeout)*1000;
224 } catch (NumberFormatException nex) {
225 msoLogger.debug("Invalid input for mso-service-request-timeout");
228 return DEFAULT_WAIT_TIME;
231 private void recordEvents(String processKey, WorkflowResponse response, long startTime) {
233 msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
234 LOGMARKER + response.getMessage() + " for processKey: "
235 + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);
237 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
238 LOGMARKER + response.getMessage() + " for processKey: "
239 + processKey + " with response: " + response.getResponse());
242 private void setLogContext(String processKey, Map<String, Object> inputVariables) {
243 MsoLogger.setServiceName("MSO." + processKey);
244 if (inputVariables != null) {
245 MsoLogger.setLogContext(getValueFromInputVariables(inputVariables, "mso-request-id"),
246 getValueFromInputVariables(inputVariables, "mso-service-instance-id"));
250 private String getValueFromInputVariables(Map<String,Object> inputVariables, String key) {
251 Object value = inputVariables.get(key);
255 return value.toString();
260 * Checks to see if the specified process is ended.
261 * @param processInstanceId the process instance ID
262 * @return true if the process is ended
264 private boolean isProcessEnded(String processInstanceId) {
265 ProcessEngineServices pes = getProcessEngineServices();
267 return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null ? true : false ;
268 } catch (Exception e) {
269 msoLogger.debug("Exception :",e);
274 private void processResponseMap(WorkflowResponse workflowResponse, Map<String, Object> responseMap) {
275 Object object = responseMap.get("Response");
276 String response = object == null ? null : String.valueOf(object);
277 if(response == null){
278 object = responseMap.get("WorkflowResponse");
279 response = object == null ? null : String.valueOf(object);
282 workflowResponse.setResponse(response);
284 object = responseMap.get("ResponseCode");
285 String responseCode = object == null ? null : String.valueOf(object);
288 workflowResponse.setMessageCode(Integer.parseInt(responseCode));
289 } catch(NumberFormatException nex) {
290 msoLogger.debug(LOGMARKER + "Failed to parse ResponseCode: " + responseCode);
291 workflowResponse.setMessageCode(-1);
294 Object status = responseMap.get("Status");
296 if ("Success".equalsIgnoreCase(String.valueOf(status))) {
297 workflowResponse.setMessage("Success");
298 } else if ("Fail".equalsIgnoreCase(String.valueOf(status))) {
299 workflowResponse.setMessage("Fail");
301 msoLogger.debug(LOGMARKER + "Unrecognized Status: " + responseCode);
302 workflowResponse.setMessage("Fail");
308 * Triggers the workflow in a separate thread
310 private class ProcessThread extends Thread {
311 private final Map<String,Object> inputVariables;
312 private final String processKey;
313 private final MsoLogger msoLogger;
314 private final String businessKey;
315 private ProcessInstance processInstance = null;
316 private Exception exception = null;
318 public ProcessThread(Map<String, Object> inputVariables, String processKey, MsoLogger msoLogger) {
319 this.inputVariables = inputVariables;
320 this.processKey = processKey;
321 this.msoLogger = msoLogger;
322 this.businessKey = UUID.randomUUID().toString();
326 * If an exception occurs when starting the process instance, it may
327 * be obtained by calling this method. Note that exceptions are only
328 * recorded while the process is executing in its original thread.
329 * Once a process is suspended, exception recording stops.
330 * @return the exception, or null if none has occurred
332 public Exception getException() {
337 public ProcessInstance getProcessInstance() {
338 return this.processInstance;
342 * Sets the process instance exception.
343 * @param exception the exception
345 private void setException(Exception exception) {
346 this.exception = exception;
350 setLogContext(processKey, inputVariables);
352 long startTime = System.currentTimeMillis();
355 msoLogger.debug(LOGMARKER + "***Received MSO startProcessInstanceByKey with processKey:"
356 + processKey + " and variables: " + inputVariables);
358 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, LOGMARKER
359 + "Call to MSO workflow/services in Camunda. Received MSO startProcessInstanceByKey with"
360 + " processKey:" + processKey
361 + " businessKey:" + businessKey
362 + " variables: " + inputVariables);
364 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
366 // Note that this method doesn't return until the process suspends
367 // itself or finishes. We provide a business key so we can identify
368 // the process instance immediately.
369 processInstance = runtimeService.startProcessInstanceByKey(
370 processKey, inputVariables);
372 } catch (Exception e) {
373 msoLogger.debug(LOGMARKER + "ProcessThread caught an exception executing "
374 + processKey + ": " + e);
381 private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
382 VariableMap inputVariables = Variables.createVariables();
383 @SuppressWarnings("unchecked")
384 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
385 for (String key : vMap.keySet()) { //variabe name vn
386 @SuppressWarnings("unchecked")
387 Map<String, Object> valueMap = (Map<String,Object>)vMap.get(key); //value, type
388 inputVariables.putValueTyped(key, Variables
389 .objectValue(valueMap.get("value"))
390 .serializationDataFormat(SerializationDataFormats.JAVA) // tells the engine to use java serialization for persisting the value
393 return inputVariables;
397 * Attempts to get a response map from the specified process instance.
398 * @return the response map, or null if it is unavailable
400 private Map<String, Object> getResponseMap(ProcessInstance processInstance,
401 String processKey, AtomicLong timeProcessEnded) {
403 String responseMapVariable = processKey + "ResponseMap";
404 String processInstanceId = processInstance.getId();
406 // Query the runtime service to see if a response map is ready.
408 /* RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
409 List<Execution> executions = runtimeService.createExecutionQuery()
410 .processInstanceId(processInstanceId).list();
412 for (Execution execution : executions) {
413 @SuppressWarnings("unchecked")
414 Map<String, Object> responseMap = (Map<String, Object>)
415 getVariableFromExecution(runtimeService, execution.getId(),
416 responseMapVariable);
418 if (responseMap != null) {
419 msoLogger.debug(LOGMARKER + "Obtained " + responseMapVariable
420 + " from process " + processInstanceId + " execution "
421 + execution.getId());
426 //Querying history seem to return consistent results compared to querying the runtime service
428 boolean alreadyEnded = timeProcessEnded.longValue() != 0;
430 if (alreadyEnded || isProcessEnded(processInstance.getId())) {
432 timeProcessEnded.set(System.currentTimeMillis());
435 // Query the history service to see if a response map exists.
437 HistoryService historyService = getProcessEngineServices().getHistoryService();
438 @SuppressWarnings("unchecked")
439 Map<String, Object> responseMap = (Map<String, Object>)
440 getVariableFromHistory(historyService, processInstance.getId(),
441 responseMapVariable);
443 if (responseMap != null) {
444 msoLogger.debug(LOGMARKER + "Obtained " + responseMapVariable
445 + " from process " + processInstanceId + " history");
449 // Query the history service for old-style response variables.
451 String prefix = (String) getVariableFromHistory(historyService, processInstanceId, "prefix");
453 if (prefix != null) {
455 // Check for 'WorkflowResponse' variable
456 Object workflowResponseObject = getVariableFromHistory(historyService, processInstanceId, "WorkflowResponse");
457 String workflowResponse = workflowResponseObject == null ? null : String.valueOf(workflowResponseObject);
458 msoLogger.debug(LOGMARKER + "WorkflowResponse: " + workflowResponse);
460 if (workflowResponse != null) {
461 Object responseCodeObject = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
462 String responseCode = responseCodeObject == null ? null : String.valueOf(responseCodeObject);
463 msoLogger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
464 responseMap = new HashMap<>();
465 responseMap.put("WorkflowResponse", workflowResponse);
466 responseMap.put("ResponseCode", responseCode);
467 responseMap.put("Status", "Success");
472 // Check for 'WorkflowException' variable
473 WorkflowException workflowException = null;
474 String workflowExceptionText = null;
476 Object workflowExceptionObject = getVariableFromHistory(historyService, processInstanceId, "WorkflowException");
477 if(workflowExceptionObject != null) {
478 if(workflowExceptionObject instanceof WorkflowException) {
479 workflowException = (WorkflowException) workflowExceptionObject;
480 workflowExceptionText = workflowException.toString();
481 responseMap = new HashMap<>();
482 responseMap.put("WorkflowException", workflowExceptionText);
483 responseMap.put("ResponseCode", workflowException.getErrorCode());
484 responseMap.put("Status", "Fail");
487 else if (workflowExceptionObject instanceof String) {
488 Object object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
489 String responseCode = object == null ? null : String.valueOf(object);
490 workflowExceptionText = (String) workflowExceptionObject;
491 responseMap = new HashMap<>();
492 responseMap.put("WorkflowException", workflowExceptionText);
493 responseMap.put("ResponseCode", responseCode);
494 responseMap.put("Status", "Fail");
499 msoLogger.debug(LOGMARKER + "WorkflowException: " + workflowExceptionText);
501 // BEGIN LEGACY SUPPORT. TODO: REMOVE THIS CODE
502 Object object = getVariableFromHistory(historyService, processInstanceId, processKey + "Response");
503 String response = object == null ? null : String.valueOf(object);
504 msoLogger.debug(LOGMARKER + processKey + "Response: " + response);
506 if (response != null) {
507 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
508 String responseCode = object == null ? null : String.valueOf(object);
509 msoLogger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
510 responseMap = new HashMap<>();
511 responseMap.put("Response", response);
512 responseMap.put("ResponseCode", responseCode);
513 responseMap.put("Status", "Success");
517 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ErrorResponse");
518 String errorResponse = object == null ? null : String.valueOf(object);
519 msoLogger.debug(LOGMARKER + prefix + "ErrorResponse: " + errorResponse);
521 if (errorResponse != null) {
522 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
523 String responseCode = object == null ? null : String.valueOf(object);
524 msoLogger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
525 responseMap = new HashMap<>();
526 responseMap.put("Response", errorResponse);
527 responseMap.put("ResponseCode", responseCode);
528 responseMap.put("Status", "Fail");
531 // END LEGACY SUPPORT. TODO: REMOVE THIS CODE
538 * Gets a variable value from the specified execution.
539 * @return the variable value, or null if the variable could not be
542 private Object getVariableFromExecution(RuntimeService runtimeService,
543 String executionId, String variableName) {
545 return runtimeService.getVariable(executionId, variableName);
546 } catch (ProcessEngineException e) {
547 // Most likely cause is that the execution no longer exists.
548 msoLogger.debug("Error retrieving execution " + executionId
549 + " variable " + variableName + ": " + e);
554 * Gets a variable value from specified historical process instance.
555 * @return the variable value, or null if the variable could not be
558 private Object getVariableFromHistory(HistoryService historyService,
559 String processInstanceId, String variableName) {
561 HistoricVariableInstance v = historyService.createHistoricVariableInstanceQuery()
562 .processInstanceId(processInstanceId).variableName(variableName).singleResult();
563 return v == null ? null : v.getValue();
564 } catch (Exception e) {
565 msoLogger.debug("Error retrieving process " + processInstanceId
566 + " variable " + variableName + " from history: " + e);
572 @Path("/services/{processKey}/{processInstanceId}")
573 @Produces("application/json")
574 @Consumes("application/json")
576 value = "Allows for retrieval of the variables for a given process",
579 public WorkflowResponse getProcessVariables(@PathParam("processKey") String processKey, @PathParam("processInstanceId") String processInstanceId) {
580 //TODO filter only set of variables
581 WorkflowResponse response = new WorkflowResponse();
583 long startTime = System.currentTimeMillis();
585 ProcessEngineServices engine = getProcessEngineServices();
586 List<HistoricVariableInstance> variables = engine.getHistoryService().createHistoricVariableInstanceQuery().processInstanceId(processInstanceId).list();
587 Map<String,String> variablesMap = new HashMap<>();
588 for (HistoricVariableInstance variableInstance: variables) {
589 variablesMap.put(variableInstance.getName(), variableInstance.getValue().toString());
592 msoLogger.debug(LOGMARKER + "***Received MSO getProcessVariables with processKey:" + processKey + " and variables: " + variablesMap.toString());
594 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, LOGMARKER
595 + "Call to MSO workflow/services in Camunda. Received MSO getProcessVariables with processKey:"
596 + processKey + " and variables: "
597 + variablesMap.toString());
600 response.setVariables(variablesMap);
601 response.setMessage("Success");
602 response.setResponse("Successfully retrieved the variables");
603 response.setProcessInstanceID(processInstanceId);
605 msoLogger.debug(LOGMARKER + response.getMessage() + " for processKey: " + processKey + " with response: " + response.getResponse());
606 } catch (Exception ex) {
607 response.setMessage("Fail");
608 response.setResponse("Failed to retrieve the variables," + ex.getMessage());
609 response.setProcessInstanceID(processInstanceId);
611 msoLogger.error (MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MDC.get(processKey), MsoLogger.ErrorCode.UnknownError, LOGMARKER
612 + response.getMessage()
613 + " for processKey: "
616 + response.getResponse());
617 msoLogger.debug("Exception :",ex);
620 msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
621 LOGMARKER + response.getMessage() + " for processKey: "
622 + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);
624 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
625 LOGMARKER + response.getMessage() + " for processKey: "
626 + processKey + " with response: " + response.getResponse());