2  * ============LICENSE_START=======================================================
\r 
   4  * ================================================================================
\r 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
\r 
   6  * ================================================================================
\r 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
\r 
   8  * you may not use this file except in compliance with the License.
\r 
   9  * You may obtain a copy of the License at
\r 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
\r 
  13  * Unless required by applicable law or agreed to in writing, software
\r 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
\r 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r 
  16  * See the License for the specific language governing permissions and
\r 
  17  * limitations under the License.
\r 
  18  * ============LICENSE_END=========================================================
\r 
  20 package org.openecomp.mso.bpmn.common.workflow.service;
\r 
  22 import java.util.HashMap;
\r 
  23 import java.util.Map;
\r 
  24 import java.util.UUID;
\r 
  26 import javax.ws.rs.Consumes;
\r 
  27 import javax.ws.rs.POST;
\r 
  28 import javax.ws.rs.Path;
\r 
  29 import javax.ws.rs.PathParam;
\r 
  30 import javax.ws.rs.Produces;
\r 
  31 import javax.ws.rs.core.Response;
\r 
  33 import org.camunda.bpm.engine.ProcessEngineServices;
\r 
  34 import org.camunda.bpm.engine.ProcessEngines;
\r 
  35 import org.camunda.bpm.engine.RuntimeService;
\r 
  36 import org.camunda.bpm.engine.runtime.ProcessInstance;
\r 
  37 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
\r 
  38 import org.jboss.resteasy.annotations.Suspend;
\r 
  39 import org.jboss.resteasy.spi.AsynchronousResponse;
\r 
  40 import org.openecomp.mso.logger.MessageEnum;
\r 
  41 import org.openecomp.mso.logger.MsoLogger;
\r 
  42 import org.slf4j.MDC;
\r 
  47  * Asynchronous Workflow processing using JAX RS RESTeasy implementation
\r 
  48  * Both Synchronous and Asynchronous BPMN process can benefit from this implementation since the workflow gets executed in the background
\r 
  49  * and the server thread is freed up, server scales better to process more incoming requests
\r 
  51  * Usage: For synchronous process, when you are ready to send the response invoke the callback to write the response
\r 
  52  * For asynchronous process - the activity may send a acknowledgement response and then proceed further on executing the process
\r 
  55 public abstract class WorkflowAsyncResource {
 
  57         private WorkflowContextHolder contextHolder = WorkflowContextHolder.getInstance();
\r 
  58         protected ProcessEngineServices pes4junit = null;
\r 
  60         private MsoLogger msoLogger = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
\r 
  62         private static final String logMarker = "[WRKFLOW-RESOURCE]";
\r 
  63         private static final int DEFAULT_WAIT_TIME = 30000;     //default wait time
\r 
  66          * Asynchronous JAX-RS method that starts a process instance.
\r 
  67          * @param asyncResponse an object that will receive the asynchronous response
\r 
  68          * @param processKey the process key
\r 
  69          * @param variableMap input variables to the process
\r 
  72         @Path("/services/{processKey}")
\r 
  73         @Produces("application/json")
\r 
  74         @Consumes("application/json")
\r 
  75         public void startProcessInstanceByKey(final @Suspend(180000) AsynchronousResponse asyncResponse,
\r 
  76                         @PathParam("processKey") String processKey, VariableMapImpl variableMap) {
\r 
  78                 WorkflowResponse response = new WorkflowResponse();
\r 
  79                 long startTime = System.currentTimeMillis();
\r 
  80                 Map<String, Object> inputVariables = null;
\r 
  81                 WorkflowContext workflowContext = null;
\r 
  84                         inputVariables = getInputVariables(variableMap);        
\r 
  85                         setLogContext(processKey, inputVariables);
\r 
  87                         // This variable indicates that the flow was invoked asynchronously
\r 
  88                         inputVariables.put("isAsyncProcess", "true");
\r 
  90                         workflowContext = new WorkflowContext(processKey, getRequestId(inputVariables),
\r 
  91                                 asyncResponse, getWaitTime(inputVariables));
\r 
  93                         msoLogger.debug("Adding the workflow context into holder: "
\r 
  94                                         + workflowContext.getProcessKey() + ":"
\r 
  95                                         + workflowContext.getRequestId() + ":"
\r 
  96                                         + workflowContext.getTimeout());
\r 
  98                         contextHolder.put(workflowContext);
\r 
 100                         ProcessThread processThread = new ProcessThread(processKey, inputVariables);
\r 
 101                         processThread.start();
\r 
 102                 } catch (Exception e) {
\r 
 103                         setLogContext(processKey, inputVariables);
\r 
 105                         if (workflowContext != null) {
\r 
 106                                 contextHolder.remove(workflowContext);
\r 
 109                         msoLogger.debug(logMarker + "Exception in startProcessInstance by key");
\r 
 110                         response.setMessage("Fail" );
\r 
 111                         response.setResponse("Error occurred while executing the process: " + e);
\r 
 112                         response.setMessageCode(500);
\r 
 113                         recordEvents(processKey, response, startTime);
\r 
 115                         msoLogger.error (MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError, logMarker 
\r 
 116                                         + response.getMessage() + " for processKey: " 
\r 
 117                                         + processKey + " with response: " + response.getResponse());
\r 
 119                         Response errorResponse = Response.serverError().entity(response).build();
\r 
 120                         asyncResponse.setResponse(errorResponse);
\r 
 129         class ProcessThread extends Thread {
\r 
 130                 private final String processKey;
\r 
 131                 private final Map<String,Object> inputVariables;
\r 
 133                 public ProcessThread(String processKey, Map<String, Object> inputVariables) {
\r 
 134                         this.processKey = processKey;
\r 
 135                         this.inputVariables = inputVariables;
\r 
 138                 public void run() {
\r 
 140                         String processInstanceId = null;
\r 
 141                         long startTime = System.currentTimeMillis();
\r 
 144                                 setLogContext(processKey, inputVariables);
\r 
 146                                 // Note: this creates a random businessKey if it wasn't specified.
\r 
 147                                 String businessKey = getBusinessKey(inputVariables);
\r 
 149                                 msoLogger.debug(logMarker + "***Received MSO startProcessInstanceByKey with processKey: "
\r 
 150                                         + processKey + " and variables: " + inputVariables);
\r 
 152                                 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, logMarker
\r 
 153                                                 + "Call to MSO workflow/services in Camunda. Received MSO startProcessInstanceByKey with processKey:"
\r 
 154                                                 + processKey + " and variables: " + inputVariables);
\r 
 156                                 RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
\r 
 157                                 ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
\r 
 158                                         processKey, businessKey, inputVariables);
\r 
 159                                 processInstanceId = processInstance.getId();
\r 
 161                                 msoLogger.debug(logMarker + "Process " + processKey + ":" + processInstanceId + " " +
\r 
 162                                                 (processInstance.isEnded() ? "ENDED" : "RUNNING"));
\r 
 163                         } catch (Exception e) {
\r 
 165                                 msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.InternalError, 
\r 
 166                                                 logMarker + "Error in starting the process: "+ e.getMessage());
\r 
 168                                 WorkflowCallbackResponse callbackResponse = new WorkflowCallbackResponse();
\r 
 169                                 callbackResponse.setStatusCode(500);
\r 
 170                                 callbackResponse.setMessage("Fail");
\r 
 171                                 callbackResponse.setResponse("Error occurred while executing the process: " + e);
\r 
 173                                 // TODO: is the processInstanceId used by the API handler?  I don't think so.
\r 
 174                                 // It may be null here.
\r 
 175                                 WorkflowContextHolder.getInstance().processCallback(
\r 
 176                                         processKey, processInstanceId,
\r 
 177                                         getRequestId(inputVariables),
\r 
 185          * Callback resource which is invoked from BPMN to process to send the workflow response
\r 
 187          * @param processKey
\r 
 188          * @param processInstanceId
\r 
 190          * @param callbackResponse
\r 
 194         @Path("/services/callback/{processKey}/{processInstanceId}/{requestId}")
\r 
 195         @Produces("application/json")
\r 
 196         @Consumes("application/json")
\r 
 197         public Response processWorkflowCallback(
\r 
 198                         @PathParam("processKey") String processKey,
\r 
 199                         @PathParam("processInstanceId") String processInstanceId,
\r 
 200                         @PathParam("requestId")String requestId,
\r 
 201                         WorkflowCallbackResponse callbackResponse) {
\r 
 203                 msoLogger.debug(logMarker + "Process instance ID:" + processInstanceId + ":" + requestId + ":" + processKey + ":" + isProcessEnded(processInstanceId));
\r 
 204                 msoLogger.debug(logMarker + "About to process the callback request:" + callbackResponse.getResponse() + ":" + callbackResponse.getMessage() + ":" + callbackResponse.getStatusCode());
\r 
 205                 return contextHolder.processCallback(processKey, processInstanceId, requestId, callbackResponse);
\r 
 208         // Note: the business key is used to identify the process in unit tests
\r 
 209         private String getBusinessKey(Map<String, Object> inputVariables) {
\r 
 210                 Object businessKey = inputVariables.get("mso-business-key");
\r 
 211                 if (businessKey == null ) {
\r 
 212                         businessKey = UUID.randomUUID().toString();
\r 
 213                         inputVariables.put("mso-business-key",  businessKey);
\r 
 215                 return businessKey.toString();
\r 
 218         private String getRequestId(Map<String, Object> inputVariables) {
\r 
 219                 Object requestId = inputVariables.get("mso-request-id");
\r 
 220                 if (requestId == null ) {
\r 
 221                         requestId = UUID.randomUUID().toString();
\r 
 222                         inputVariables.put("mso-request-id",  requestId);
\r 
 224                 return requestId.toString();
\r 
 227         private long getWaitTime(Map<String, Object> inputVariables)
\r 
 229                 String timeout = inputVariables.get("mso-service-request-timeout") == null
\r 
 230                                 ? null : inputVariables.get("mso-service-request-timeout").toString();          
\r 
 232                 if (timeout != null) {
\r 
 234                                 return Long.parseLong(timeout)*1000;
\r 
 235                         } catch (NumberFormatException nex) {
\r 
 236                                 msoLogger.debug("Invalid input for mso-service-request-timeout");
\r 
 240                 return DEFAULT_WAIT_TIME;
\r 
 243         private void recordEvents(String processKey, WorkflowResponse response,
\r 
 246                 msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, 
\r 
 247                                 logMarker + response.getMessage() + " for processKey: "
\r 
 248                                 + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);
\r 
 250                 msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, 
\r 
 251                                 logMarker + response.getMessage() + "for processKey: " + processKey + " with response: " + response.getResponse());
\r 
 255         private void setLogContext(String processKey,
\r 
 256                         Map<String, Object> inputVariables) {
\r 
 257                 MsoLogger.setServiceName("MSO." + processKey);
\r 
 258                 if (inputVariables != null) {
\r 
 259                         MsoLogger.setLogContext(getKeyValueFromInputVariables(inputVariables,"mso-request-id"), getKeyValueFromInputVariables(inputVariables,"mso-service-instance-id"));
\r 
 263         private String getKeyValueFromInputVariables(Map<String,Object> inputVariables, String key) {
\r 
 264                 if (inputVariables == null) return "";
\r 
 265                 Object requestId = inputVariables.get(key);
\r 
 266                 if (requestId != null) return requestId.toString();
\r 
 270         private boolean isProcessEnded(String processInstanceId) {
\r 
 271                 ProcessEngineServices pes = getProcessEngineServices();
\r 
 272                 return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null ? true : false ;                
\r 
 276         protected abstract ProcessEngineServices getProcessEngineServices();
 
 278         public void setProcessEngineServices4junit(ProcessEngineServices pes) {
\r 
 282         private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
\r 
 283                 Map<String, Object> inputVariables = new HashMap<String,Object>();
\r 
 284                 @SuppressWarnings("unchecked")
\r 
 285                 Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
\r 
 286                 for (String vName : vMap.keySet()) {
\r 
 287                         @SuppressWarnings("unchecked")
\r 
 288                         Map<String, Object> valueMap = (Map<String,Object>)vMap.get(vName); // value, type
\r 
 289                         inputVariables.put(vName, valueMap.get("value"));
\r 
 291                 return inputVariables;
\r