2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.distribution;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import fj.data.Either;
26 import org.apache.commons.collections.CollectionUtils;
27 import org.apache.http.HttpStatus;
28 import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
29 import org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask;
30 import org.openecomp.sdc.be.components.distribution.engine.ICambriaHandler;
31 import org.openecomp.sdc.be.components.distribution.engine.IDistributionEngine;
32 import org.openecomp.sdc.be.components.distribution.engine.SubscriberTypeEnum;
33 import org.openecomp.sdc.be.components.impl.ResponseFormatManager;
34 import org.openecomp.sdc.be.config.BeEcompErrorManager;
35 import org.openecomp.sdc.be.config.ConfigurationManager;
36 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
37 import org.openecomp.sdc.be.dao.api.ActionStatus;
38 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
39 import org.openecomp.sdc.be.distribution.api.client.RegistrationRequest;
40 import org.openecomp.sdc.be.distribution.api.client.ServerListResponse;
41 import org.openecomp.sdc.be.distribution.api.client.TopicRegistrationResponse;
42 import org.openecomp.sdc.be.distribution.api.client.TopicUnregistrationResponse;
43 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
44 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
45 import org.openecomp.sdc.common.datastructure.Wrapper;
46 import org.openecomp.sdc.common.log.wrappers.Logger;
47 import org.openecomp.sdc.exception.ResponseFormat;
48 import org.springframework.stereotype.Component;
50 import javax.annotation.Resource;
51 import javax.ws.rs.core.Response;
52 import java.util.List;
54 import static org.apache.commons.lang.BooleanUtils.isTrue;
55 import static org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask.buildTopicName;
56 import static org.openecomp.sdc.be.config.ConfigurationManager.getConfigurationManager;
58 @Component("distributionBusinessLogic")
59 public class DistributionBusinessLogic {
60 public static final String REGISTER_IN_DISTRIBUTION_ENGINE = "registerInDistributionEngine";
61 public static final String UN_REGISTER_IN_DISTRIBUTION_ENGINE = "unregisterInDistributionEngine";
62 private Gson gson = new GsonBuilder().setPrettyPrinting().create();
63 private static final Logger log = Logger.getLogger(DistributionBusinessLogic.class);
65 private IDistributionEngine distributionEngine;
67 private ResponseFormatManager responseFormatManager = ResponseFormatManager.getInstance();
69 private ICambriaHandler cambriaHandler;
71 private void initRequestEnvEndPointsAndKeys(RegistrationRequest registrationRequest, DistributionEngineConfiguration config) {
72 if(CollectionUtils.isEmpty(registrationRequest.getDistEnvEndPoints())){
73 registrationRequest.setDistEnvEndPoints(config.getUebServers());
74 registrationRequest.setManagerApiPublicKey(config.getUebPublicKey());
75 registrationRequest.setManagerApiSecretKey(config.getUebSecretKey());
77 OperationalEnvironmentEntry environment = distributionEngine.getEnvironmentByDmaapUebAddress(registrationRequest.getDistEnvEndPoints());
78 registrationRequest.setManagerApiPublicKey(environment.getUebApikey());
79 registrationRequest.setManagerApiSecretKey(environment.getUebSecretKey());
82 public Either<ServerListResponse, ResponseFormat> getUebServerList() {
84 DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
85 .getDistributionEngineConfiguration();
87 List<String> serverList = distributionEngineConfiguration.getUebServers();
89 if (serverList != null && !serverList.isEmpty()) {
91 ServerListResponse serverListResponse = new ServerListResponse();
93 serverListResponse.setUebServerList(serverList);
95 return Either.left(serverListResponse);
97 ResponseFormat errorResponseWrapper = getResponseFormatManager()
98 .getResponseFormat(ActionStatus.GENERAL_ERROR);
99 return Either.right(errorResponseWrapper);
104 public void handleRegistration(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest,
105 AuditHandler auditHandler) {
106 CambriaErrorResponse registerResponse = null;
108 DistributionEngineConfiguration config = getConfigurationManager().getDistributionEngineConfiguration();
109 String statusTopicName = buildTopicName(config.getDistributionStatusTopicName(),
110 registrationRequest.getDistrEnvName());
111 initRequestEnvEndPointsAndKeys(registrationRequest, config);
112 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
113 SubscriberTypeEnum.PRODUCER, statusTopicName);
115 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.PRODUCER,
116 DistributionTopicData.newBuilder()
117 .statusTopic(statusTopicName)
119 boolean isRegisteredAsProducerOnStatusSuccess = responseWrapper.isEmpty();
121 // Story [347698] Distribution Client Get Indication from
122 // component whether to register as consumer and producer on
124 boolean registeredAsConsumerOnStatus = false;
125 if (isRegisteredAsProducerOnStatusSuccess && isTrue(registrationRequest.getIsConsumerToSdcDistrStatusTopic())) {
126 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
127 SubscriberTypeEnum.CONSUMER, statusTopicName);
128 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
129 DistributionTopicData.newBuilder()
130 .statusTopic(statusTopicName)
132 registeredAsConsumerOnStatus = responseWrapper.isEmpty();
136 if (responseWrapper.isEmpty()) {
137 String notificationTopicName = buildTopicName(config.getDistributionNotifTopicName(),
138 registrationRequest.getDistrEnvName());
139 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
140 SubscriberTypeEnum.CONSUMER, notificationTopicName);
141 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
142 DistributionTopicData.newBuilder()
143 .notificationTopic(notificationTopicName)
146 // Unregister Rollback
147 if (!responseWrapper.isEmpty()) {
148 if (isRegisteredAsProducerOnStatusSuccess) {
149 CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest,
150 SubscriberTypeEnum.PRODUCER, statusTopicName);
151 auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.PRODUCER,
152 DistributionTopicData.newBuilder()
153 .statusTopic(statusTopicName)
156 if (registeredAsConsumerOnStatus) {
157 CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest,
158 SubscriberTypeEnum.CONSUMER, statusTopicName);
159 auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.CONSUMER,
160 DistributionTopicData.newBuilder()
161 .statusTopic(statusTopicName)
166 if (responseWrapper.isEmpty()) {
167 TopicRegistrationResponse okTopicResponse = buildTopicResponse(registrationRequest);
168 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(okTopicResponse).build());
171 } catch (Exception e) {
172 log.error("registration to topic failed", e);
173 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE,
174 "registration of subscriber to topic");
175 Response errorResponse = buildErrorResponse(
176 getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
177 responseWrapper.setInnerElement(errorResponse);
179 auditHandler.auditRegisterRequest(registerResponse);
183 public void handleUnRegistration(Wrapper<Response> responseWrapper, RegistrationRequest unRegistrationRequest,
184 AuditHandler auditHandler) {
185 Wrapper<CambriaErrorResponse> cambriaResponseWrapper = new Wrapper<>();
187 String statusTopicName = getStatusTopicName(unRegistrationRequest.getDistrEnvName());
188 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
189 .getDistributionEngineConfiguration();
190 initRequestEnvEndPointsAndKeys(unRegistrationRequest, config);
191 CambriaErrorResponse unregisterClientProducerTopicResponse = unRegisterDistributionClientFromTopic(
192 unRegistrationRequest, SubscriberTypeEnum.PRODUCER, statusTopicName);
193 auditHandler.auditUnRegisterACL(unregisterClientProducerTopicResponse, SubscriberTypeEnum.PRODUCER,
194 DistributionTopicData.newBuilder()
195 .statusTopic(statusTopicName)
197 updateResponseWrapper(cambriaResponseWrapper, unregisterClientProducerTopicResponse);
199 String notificationTopicName = getNotificationTopicName(unRegistrationRequest.getDistrEnvName());
200 CambriaErrorResponse unregisterClientConsumerTopicResponse = unRegisterDistributionClientFromTopic(
201 unRegistrationRequest, SubscriberTypeEnum.CONSUMER, notificationTopicName);
202 auditHandler.auditUnRegisterACL(unregisterClientConsumerTopicResponse, SubscriberTypeEnum.CONSUMER,
203 DistributionTopicData.newBuilder()
204 .notificationTopic(notificationTopicName)
206 updateResponseWrapper(cambriaResponseWrapper, unregisterClientConsumerTopicResponse);
208 // Success unregister both topics
209 TopicUnregistrationResponse unregisterResponse = new TopicUnregistrationResponse(
210 getNotificationTopicName(unRegistrationRequest.getDistrEnvName()),
211 getStatusTopicName(unRegistrationRequest.getDistrEnvName()),
212 unregisterClientConsumerTopicResponse.getOperationStatus(),
213 unregisterClientProducerTopicResponse.getOperationStatus());
215 if (cambriaResponseWrapper.getInnerElement().getOperationStatus() == CambriaOperationStatus.OK) {
216 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(unregisterResponse).build());
218 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(UN_REGISTER_IN_DISTRIBUTION_ENGINE,
219 "unregistration failed");
220 responseWrapper.setInnerElement(
221 Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).entity(unregisterResponse).build());
223 } catch (Exception e) {
224 log.error("unregistered to topic failed", e);
225 Response errorResponse = buildErrorResponse(
226 getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
227 responseWrapper.setInnerElement(errorResponse);
230 auditHandler.auditUnRegisterRequest(cambriaResponseWrapper.getInnerElement());
234 private void updateResponseWrapper(Wrapper<CambriaErrorResponse> cambriaResponseWrapper,
235 CambriaErrorResponse currentResponse) {
236 if (cambriaResponseWrapper.isEmpty()) {
237 cambriaResponseWrapper.setInnerElement(currentResponse);
238 } else if (currentResponse.getOperationStatus() != CambriaOperationStatus.OK) {
239 cambriaResponseWrapper.setInnerElement(currentResponse);
245 public static String getNotificationTopicName(String envName) {
246 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
247 .getDistributionEngineConfiguration();
248 return DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(), envName);
252 public static String getStatusTopicName(String envName) {
253 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
254 .getDistributionEngineConfiguration();
255 return DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(), envName);
259 protected CambriaErrorResponse unRegisterDistributionClientFromTopic(RegistrationRequest unRegistrationRequest,
260 SubscriberTypeEnum subscriberType, String topicName) {
262 log.debug("unregistering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName, unRegistrationRequest.getDistEnvEndPoints());
263 return cambriaHandler.unRegisterFromTopic(unRegistrationRequest.getDistEnvEndPoints(), unRegistrationRequest.getManagerApiPublicKey(),
264 unRegistrationRequest.getManagerApiSecretKey(), unRegistrationRequest.getApiPublicKey(), subscriberType, topicName);
267 private TopicRegistrationResponse buildTopicResponse(RegistrationRequest registrationRequest) {
268 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
269 .getDistributionEngineConfiguration();
270 String statusTopicName = DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(),
271 registrationRequest.getDistrEnvName());
272 String notificationTopicName = DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(),
273 registrationRequest.getDistrEnvName());
275 TopicRegistrationResponse topicResponse = new TopicRegistrationResponse();
276 topicResponse.setDistrNotificationTopicName(notificationTopicName);
277 topicResponse.setDistrStatusTopicName(statusTopicName);
278 return topicResponse;
281 protected CambriaErrorResponse registerDistributionClientToTopic(Wrapper<Response> responseWrapper,
282 RegistrationRequest registrationRequest, SubscriberTypeEnum subscriberType, String topicName) {
286 // Register for notifications as consumer
287 if (subscriberType == SubscriberTypeEnum.CONSUMER) {
288 errorMsg = "registration of subscriber to topic:" + topicName + " as consumer failed";
290 // Register for status as producer
292 errorMsg = "registration of subscriber to topic:" + topicName + " as producer failed";
294 log.debug("registering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName, registrationRequest.getDistEnvEndPoints());
295 CambriaErrorResponse registerToTopic = cambriaHandler.registerToTopic(registrationRequest.getDistEnvEndPoints(),
296 registrationRequest.getManagerApiPublicKey(), registrationRequest.getManagerApiSecretKey(), registrationRequest.getApiPublicKey(),
297 subscriberType, topicName);
299 if (registerToTopic.getOperationStatus() != CambriaOperationStatus.OK) {
300 Response failedRegistrationResponse = buildErrorResponse(
301 getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
302 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE,
304 responseWrapper.setInnerElement(failedRegistrationResponse);
306 return registerToTopic;
309 protected Response buildErrorResponse(ResponseFormat requestErrorWrapper) {
310 return Response.status(requestErrorWrapper.getStatus())
311 .entity(gson.toJson(requestErrorWrapper.getRequestError())).build();
314 public ResponseFormatManager getResponseFormatManager() {
315 return responseFormatManager;
318 public IDistributionEngine getDistributionEngine() {
319 return distributionEngine;