e5d8a921a56d1b7f54f74f5a1dbeb8c35e76b916
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / onap / so / client / cds / AbstractCDSProcessingBBUtils.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2019 TechMahindra
6  * ================================================================================
7  * Modifications Copyright (c) 2019 Samsung
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.client.cds;
24
25 import com.google.protobuf.InvalidProtocolBufferException;
26 import com.google.protobuf.Struct;
27 import com.google.protobuf.Struct.Builder;
28 import com.google.protobuf.util.JsonFormat;
29 import io.grpc.Status;
30 import org.camunda.bpm.engine.delegate.DelegateExecution;
31 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
32 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
33 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
34 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
35 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
36 import org.onap.so.bpmn.common.BuildingBlockExecution;
37 import org.onap.so.client.PreconditionFailedException;
38 import org.onap.so.client.RestPropertiesLoader;
39 import org.onap.so.client.cds.beans.AbstractCDSPropertiesBean;
40 import org.onap.so.client.exception.BadResponseException;
41 import org.onap.so.client.exception.ExceptionBuilder;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.springframework.beans.factory.annotation.Autowired;
45 import org.springframework.stereotype.Component;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48
49 /**
50  * Util class to support Call to CDS client
51  */
52 @Component
53 public class AbstractCDSProcessingBBUtils {
54
55     private static final Logger logger = LoggerFactory.getLogger(AbstractCDSProcessingBBUtils.class);
56
57     private static final String SUCCESS = "Success";
58     private static final String FAILED = "Failed";
59     private static final String PROCESSING = "Processing";
60     private static final String RESPONSE_PAYLOAD = "CDSResponsePayload";
61     private static final String CDS_STATUS = "ControllerStatus";
62     private static final String EXEC_INPUT = "executionServiceInput";
63     private static final String EXECUTION_OBJECT = "executionObject";
64     private static final String EXCEPTION = "Exception";
65
66     @Autowired
67     protected ExceptionBuilder exceptionUtil;
68
69     /**
70      * Extracting data from execution object and building the ExecutionServiceInput Object
71      *
72      * @param execution DelegateExecution object
73      */
74     public void constructExecutionServiceInputObject(DelegateExecution execution) {
75         logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest for DelegateExecution object.");
76
77         try {
78             AbstractCDSPropertiesBean executionObject =
79                     (AbstractCDSPropertiesBean) execution.getVariable(EXECUTION_OBJECT);
80
81             ExecutionServiceInput executionServiceInput = prepareExecutionServiceInput(executionObject);
82
83             execution.setVariable(EXEC_INPUT, executionServiceInput);
84
85         } catch (Exception ex) {
86             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
87         }
88     }
89
90     /**
91      * Extracting data from execution object and building the ExecutionServiceInput Object
92      *
93      * @param execution BuildingBlockExecution object
94      */
95     public void constructExecutionServiceInputObjectBB(BuildingBlockExecution execution) {
96         logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest for BuildingBlockExecution object.");
97
98         try {
99             AbstractCDSPropertiesBean executionObject = execution.getVariable(EXECUTION_OBJECT);
100
101             ExecutionServiceInput executionServiceInput = prepareExecutionServiceInput(executionObject);
102
103             execution.setVariable(EXEC_INPUT, executionServiceInput);
104             logger.debug("Input payload: " + executionServiceInput.getPayload());
105
106         } catch (Exception ex) {
107             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
108         }
109     }
110
111     /**
112      * get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period
113      *
114      * @param execution DelegateExecution object
115      */
116     public void sendRequestToCDSClient(DelegateExecution execution) {
117
118         logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for DelegateExecution object.");
119         try {
120             ExecutionServiceInput executionServiceInput = (ExecutionServiceInput) execution.getVariable(EXEC_INPUT);
121             CDSResponse cdsResponse = getCdsResponse(executionServiceInput);
122             execution.setVariable(CDS_STATUS, cdsResponse.status);
123
124             if (cdsResponse.payload != null) {
125                 String payload = JsonFormat.printer().print(cdsResponse.payload);
126                 execution.setVariable(RESPONSE_PAYLOAD, payload);
127             }
128
129         } catch (Exception ex) {
130             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
131         }
132     }
133
134     /**
135      * get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period
136      *
137      * @param execution BuildingBlockExecution object
138      */
139     public void sendRequestToCDSClientBB(BuildingBlockExecution execution) {
140
141         logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for BuildingBlockExecution object.");
142         try {
143             ExecutionServiceInput executionServiceInput = execution.getVariable(EXEC_INPUT);
144             CDSResponse cdsResponse = getCdsResponse(executionServiceInput);
145             execution.setVariable(CDS_STATUS, cdsResponse.status);
146
147             if (cdsResponse.payload != null) {
148                 String payload = JsonFormat.printer().print(cdsResponse.payload);
149                 execution.setVariable(RESPONSE_PAYLOAD, payload);
150             }
151
152         } catch (Exception ex) {
153             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
154         }
155     }
156
157     private CDSResponse getCdsResponse(ExecutionServiceInput executionServiceInput) throws BadResponseException {
158         CDSProperties props = RestPropertiesLoader.getInstance().getNewImpl(CDSProperties.class);
159         if (props == null) {
160             throw new PreconditionFailedException(
161                     "No RestProperty.CDSProperties implementation found on classpath, can't create client.");
162         }
163
164         CDSResponse cdsResponse = new CDSResponse();
165
166         try (CDSProcessingClient cdsClient = new CDSProcessingClient(new ResponseHandler(cdsResponse))) {
167             CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
168             countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS);
169         } catch (InterruptedException ex) {
170             logger.error("Caught exception in sendRequestToCDSClient in AbstractCDSProcessingBBUtils : ", ex);
171             Thread.currentThread().interrupt();
172         }
173
174         String cdsResponseStatus = cdsResponse.status;
175
176         /**
177          * throw CDS failed exception.
178          */
179         if (!cdsResponseStatus.equals(SUCCESS)) {
180             throw new BadResponseException("CDS call failed with status: " + cdsResponse.status + " and errorMessage: "
181                     + cdsResponse.errorMessage);
182         }
183         return cdsResponse;
184     }
185
186     private ExecutionServiceInput prepareExecutionServiceInput(AbstractCDSPropertiesBean executionObject) {
187         String payload = executionObject.getRequestObject();
188
189         CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(executionObject.getOriginatorId())
190                 .setRequestId(executionObject.getRequestId()).setSubRequestId(executionObject.getSubRequestId())
191                 .build();
192         ActionIdentifiers actionIdentifiers =
193                 ActionIdentifiers.newBuilder().setBlueprintName(executionObject.getBlueprintName())
194                         .setBlueprintVersion(executionObject.getBlueprintVersion())
195                         .setActionName(executionObject.getActionName()).setMode(executionObject.getMode()).build();
196
197         Builder struct = Struct.newBuilder();
198         try {
199             JsonFormat.parser().merge(payload, struct);
200         } catch (InvalidProtocolBufferException e) {
201             logger.error("Failed to parse received message. blueprint({}:{}) for action({}). {}",
202                     executionObject.getBlueprintVersion(), executionObject.getBlueprintName(),
203                     executionObject.getActionName(), e);
204         }
205
206         return ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader).setActionIdentifiers(actionIdentifiers)
207                 .setPayload(struct.build()).build();
208     }
209
210     private class ResponseHandler implements CDSProcessingListener {
211
212         private CDSResponse cdsResponse;
213
214         ResponseHandler(CDSResponse cdsResponse) {
215             this.cdsResponse = cdsResponse;
216         }
217
218         /**
219          * Get Response from CDS Client
220          */
221         @Override
222         public void onMessage(ExecutionServiceOutput message) {
223             logger.info("Received notification from CDS: {}", message);
224             EventType eventType = message.getStatus().getEventType();
225
226             switch (eventType) {
227                 case EVENT_COMPONENT_PROCESSING:
228                     cdsResponse.status = PROCESSING;
229                     break;
230                 case EVENT_COMPONENT_EXECUTED:
231                     cdsResponse.status = SUCCESS;
232                     break;
233                 default:
234                     cdsResponse.status = FAILED;
235                     cdsResponse.errorMessage = message.getStatus().getErrorMessage();
236                     break;
237             }
238             cdsResponse.payload = message.getPayload();
239         }
240
241         /**
242          * On error at CDS, log the error
243          */
244         @Override
245         public void onError(Throwable t) {
246             Status status = Status.fromThrowable(t);
247             logger.error("Failed processing blueprint {}", status, t);
248             cdsResponse.status = EXCEPTION;
249         }
250     }
251
252     private class CDSResponse {
253
254         String status;
255         String errorMessage;
256         Struct payload;
257
258         @Override
259         public String toString() {
260             return "CDSResponse{" + "status='" + status + '\'' + ", errorMessage='" + errorMessage + '\'' + ", payload="
261                     + payload + '}';
262         }
263     }
264 }