Merge "Notification handling for instantiate"
[so.git] / bpmn / MSOCommonBPMN / src / main / java / org / onap / so / client / cds / AbstractCDSProcessingBBUtils.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP - SO
4  * ================================================================================
5  * Copyright (C) 2019 TechMahindra
6  * ================================================================================
7  * Modifications Copyright (c) 2019 Samsung
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.so.client.cds;
24
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.camunda.bpm.engine.delegate.DelegateExecution;
30 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
31 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
32 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
33 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
34 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
35 import org.onap.so.client.PreconditionFailedException;
36 import org.onap.so.client.RestPropertiesLoader;
37 import org.onap.so.client.cds.beans.AbstractCDSPropertiesBean;
38 import org.onap.so.client.exception.ExceptionBuilder;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.stereotype.Component;
43
44 import com.google.protobuf.InvalidProtocolBufferException;
45 import com.google.protobuf.Struct;
46 import com.google.protobuf.Struct.Builder;
47 import com.google.protobuf.util.JsonFormat;
48
49 import io.grpc.Status;
50
51 /**
52  * Util class to support Call to CDS client
53  *
54  */
55 @Component
56 public class AbstractCDSProcessingBBUtils implements CDSProcessingListener {
57
58     private static final Logger logger = LoggerFactory.getLogger(AbstractCDSProcessingBBUtils.class);
59
60     private static final String SUCCESS = "Success";
61     private static final String FAILED = "Failed";
62     private static final String PROCESSING = "Processing";
63
64     private final AtomicReference<String> cdsResponse = new AtomicReference<>();
65
66     @Autowired
67     private ExceptionBuilder exceptionUtil;
68
69     /**
70      * Extracting data from execution object and building the ExecutionServiceInput
71      * Object
72      * 
73      * @param execution
74      *            DelegateExecution object
75      */
76     public void constructExecutionServiceInputObject(DelegateExecution execution) {
77         logger.trace("Start AbstractCDSProcessingBBUtils.preProcessRequest ");
78
79         try {
80             AbstractCDSPropertiesBean executionObject = (AbstractCDSPropertiesBean) execution
81                     .getVariable("executionObject");
82
83             String payload = executionObject.getRequestObject();
84
85             CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(executionObject.getOriginatorId())
86                     .setRequestId(executionObject.getRequestId()).setSubRequestId(executionObject.getSubRequestId())
87                     .build();
88             ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
89                     .setBlueprintName(executionObject.getBlueprintName())
90                     .setBlueprintVersion(executionObject.getBlueprintVersion())
91                     .setActionName(executionObject.getActionName()).setMode(executionObject.getMode()).build();
92
93             Builder struct = Struct.newBuilder();
94             try {
95                 JsonFormat.parser().merge(payload, struct);
96             } catch (InvalidProtocolBufferException e) {
97                 logger.error("Failed to parse received message. blueprint({}:{}) for action({}). {}",
98                         executionObject.getBlueprintVersion(), executionObject.getBlueprintName(),
99                         executionObject.getActionName(), e);
100             }
101
102             ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
103                     .setCommonHeader(commonHeader).setActionIdentifiers(actionIdentifiers).setPayload(struct.build())
104                     .build();
105
106             execution.setVariable("executionServiceInput", executionServiceInput);
107
108         } catch (Exception ex) {
109             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
110         }
111     }
112
113     /**
114      * get the executionServiceInput object from execution and send a request to CDS
115      * Client and wait for TIMEOUT period
116      * 
117      * @param execution
118      *            DelegateExecution object
119      */
120     public void sendRequestToCDSClient(DelegateExecution execution) {
121
122         logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient ");
123         try {
124             CDSProperties props = RestPropertiesLoader.getInstance().getNewImpl(CDSProperties.class);
125             if (props == null) {
126                 throw new PreconditionFailedException(
127                         "No RestProperty.CDSProperties implementation found on classpath, can't create client.");
128             }
129
130             ExecutionServiceInput executionServiceInput = (ExecutionServiceInput) execution
131                     .getVariable("executionServiceInput");
132
133             //CDSProcessingListener cdsProcessingListener = new AbstractCDSProcessingBBUtils();
134
135             CDSProcessingClient cdsClient = null;
136             CountDownLatch countDownLatch;
137             try {
138                 cdsClient = new CDSProcessingClient(this);
139                 countDownLatch = cdsClient.sendRequest(executionServiceInput);
140                 countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS);
141             } catch (InterruptedException ex) {
142                 logger.error("Caught exception in sendRequestToCDSClient in AbstractCDSProcessingBBUtils : ", ex);
143                 Thread.currentThread().interrupt();
144             } finally {
145                 cdsClient.close();
146             }
147
148             if (cdsResponse != null) {
149                 execution.setVariable("CDSStatus", cdsResponse.get());
150             }
151
152         } catch (Exception ex) {
153             exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
154         }
155     }
156
157     /**
158      * Get Response from CDS Client
159      * 
160      */
161     @Override
162     public void onMessage(ExecutionServiceOutput message) {
163         logger.info("Received notification from CDS: {}", message);
164         EventType eventType = message.getStatus().getEventType();
165
166         switch (eventType) {
167
168         case EVENT_COMPONENT_FAILURE:
169             // failed processing with failure
170             cdsResponse.set(FAILED);
171             break;
172         case EVENT_COMPONENT_PROCESSING:
173             // still processing
174             cdsResponse.set(PROCESSING);
175             break;
176         case EVENT_COMPONENT_EXECUTED:
177             // done with async processing
178             cdsResponse.set(SUCCESS);
179             break;
180         default:
181             cdsResponse.set(FAILED);
182             break;
183         }
184
185     }
186
187     /**
188      * On error at CDS, log the error
189      */
190     @Override
191     public void onError(Throwable t) {
192         Status status = Status.fromThrowable(t);
193         logger.error("Failed processing blueprint {}", status, t);
194     }
195
196 }