[KAFKA] Allow kafka params to be passed as config
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / distribution / DistributionBusinessLogic.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.distribution;
21
22 import static org.apache.commons.lang.BooleanUtils.isTrue;
23 import static org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask.buildTopicName;
24 import static org.openecomp.sdc.be.config.ConfigurationManager.getConfigurationManager;
25
26 import com.google.gson.Gson;
27 import com.google.gson.GsonBuilder;
28 import fj.data.Either;
29 import java.util.List;
30 import javax.annotation.Resource;
31 import javax.ws.rs.core.Response;
32 import org.apache.commons.collections.CollectionUtils;
33 import org.apache.http.HttpStatus;
34 import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
35 import org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask;
36 import org.openecomp.sdc.be.components.distribution.engine.ICambriaHandler;
37 import org.openecomp.sdc.be.components.distribution.engine.IDistributionEngine;
38 import org.openecomp.sdc.be.components.distribution.engine.SubscriberTypeEnum;
39 import org.openecomp.sdc.be.components.impl.ResponseFormatManager;
40 import org.openecomp.sdc.be.config.BeEcompErrorManager;
41 import org.openecomp.sdc.be.config.ConfigurationManager;
42 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
43 import org.openecomp.sdc.be.dao.api.ActionStatus;
44 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
45 import org.openecomp.sdc.be.distribution.api.client.KafkaDataResponse;
46 import org.openecomp.sdc.be.distribution.api.client.RegistrationRequest;
47 import org.openecomp.sdc.be.distribution.api.client.ServerListResponse;
48 import org.openecomp.sdc.be.distribution.api.client.TopicRegistrationResponse;
49 import org.openecomp.sdc.be.distribution.api.client.TopicUnregistrationResponse;
50 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
51 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
52 import org.openecomp.sdc.common.datastructure.Wrapper;
53 import org.openecomp.sdc.common.log.wrappers.Logger;
54 import org.openecomp.sdc.exception.ResponseFormat;
55 import org.springframework.stereotype.Component;
56
57 @Component("distributionBusinessLogic")
58 public class DistributionBusinessLogic {
59
60     public static final String REGISTER_IN_DISTRIBUTION_ENGINE = "registerInDistributionEngine";
61     public static final String UN_REGISTER_IN_DISTRIBUTION_ENGINE = "unregisterInDistributionEngine";
62     private static final Logger log = Logger.getLogger(DistributionBusinessLogic.class);
63     private Gson gson = new GsonBuilder().setPrettyPrinting().create();
64     @Resource
65     private IDistributionEngine distributionEngine;
66     private ResponseFormatManager responseFormatManager = ResponseFormatManager.getInstance();
67     @Resource
68     private ICambriaHandler cambriaHandler;
69
70     public static String getNotificationTopicName(String envName) {
71         DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
72         return DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(), envName);
73     }
74
75     public static String getStatusTopicName(String envName) {
76         DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
77         return DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(), envName);
78     }
79
80     private void initRequestEnvEndPointsAndKeys(RegistrationRequest registrationRequest, DistributionEngineConfiguration config) {
81         if (CollectionUtils.isEmpty(registrationRequest.getDistEnvEndPoints())) {
82             registrationRequest.setDistEnvEndPoints(config.getUebServers());
83             registrationRequest.setManagerApiPublicKey(config.getUebPublicKey());
84             registrationRequest.setManagerApiSecretKey(config.getUebSecretKey());
85         } else {
86             OperationalEnvironmentEntry environment = distributionEngine.getEnvironmentByDmaapUebAddress(registrationRequest.getDistEnvEndPoints());
87             registrationRequest.setManagerApiPublicKey(environment.getUebApikey());
88             registrationRequest.setManagerApiSecretKey(environment.getUebSecretKey());
89         }
90     }
91
92     public Either<ServerListResponse, ResponseFormat> getUebServerList() {
93         DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
94             .getDistributionEngineConfiguration();
95         List<String> serverList = distributionEngineConfiguration.getUebServers();
96         if (serverList != null && !serverList.isEmpty()) {
97             ServerListResponse serverListResponse = new ServerListResponse();
98             serverListResponse.setUebServerList(serverList);
99             return Either.left(serverListResponse);
100         } else {
101             ResponseFormat errorResponseWrapper = getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR);
102             return Either.right(errorResponseWrapper);
103         }
104     }
105
106     public Either<KafkaDataResponse, ResponseFormat> getKafkaData() {
107         DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
108             .getDistributionEngineConfiguration();
109         String bootStrapServers = distributionEngineConfiguration.getKafkaBootStrapServers();
110         if (bootStrapServers != null) {
111             String statusTopicName = DistributionEngineInitTask
112                 .buildTopicName(distributionEngineConfiguration.getDistributionStatusTopicName(), distributionEngineConfiguration.getEnvironments().get(0));
113             String notificationTopicName = DistributionEngineInitTask
114                 .buildTopicName(distributionEngineConfiguration.getDistributionNotifTopicName(), distributionEngineConfiguration.getEnvironments().get(0));
115             KafkaDataResponse kafkaDataResponse = new KafkaDataResponse();
116             kafkaDataResponse.setKafkaBootStrapServer(bootStrapServers);
117             kafkaDataResponse.setDistrStatusTopicName(statusTopicName);
118             kafkaDataResponse.setDistrNotificationTopicName(notificationTopicName);
119             return Either.left(kafkaDataResponse);
120         } else {
121             ResponseFormat errorResponseWrapper = getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR);
122             return Either.right(errorResponseWrapper);
123         }
124     }
125
126     public void handleRegistration(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest, AuditHandler auditHandler) {
127         CambriaErrorResponse registerResponse = null;
128         try {
129             DistributionEngineConfiguration config = getConfigurationManager().getDistributionEngineConfiguration();
130             String statusTopicName = buildTopicName(config.getDistributionStatusTopicName(), registrationRequest.getDistrEnvName());
131             initRequestEnvEndPointsAndKeys(registrationRequest, config);
132             registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.PRODUCER, statusTopicName);
133             auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.PRODUCER,
134                 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
135             boolean isRegisteredAsProducerOnStatusSuccess = responseWrapper.isEmpty();
136             // Story [347698] Distribution Client Get Indication from
137
138             // component whether to register as consumer and producer on
139
140             // status topic
141             boolean registeredAsConsumerOnStatus = false;
142             if (isRegisteredAsProducerOnStatusSuccess && isTrue(registrationRequest.getIsConsumerToSdcDistrStatusTopic())) {
143                 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.CONSUMER,
144                     statusTopicName);
145                 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
146                     DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
147                 registeredAsConsumerOnStatus = responseWrapper.isEmpty();
148             }
149             if (responseWrapper.isEmpty()) {
150                 String notificationTopicName = buildTopicName(config.getDistributionNotifTopicName(), registrationRequest.getDistrEnvName());
151                 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.CONSUMER,
152                     notificationTopicName);
153                 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
154                     DistributionTopicData.newBuilder().notificationTopic(notificationTopicName).build());
155             }
156             // Unregister Rollback
157             if (!responseWrapper.isEmpty()) {
158                 if (isRegisteredAsProducerOnStatusSuccess) {
159                     CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest, SubscriberTypeEnum.PRODUCER,
160                         statusTopicName);
161                     auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.PRODUCER,
162                         DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
163                 }
164                 if (registeredAsConsumerOnStatus) {
165                     CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest, SubscriberTypeEnum.CONSUMER,
166                         statusTopicName);
167                     auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.CONSUMER,
168                         DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
169                 }
170             }
171             if (responseWrapper.isEmpty()) {
172                 TopicRegistrationResponse okTopicResponse = buildTopicResponse(registrationRequest);
173                 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(okTopicResponse).build());
174             }
175         } catch (Exception e) {
176             log.error("registration to topic failed", e);
177             BeEcompErrorManager.getInstance()
178                 .logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE, "registration of subscriber to topic");
179             Response errorResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
180             responseWrapper.setInnerElement(errorResponse);
181         } finally {
182             auditHandler.auditRegisterRequest(registerResponse);
183         }
184     }
185
186     public void handleUnRegistration(Wrapper<Response> responseWrapper, RegistrationRequest unRegistrationRequest, AuditHandler auditHandler) {
187         Wrapper<CambriaErrorResponse> cambriaResponseWrapper = new Wrapper<>();
188         try {
189             String statusTopicName = getStatusTopicName(unRegistrationRequest.getDistrEnvName());
190             DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
191             initRequestEnvEndPointsAndKeys(unRegistrationRequest, config);
192             CambriaErrorResponse unregisterClientProducerTopicResponse = unRegisterDistributionClientFromTopic(unRegistrationRequest,
193                 SubscriberTypeEnum.PRODUCER, statusTopicName);
194             auditHandler.auditUnRegisterACL(unregisterClientProducerTopicResponse, SubscriberTypeEnum.PRODUCER,
195                 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
196             updateResponseWrapper(cambriaResponseWrapper, unregisterClientProducerTopicResponse);
197             String notificationTopicName = getNotificationTopicName(unRegistrationRequest.getDistrEnvName());
198             CambriaErrorResponse unregisterClientConsumerTopicResponse = unRegisterDistributionClientFromTopic(unRegistrationRequest,
199                 SubscriberTypeEnum.CONSUMER, notificationTopicName);
200             auditHandler.auditUnRegisterACL(unregisterClientConsumerTopicResponse, SubscriberTypeEnum.CONSUMER,
201                 DistributionTopicData.newBuilder().notificationTopic(notificationTopicName).build());
202             updateResponseWrapper(cambriaResponseWrapper, unregisterClientConsumerTopicResponse);
203             // Success unregister both topics
204             TopicUnregistrationResponse unregisterResponse = new TopicUnregistrationResponse(
205                 getNotificationTopicName(unRegistrationRequest.getDistrEnvName()), getStatusTopicName(unRegistrationRequest.getDistrEnvName()),
206                 unregisterClientConsumerTopicResponse.getOperationStatus(), unregisterClientProducerTopicResponse.getOperationStatus());
207             if (cambriaResponseWrapper.getInnerElement().getOperationStatus() == CambriaOperationStatus.OK) {
208                 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(unregisterResponse).build());
209             } else {
210                 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(UN_REGISTER_IN_DISTRIBUTION_ENGINE, "unregistration failed");
211                 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).entity(unregisterResponse).build());
212             }
213         } catch (Exception e) {
214             log.error("unregistered to topic failed", e);
215             Response errorResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
216             responseWrapper.setInnerElement(errorResponse);
217         } finally {
218             auditHandler.auditUnRegisterRequest(cambriaResponseWrapper.getInnerElement());
219         }
220     }
221
222     private void updateResponseWrapper(Wrapper<CambriaErrorResponse> cambriaResponseWrapper, CambriaErrorResponse currentResponse) {
223         if (cambriaResponseWrapper.isEmpty()) {
224             cambriaResponseWrapper.setInnerElement(currentResponse);
225         } else if (currentResponse.getOperationStatus() != CambriaOperationStatus.OK) {
226             cambriaResponseWrapper.setInnerElement(currentResponse);
227         }
228     }
229
230     protected CambriaErrorResponse unRegisterDistributionClientFromTopic(RegistrationRequest unRegistrationRequest, SubscriberTypeEnum subscriberType,
231                                                                          String topicName) {
232         log.debug("unregistering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName,
233             unRegistrationRequest.getDistEnvEndPoints());
234         return cambriaHandler.unRegisterFromTopic(unRegistrationRequest.getDistEnvEndPoints(), unRegistrationRequest.getManagerApiPublicKey(),
235             unRegistrationRequest.getManagerApiSecretKey(), unRegistrationRequest.getApiPublicKey(), subscriberType, topicName);
236     }
237
238     private TopicRegistrationResponse buildTopicResponse(RegistrationRequest registrationRequest) {
239         DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
240         String statusTopicName = DistributionEngineInitTask
241             .buildTopicName(config.getDistributionStatusTopicName(), registrationRequest.getDistrEnvName());
242         String notificationTopicName = DistributionEngineInitTask
243             .buildTopicName(config.getDistributionNotifTopicName(), registrationRequest.getDistrEnvName());
244         TopicRegistrationResponse topicResponse = new TopicRegistrationResponse();
245         topicResponse.setDistrNotificationTopicName(notificationTopicName);
246         topicResponse.setDistrStatusTopicName(statusTopicName);
247         return topicResponse;
248     }
249
250     protected CambriaErrorResponse registerDistributionClientToTopic(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest,
251                                                                      SubscriberTypeEnum subscriberType, String topicName) {
252         String errorMsg;
253         // Register for notifications as consumer
254         if (subscriberType == SubscriberTypeEnum.CONSUMER) {
255             errorMsg = "registration of subscriber to topic:" + topicName + " as consumer failed";
256         }
257         // Register for status as producer
258         else {
259             errorMsg = "registration of subscriber to topic:" + topicName + " as producer failed";
260         }
261         log.debug("registering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName,
262             registrationRequest.getDistEnvEndPoints());
263         CambriaErrorResponse registerToTopic = cambriaHandler
264             .registerToTopic(registrationRequest.getDistEnvEndPoints(), registrationRequest.getManagerApiPublicKey(),
265                 registrationRequest.getManagerApiSecretKey(), registrationRequest.getApiPublicKey(), subscriberType, topicName);
266         if (registerToTopic.getOperationStatus() != CambriaOperationStatus.OK) {
267             Response failedRegistrationResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
268             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE, errorMsg);
269             responseWrapper.setInnerElement(failedRegistrationResponse);
270         }
271         return registerToTopic;
272     }
273
274     protected Response buildErrorResponse(ResponseFormat requestErrorWrapper) {
275         return Response.status(requestErrorWrapper.getStatus()).entity(gson.toJson(requestErrorWrapper.getRequestError())).build();
276     }
277
278     public ResponseFormatManager getResponseFormatManager() {
279         return responseFormatManager;
280     }
281
282     public IDistributionEngine getDistributionEngine() {
283         return distributionEngine;
284     }
285 }