Clean up Process Engine selection logic
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / openecomp / mso / bpmn / common / workflow / service / WorkflowAsyncResource.java
1 /*-\r
2  * ============LICENSE_START=======================================================\r
3  * ONAP - SO\r
4  * ================================================================================\r
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * you may not use this file except in compliance with the License.\r
9  * You may obtain a copy of the License at\r
10  * \r
11  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * \r
13  * Unless required by applicable law or agreed to in writing, software\r
14  * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * See the License for the specific language governing permissions and\r
17  * limitations under the License.\r
18  * ============LICENSE_END=========================================================\r
19  */\r
20 package org.openecomp.mso.bpmn.common.workflow.service;\r
21 \r
22 import java.util.HashMap;\r
23 import java.util.Map;\r
24 import java.util.Objects;\r
25 import java.util.Optional;\r
26 import java.util.UUID;\r
27 \r
28 import javax.ws.rs.Consumes;\r
29 import javax.ws.rs.POST;\r
30 import javax.ws.rs.Path;\r
31 import javax.ws.rs.PathParam;\r
32 import javax.ws.rs.Produces;\r
33 import javax.ws.rs.core.Response;\r
34 \r
35 import org.camunda.bpm.engine.ProcessEngineServices;\r
36 import org.camunda.bpm.engine.RuntimeService;\r
37 import org.camunda.bpm.engine.runtime.ProcessInstance;\r
38 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;\r
39 import org.jboss.resteasy.annotations.Suspend;\r
40 import org.jboss.resteasy.spi.AsynchronousResponse;\r
41 import org.openecomp.mso.logger.MessageEnum;\r
42 import org.openecomp.mso.logger.MsoLogger;\r
43 import org.slf4j.MDC;\r
44 \r
45 /**\r
46  * \r
47  * @version 1.0\r
48  * Asynchronous Workflow processing using JAX RS RESTeasy implementation\r
49  * Both Synchronous and Asynchronous BPMN process can benefit from this implementation since the workflow gets executed in the background\r
50  * and the server thread is freed up, server scales better to process more incoming requests\r
51  * \r
52  * Usage: For synchronous process, when you are ready to send the response invoke the callback to write the response\r
53  * For asynchronous process - the activity may send a acknowledgement response and then proceed further on executing the process\r
54  */\r
55 @Path("/async")\r
56 public class WorkflowAsyncResource extends ProcessEngineAwareService {\r
57 \r
58         private static final WorkflowContextHolder contextHolder = WorkflowContextHolder.getInstance();\r
59         protected Optional<ProcessEngineServices> pes4junit = Optional.empty();\r
60 \r
61         private final MsoLogger msoLogger = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);\r
62 \r
63         private static final String logMarker = "[WRKFLOW-RESOURCE]";\r
64         private static final long DEFAULT_WAIT_TIME = 30000;    //default wait time\r
65         \r
66         /**\r
67          * Asynchronous JAX-RS method that starts a process instance.\r
68          * @param asyncResponse an object that will receive the asynchronous response\r
69          * @param processKey the process key\r
70          * @param variableMap input variables to the process\r
71          */\r
72         @POST\r
73         @Path("/services/{processKey}")\r
74         @Produces("application/json")\r
75         @Consumes("application/json")\r
76         public void startProcessInstanceByKey(final @Suspend(180000) AsynchronousResponse asyncResponse,\r
77                         @PathParam("processKey") String processKey, VariableMapImpl variableMap) {\r
78         \r
79                 long startTime = System.currentTimeMillis();\r
80                 Map<String, Object> inputVariables = null;\r
81                 WorkflowContext workflowContext = null;\r
82 \r
83                 try {\r
84                         inputVariables = getInputVariables(variableMap);        \r
85                         setLogContext(processKey, inputVariables);\r
86 \r
87                         // This variable indicates that the flow was invoked asynchronously\r
88                         inputVariables.put("isAsyncProcess", "true");\r
89 \r
90                         workflowContext = new WorkflowContext(processKey, getRequestId(inputVariables),\r
91                                 asyncResponse, getWaitTime(inputVariables));\r
92 \r
93                         msoLogger.debug("Adding the workflow context into holder: "\r
94                                         + workflowContext.getProcessKey() + ":"\r
95                                         + workflowContext.getRequestId() + ":"\r
96                                         + workflowContext.getTimeout());\r
97 \r
98                         contextHolder.put(workflowContext);\r
99 \r
100                         ProcessThread processThread = new ProcessThread(processKey, inputVariables);\r
101                         processThread.start();\r
102                 } catch (Exception e) {\r
103                         setLogContext(processKey, inputVariables);\r
104 \r
105                         if (workflowContext != null) {\r
106                                 contextHolder.remove(workflowContext);\r
107                         }\r
108 \r
109                         msoLogger.debug(logMarker + "Exception in startProcessInstance by key");\r
110                 WorkflowResponse response = new WorkflowResponse();\r
111                         response.setMessage("Fail" );\r
112                         response.setResponse("Error occurred while executing the process: " + e);\r
113                         response.setMessageCode(500);\r
114                         recordEvents(processKey, response, startTime);\r
115                         \r
116                         msoLogger.error (MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError, logMarker \r
117                                         + response.getMessage() + " for processKey: " \r
118                                         + processKey + " with response: " + response.getResponse());\r
119                         \r
120                         Response errorResponse = Response.serverError().entity(response).build();\r
121                         asyncResponse.setResponse(errorResponse);\r
122                 }\r
123         }\r
124         \r
125         /**\r
126          * \r
127          * @version 1.0\r
128          *\r
129          */\r
130         class ProcessThread extends Thread {\r
131                 private final String processKey;\r
132                 private final Map<String,Object> inputVariables;\r
133 \r
134                 public ProcessThread(String processKey, Map<String, Object> inputVariables) {\r
135                         this.processKey = processKey;\r
136                         this.inputVariables = inputVariables;\r
137                 }\r
138                 \r
139                 public void run() {\r
140 \r
141                         String processInstanceId = null;\r
142                         long startTime = System.currentTimeMillis();\r
143                         \r
144                         try {\r
145                                 setLogContext(processKey, inputVariables);\r
146 \r
147                                 // Note: this creates a random businessKey if it wasn't specified.\r
148                                 String businessKey = getBusinessKey(inputVariables);\r
149                                 \r
150                                 msoLogger.debug(logMarker + "***Received MSO startProcessInstanceByKey with processKey: "\r
151                                         + processKey + " and variables: " + inputVariables);\r
152 \r
153                                 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, logMarker\r
154                                                 + "Call to MSO workflow/services in Camunda. Received MSO startProcessInstanceByKey with processKey:"\r
155                                                 + processKey + " and variables: " + inputVariables);\r
156                                 \r
157                                 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();\r
158                                 ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(\r
159                                         processKey, businessKey, inputVariables);\r
160                                 processInstanceId = processInstance.getId();\r
161 \r
162                                 msoLogger.debug(logMarker + "Process " + processKey + ":" + processInstanceId + " " +\r
163                                                 (processInstance.isEnded() ? "ENDED" : "RUNNING"));\r
164                         } catch (Exception e) {\r
165 \r
166                                 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.InternalError, \r
167                                                 logMarker + "Error in starting the process: "+ e.getMessage());\r
168                                 \r
169                                 WorkflowCallbackResponse callbackResponse = new WorkflowCallbackResponse();\r
170                                 callbackResponse.setStatusCode(500);\r
171                                 callbackResponse.setMessage("Fail");\r
172                                 callbackResponse.setResponse("Error occurred while executing the process: " + e);\r
173 \r
174                                 // TODO: is the processInstanceId used by the API handler?  I don't think so.\r
175                                 // It may be null here.\r
176                                 WorkflowContextHolder.getInstance().processCallback(\r
177                                         processKey, processInstanceId,\r
178                                         getRequestId(inputVariables),\r
179                                         callbackResponse);\r
180                         }\r
181                 }\r
182         }\r
183         \r
184         \r
185         /**\r
186          * Callback resource which is invoked from BPMN to process to send the workflow response\r
187          * \r
188          * @param processKey\r
189          * @param processInstanceId\r
190          * @param requestId\r
191          * @param callbackResponse\r
192          * @return\r
193          */\r
194         @POST\r
195         @Path("/services/callback/{processKey}/{processInstanceId}/{requestId}")\r
196         @Produces("application/json")\r
197         @Consumes("application/json")\r
198         public Response processWorkflowCallback(\r
199                         @PathParam("processKey") String processKey,\r
200                         @PathParam("processInstanceId") String processInstanceId,\r
201                         @PathParam("requestId")String requestId,\r
202                         WorkflowCallbackResponse callbackResponse) {\r
203 \r
204                 msoLogger.debug(logMarker + "Process instance ID:" + processInstanceId + ":" + requestId + ":" + processKey + ":" + isProcessEnded(processInstanceId));\r
205                 msoLogger.debug(logMarker + "About to process the callback request:" + callbackResponse.getResponse() + ":" + callbackResponse.getMessage() + ":" + callbackResponse.getStatusCode());\r
206                 return contextHolder.processCallback(processKey, processInstanceId, requestId, callbackResponse);\r
207         }\r
208         \r
209     private static String getOrCreate(Map<String, Object> inputVariables, String key) {\r
210         String value = Objects.toString(inputVariables.get(key), null);\r
211         if (value == null) {\r
212             value = UUID.randomUUID().toString();\r
213             inputVariables.put(key, value);\r
214         }\r
215         return value;\r
216     }\r
217         \r
218         // Note: the business key is used to identify the process in unit tests\r
219         private static String getBusinessKey(Map<String, Object> inputVariables) {\r
220         return getOrCreate(inputVariables, "mso-business-key");\r
221         }\r
222 \r
223         private static String getRequestId(Map<String, Object> inputVariables) {\r
224         return getOrCreate(inputVariables, "mso-request-id");\r
225         }\r
226 \r
227         private long getWaitTime(Map<String, Object> inputVariables)\r
228         {\r
229             \r
230                 String timeout = Objects.toString(inputVariables.get("mso-service-request-timeout"), null);\r
231 \r
232                 if (timeout != null) {\r
233                         try {\r
234                                 return Long.parseLong(timeout)*1000;\r
235                         } catch (NumberFormatException nex) {\r
236                                 msoLogger.debug("Invalid input for mso-service-request-timeout");\r
237                         }\r
238                 }\r
239 \r
240                 return DEFAULT_WAIT_TIME;\r
241         }\r
242         \r
243         private void recordEvents(String processKey, WorkflowResponse response,\r
244                         long startTime) {\r
245                 \r
246                 msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, \r
247                                 logMarker + response.getMessage() + " for processKey: "\r
248                                 + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);\r
249                 \r
250                 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, \r
251                                 logMarker + response.getMessage() + "for processKey: " + processKey + " with response: " + response.getResponse());\r
252                 \r
253         }\r
254 \r
255         private static void setLogContext(String processKey,\r
256                         Map<String, Object> inputVariables) {\r
257                 MsoLogger.setServiceName("MSO." + processKey);\r
258                 if (inputVariables != null) {\r
259                         MsoLogger.setLogContext(getKeyValueFromInputVariables(inputVariables,"mso-request-id"), getKeyValueFromInputVariables(inputVariables,"mso-service-instance-id"));\r
260                 }\r
261         }\r
262 \r
263         private static String getKeyValueFromInputVariables(Map<String,Object> inputVariables, String key) {\r
264                 if (inputVariables == null) {\r
265                         return "";\r
266                 }\r
267 \r
268                 return Objects.toString(inputVariables.get(key), "N/A");\r
269         }\r
270 \r
271         private boolean isProcessEnded(String processInstanceId) {\r
272                 ProcessEngineServices pes = getProcessEngineServices();\r
273                 return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null;\r
274         }\r
275         \r
276         \r
277         private static Map<String, Object> getInputVariables(VariableMapImpl variableMap) {\r
278                 Map<String, Object> inputVariables = new HashMap<>();\r
279                 @SuppressWarnings("unchecked")\r
280                 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");\r
281                 for (Map.Entry<String, Object> entry : vMap.entrySet()) {\r
282                         String vName = entry.getKey();\r
283                         Object value = entry.getValue();\r
284                         @SuppressWarnings("unchecked")\r
285                         Map<String, Object> valueMap = (Map<String,Object>)value; // value, type\r
286                         inputVariables.put(vName, valueMap.get("value"));\r
287                 }\r
288                 return inputVariables;\r
289         }\r
290 }\r