2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.mso.bpmn.gamma.workflow.service;
23 import java.util.HashMap;
24 import java.util.List;
26 import java.util.UUID;
27 import java.util.concurrent.atomic.AtomicLong;
29 import javax.ws.rs.Consumes;
30 import javax.ws.rs.POST;
31 import javax.ws.rs.Path;
32 import javax.ws.rs.PathParam;
33 import javax.ws.rs.Produces;
34 import javax.ws.rs.core.Context;
35 import javax.ws.rs.core.Response;
36 import javax.ws.rs.core.UriInfo;
38 import org.camunda.bpm.engine.HistoryService;
39 import org.camunda.bpm.engine.ProcessEngineException;
40 import org.camunda.bpm.engine.ProcessEngineServices;
41 import org.camunda.bpm.engine.ProcessEngines;
42 import org.camunda.bpm.engine.RuntimeService;
43 import org.camunda.bpm.engine.history.HistoricVariableInstance;
44 import org.camunda.bpm.engine.impl.core.variable.VariableMapImpl;
45 import org.camunda.bpm.engine.runtime.ProcessInstance;
46 import org.camunda.bpm.engine.variable.VariableMap;
47 import org.camunda.bpm.engine.variable.Variables;
48 import org.camunda.bpm.engine.variable.Variables.SerializationDataFormats;
50 import org.openecomp.mso.bpmn.core.WorkflowException;
51 import org.openecomp.mso.logger.MessageEnum;
52 import org.openecomp.mso.logger.MsoLogger;
56 public class WorkflowResource {
58 private ProcessEngineServices pes4junit = null;
60 private static final MsoLogger msoLogger = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
61 private static final String LOGMARKER = "[WRKFLOW-RESOURCE]";
63 private static final int DEFAULT_WAIT_TIME = 30000;
66 private UriInfo uriInfo = null;
69 * Starts the process instance and responds to client synchronously
70 * If the request does not contain att-mso-service-request-timeout then it waits for the value specified in DEFAULT_WAIT_TIME
71 * Note: value specified in att-mso-service-request-timeout is in seconds
72 * During polling time, if there is an exception encountered in the process execution then polling is stopped and the error response is
73 * returned to the client
79 @Path("/services/{processKey}")
80 @Produces("application/json")
81 @Consumes("application/json")
82 public Response startProcessInstanceByKey(@PathParam("processKey") String processKey,
83 VariableMapImpl variableMap) {
85 Map<String, Object> inputVariables = getInputVariables(variableMap);
86 setLogContext(processKey, inputVariables);
88 WorkflowResponse workflowResponse = new WorkflowResponse();
89 long startTime = System.currentTimeMillis();
90 ProcessInstance processInstance = null;
94 ProcessThread thread = new ProcessThread(inputVariables,processKey,msoLogger);
97 Map<String, Object> responseMap = null;
99 //wait for process to be completed
100 long waitTime = getWaitTime(inputVariables);
101 long now = System.currentTimeMillis();
103 long endTime = start + waitTime;
104 long pollingInterval = 500;
106 // TEMPORARY LOGIC FOR UNIT TEST REFACTORING
107 // If this is a unit test (method is invoked directly), wait a max
108 // of 5 seconds after process ended for a result. In production,
109 // wait up to 60 seconds.
110 long timeToWaitAfterProcessEnded = uriInfo == null ? 5000 : 60000;
111 AtomicLong timeProcessEnded = new AtomicLong(0);
112 boolean endedWithNoResponse = false;
114 while (now <= endTime) {
115 Thread.sleep(pollingInterval);
117 now = System.currentTimeMillis();
119 // Increase the polling interval over time
121 long elapsed = now - start;
123 if (elapsed > 60000) {
124 pollingInterval = 5000;
125 } else if (elapsed > 10000) {
126 pollingInterval = 1000;
128 Exception exception = thread.getException();
129 if (exception != null) {
130 throw new Exception(exception);
133 processInstance = thread.getProcessInstance();
135 if (processInstance == null) {
136 msoLogger.debug(LOGMARKER + processKey + " process has not been created yet");
140 String processInstanceId = processInstance.getId();
141 workflowResponse.setProcessInstanceID(processInstanceId);
143 responseMap = getResponseMap(processInstance, processKey, timeProcessEnded);
145 if (responseMap == null) {
146 msoLogger.debug(LOGMARKER + processKey + " has not produced a response yet");
148 if (timeProcessEnded.longValue() != 0) {
149 long elapsedSinceEnded = System.currentTimeMillis() - timeProcessEnded.longValue();
151 if (elapsedSinceEnded > timeToWaitAfterProcessEnded) {
152 endedWithNoResponse = true;
157 processResponseMap(workflowResponse, responseMap);
158 recordEvents(processKey, workflowResponse, startTime);
159 return Response.status(workflowResponse.getMessageCode()).entity(workflowResponse).build();
163 //if we dont get response after waiting then send timeout response
166 String processInstanceId;
168 if (processInstance == null) {
169 processInstanceId = "N/A";
170 state = "NOT STARTED";
172 processInstanceId = processInstance.getProcessInstanceId();
173 state = isProcessEnded(processInstanceId) ? "ENDED" : "NOT ENDED";
176 workflowResponse.setMessage("Fail");
177 if (endedWithNoResponse) {
178 workflowResponse.setResponse("Process ended without producing a response");
180 workflowResponse.setResponse("Request timed out, process state: " + state);
182 workflowResponse.setProcessInstanceID(processInstanceId);
183 recordEvents(processKey, workflowResponse, startTime);
184 workflowResponse.setMessageCode(500);
185 return Response.status(500).entity(workflowResponse).build();
186 } catch (Exception ex) {
187 msoLogger.debug(LOGMARKER + "Exception in startProcessInstance by key");
188 ex.printStackTrace();
189 workflowResponse.setMessage("Fail" );
190 workflowResponse.setResponse("Error occurred while executing the process: " + ex.getMessage());
191 if (processInstance != null) workflowResponse.setProcessInstanceID(processInstance.getId());
193 msoLogger.error(MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MDC.get(processKey),
194 MsoLogger.ErrorCode.UnknownError, LOGMARKER + workflowResponse.getMessage()
195 + " for processKey: " + processKey + " with response: " + workflowResponse.getResponse());
197 workflowResponse.setMessageCode(500);
198 recordEvents(processKey, workflowResponse, startTime);
199 return Response.status(500).entity(workflowResponse).build();
204 * Returns the wait time, this is used by the resource on how long it should wait to send a response
205 * If none specified DEFAULT_WAIT_TIME is used
206 * @param inputVariables
209 private int getWaitTime(Map<String, Object> inputVariables)
211 String timeout = inputVariables.get("att-mso-service-request-timeout") == null
212 ? null : inputVariables.get("att-mso-service-request-timeout").toString();
214 if (timeout != null) {
216 return Integer.parseInt(timeout)*1000;
217 } catch (NumberFormatException nex) {
218 msoLogger.debug("Invalid input for att-mso-service-request-timeout");
221 return DEFAULT_WAIT_TIME;
224 private void recordEvents(String processKey, WorkflowResponse response, long startTime) {
226 msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
227 LOGMARKER + response.getMessage() + " for processKey: "
228 + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);
230 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
231 LOGMARKER + response.getMessage() + " for processKey: "
232 + processKey + " with response: " + response.getResponse());
235 private void setLogContext(String processKey, Map<String, Object> inputVariables) {
236 MsoLogger.setServiceName("MSO." + processKey);
237 if (inputVariables != null) {
238 MsoLogger.setLogContext(getValueFromInputVariables(inputVariables, "att-mso-request-id"),
239 getValueFromInputVariables(inputVariables, "att-mso-service-instance-id"));
243 private String getValueFromInputVariables(Map<String,Object> inputVariables, String key) {
244 Object value = inputVariables.get(key);
248 return value.toString();
253 * Checks to see if the specified process is ended.
254 * @param processInstanceId the process instance ID
255 * @return true if the process is ended
257 private boolean isProcessEnded(String processInstanceId) {
258 ProcessEngineServices pes = getProcessEngineServices();
259 return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null ? true : false ;
262 private void processResponseMap(WorkflowResponse workflowResponse, Map<String, Object> responseMap) {
263 Object object = responseMap.get("Response");
264 String response = object == null ? null : String.valueOf(object);
265 if(response == null){
266 object = responseMap.get("WorkflowResponse");
267 response = object == null ? null : String.valueOf(object);
270 workflowResponse.setResponse(response);
272 object = responseMap.get("ResponseCode");
273 String responseCode = object == null ? null : String.valueOf(object);
276 workflowResponse.setMessageCode(Integer.parseInt(responseCode));
277 } catch(NumberFormatException nex) {
278 msoLogger.debug(LOGMARKER + "Failed to parse ResponseCode: " + responseCode);
279 workflowResponse.setMessageCode(-1);
282 Object status = responseMap.get("Status");
284 if ("Success".equalsIgnoreCase(String.valueOf(status))) {
285 workflowResponse.setMessage("Success");
286 } else if ("Fail".equalsIgnoreCase(String.valueOf(status))) {
287 workflowResponse.setMessage("Fail");
289 msoLogger.debug(LOGMARKER + "Unrecognized Status: " + responseCode);
290 workflowResponse.setMessage("Fail");
296 * Triggers the workflow in a separate thread
298 private class ProcessThread extends Thread {
299 private final Map<String,Object> inputVariables;
300 private final String processKey;
301 private final MsoLogger msoLogger;
302 private final String businessKey;
303 private ProcessInstance processInstance = null;
304 private Exception exception = null;
306 public ProcessThread(Map<String, Object> inputVariables, String processKey, MsoLogger msoLogger) {
307 this.inputVariables = inputVariables;
308 this.processKey = processKey;
309 this.msoLogger = msoLogger;
310 this.businessKey = UUID.randomUUID().toString();
314 * If an exception occurs when starting the process instance, it may
315 * be obtained by calling this method. Note that exceptions are only
316 * recorded while the process is executing in its original thread.
317 * Once a process is suspended, exception recording stops.
318 * @return the exception, or null if none has occurred
320 public Exception getException() {
325 public ProcessInstance getProcessInstance() {
326 return this.processInstance;
330 * Sets the process instance exception.
331 * @param exception the exception
333 private void setException(Exception exception) {
334 this.exception = exception;
338 setLogContext(processKey, inputVariables);
340 long startTime = System.currentTimeMillis();
343 msoLogger.debug(LOGMARKER + "***Received MSO startProcessInstanceByKey with processKey:"
344 + processKey + " and variables: " + inputVariables);
346 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, LOGMARKER
347 + "Call to MSO workflow/services in Camunda. Received MSO startProcessInstanceByKey with"
348 + " processKey:" + processKey
349 + " businessKey:" + businessKey
350 + " variables: " + inputVariables);
352 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
354 // Note that this method doesn't return until the process suspends
355 // itself or finishes. We provide a business key so we can identify
356 // the process instance immediately.
357 processInstance = runtimeService.startProcessInstanceByKey(
358 processKey, inputVariables);
360 } catch (Exception e) {
361 msoLogger.debug(LOGMARKER + "ProcessThread caught an exception executing "
362 + processKey + ": " + e);
369 private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
370 VariableMap inputVariables = Variables.createVariables();
371 @SuppressWarnings("unchecked")
372 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
373 for (String key : vMap.keySet()) { //variabe name vn
374 @SuppressWarnings("unchecked")
375 Map<String, Object> valueMap = (Map<String,Object>)vMap.get(key); //value, type
376 inputVariables.putValueTyped(key, Variables
377 .objectValue(valueMap.get("value"))
378 .serializationDataFormat(SerializationDataFormats.JAVA) // tells the engine to use java serialization for persisting the value
381 return inputVariables;
385 * Attempts to get a response map from the specified process instance.
386 * @return the response map, or null if it is unavailable
388 private Map<String, Object> getResponseMap(ProcessInstance processInstance,
389 String processKey, AtomicLong timeProcessEnded) {
391 String responseMapVariable = processKey + "ResponseMap";
392 String processInstanceId = processInstance.getId();
394 // Query the runtime service to see if a response map is ready.
396 /* RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
397 List<Execution> executions = runtimeService.createExecutionQuery()
398 .processInstanceId(processInstanceId).list();
400 for (Execution execution : executions) {
401 @SuppressWarnings("unchecked")
402 Map<String, Object> responseMap = (Map<String, Object>)
403 getVariableFromExecution(runtimeService, execution.getId(),
404 responseMapVariable);
406 if (responseMap != null) {
407 msoLogger.debug(LOGMARKER + "Obtained " + responseMapVariable
408 + " from process " + processInstanceId + " execution "
409 + execution.getId());
414 //Querying history seem to return consistent results compared to querying the runtime service
416 boolean alreadyEnded = timeProcessEnded.longValue() != 0;
418 if (alreadyEnded || isProcessEnded(processInstance.getId())) {
420 timeProcessEnded.set(System.currentTimeMillis());
423 // Query the history service to see if a response map exists.
425 HistoryService historyService = getProcessEngineServices().getHistoryService();
426 @SuppressWarnings("unchecked")
427 Map<String, Object> responseMap = (Map<String, Object>)
428 getVariableFromHistory(historyService, processInstance.getId(),
429 responseMapVariable);
431 if (responseMap != null) {
432 msoLogger.debug(LOGMARKER + "Obtained " + responseMapVariable
433 + " from process " + processInstanceId + " history");
437 // Query the history service for old-style response variables.
439 String prefix = (String) getVariableFromHistory(historyService, processInstanceId, "prefix");
441 if (prefix != null) {
443 // Check for 'WorkflowResponse' variable
444 Object workflowResponseObject = getVariableFromHistory(historyService, processInstanceId, "WorkflowResponse");
445 String workflowResponse = workflowResponseObject == null ? null : String.valueOf(workflowResponseObject);
446 msoLogger.debug(LOGMARKER + "WorkflowResponse: " + workflowResponse);
448 if (workflowResponse != null) {
449 Object responseCodeObject = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
450 String responseCode = responseCodeObject == null ? null : String.valueOf(responseCodeObject);
451 msoLogger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
452 responseMap = new HashMap<String, Object>();
453 responseMap.put("WorkflowResponse", workflowResponse);
454 responseMap.put("ResponseCode", responseCode);
455 responseMap.put("Status", "Success");
460 // Check for 'WorkflowException' variable
461 WorkflowException workflowException = null;
462 String workflowExceptionText = null;
464 Object workflowExceptionObject = getVariableFromHistory(historyService, processInstanceId, "WorkflowException");
465 if(workflowExceptionObject != null) {
466 if(workflowExceptionObject instanceof WorkflowException) {
467 workflowException = (WorkflowException) workflowExceptionObject;
468 workflowExceptionText = workflowException.toString();
469 responseMap = new HashMap<String, Object>();
470 responseMap.put("WorkflowException", workflowExceptionText);
471 responseMap.put("ResponseCode", workflowException.getErrorCode());
472 responseMap.put("Status", "Fail");
475 else if (workflowExceptionObject instanceof String) {
476 Object object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
477 String responseCode = object == null ? null : String.valueOf(object);
478 workflowExceptionText = (String) workflowExceptionObject;
479 responseMap = new HashMap<String, Object>();
480 responseMap.put("WorkflowException", workflowExceptionText);
481 responseMap.put("ResponseCode", responseCode);
482 responseMap.put("Status", "Fail");
487 msoLogger.debug(LOGMARKER + "WorkflowException: " + workflowExceptionText);
489 // BEGIN LEGACY SUPPORT. TODO: REMOVE THIS CODE
490 Object object = getVariableFromHistory(historyService, processInstanceId, processKey + "Response");
491 String response = object == null ? null : String.valueOf(object);
492 msoLogger.debug(LOGMARKER + processKey + "Response: " + response);
494 if (response != null) {
495 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
496 String responseCode = object == null ? null : String.valueOf(object);
497 msoLogger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
498 responseMap = new HashMap<String, Object>();
499 responseMap.put("Response", response);
500 responseMap.put("ResponseCode", responseCode);
501 responseMap.put("Status", "Success");
505 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ErrorResponse");
506 String errorResponse = object == null ? null : String.valueOf(object);
507 msoLogger.debug(LOGMARKER + prefix + "ErrorResponse: " + errorResponse);
509 if (errorResponse != null) {
510 object = getVariableFromHistory(historyService, processInstanceId, prefix + "ResponseCode");
511 String responseCode = object == null ? null : String.valueOf(object);
512 msoLogger.debug(LOGMARKER + prefix + "ResponseCode: " + responseCode);
513 responseMap = new HashMap<String, Object>();
514 responseMap.put("Response", errorResponse);
515 responseMap.put("ResponseCode", responseCode);
516 responseMap.put("Status", "Fail");
519 // END LEGACY SUPPORT. TODO: REMOVE THIS CODE
526 * Gets a variable value from the specified execution.
527 * @return the variable value, or null if the variable could not be
530 private Object getVariableFromExecution(RuntimeService runtimeService,
531 String executionId, String variableName) {
533 return runtimeService.getVariable(executionId, variableName);
534 } catch (ProcessEngineException e) {
535 // Most likely cause is that the execution no longer exists.
536 msoLogger.debug("Error retrieving execution " + executionId
537 + " variable " + variableName + ": " + e);
543 * Gets a variable value from specified historical process instance.
544 * @return the variable value, or null if the variable could not be
547 private Object getVariableFromHistory(HistoryService historyService,
548 String processInstanceId, String variableName) {
550 HistoricVariableInstance v = historyService.createHistoricVariableInstanceQuery()
551 .processInstanceId(processInstanceId).variableName(variableName).singleResult();
552 return v == null ? null : v.getValue();
553 } catch (Exception e) {
554 msoLogger.debug("Error retrieving process " + processInstanceId
555 + " variable " + variableName + " from history: " + e);
561 @Path("/services/{processKey}/{processInstanceId}")
562 @Produces("application/json")
563 @Consumes("application/json")
564 public WorkflowResponse getProcessVariables(@PathParam("processKey") String processKey, @PathParam("processInstanceId") String processInstanceId) {
565 //TODO filter only set of variables
566 WorkflowResponse response = new WorkflowResponse();
568 long startTime = System.currentTimeMillis();
570 ProcessEngineServices engine = getProcessEngineServices();
571 List<HistoricVariableInstance> variables = engine.getHistoryService().createHistoricVariableInstanceQuery().processInstanceId(processInstanceId).list();
572 Map<String,String> variablesMap = new HashMap<String,String>();
573 for (HistoricVariableInstance variableInstance: variables) {
574 variablesMap.put(variableInstance.getName(), variableInstance.getValue().toString());
577 msoLogger.debug(LOGMARKER + "***Received MSO getProcessVariables with processKey:" + processKey + " and variables: " + variablesMap.toString());
579 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, LOGMARKER
580 + "Call to MSO workflow/services in Camunda. Received MSO getProcessVariables with processKey:"
581 + processKey + " and variables: "
582 + variablesMap.toString());
585 response.setVariables(variablesMap);
586 response.setMessage("Success");
587 response.setResponse("Successfully retrieved the variables");
588 response.setProcessInstanceID(processInstanceId);
590 msoLogger.debug(LOGMARKER + response.getMessage() + " for processKey: " + processKey + " with response: " + response.getResponse());
591 } catch (Exception ex) {
592 response.setMessage("Fail");
593 response.setResponse("Failed to retrieve the variables," + ex.getMessage());
594 response.setProcessInstanceID(processInstanceId);
596 msoLogger.error (MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MDC.get(processKey), MsoLogger.ErrorCode.UnknownError, LOGMARKER
597 + response.getMessage()
598 + " for processKey: "
601 + response.getResponse());
605 msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
606 LOGMARKER + response.getMessage() + " for processKey: "
607 + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);
609 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
610 LOGMARKER + response.getMessage() + " for processKey: "
611 + processKey + " with response: " + response.getResponse());
616 private ProcessEngineServices getProcessEngineServices() {
617 if (pes4junit == null) {
618 return ProcessEngines.getDefaultProcessEngine();
624 public void setProcessEngineServices4junit(ProcessEngineServices pes) {