15064e501d46b9fd1f0660be535aadf0a6b9c1a2
[so.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Modifications Copyright (c) 2019 Samsung
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.bpmn.common.workflow.service;
24
25 import java.util.HashMap;
26 import java.util.Map;
27 import java.util.Objects;
28 import java.util.UUID;
29 import javax.ws.rs.Consumes;
30 import javax.ws.rs.POST;
31 import javax.ws.rs.Path;
32 import javax.ws.rs.PathParam;
33 import javax.ws.rs.Produces;
34 import javax.ws.rs.core.Response;
35 import javax.ws.rs.ext.Provider;
36 import org.camunda.bpm.engine.ProcessEngineServices;
37 import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
38 import org.onap.logging.ref.slf4j.ONAPLogConstants;
39 import org.onap.so.bpmn.common.workflow.context.WorkflowContext;
40 import org.onap.so.bpmn.common.workflow.context.WorkflowContextHolder;
41 import org.onap.so.bpmn.common.workflow.context.WorkflowResponse;
42 import org.openecomp.mso.bpmn.common.workflow.service.WorkflowProcessorException;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.slf4j.MDC;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.stereotype.Component;
48 import org.springframework.core.env.Environment;
49 import io.swagger.v3.oas.annotations.OpenAPIDefinition;
50 import io.swagger.v3.oas.annotations.Operation;
51 import io.swagger.v3.oas.annotations.info.Info;
52
53
54 /**
55  * 
56  * @version 1.0 Asynchronous Workflow processing using JAX RS RESTeasy implementation Both Synchronous and Asynchronous
57  *          BPMN process can benefit from this implementation since the workflow gets executed in the background and the
58  *          server thread is freed up, server scales better to process more incoming requests
59  * 
60  *          Usage: For synchronous process, when you are ready to send the response invoke the callback to write the
61  *          response For asynchronous process - the activity may send a acknowledgement response and then proceed
62  *          further on executing the process
63  */
64 @Path("/async")
65 @OpenAPIDefinition(info = @Info(title = "/async", description = "Provides asynchronous starting of a bpmn process"))
66 @Provider
67 @Component
68 public class WorkflowAsyncResource extends ProcessEngineAwareService {
69
70     private static final WorkflowContextHolder contextHolder = WorkflowContextHolder.getInstance();
71
72     long workflowPollInterval = 1000;
73     private static final String ASYNC_WAIT_TIME = "mso.workflow.async.waitTime";
74
75     @Autowired
76     private WorkflowProcessor processor;
77
78     @Autowired
79     private WorkflowContextHolder workflowContext;
80
81     @Autowired
82     private Environment env;
83
84     public void setProcessor(WorkflowProcessor processor) {
85         this.processor = processor;
86     }
87
88     protected static final Logger logger = LoggerFactory.getLogger(WorkflowAsyncResource.class);
89     protected static final long DEFAULT_WAIT_TIME = 60000; // default wait time
90
91     /**
92      * Asynchronous JAX-RS method that starts a process instance.
93      * 
94      * @param processKey the process key
95      * @param variableMap input variables to the process
96      * @return
97      */
98
99     @POST
100     @Path("/services/{processKey}")
101     @Operation(description = "Starts a new process with the appropriate process Key. Aysnc fall outs are only logged")
102     @Produces("application/json")
103     @Consumes("application/json")
104     public Response startProcessInstanceByKey(@PathParam("processKey") String processKey, VariableMapImpl variableMap) {
105         Map<String, Object> inputVariables = getInputVariables(variableMap);
106         try {
107             MDC.put(ONAPLogConstants.MDCs.REQUEST_ID, getRequestId(inputVariables));
108             processor.startProcess(processKey, variableMap);
109             WorkflowResponse response = waitForResponse(inputVariables);
110             if (response.getMessageCode() == 500) {
111                 return Response.status(500).entity(response).build();
112             } else {
113                 return Response.status(202).entity(response).build();
114             }
115         } catch (WorkflowProcessorException e) {
116             WorkflowResponse response = e.getWorkflowResponse();
117             return Response.status(500).entity(response).build();
118         } catch (Exception e) {
119             WorkflowResponse response = buildUnkownError(getRequestId(inputVariables), e.getMessage());
120             return Response.status(500).entity(response).build();
121         }
122     }
123
124     protected WorkflowResponse waitForResponse(Map<String, Object> inputVariables) throws Exception {
125         String requestId = getRequestId(inputVariables);
126         long currentWaitTime = 0;
127         long waitTime = getWaitTime();
128         logger.debug("WorkflowAsyncResource.waitForResponse using timeout: " + waitTime);
129         while (waitTime > currentWaitTime) {
130             Thread.sleep(workflowPollInterval);
131             currentWaitTime = currentWaitTime + workflowPollInterval;
132             WorkflowContext foundContext = contextHolder.getWorkflowContext(requestId);
133             if (foundContext != null) {
134                 contextHolder.remove(foundContext);
135                 return buildResponse(foundContext);
136             }
137         }
138         throw new Exception("TimeOutOccured in WorkflowAsyncResource.waitForResponse for time " + waitTime + "ms");
139     }
140
141     private WorkflowResponse buildUnkownError(String requestId, String error) {
142         WorkflowResponse response = new WorkflowResponse();
143         response.setMessage(error);
144         response.setResponse("UnknownError, request id:" + requestId);
145         response.setMessageCode(500);
146         return response;
147     }
148
149     private WorkflowResponse buildResponse(WorkflowContext foundContext) {
150         return foundContext.getWorkflowResponse();
151     }
152
153     protected static String getOrCreate(Map<String, Object> inputVariables, String key) {
154         String value = Objects.toString(inputVariables.get(key), null);
155         if (value == null) {
156             value = UUID.randomUUID().toString();
157             inputVariables.put(key, value);
158         }
159         return value;
160     }
161
162     protected static String getRequestId(Map<String, Object> inputVariables) {
163         return getOrCreate(inputVariables, "mso-request-id");
164     }
165
166     protected boolean isProcessEnded(String processInstanceId) {
167         ProcessEngineServices pes = getProcessEngineServices();
168         return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId)
169                 .singleResult() == null;
170     }
171
172     protected static Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
173         Map<String, Object> inputVariables = new HashMap<>();
174         @SuppressWarnings("unchecked")
175         Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
176         for (Map.Entry<String, Object> entry : vMap.entrySet()) {
177             String vName = entry.getKey();
178             Object value = entry.getValue();
179             @SuppressWarnings("unchecked")
180             Map<String, Object> valueMap = (Map<String, Object>) value; // value, type
181             inputVariables.put(vName, valueMap.get("value"));
182         }
183         return inputVariables;
184     }
185
186     /**
187      * Returns the wait time, this is used by the resource on how long it should wait to send a response If none
188      * specified DEFAULT_WAIT_TIME is used
189      *
190      * @param inputVariables
191      * @return
192      */
193     private long getWaitTime() {
194         return env.getProperty(ASYNC_WAIT_TIME, Long.class, new Long(DEFAULT_WAIT_TIME));
195     }
196
197 }