2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.mso.bpmn.common.workflow.service;
22 import java.util.HashMap;
24 import java.util.UUID;
26 import javax.ws.rs.Consumes;
27 import javax.ws.rs.POST;
28 import javax.ws.rs.Path;
29 import javax.ws.rs.PathParam;
30 import javax.ws.rs.Produces;
31 import javax.ws.rs.core.Response;
33 import org.camunda.bpm.engine.ProcessEngineServices;
34 import org.camunda.bpm.engine.ProcessEngines;
35 import org.camunda.bpm.engine.RuntimeService;
36 import org.camunda.bpm.engine.runtime.ProcessInstance;
37 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
38 import org.jboss.resteasy.annotations.Suspend;
39 import org.jboss.resteasy.spi.AsynchronousResponse;
40 import org.openecomp.mso.logger.MessageEnum;
41 import org.openecomp.mso.logger.MsoLogger;
47 * Asynchronous Workflow processing using JAX RS RESTeasy implementation
48 * Both Synchronous and Asynchronous BPMN process can benefit from this implementation since the workflow gets executed in the background
49 * and the server thread is freed up, server scales better to process more incoming requests
51 * Usage: For synchronous process, when you are ready to send the response invoke the callback to write the response
52 * For asynchronous process - the activity may send a acknowledgement response and then proceed further on executing the process
55 public abstract class WorkflowAsyncResource {
57 private WorkflowContextHolder contextHolder = WorkflowContextHolder.getInstance();
58 protected ProcessEngineServices pes4junit = null;
60 private MsoLogger msoLogger = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
62 private static final String logMarker = "[WRKFLOW-RESOURCE]";
63 private static final int DEFAULT_WAIT_TIME = 30000; //default wait time
66 * Asynchronous JAX-RS method that starts a process instance.
67 * @param asyncResponse an object that will receive the asynchronous response
68 * @param processKey the process key
69 * @param variableMap input variables to the process
72 @Path("/services/{processKey}")
73 @Produces("application/json")
74 @Consumes("application/json")
75 public void startProcessInstanceByKey(final @Suspend(180000) AsynchronousResponse asyncResponse,
76 @PathParam("processKey") String processKey, VariableMapImpl variableMap) {
78 WorkflowResponse response = new WorkflowResponse();
79 long startTime = System.currentTimeMillis();
80 Map<String, Object> inputVariables = null;
81 WorkflowContext workflowContext = null;
84 inputVariables = getInputVariables(variableMap);
85 setLogContext(processKey, inputVariables);
87 // This variable indicates that the flow was invoked asynchronously
88 inputVariables.put("isAsyncProcess", "true");
90 workflowContext = new WorkflowContext(processKey, getRequestId(inputVariables),
91 asyncResponse, getWaitTime(inputVariables));
93 msoLogger.debug("Adding the workflow context into holder: "
94 + workflowContext.getProcessKey() + ":"
95 + workflowContext.getRequestId() + ":"
96 + workflowContext.getTimeout());
98 contextHolder.put(workflowContext);
100 ProcessThread processThread = new ProcessThread(processKey, inputVariables);
101 processThread.start();
102 } catch (Exception e) {
103 setLogContext(processKey, inputVariables);
105 if (workflowContext != null) {
106 contextHolder.remove(workflowContext);
109 msoLogger.debug(logMarker + "Exception in startProcessInstance by key");
110 response.setMessage("Fail" );
111 response.setResponse("Error occurred while executing the process: " + e);
112 response.setMessageCode(500);
113 recordEvents(processKey, response, startTime);
115 msoLogger.error (MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError, logMarker
116 + response.getMessage() + " for processKey: "
117 + processKey + " with response: " + response.getResponse());
119 Response errorResponse = Response.serverError().entity(response).build();
120 asyncResponse.setResponse(errorResponse);
129 class ProcessThread extends Thread {
130 private final String processKey;
131 private final Map<String,Object> inputVariables;
133 public ProcessThread(String processKey, Map<String, Object> inputVariables) {
134 this.processKey = processKey;
135 this.inputVariables = inputVariables;
140 String processInstanceId = null;
141 long startTime = System.currentTimeMillis();
144 setLogContext(processKey, inputVariables);
146 // Note: this creates a random businessKey if it wasn't specified.
147 String businessKey = getBusinessKey(inputVariables);
149 msoLogger.debug(logMarker + "***Received MSO startProcessInstanceByKey with processKey: "
150 + processKey + " and variables: " + inputVariables);
152 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, logMarker
153 + "Call to MSO workflow/services in Camunda. Received MSO startProcessInstanceByKey with processKey:"
154 + processKey + " and variables: " + inputVariables);
156 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
157 ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
158 processKey, businessKey, inputVariables);
159 processInstanceId = processInstance.getId();
161 msoLogger.debug(logMarker + "Process " + processKey + ":" + processInstanceId + " " +
162 (processInstance.isEnded() ? "ENDED" : "RUNNING"));
163 } catch (Exception e) {
165 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.InternalError,
166 logMarker + "Error in starting the process: "+ e.getMessage());
168 WorkflowCallbackResponse callbackResponse = new WorkflowCallbackResponse();
169 callbackResponse.setStatusCode(500);
170 callbackResponse.setMessage("Fail");
171 callbackResponse.setResponse("Error occurred while executing the process: " + e);
173 // TODO: is the processInstanceId used by the API handler? I don't think so.
174 // It may be null here.
175 WorkflowContextHolder.getInstance().processCallback(
176 processKey, processInstanceId,
177 getRequestId(inputVariables),
185 * Callback resource which is invoked from BPMN to process to send the workflow response
188 * @param processInstanceId
190 * @param callbackResponse
194 @Path("/services/callback/{processKey}/{processInstanceId}/{requestId}")
195 @Produces("application/json")
196 @Consumes("application/json")
197 public Response processWorkflowCallback(
198 @PathParam("processKey") String processKey,
199 @PathParam("processInstanceId") String processInstanceId,
200 @PathParam("requestId")String requestId,
201 WorkflowCallbackResponse callbackResponse) {
203 msoLogger.debug(logMarker + "Process instance ID:" + processInstanceId + ":" + requestId + ":" + processKey + ":" + isProcessEnded(processInstanceId));
204 msoLogger.debug(logMarker + "About to process the callback request:" + callbackResponse.getResponse() + ":" + callbackResponse.getMessage() + ":" + callbackResponse.getStatusCode());
205 return contextHolder.processCallback(processKey, processInstanceId, requestId, callbackResponse);
208 // Note: the business key is used to identify the process in unit tests
209 private String getBusinessKey(Map<String, Object> inputVariables) {
210 Object businessKey = inputVariables.get("mso-business-key");
211 if (businessKey == null ) {
212 businessKey = UUID.randomUUID().toString();
213 inputVariables.put("mso-business-key", businessKey);
215 return businessKey.toString();
218 private String getRequestId(Map<String, Object> inputVariables) {
219 Object requestId = inputVariables.get("mso-request-id");
220 if (requestId == null ) {
221 requestId = UUID.randomUUID().toString();
222 inputVariables.put("mso-request-id", requestId);
224 return requestId.toString();
227 private long getWaitTime(Map<String, Object> inputVariables)
229 String timeout = inputVariables.get("mso-service-request-timeout") == null
230 ? null : inputVariables.get("mso-service-request-timeout").toString();
232 if (timeout != null) {
234 return Long.parseLong(timeout)*1000;
235 } catch (NumberFormatException nex) {
236 msoLogger.debug("Invalid input for mso-service-request-timeout");
240 return DEFAULT_WAIT_TIME;
243 private void recordEvents(String processKey, WorkflowResponse response,
246 msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
247 logMarker + response.getMessage() + " for processKey: "
248 + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);
250 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
251 logMarker + response.getMessage() + "for processKey: " + processKey + " with response: " + response.getResponse());
255 private void setLogContext(String processKey,
256 Map<String, Object> inputVariables) {
257 MsoLogger.setServiceName("MSO." + processKey);
258 if (inputVariables != null) {
259 MsoLogger.setLogContext(getKeyValueFromInputVariables(inputVariables,"mso-request-id"), getKeyValueFromInputVariables(inputVariables,"mso-service-instance-id"));
263 private String getKeyValueFromInputVariables(Map<String,Object> inputVariables, String key) {
264 if (inputVariables == null) return "";
265 Object requestId = inputVariables.get(key);
266 if (requestId != null) return requestId.toString();
270 private boolean isProcessEnded(String processInstanceId) {
271 ProcessEngineServices pes = getProcessEngineServices();
272 return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null ? true : false ;
276 protected abstract ProcessEngineServices getProcessEngineServices();
278 public void setProcessEngineServices4junit(ProcessEngineServices pes) {
282 private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
283 Map<String, Object> inputVariables = new HashMap<String,Object>();
284 @SuppressWarnings("unchecked")
285 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
286 for (String vName : vMap.keySet()) {
287 @SuppressWarnings("unchecked")
288 Map<String, Object> valueMap = (Map<String,Object>)vMap.get(vName); // value, type
289 inputVariables.put(vName, valueMap.get("value"));
291 return inputVariables;