2 * ============LICENSE_START=======================================================
\r
4 * ================================================================================
\r
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============LICENSE_END=========================================================
\r
20 package org.openecomp.mso.bpmn.common.workflow.service;
\r
22 import java.util.HashMap;
\r
23 import java.util.Map;
\r
24 import java.util.Objects;
\r
25 import java.util.Optional;
\r
26 import java.util.UUID;
\r
28 import javax.ws.rs.Consumes;
\r
29 import javax.ws.rs.POST;
\r
30 import javax.ws.rs.Path;
\r
31 import javax.ws.rs.PathParam;
\r
32 import javax.ws.rs.Produces;
\r
33 import javax.ws.rs.core.Response;
\r
35 import org.camunda.bpm.engine.ProcessEngineServices;
\r
36 import org.camunda.bpm.engine.RuntimeService;
\r
37 import org.camunda.bpm.engine.runtime.ProcessInstance;
\r
38 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
\r
39 import org.jboss.resteasy.annotations.Suspend;
\r
40 import org.jboss.resteasy.spi.AsynchronousResponse;
\r
41 import org.openecomp.mso.logger.MessageEnum;
\r
42 import org.openecomp.mso.logger.MsoLogger;
\r
43 import org.slf4j.MDC;
\r
48 * Asynchronous Workflow processing using JAX RS RESTeasy implementation
\r
49 * Both Synchronous and Asynchronous BPMN process can benefit from this implementation since the workflow gets executed in the background
\r
50 * and the server thread is freed up, server scales better to process more incoming requests
\r
52 * Usage: For synchronous process, when you are ready to send the response invoke the callback to write the response
\r
53 * For asynchronous process - the activity may send a acknowledgement response and then proceed further on executing the process
\r
56 public abstract class WorkflowAsyncResource {
\r
58 private static final WorkflowContextHolder contextHolder = WorkflowContextHolder.getInstance();
\r
59 protected Optional<ProcessEngineServices> pes4junit = Optional.empty();
\r
61 private final MsoLogger msoLogger = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
\r
63 private static final String logMarker = "[WRKFLOW-RESOURCE]";
\r
64 private static final long DEFAULT_WAIT_TIME = 30000; //default wait time
\r
67 * Asynchronous JAX-RS method that starts a process instance.
\r
68 * @param asyncResponse an object that will receive the asynchronous response
\r
69 * @param processKey the process key
\r
70 * @param variableMap input variables to the process
\r
73 @Path("/services/{processKey}")
\r
74 @Produces("application/json")
\r
75 @Consumes("application/json")
\r
76 public void startProcessInstanceByKey(final @Suspend(180000) AsynchronousResponse asyncResponse,
\r
77 @PathParam("processKey") String processKey, VariableMapImpl variableMap) {
\r
79 long startTime = System.currentTimeMillis();
\r
80 Map<String, Object> inputVariables = null;
\r
81 WorkflowContext workflowContext = null;
\r
84 inputVariables = getInputVariables(variableMap);
\r
85 setLogContext(processKey, inputVariables);
\r
87 // This variable indicates that the flow was invoked asynchronously
\r
88 inputVariables.put("isAsyncProcess", "true");
\r
90 workflowContext = new WorkflowContext(processKey, getRequestId(inputVariables),
\r
91 asyncResponse, getWaitTime(inputVariables));
\r
93 msoLogger.debug("Adding the workflow context into holder: "
\r
94 + workflowContext.getProcessKey() + ":"
\r
95 + workflowContext.getRequestId() + ":"
\r
96 + workflowContext.getTimeout());
\r
98 contextHolder.put(workflowContext);
\r
100 ProcessThread processThread = new ProcessThread(processKey, inputVariables);
\r
101 processThread.start();
\r
102 } catch (Exception e) {
\r
103 setLogContext(processKey, inputVariables);
\r
105 if (workflowContext != null) {
\r
106 contextHolder.remove(workflowContext);
\r
109 msoLogger.debug(logMarker + "Exception in startProcessInstance by key");
\r
110 WorkflowResponse response = new WorkflowResponse();
\r
111 response.setMessage("Fail" );
\r
112 response.setResponse("Error occurred while executing the process: " + e);
\r
113 response.setMessageCode(500);
\r
114 recordEvents(processKey, response, startTime);
\r
116 msoLogger.error (MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError, logMarker
\r
117 + response.getMessage() + " for processKey: "
\r
118 + processKey + " with response: " + response.getResponse());
\r
120 Response errorResponse = Response.serverError().entity(response).build();
\r
121 asyncResponse.setResponse(errorResponse);
\r
130 class ProcessThread extends Thread {
\r
131 private final String processKey;
\r
132 private final Map<String,Object> inputVariables;
\r
134 public ProcessThread(String processKey, Map<String, Object> inputVariables) {
\r
135 this.processKey = processKey;
\r
136 this.inputVariables = inputVariables;
\r
139 public void run() {
\r
141 String processInstanceId = null;
\r
142 long startTime = System.currentTimeMillis();
\r
145 setLogContext(processKey, inputVariables);
\r
147 // Note: this creates a random businessKey if it wasn't specified.
\r
148 String businessKey = getBusinessKey(inputVariables);
\r
150 msoLogger.debug(logMarker + "***Received MSO startProcessInstanceByKey with processKey: "
\r
151 + processKey + " and variables: " + inputVariables);
\r
153 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, logMarker
\r
154 + "Call to MSO workflow/services in Camunda. Received MSO startProcessInstanceByKey with processKey:"
\r
155 + processKey + " and variables: " + inputVariables);
\r
157 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
\r
158 ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
\r
159 processKey, businessKey, inputVariables);
\r
160 processInstanceId = processInstance.getId();
\r
162 msoLogger.debug(logMarker + "Process " + processKey + ":" + processInstanceId + " " +
\r
163 (processInstance.isEnded() ? "ENDED" : "RUNNING"));
\r
164 } catch (Exception e) {
\r
166 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.InternalError,
\r
167 logMarker + "Error in starting the process: "+ e.getMessage());
\r
169 WorkflowCallbackResponse callbackResponse = new WorkflowCallbackResponse();
\r
170 callbackResponse.setStatusCode(500);
\r
171 callbackResponse.setMessage("Fail");
\r
172 callbackResponse.setResponse("Error occurred while executing the process: " + e);
\r
174 // TODO: is the processInstanceId used by the API handler? I don't think so.
\r
175 // It may be null here.
\r
176 WorkflowContextHolder.getInstance().processCallback(
\r
177 processKey, processInstanceId,
\r
178 getRequestId(inputVariables),
\r
186 * Callback resource which is invoked from BPMN to process to send the workflow response
\r
188 * @param processKey
\r
189 * @param processInstanceId
\r
191 * @param callbackResponse
\r
195 @Path("/services/callback/{processKey}/{processInstanceId}/{requestId}")
\r
196 @Produces("application/json")
\r
197 @Consumes("application/json")
\r
198 public Response processWorkflowCallback(
\r
199 @PathParam("processKey") String processKey,
\r
200 @PathParam("processInstanceId") String processInstanceId,
\r
201 @PathParam("requestId")String requestId,
\r
202 WorkflowCallbackResponse callbackResponse) {
\r
204 msoLogger.debug(logMarker + "Process instance ID:" + processInstanceId + ":" + requestId + ":" + processKey + ":" + isProcessEnded(processInstanceId));
\r
205 msoLogger.debug(logMarker + "About to process the callback request:" + callbackResponse.getResponse() + ":" + callbackResponse.getMessage() + ":" + callbackResponse.getStatusCode());
\r
206 return contextHolder.processCallback(processKey, processInstanceId, requestId, callbackResponse);
\r
209 private static String getOrCreate(Map<String, Object> inputVariables, String key) {
\r
210 String value = Objects.toString(inputVariables.get(key), null);
\r
211 if (value == null) {
\r
212 value = UUID.randomUUID().toString();
\r
213 inputVariables.put(key, value);
\r
218 // Note: the business key is used to identify the process in unit tests
\r
219 private static String getBusinessKey(Map<String, Object> inputVariables) {
\r
220 return getOrCreate(inputVariables, "mso-business-key");
\r
223 private static String getRequestId(Map<String, Object> inputVariables) {
\r
224 return getOrCreate(inputVariables, "mso-request-id");
\r
227 private long getWaitTime(Map<String, Object> inputVariables)
\r
230 String timeout = Objects.toString(inputVariables.get("mso-service-request-timeout"), null);
\r
232 if (timeout != null) {
\r
234 return Long.parseLong(timeout)*1000;
\r
235 } catch (NumberFormatException nex) {
\r
236 msoLogger.debug("Invalid input for mso-service-request-timeout");
\r
240 return DEFAULT_WAIT_TIME;
\r
243 private void recordEvents(String processKey, WorkflowResponse response,
\r
246 msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
\r
247 logMarker + response.getMessage() + " for processKey: "
\r
248 + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);
\r
250 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
\r
251 logMarker + response.getMessage() + "for processKey: " + processKey + " with response: " + response.getResponse());
\r
255 private static void setLogContext(String processKey,
\r
256 Map<String, Object> inputVariables) {
\r
257 MsoLogger.setServiceName("MSO." + processKey);
\r
258 if (inputVariables != null) {
\r
259 MsoLogger.setLogContext(getKeyValueFromInputVariables(inputVariables,"mso-request-id"), getKeyValueFromInputVariables(inputVariables,"mso-service-instance-id"));
\r
263 private static String getKeyValueFromInputVariables(Map<String,Object> inputVariables, String key) {
\r
264 if (inputVariables == null) {
\r
268 return Objects.toString(inputVariables.get(key), "N/A");
\r
271 private boolean isProcessEnded(String processInstanceId) {
\r
272 ProcessEngineServices pes = getProcessEngineServices();
\r
273 return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null;
\r
277 protected abstract ProcessEngineServices getProcessEngineServices();
\r
279 public void setProcessEngineServices4junit(ProcessEngineServices pes) {
\r
280 pes4junit = Optional.ofNullable(pes);
\r
283 private static Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
\r
284 Map<String, Object> inputVariables = new HashMap<>();
\r
285 @SuppressWarnings("unchecked")
\r
286 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
\r
287 for (Map.Entry<String, Object> entry : vMap.entrySet()) {
\r
288 String vName = entry.getKey();
\r
289 Object value = entry.getValue();
\r
290 @SuppressWarnings("unchecked")
\r
291 Map<String, Object> valueMap = (Map<String,Object>)value; // value, type
\r
292 inputVariables.put(vName, valueMap.get("value"));
\r
294 return inputVariables;
\r