2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.mso.bpmn.gamma.workflow.service;
23 import java.util.HashMap;
25 import java.util.UUID;
27 import javax.ws.rs.Consumes;
28 import javax.ws.rs.POST;
29 import javax.ws.rs.Path;
30 import javax.ws.rs.PathParam;
31 import javax.ws.rs.Produces;
32 import javax.ws.rs.core.Response;
34 import org.camunda.bpm.engine.ProcessEngineServices;
35 import org.camunda.bpm.engine.ProcessEngines;
36 import org.camunda.bpm.engine.RuntimeService;
37 import org.camunda.bpm.engine.impl.core.variable.VariableMapImpl;
38 import org.camunda.bpm.engine.runtime.ProcessInstance;
39 import org.jboss.resteasy.annotations.Suspend;
40 import org.jboss.resteasy.spi.AsynchronousResponse;
43 import org.openecomp.mso.logger.MessageEnum;
44 import org.openecomp.mso.logger.MsoLogger;
50 * Asynchronous Workflow processing using JAX RS RESTeasy implementation
51 * Both Synchronous and Asynchronous BPMN process can benefit from this implementation since the workflow gets executed in the background
52 * and the server thread is freed up, server scales better to process more incoming requests
54 * Usage: For synchronous process, when you are ready to send the response invoke the callback to write the response
55 * For asynchronous process - the activity may send a acknowledgement response and then proceed further on executing the process
58 public class WorkflowAsyncResource {
60 private WorkflowContextHolder contextHolder = WorkflowContextHolder.getInstance();
61 private ProcessEngineServices pes4junit = null;
63 private MsoLogger msoLogger = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
65 private static final String logMarker = "[WRKFLOW-RESOURCE]";
66 private static final int DEFAULT_WAIT_TIME = 30000; //default wait time
69 * Asynchronous JAX-RS method that starts a process instance.
70 * @param asyncResponse an object that will receive the asynchronous response
71 * @param processKey the process key
72 * @param variableMap input variables to the process
75 @Path("/services/{processKey}")
76 @Produces("application/json")
77 @Consumes("application/json")
78 public void startProcessInstanceByKey(final @Suspend(180000) AsynchronousResponse asyncResponse,
79 @PathParam("processKey") String processKey, VariableMapImpl variableMap) {
81 WorkflowResponse response = new WorkflowResponse();
82 long startTime = System.currentTimeMillis();
83 Map<String, Object> inputVariables = null;
84 WorkflowContext workflowContext = null;
87 inputVariables = getInputVariables(variableMap);
88 setLogContext(processKey, inputVariables);
90 // This variable indicates that the flow was invoked asynchronously
91 inputVariables.put("isAsyncProcess", "true");
93 workflowContext = new WorkflowContext(processKey, getRequestId(inputVariables),
94 asyncResponse, getWaitTime(inputVariables));
96 msoLogger.debug("Adding the workflow context into holder: "
97 + workflowContext.getProcessKey() + ":"
98 + workflowContext.getRequestId() + ":"
99 + workflowContext.getTimeout());
101 contextHolder.put(workflowContext);
103 ProcessThread processThread = new ProcessThread(processKey, inputVariables);
104 processThread.start();
105 } catch (Exception e) {
106 setLogContext(processKey, inputVariables);
108 if (workflowContext != null) {
109 contextHolder.remove(workflowContext);
112 msoLogger.debug(logMarker + "Exception in startProcessInstance by key");
113 response.setMessage("Fail" );
114 response.setResponse("Error occurred while executing the process: " + e);
115 response.setMessageCode(500);
116 recordEvents(processKey, response, startTime);
118 msoLogger.error (MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError, logMarker
119 + response.getMessage() + " for processKey: "
120 + processKey + " with response: " + response.getResponse());
122 Response errorResponse = Response.serverError().entity(response).build();
123 asyncResponse.setResponse(errorResponse);
132 class ProcessThread extends Thread {
133 private final String processKey;
134 private final Map<String,Object> inputVariables;
136 public ProcessThread(String processKey, Map<String, Object> inputVariables) {
137 this.processKey = processKey;
138 this.inputVariables = inputVariables;
143 String processInstanceId = null;
144 long startTime = System.currentTimeMillis();
147 setLogContext(processKey, inputVariables);
149 // Note: this creates a random businessKey if it wasn't specified.
150 String businessKey = getBusinessKey(inputVariables);
152 msoLogger.debug(logMarker + "***Received MSO startProcessInstanceByKey with processKey: "
153 + processKey + " and variables: " + inputVariables);
155 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, logMarker
156 + "Call to MSO workflow/services in Camunda. Received MSO startProcessInstanceByKey with processKey:"
157 + processKey + " and variables: " + inputVariables);
159 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
160 ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
161 processKey, businessKey, inputVariables);
162 processInstanceId = processInstance.getId();
164 msoLogger.debug(logMarker + "Process " + processKey + ":" + processInstanceId + " " +
165 (processInstance.isEnded() ? "ENDED" : "RUNNING"));
166 } catch (Exception e) {
168 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.InternalError,
169 logMarker + "Error in starting the process: "+ e.getMessage());
171 WorkflowCallbackResponse callbackResponse = new WorkflowCallbackResponse();
172 callbackResponse.setStatusCode(500);
173 callbackResponse.setMessage("Fail");
174 callbackResponse.setResponse("Error occurred while executing the process: " + e);
176 // TODO: is the processInstanceId used by the API handler? I don't think so.
177 // It may be null here.
178 WorkflowContextHolder.getInstance().processCallback(
179 processKey, processInstanceId,
180 getRequestId(inputVariables),
188 * Callback resource which is invoked from BPMN to process to send the workflow response
191 * @param processInstanceId
193 * @param callbackResponse
197 @Path("/services/callback/{processKey}/{processInstanceId}/{requestId}")
198 @Produces("application/json")
199 @Consumes("application/json")
200 public Response processWorkflowCallback(
201 @PathParam("processKey") String processKey,
202 @PathParam("processInstanceId") String processInstanceId,
203 @PathParam("requestId")String requestId,
204 WorkflowCallbackResponse callbackResponse) {
206 msoLogger.debug(logMarker + "Process instance ID:" + processInstanceId + ":" + requestId + ":" + processKey + ":" + isProcessEnded(processInstanceId));
207 msoLogger.debug(logMarker + "About to process the callback request:" + callbackResponse.getResponse() + ":" + callbackResponse.getMessage() + ":" + callbackResponse.getStatusCode());
208 return contextHolder.processCallback(processKey, processInstanceId, requestId, callbackResponse);
211 // Note: the business key is used to identify the process in unit tests
212 private String getBusinessKey(Map<String, Object> inputVariables) {
213 Object businessKey = inputVariables.get("att-mso-business-key");
214 if (businessKey == null ) {
215 businessKey = UUID.randomUUID().toString();
216 inputVariables.put("att-mso-business-key", businessKey);
218 return businessKey.toString();
221 private String getRequestId(Map<String, Object> inputVariables) {
222 Object requestId = inputVariables.get("att-mso-request-id");
223 if (requestId == null ) {
224 requestId = UUID.randomUUID().toString();
225 inputVariables.put("att-mso-request-id", requestId);
227 return requestId.toString();
230 private long getWaitTime(Map<String, Object> inputVariables)
232 String timeout = inputVariables.get("att-mso-service-request-timeout") == null
233 ? null : inputVariables.get("att-mso-service-request-timeout").toString();
235 if (timeout != null) {
237 return Long.parseLong(timeout)*1000;
238 } catch (NumberFormatException nex) {
239 msoLogger.debug("Invalid input for att-mso-service-request-timeout");
243 return DEFAULT_WAIT_TIME;
246 private void recordEvents(String processKey, WorkflowResponse response,
249 msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
250 logMarker + response.getMessage() + " for processKey: "
251 + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);
253 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
254 logMarker + response.getMessage() + "for processKey: " + processKey + " with response: " + response.getResponse());
258 private void setLogContext(String processKey,
259 Map<String, Object> inputVariables) {
260 MsoLogger.setServiceName("MSO." + processKey);
261 if (inputVariables != null) {
262 MsoLogger.setLogContext(getKeyValueFromInputVariables(inputVariables,"att-mso-request-id"), getKeyValueFromInputVariables(inputVariables,"att-mso-service-instance-id"));
266 private String getKeyValueFromInputVariables(Map<String,Object> inputVariables, String key) {
267 if (inputVariables == null) return "";
268 Object requestId = inputVariables.get(key);
269 if (requestId != null) return requestId.toString();
273 private boolean isProcessEnded(String processInstanceId) {
274 ProcessEngineServices pes = getProcessEngineServices();
275 return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null ? true : false ;
279 private ProcessEngineServices getProcessEngineServices() {
280 if (pes4junit == null) {
281 return ProcessEngines.getDefaultProcessEngine();
287 public void setProcessEngineServices4junit(ProcessEngineServices pes) {
291 private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
292 Map<String, Object> inputVariables = new HashMap<String,Object>();
293 @SuppressWarnings("unchecked")
294 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
295 for (String vName : vMap.keySet()) {
296 @SuppressWarnings("unchecked")
297 Map<String, Object> valueMap = (Map<String,Object>)vMap.get(vName); // value, type
298 inputVariables.put(vName, valueMap.get("value"));
300 return inputVariables;