2 * ============LICENSE_START=======================================================
\r
4 * ================================================================================
\r
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============LICENSE_END=========================================================
\r
20 package org.openecomp.mso.bpmn.common.workflow.service;
\r
22 import java.util.HashMap;
\r
23 import java.util.Map;
\r
24 import java.util.UUID;
\r
26 import javax.ws.rs.Consumes;
\r
27 import javax.ws.rs.POST;
\r
28 import javax.ws.rs.Path;
\r
29 import javax.ws.rs.PathParam;
\r
30 import javax.ws.rs.Produces;
\r
31 import javax.ws.rs.core.Response;
\r
33 import org.camunda.bpm.engine.ProcessEngineServices;
\r
34 import org.camunda.bpm.engine.ProcessEngines;
\r
35 import org.camunda.bpm.engine.RuntimeService;
\r
36 import org.camunda.bpm.engine.runtime.ProcessInstance;
\r
37 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
\r
38 import org.jboss.resteasy.annotations.Suspend;
\r
39 import org.jboss.resteasy.spi.AsynchronousResponse;
\r
40 import org.openecomp.mso.logger.MessageEnum;
\r
41 import org.openecomp.mso.logger.MsoLogger;
\r
42 import org.slf4j.MDC;
\r
47 * Asynchronous Workflow processing using JAX RS RESTeasy implementation
\r
48 * Both Synchronous and Asynchronous BPMN process can benefit from this implementation since the workflow gets executed in the background
\r
49 * and the server thread is freed up, server scales better to process more incoming requests
\r
51 * Usage: For synchronous process, when you are ready to send the response invoke the callback to write the response
\r
52 * For asynchronous process - the activity may send a acknowledgement response and then proceed further on executing the process
\r
55 public class WorkflowAsyncResource {
\r
57 private WorkflowContextHolder contextHolder = WorkflowContextHolder.getInstance();
\r
58 protected ProcessEngineServices pes4junit = null;
\r
60 private MsoLogger msoLogger = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
\r
62 private static final String logMarker = "[WRKFLOW-RESOURCE]";
\r
63 private static final int DEFAULT_WAIT_TIME = 30000; //default wait time
\r
66 * Asynchronous JAX-RS method that starts a process instance.
\r
67 * @param asyncResponse an object that will receive the asynchronous response
\r
68 * @param processKey the process key
\r
69 * @param variableMap input variables to the process
\r
72 @Path("/services/{processKey}")
\r
73 @Produces("application/json")
\r
74 @Consumes("application/json")
\r
75 public void startProcessInstanceByKey(final @Suspend(180000) AsynchronousResponse asyncResponse,
\r
76 @PathParam("processKey") String processKey, VariableMapImpl variableMap) {
\r
78 WorkflowResponse response = new WorkflowResponse();
\r
79 long startTime = System.currentTimeMillis();
\r
80 Map<String, Object> inputVariables = null;
\r
81 WorkflowContext workflowContext = null;
\r
84 inputVariables = getInputVariables(variableMap);
\r
85 setLogContext(processKey, inputVariables);
\r
87 // This variable indicates that the flow was invoked asynchronously
\r
88 inputVariables.put("isAsyncProcess", "true");
\r
90 workflowContext = new WorkflowContext(processKey, getRequestId(inputVariables),
\r
91 asyncResponse, getWaitTime(inputVariables));
\r
93 msoLogger.debug("Adding the workflow context into holder: "
\r
94 + workflowContext.getProcessKey() + ":"
\r
95 + workflowContext.getRequestId() + ":"
\r
96 + workflowContext.getTimeout());
\r
98 contextHolder.put(workflowContext);
\r
100 ProcessThread processThread = new ProcessThread(processKey, inputVariables);
\r
101 processThread.start();
\r
102 } catch (Exception e) {
\r
103 setLogContext(processKey, inputVariables);
\r
105 if (workflowContext != null) {
\r
106 contextHolder.remove(workflowContext);
\r
109 msoLogger.debug(logMarker + "Exception in startProcessInstance by key");
\r
110 response.setMessage("Fail" );
\r
111 response.setResponse("Error occurred while executing the process: " + e);
\r
112 response.setMessageCode(500);
\r
113 recordEvents(processKey, response, startTime);
\r
115 msoLogger.error (MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError, logMarker
\r
116 + response.getMessage() + " for processKey: "
\r
117 + processKey + " with response: " + response.getResponse());
\r
119 Response errorResponse = Response.serverError().entity(response).build();
\r
120 asyncResponse.setResponse(errorResponse);
\r
129 class ProcessThread extends Thread {
\r
130 private final String processKey;
\r
131 private final Map<String,Object> inputVariables;
\r
133 public ProcessThread(String processKey, Map<String, Object> inputVariables) {
\r
134 this.processKey = processKey;
\r
135 this.inputVariables = inputVariables;
\r
138 public void run() {
\r
140 String processInstanceId = null;
\r
141 long startTime = System.currentTimeMillis();
\r
144 setLogContext(processKey, inputVariables);
\r
146 // Note: this creates a random businessKey if it wasn't specified.
\r
147 String businessKey = getBusinessKey(inputVariables);
\r
149 msoLogger.debug(logMarker + "***Received MSO startProcessInstanceByKey with processKey: "
\r
150 + processKey + " and variables: " + inputVariables);
\r
152 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, logMarker
\r
153 + "Call to MSO workflow/services in Camunda. Received MSO startProcessInstanceByKey with processKey:"
\r
154 + processKey + " and variables: " + inputVariables);
\r
156 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
\r
157 ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
\r
158 processKey, businessKey, inputVariables);
\r
159 processInstanceId = processInstance.getId();
\r
161 msoLogger.debug(logMarker + "Process " + processKey + ":" + processInstanceId + " " +
\r
162 (processInstance.isEnded() ? "ENDED" : "RUNNING"));
\r
163 } catch (Exception e) {
\r
165 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.InternalError,
\r
166 logMarker + "Error in starting the process: "+ e.getMessage());
\r
168 WorkflowCallbackResponse callbackResponse = new WorkflowCallbackResponse();
\r
169 callbackResponse.setStatusCode(500);
\r
170 callbackResponse.setMessage("Fail");
\r
171 callbackResponse.setResponse("Error occurred while executing the process: " + e);
\r
173 // TODO: is the processInstanceId used by the API handler? I don't think so.
\r
174 // It may be null here.
\r
175 WorkflowContextHolder.getInstance().processCallback(
\r
176 processKey, processInstanceId,
\r
177 getRequestId(inputVariables),
\r
185 * Callback resource which is invoked from BPMN to process to send the workflow response
\r
187 * @param processKey
\r
188 * @param processInstanceId
\r
190 * @param callbackResponse
\r
194 @Path("/services/callback/{processKey}/{processInstanceId}/{requestId}")
\r
195 @Produces("application/json")
\r
196 @Consumes("application/json")
\r
197 public Response processWorkflowCallback(
\r
198 @PathParam("processKey") String processKey,
\r
199 @PathParam("processInstanceId") String processInstanceId,
\r
200 @PathParam("requestId")String requestId,
\r
201 WorkflowCallbackResponse callbackResponse) {
\r
203 msoLogger.debug(logMarker + "Process instance ID:" + processInstanceId + ":" + requestId + ":" + processKey + ":" + isProcessEnded(processInstanceId));
\r
204 msoLogger.debug(logMarker + "About to process the callback request:" + callbackResponse.getResponse() + ":" + callbackResponse.getMessage() + ":" + callbackResponse.getStatusCode());
\r
205 return contextHolder.processCallback(processKey, processInstanceId, requestId, callbackResponse);
\r
208 // Note: the business key is used to identify the process in unit tests
\r
209 private String getBusinessKey(Map<String, Object> inputVariables) {
\r
210 Object businessKey = inputVariables.get("mso-business-key");
\r
211 if (businessKey == null ) {
\r
212 businessKey = UUID.randomUUID().toString();
\r
213 inputVariables.put("mso-business-key", businessKey);
\r
215 return businessKey.toString();
\r
218 private String getRequestId(Map<String, Object> inputVariables) {
\r
219 Object requestId = inputVariables.get("mso-request-id");
\r
220 if (requestId == null ) {
\r
221 requestId = UUID.randomUUID().toString();
\r
222 inputVariables.put("mso-request-id", requestId);
\r
224 return requestId.toString();
\r
227 private long getWaitTime(Map<String, Object> inputVariables)
\r
229 String timeout = inputVariables.get("mso-service-request-timeout") == null
\r
230 ? null : inputVariables.get("mso-service-request-timeout").toString();
\r
232 if (timeout != null) {
\r
234 return Long.parseLong(timeout)*1000;
\r
235 } catch (NumberFormatException nex) {
\r
236 msoLogger.debug("Invalid input for mso-service-request-timeout");
\r
240 return DEFAULT_WAIT_TIME;
\r
243 private void recordEvents(String processKey, WorkflowResponse response,
\r
246 msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
\r
247 logMarker + response.getMessage() + " for processKey: "
\r
248 + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);
\r
250 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
\r
251 logMarker + response.getMessage() + "for processKey: " + processKey + " with response: " + response.getResponse());
\r
255 private void setLogContext(String processKey,
\r
256 Map<String, Object> inputVariables) {
\r
257 MsoLogger.setServiceName("MSO." + processKey);
\r
258 if (inputVariables != null) {
\r
259 MsoLogger.setLogContext(getKeyValueFromInputVariables(inputVariables,"mso-request-id"), getKeyValueFromInputVariables(inputVariables,"mso-service-instance-id"));
\r
263 private String getKeyValueFromInputVariables(Map<String,Object> inputVariables, String key) {
\r
264 if (inputVariables == null) return "";
\r
265 Object requestId = inputVariables.get(key);
\r
266 if (requestId != null) return requestId.toString();
\r
270 private boolean isProcessEnded(String processInstanceId) {
\r
271 ProcessEngineServices pes = getProcessEngineServices();
\r
272 return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null ? true : false ;
\r
276 protected ProcessEngineServices getProcessEngineServices() {
\r
277 if (pes4junit == null) {
\r
278 return ProcessEngines.getDefaultProcessEngine();
\r
284 public void setProcessEngineServices4junit(ProcessEngineServices pes) {
\r
288 private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
\r
289 Map<String, Object> inputVariables = new HashMap<String,Object>();
\r
290 @SuppressWarnings("unchecked")
\r
291 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
\r
292 for (String vName : vMap.keySet()) {
\r
293 @SuppressWarnings("unchecked")
\r
294 Map<String, Object> valueMap = (Map<String,Object>)vMap.get(vName); // value, type
\r
295 inputVariables.put(vName, valueMap.get("value"));
\r
297 return inputVariables;
\r