2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.distribution;
22 import static org.apache.commons.lang.BooleanUtils.isTrue;
23 import static org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask.buildTopicName;
24 import static org.openecomp.sdc.be.config.ConfigurationManager.getConfigurationManager;
26 import com.google.gson.Gson;
27 import com.google.gson.GsonBuilder;
28 import fj.data.Either;
29 import java.util.List;
30 import javax.annotation.Resource;
31 import javax.ws.rs.core.Response;
32 import org.apache.commons.collections.CollectionUtils;
33 import org.apache.http.HttpStatus;
34 import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
35 import org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask;
36 import org.openecomp.sdc.be.components.distribution.engine.ICambriaHandler;
37 import org.openecomp.sdc.be.components.distribution.engine.IDistributionEngine;
38 import org.openecomp.sdc.be.components.distribution.engine.SubscriberTypeEnum;
39 import org.openecomp.sdc.be.components.impl.ResponseFormatManager;
40 import org.openecomp.sdc.be.config.BeEcompErrorManager;
41 import org.openecomp.sdc.be.config.ConfigurationManager;
42 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
43 import org.openecomp.sdc.be.dao.api.ActionStatus;
44 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
45 import org.openecomp.sdc.be.distribution.api.client.KafkaDataResponse;
46 import org.openecomp.sdc.be.distribution.api.client.RegistrationRequest;
47 import org.openecomp.sdc.be.distribution.api.client.ServerListResponse;
48 import org.openecomp.sdc.be.distribution.api.client.TopicRegistrationResponse;
49 import org.openecomp.sdc.be.distribution.api.client.TopicUnregistrationResponse;
50 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
51 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
52 import org.openecomp.sdc.common.datastructure.Wrapper;
53 import org.openecomp.sdc.common.log.wrappers.Logger;
54 import org.openecomp.sdc.exception.ResponseFormat;
55 import org.springframework.stereotype.Component;
57 @Component("distributionBusinessLogic")
58 public class DistributionBusinessLogic {
60 public static final String REGISTER_IN_DISTRIBUTION_ENGINE = "registerInDistributionEngine";
61 public static final String UN_REGISTER_IN_DISTRIBUTION_ENGINE = "unregisterInDistributionEngine";
62 private static final Logger log = Logger.getLogger(DistributionBusinessLogic.class);
63 private Gson gson = new GsonBuilder().setPrettyPrinting().create();
65 private IDistributionEngine distributionEngine;
66 private ResponseFormatManager responseFormatManager = ResponseFormatManager.getInstance();
68 private ICambriaHandler cambriaHandler;
70 public static String getNotificationTopicName(String envName) {
71 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
72 return DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(), envName);
75 public static String getStatusTopicName(String envName) {
76 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
77 return DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(), envName);
80 private void initRequestEnvEndPointsAndKeys(RegistrationRequest registrationRequest, DistributionEngineConfiguration config) {
81 if (CollectionUtils.isEmpty(registrationRequest.getDistEnvEndPoints())) {
82 registrationRequest.setDistEnvEndPoints(config.getUebServers());
83 registrationRequest.setManagerApiPublicKey(config.getUebPublicKey());
84 registrationRequest.setManagerApiSecretKey(config.getUebSecretKey());
86 OperationalEnvironmentEntry environment = distributionEngine.getEnvironmentByDmaapUebAddress(registrationRequest.getDistEnvEndPoints());
87 registrationRequest.setManagerApiPublicKey(environment.getUebApikey());
88 registrationRequest.setManagerApiSecretKey(environment.getUebSecretKey());
92 public Either<ServerListResponse, ResponseFormat> getUebServerList() {
93 DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
94 .getDistributionEngineConfiguration();
95 List<String> serverList = distributionEngineConfiguration.getUebServers();
96 if (serverList != null && !serverList.isEmpty()) {
97 ServerListResponse serverListResponse = new ServerListResponse();
98 serverListResponse.setUebServerList(serverList);
99 return Either.left(serverListResponse);
101 ResponseFormat errorResponseWrapper = getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR);
102 return Either.right(errorResponseWrapper);
106 public Either<KafkaDataResponse, ResponseFormat> getKafkaData() {
107 DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
108 .getDistributionEngineConfiguration();
109 String bootStrapServers = distributionEngineConfiguration.getKafkaBootStrapServers();
110 if (bootStrapServers != null) {
111 String statusTopicName = DistributionEngineInitTask
112 .buildTopicName(distributionEngineConfiguration.getDistributionStatusTopicName(), distributionEngineConfiguration.getEnvironments().get(0));
113 String notificationTopicName = DistributionEngineInitTask
114 .buildTopicName(distributionEngineConfiguration.getDistributionNotifTopicName(), distributionEngineConfiguration.getEnvironments().get(0));
115 KafkaDataResponse kafkaDataResponse = new KafkaDataResponse();
116 kafkaDataResponse.setKafkaBootStrapServer(bootStrapServers);
117 kafkaDataResponse.setDistrStatusTopicName(statusTopicName);
118 kafkaDataResponse.setDistrNotificationTopicName(notificationTopicName);
119 return Either.left(kafkaDataResponse);
121 ResponseFormat errorResponseWrapper = getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR);
122 return Either.right(errorResponseWrapper);
126 public void handleRegistration(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest, AuditHandler auditHandler) {
127 CambriaErrorResponse registerResponse = null;
129 DistributionEngineConfiguration config = getConfigurationManager().getDistributionEngineConfiguration();
130 String statusTopicName = buildTopicName(config.getDistributionStatusTopicName(), registrationRequest.getDistrEnvName());
131 initRequestEnvEndPointsAndKeys(registrationRequest, config);
132 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.PRODUCER, statusTopicName);
133 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.PRODUCER,
134 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
135 boolean isRegisteredAsProducerOnStatusSuccess = responseWrapper.isEmpty();
136 // Story [347698] Distribution Client Get Indication from
138 // component whether to register as consumer and producer on
141 boolean registeredAsConsumerOnStatus = false;
142 if (isRegisteredAsProducerOnStatusSuccess && isTrue(registrationRequest.getIsConsumerToSdcDistrStatusTopic())) {
143 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.CONSUMER,
145 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
146 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
147 registeredAsConsumerOnStatus = responseWrapper.isEmpty();
149 if (responseWrapper.isEmpty()) {
150 String notificationTopicName = buildTopicName(config.getDistributionNotifTopicName(), registrationRequest.getDistrEnvName());
151 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.CONSUMER,
152 notificationTopicName);
153 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
154 DistributionTopicData.newBuilder().notificationTopic(notificationTopicName).build());
156 // Unregister Rollback
157 if (!responseWrapper.isEmpty()) {
158 if (isRegisteredAsProducerOnStatusSuccess) {
159 CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest, SubscriberTypeEnum.PRODUCER,
161 auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.PRODUCER,
162 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
164 if (registeredAsConsumerOnStatus) {
165 CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest, SubscriberTypeEnum.CONSUMER,
167 auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.CONSUMER,
168 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
171 if (responseWrapper.isEmpty()) {
172 TopicRegistrationResponse okTopicResponse = buildTopicResponse(registrationRequest);
173 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(okTopicResponse).build());
175 } catch (Exception e) {
176 log.error("registration to topic failed", e);
177 BeEcompErrorManager.getInstance()
178 .logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE, "registration of subscriber to topic");
179 Response errorResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
180 responseWrapper.setInnerElement(errorResponse);
182 auditHandler.auditRegisterRequest(registerResponse);
186 public void handleUnRegistration(Wrapper<Response> responseWrapper, RegistrationRequest unRegistrationRequest, AuditHandler auditHandler) {
187 Wrapper<CambriaErrorResponse> cambriaResponseWrapper = new Wrapper<>();
189 String statusTopicName = getStatusTopicName(unRegistrationRequest.getDistrEnvName());
190 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
191 initRequestEnvEndPointsAndKeys(unRegistrationRequest, config);
192 CambriaErrorResponse unregisterClientProducerTopicResponse = unRegisterDistributionClientFromTopic(unRegistrationRequest,
193 SubscriberTypeEnum.PRODUCER, statusTopicName);
194 auditHandler.auditUnRegisterACL(unregisterClientProducerTopicResponse, SubscriberTypeEnum.PRODUCER,
195 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
196 updateResponseWrapper(cambriaResponseWrapper, unregisterClientProducerTopicResponse);
197 String notificationTopicName = getNotificationTopicName(unRegistrationRequest.getDistrEnvName());
198 CambriaErrorResponse unregisterClientConsumerTopicResponse = unRegisterDistributionClientFromTopic(unRegistrationRequest,
199 SubscriberTypeEnum.CONSUMER, notificationTopicName);
200 auditHandler.auditUnRegisterACL(unregisterClientConsumerTopicResponse, SubscriberTypeEnum.CONSUMER,
201 DistributionTopicData.newBuilder().notificationTopic(notificationTopicName).build());
202 updateResponseWrapper(cambriaResponseWrapper, unregisterClientConsumerTopicResponse);
203 // Success unregister both topics
204 TopicUnregistrationResponse unregisterResponse = new TopicUnregistrationResponse(
205 getNotificationTopicName(unRegistrationRequest.getDistrEnvName()), getStatusTopicName(unRegistrationRequest.getDistrEnvName()),
206 unregisterClientConsumerTopicResponse.getOperationStatus(), unregisterClientProducerTopicResponse.getOperationStatus());
207 if (cambriaResponseWrapper.getInnerElement().getOperationStatus() == CambriaOperationStatus.OK) {
208 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(unregisterResponse).build());
210 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(UN_REGISTER_IN_DISTRIBUTION_ENGINE, "unregistration failed");
211 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).entity(unregisterResponse).build());
213 } catch (Exception e) {
214 log.error("unregistered to topic failed", e);
215 Response errorResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
216 responseWrapper.setInnerElement(errorResponse);
218 auditHandler.auditUnRegisterRequest(cambriaResponseWrapper.getInnerElement());
222 private void updateResponseWrapper(Wrapper<CambriaErrorResponse> cambriaResponseWrapper, CambriaErrorResponse currentResponse) {
223 if (cambriaResponseWrapper.isEmpty()) {
224 cambriaResponseWrapper.setInnerElement(currentResponse);
225 } else if (currentResponse.getOperationStatus() != CambriaOperationStatus.OK) {
226 cambriaResponseWrapper.setInnerElement(currentResponse);
230 protected CambriaErrorResponse unRegisterDistributionClientFromTopic(RegistrationRequest unRegistrationRequest, SubscriberTypeEnum subscriberType,
232 log.debug("unregistering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName,
233 unRegistrationRequest.getDistEnvEndPoints());
234 return cambriaHandler.unRegisterFromTopic(unRegistrationRequest.getDistEnvEndPoints(), unRegistrationRequest.getManagerApiPublicKey(),
235 unRegistrationRequest.getManagerApiSecretKey(), unRegistrationRequest.getApiPublicKey(), subscriberType, topicName);
238 private TopicRegistrationResponse buildTopicResponse(RegistrationRequest registrationRequest) {
239 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
240 String statusTopicName = DistributionEngineInitTask
241 .buildTopicName(config.getDistributionStatusTopicName(), registrationRequest.getDistrEnvName());
242 String notificationTopicName = DistributionEngineInitTask
243 .buildTopicName(config.getDistributionNotifTopicName(), registrationRequest.getDistrEnvName());
244 TopicRegistrationResponse topicResponse = new TopicRegistrationResponse();
245 topicResponse.setDistrNotificationTopicName(notificationTopicName);
246 topicResponse.setDistrStatusTopicName(statusTopicName);
247 return topicResponse;
250 protected CambriaErrorResponse registerDistributionClientToTopic(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest,
251 SubscriberTypeEnum subscriberType, String topicName) {
253 // Register for notifications as consumer
254 if (subscriberType == SubscriberTypeEnum.CONSUMER) {
255 errorMsg = "registration of subscriber to topic:" + topicName + " as consumer failed";
257 // Register for status as producer
259 errorMsg = "registration of subscriber to topic:" + topicName + " as producer failed";
261 log.debug("registering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName,
262 registrationRequest.getDistEnvEndPoints());
263 CambriaErrorResponse registerToTopic = cambriaHandler
264 .registerToTopic(registrationRequest.getDistEnvEndPoints(), registrationRequest.getManagerApiPublicKey(),
265 registrationRequest.getManagerApiSecretKey(), registrationRequest.getApiPublicKey(), subscriberType, topicName);
266 if (registerToTopic.getOperationStatus() != CambriaOperationStatus.OK) {
267 Response failedRegistrationResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
268 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE, errorMsg);
269 responseWrapper.setInnerElement(failedRegistrationResponse);
271 return registerToTopic;
274 protected Response buildErrorResponse(ResponseFormat requestErrorWrapper) {
275 return Response.status(requestErrorWrapper.getStatus()).entity(gson.toJson(requestErrorWrapper.getRequestError())).build();
278 public ResponseFormatManager getResponseFormatManager() {
279 return responseFormatManager;
282 public IDistributionEngine getDistributionEngine() {
283 return distributionEngine;