5afbded982889cd114ce04f92171f49bc850011a
[so.git] /
1 package org.openecomp.mso.bpmn.common.workflow.service;\r
2 \r
3 import java.util.HashMap;\r
4 import java.util.Map;\r
5 \r
6 import javax.ws.rs.Consumes;\r
7 import javax.ws.rs.POST;\r
8 import javax.ws.rs.Path;\r
9 import javax.ws.rs.PathParam;\r
10 import javax.ws.rs.Produces;\r
11 import javax.ws.rs.core.MediaType;\r
12 import javax.ws.rs.core.Response;\r
13 \r
14 import org.camunda.bpm.BpmPlatform;\r
15 import org.camunda.bpm.engine.ProcessEngineServices;\r
16 import org.camunda.bpm.engine.RuntimeService;\r
17 \r
18 import org.openecomp.mso.logger.MessageEnum;\r
19 import org.openecomp.mso.logger.MsoLogger;\r
20 \r
21 /**\r
22  * Generalized REST interface that injects a message event into a waiting BPMN process.\r
23  * Examples:\r
24  * <pre>\r
25  *     /WorkflowMessage/SDNCAResponse/6d10d075-100c-42d0-9d84-a52432681cae-1478486185286\r
26  *     /WorkflowMessage/SDNCAEvent/USOSTCDALTX0101UJZZ01\r
27  * </pre>\r
28  */\r
29 @Path("/")\r
30 public class WorkflowMessageResource {\r
31         private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);\r
32         private static final String LOGMARKER = "[WORKFLOW-MESSAGE]";\r
33 \r
34         private ProcessEngineServices pes4junit = null;\r
35         \r
36         @POST\r
37         @Path("/WorkflowMessage/{messageType}/{correlator}")\r
38         @Consumes("*/*")\r
39         @Produces(MediaType.TEXT_PLAIN)\r
40         public Response deliver(@PathParam("messageType") String messageType,\r
41                         @PathParam("correlator") String correlator, String message) {\r
42 \r
43                 LOGGER.debug(LOGMARKER + " Received workflow message"\r
44                         + " type='" + messageType + "'"\r
45                         + " correlator='" + correlator + "'"\r
46                         + System.lineSeparator() + message);\r
47 \r
48                 MsoLogger.setServiceName("MSO." + "WorkflowMessage");\r
49 \r
50                 if (messageType == null || messageType.isEmpty()) {\r
51                         String msg = "Missing message type";\r
52                         LOGGER.debug(LOGMARKER + " " + msg);\r
53                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
54                                 MsoLogger.ErrorCode.DataError, LOGMARKER + ":" + msg);\r
55                         return Response.status(400).entity(msg).build();\r
56                 }\r
57 \r
58                 if (correlator == null || correlator.isEmpty()) {\r
59                         String msg = "Missing correlator";\r
60                         LOGGER.debug(LOGMARKER + " " + msg);\r
61                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),\r
62                                 MsoLogger.ErrorCode.DataError, LOGMARKER + ":" + msg);\r
63                         return Response.status(400).entity(msg).build();\r
64                 }\r
65 \r
66                 String correlatorVariable = messageType + "_CORRELATOR";\r
67                 String messageVariable = messageType + "_MESSAGE";\r
68 \r
69                 long startTime = System.currentTimeMillis();\r
70 \r
71                 try {\r
72                         RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();\r
73 \r
74                         if (!isReadyforCorrelation(runtimeService, correlatorVariable, correlator)) {\r
75                                 String msg = "No process is waiting to receive '" + messageType + "' WorkflowMessage with correlator '" + correlator + "'";\r
76                                 LOGGER.debug(LOGMARKER + " " + msg);\r
77                                 LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError, LOGMARKER + ":" + msg);\r
78                                 LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.BusinessProcesssError, msg, "BPMN", MsoLogger.getServiceName(), messageType);\r
79                                 LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.BusinessProcesssError, msg);\r
80                                 return Response.status(500).entity(msg).build();\r
81                         }\r
82 \r
83                         Map<String, Object> variables = new HashMap<String, Object>();\r
84                         variables.put(correlatorVariable, correlator);\r
85                         variables.put(messageVariable, message);\r
86 \r
87                         runtimeService.createMessageCorrelation("WorkflowMessage").setVariables(variables)\r
88                                 .processInstanceVariableEquals(correlatorVariable, correlator).correlate();\r
89 \r
90                         String msg = "Completed delivery of '" + messageType + "' WorkflowMessage with correlator '" + correlator + "'";\r
91                         LOGGER.debug(LOGMARKER + msg);\r
92                         LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, msg, "BPMN", MsoLogger.getServiceName(), messageType);\r
93                         LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, msg);\r
94                         return Response.status(204).build();\r
95                 } catch (Exception e) {\r
96                         // For example: MismatchingMessageCorrelationException\r
97                         String msg = "Caught " + e.getClass().getSimpleName() + " processing '" + messageType + "' WorkflowMessage with " + correlatorVariable + "='" + correlator + "'";\r
98                         LOGGER.debug(LOGMARKER + " " + msg);\r
99                         LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError, LOGMARKER + ":" + msg, e);\r
100                         LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.UnknownError, msg, "BPMN", MsoLogger.getServiceName(), messageType);\r
101                         LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.UnknownError, msg);\r
102                         return Response.status(500).entity(msg).build();\r
103                 }\r
104         }\r
105         \r
106         private boolean isReadyforCorrelation(RuntimeService runtimeService,\r
107                         String correlationVariable, String correlationValue)\r
108                         throws InterruptedException {\r
109 \r
110                 long waitingInstances = runtimeService.createExecutionQuery()\r
111                         .messageEventSubscriptionName("WorkflowMessage")\r
112                         .processVariableValueEquals(correlationVariable, correlationValue)\r
113                         .count();\r
114 \r
115                 int retries = 50;\r
116                 while (waitingInstances == 0 && retries > 0) {\r
117                         Thread.sleep(100);\r
118 \r
119                         waitingInstances = runtimeService.createExecutionQuery()\r
120                                 .messageEventSubscriptionName("WorkflowMessage")\r
121                                 .processVariableValueEquals(correlationVariable, correlationValue)\r
122                                 .count();\r
123 \r
124                         retries--;\r
125                 }\r
126 \r
127                 return waitingInstances != 0;\r
128         }\r
129         \r
130         private ProcessEngineServices getProcessEngineServices() {\r
131                 if (pes4junit == null) {\r
132                         return BpmPlatform.getDefaultProcessEngine();\r
133                 } else {\r
134                         return pes4junit;\r
135                 }\r
136         }\r
137 \r
138         public void setProcessEngineServices4junit(ProcessEngineServices pes) {\r
139                 pes4junit = pes;\r
140         }\r
141 }