Change the header to SO
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / openecomp / mso / bpmn / common / workflow / service / WorkflowAsyncResource.java
1 /*-\r
2  * ============LICENSE_START=======================================================\r
3  * ONAP - SO\r
4  * ================================================================================\r
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * you may not use this file except in compliance with the License.\r
9  * You may obtain a copy of the License at\r
10  * \r
11  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * \r
13  * Unless required by applicable law or agreed to in writing, software\r
14  * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * See the License for the specific language governing permissions and\r
17  * limitations under the License.\r
18  * ============LICENSE_END=========================================================\r
19  */\r
20 package org.openecomp.mso.bpmn.common.workflow.service;\r
21 \r
22 import java.util.HashMap;\r
23 import java.util.Map;\r
24 import java.util.UUID;\r
25 \r
26 import javax.ws.rs.Consumes;\r
27 import javax.ws.rs.POST;\r
28 import javax.ws.rs.Path;\r
29 import javax.ws.rs.PathParam;\r
30 import javax.ws.rs.Produces;\r
31 import javax.ws.rs.core.Response;\r
32 \r
33 import org.camunda.bpm.engine.ProcessEngineServices;\r
34 import org.camunda.bpm.engine.ProcessEngines;\r
35 import org.camunda.bpm.engine.RuntimeService;\r
36 import org.camunda.bpm.engine.runtime.ProcessInstance;\r
37 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;\r
38 import org.jboss.resteasy.annotations.Suspend;\r
39 import org.jboss.resteasy.spi.AsynchronousResponse;\r
40 import org.openecomp.mso.logger.MessageEnum;\r
41 import org.openecomp.mso.logger.MsoLogger;\r
42 import org.slf4j.MDC;\r
43 \r
44 /**\r
45  * \r
46  * @version 1.0\r
47  * Asynchronous Workflow processing using JAX RS RESTeasy implementation\r
48  * Both Synchronous and Asynchronous BPMN process can benefit from this implementation since the workflow gets executed in the background\r
49  * and the server thread is freed up, server scales better to process more incoming requests\r
50  * \r
51  * Usage: For synchronous process, when you are ready to send the response invoke the callback to write the response\r
52  * For asynchronous process - the activity may send a acknowledgement response and then proceed further on executing the process\r
53  */\r
54 @Path("/async")\r
55 public class WorkflowAsyncResource {\r
56 \r
57         private WorkflowContextHolder contextHolder = WorkflowContextHolder.getInstance();\r
58         protected ProcessEngineServices pes4junit = null;\r
59 \r
60         private MsoLogger msoLogger = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);\r
61 \r
62         private static final String logMarker = "[WRKFLOW-RESOURCE]";\r
63         private static final int DEFAULT_WAIT_TIME = 30000;     //default wait time\r
64         \r
65         /**\r
66          * Asynchronous JAX-RS method that starts a process instance.\r
67          * @param asyncResponse an object that will receive the asynchronous response\r
68          * @param processKey the process key\r
69          * @param variableMap input variables to the process\r
70          */\r
71         @POST\r
72         @Path("/services/{processKey}")\r
73         @Produces("application/json")\r
74         @Consumes("application/json")\r
75         public void startProcessInstanceByKey(final @Suspend(180000) AsynchronousResponse asyncResponse,\r
76                         @PathParam("processKey") String processKey, VariableMapImpl variableMap) {\r
77         \r
78                 WorkflowResponse response = new WorkflowResponse();\r
79                 long startTime = System.currentTimeMillis();\r
80                 Map<String, Object> inputVariables = null;\r
81                 WorkflowContext workflowContext = null;\r
82 \r
83                 try {\r
84                         inputVariables = getInputVariables(variableMap);        \r
85                         setLogContext(processKey, inputVariables);\r
86 \r
87                         // This variable indicates that the flow was invoked asynchronously\r
88                         inputVariables.put("isAsyncProcess", "true");\r
89 \r
90                         workflowContext = new WorkflowContext(processKey, getRequestId(inputVariables),\r
91                                 asyncResponse, getWaitTime(inputVariables));\r
92 \r
93                         msoLogger.debug("Adding the workflow context into holder: "\r
94                                         + workflowContext.getProcessKey() + ":"\r
95                                         + workflowContext.getRequestId() + ":"\r
96                                         + workflowContext.getTimeout());\r
97 \r
98                         contextHolder.put(workflowContext);\r
99 \r
100                         ProcessThread processThread = new ProcessThread(processKey, inputVariables);\r
101                         processThread.start();\r
102                 } catch (Exception e) {\r
103                         setLogContext(processKey, inputVariables);\r
104 \r
105                         if (workflowContext != null) {\r
106                                 contextHolder.remove(workflowContext);\r
107                         }\r
108 \r
109                         msoLogger.debug(logMarker + "Exception in startProcessInstance by key");\r
110                         response.setMessage("Fail" );\r
111                         response.setResponse("Error occurred while executing the process: " + e);\r
112                         response.setMessageCode(500);\r
113                         recordEvents(processKey, response, startTime);\r
114                         \r
115                         msoLogger.error (MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError, logMarker \r
116                                         + response.getMessage() + " for processKey: " \r
117                                         + processKey + " with response: " + response.getResponse());\r
118                         \r
119                         Response errorResponse = Response.serverError().entity(response).build();\r
120                         asyncResponse.setResponse(errorResponse);\r
121                 }\r
122         }\r
123         \r
124         /**\r
125          * \r
126          * @version 1.0\r
127          *\r
128          */\r
129         class ProcessThread extends Thread {\r
130                 private final String processKey;\r
131                 private final Map<String,Object> inputVariables;\r
132 \r
133                 public ProcessThread(String processKey, Map<String, Object> inputVariables) {\r
134                         this.processKey = processKey;\r
135                         this.inputVariables = inputVariables;\r
136                 }\r
137                 \r
138                 public void run() {\r
139 \r
140                         String processInstanceId = null;\r
141                         long startTime = System.currentTimeMillis();\r
142                         \r
143                         try {\r
144                                 setLogContext(processKey, inputVariables);\r
145 \r
146                                 // Note: this creates a random businessKey if it wasn't specified.\r
147                                 String businessKey = getBusinessKey(inputVariables);\r
148                                 \r
149                                 msoLogger.debug(logMarker + "***Received MSO startProcessInstanceByKey with processKey: "\r
150                                         + processKey + " and variables: " + inputVariables);\r
151 \r
152                                 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, logMarker\r
153                                                 + "Call to MSO workflow/services in Camunda. Received MSO startProcessInstanceByKey with processKey:"\r
154                                                 + processKey + " and variables: " + inputVariables);\r
155                                 \r
156                                 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();\r
157                                 ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(\r
158                                         processKey, businessKey, inputVariables);\r
159                                 processInstanceId = processInstance.getId();\r
160 \r
161                                 msoLogger.debug(logMarker + "Process " + processKey + ":" + processInstanceId + " " +\r
162                                                 (processInstance.isEnded() ? "ENDED" : "RUNNING"));\r
163                         } catch (Exception e) {\r
164 \r
165                                 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.InternalError, \r
166                                                 logMarker + "Error in starting the process: "+ e.getMessage());\r
167                                 \r
168                                 WorkflowCallbackResponse callbackResponse = new WorkflowCallbackResponse();\r
169                                 callbackResponse.setStatusCode(500);\r
170                                 callbackResponse.setMessage("Fail");\r
171                                 callbackResponse.setResponse("Error occurred while executing the process: " + e);\r
172 \r
173                                 // TODO: is the processInstanceId used by the API handler?  I don't think so.\r
174                                 // It may be null here.\r
175                                 WorkflowContextHolder.getInstance().processCallback(\r
176                                         processKey, processInstanceId,\r
177                                         getRequestId(inputVariables),\r
178                                         callbackResponse);\r
179                         }\r
180                 }\r
181         }\r
182         \r
183         \r
184         /**\r
185          * Callback resource which is invoked from BPMN to process to send the workflow response\r
186          * \r
187          * @param processKey\r
188          * @param processInstanceId\r
189          * @param requestId\r
190          * @param callbackResponse\r
191          * @return\r
192          */\r
193         @POST\r
194         @Path("/services/callback/{processKey}/{processInstanceId}/{requestId}")\r
195         @Produces("application/json")\r
196         @Consumes("application/json")\r
197         public Response processWorkflowCallback(\r
198                         @PathParam("processKey") String processKey,\r
199                         @PathParam("processInstanceId") String processInstanceId,\r
200                         @PathParam("requestId")String requestId,\r
201                         WorkflowCallbackResponse callbackResponse) {\r
202 \r
203                 msoLogger.debug(logMarker + "Process instance ID:" + processInstanceId + ":" + requestId + ":" + processKey + ":" + isProcessEnded(processInstanceId));\r
204                 msoLogger.debug(logMarker + "About to process the callback request:" + callbackResponse.getResponse() + ":" + callbackResponse.getMessage() + ":" + callbackResponse.getStatusCode());\r
205                 return contextHolder.processCallback(processKey, processInstanceId, requestId, callbackResponse);\r
206         }\r
207         \r
208         // Note: the business key is used to identify the process in unit tests\r
209         private String getBusinessKey(Map<String, Object> inputVariables) {\r
210                 Object businessKey = inputVariables.get("mso-business-key");\r
211                 if (businessKey == null ) {\r
212                         businessKey = UUID.randomUUID().toString();\r
213                         inputVariables.put("mso-business-key",  businessKey);\r
214                 }\r
215                 return businessKey.toString();\r
216         }\r
217 \r
218         private String getRequestId(Map<String, Object> inputVariables) {\r
219                 Object requestId = inputVariables.get("mso-request-id");\r
220                 if (requestId == null ) {\r
221                         requestId = UUID.randomUUID().toString();\r
222                         inputVariables.put("mso-request-id",  requestId);\r
223                 } \r
224                 return requestId.toString();\r
225         }\r
226 \r
227         private long getWaitTime(Map<String, Object> inputVariables)\r
228         {\r
229                 String timeout = inputVariables.get("mso-service-request-timeout") == null\r
230                                 ? null : inputVariables.get("mso-service-request-timeout").toString();          \r
231 \r
232                 if (timeout != null) {\r
233                         try {\r
234                                 return Long.parseLong(timeout)*1000;\r
235                         } catch (NumberFormatException nex) {\r
236                                 msoLogger.debug("Invalid input for mso-service-request-timeout");\r
237                         }\r
238                 }\r
239 \r
240                 return DEFAULT_WAIT_TIME;\r
241         }\r
242         \r
243         private void recordEvents(String processKey, WorkflowResponse response,\r
244                         long startTime) {\r
245                 \r
246                 msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, \r
247                                 logMarker + response.getMessage() + " for processKey: "\r
248                                 + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);\r
249                 \r
250                 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, \r
251                                 logMarker + response.getMessage() + "for processKey: " + processKey + " with response: " + response.getResponse());\r
252                 \r
253         }\r
254 \r
255         private void setLogContext(String processKey,\r
256                         Map<String, Object> inputVariables) {\r
257                 MsoLogger.setServiceName("MSO." + processKey);\r
258                 if (inputVariables != null) {\r
259                         MsoLogger.setLogContext(getKeyValueFromInputVariables(inputVariables,"mso-request-id"), getKeyValueFromInputVariables(inputVariables,"mso-service-instance-id"));\r
260                 }\r
261         }\r
262 \r
263         private String getKeyValueFromInputVariables(Map<String,Object> inputVariables, String key) {\r
264                 if (inputVariables == null) return "";\r
265                 Object requestId = inputVariables.get(key);\r
266                 if (requestId != null) return requestId.toString();\r
267                 return "N/A";\r
268         }\r
269 \r
270         private boolean isProcessEnded(String processInstanceId) {\r
271                 ProcessEngineServices pes = getProcessEngineServices();\r
272                 return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null ? true : false ;                \r
273         }\r
274         \r
275         \r
276         protected ProcessEngineServices getProcessEngineServices() {\r
277                 if (pes4junit == null) {\r
278                         return ProcessEngines.getDefaultProcessEngine();\r
279                 } else {\r
280                         return pes4junit;\r
281                 }\r
282         }\r
283         \r
284         public void setProcessEngineServices4junit(ProcessEngineServices pes) {\r
285                 pes4junit = pes;\r
286         }\r
287 \r
288         private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {\r
289                 Map<String, Object> inputVariables = new HashMap<String,Object>();\r
290                 @SuppressWarnings("unchecked")\r
291                 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");\r
292                 for (String vName : vMap.keySet()) {\r
293                         @SuppressWarnings("unchecked")\r
294                         Map<String, Object> valueMap = (Map<String,Object>)vMap.get(vName); // value, type\r
295                         inputVariables.put(vName, valueMap.get("value"));\r
296                 }\r
297                 return inputVariables;\r
298         }\r
299 }\r