2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.distribution;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import fj.data.Either;
26 import org.apache.http.HttpStatus;
27 import org.openecomp.sdc.be.components.distribution.engine.*;
28 import org.openecomp.sdc.be.components.impl.ResponseFormatManager;
29 import org.openecomp.sdc.be.config.BeEcompErrorManager;
30 import org.openecomp.sdc.be.config.ConfigurationManager;
31 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
32 import org.openecomp.sdc.be.dao.api.ActionStatus;
33 import org.openecomp.sdc.be.distribution.api.client.*;
34 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
35 import org.openecomp.sdc.common.datastructure.Wrapper;
36 import org.openecomp.sdc.common.log.wrappers.Logger;
37 import org.openecomp.sdc.exception.ResponseFormat;
38 import org.springframework.stereotype.Component;
40 import javax.annotation.Resource;
41 import javax.ws.rs.core.Response;
42 import java.util.List;
44 import static org.apache.commons.lang.BooleanUtils.isTrue;
45 import static org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask.buildTopicName;
46 import static org.openecomp.sdc.be.config.ConfigurationManager.getConfigurationManager;
48 @Component("distributionBusinessLogic")
49 public class DistributionBusinessLogic {
50 public static final String REGISTER_IN_DISTRIBUTION_ENGINE = "registerInDistributionEngine";
51 public static final String UN_REGISTER_IN_DISTRIBUTION_ENGINE = "unregisterInDistributionEngine";
52 private Gson gson = new GsonBuilder().setPrettyPrinting().create();
53 private static final Logger log = Logger.getLogger(DistributionBusinessLogic.class);
55 private IDistributionEngine distributionEngine;
57 private ResponseFormatManager responseFormatManager = ResponseFormatManager.getInstance();
58 private CambriaHandler cambriaHandler;
60 private void initRequestEnvEndPoints(RegistrationRequest registrationRequest, DistributionEngineConfiguration config) {
61 if(registrationRequest.getDistEnvEndPoints() == null || registrationRequest.getDistEnvEndPoints().isEmpty()){
62 registrationRequest.setDistEnvEndPoints(config.getUebServers());
65 public Either<ServerListResponse, ResponseFormat> getUebServerList() {
67 DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
68 .getDistributionEngineConfiguration();
70 List<String> serverList = distributionEngineConfiguration.getUebServers();
72 if (serverList != null && !serverList.isEmpty()) {
74 ServerListResponse serverListResponse = new ServerListResponse();
76 serverListResponse.setUebServerList(serverList);
78 return Either.left(serverListResponse);
80 ResponseFormat errorResponseWrapper = getResponseFormatManager()
81 .getResponseFormat(ActionStatus.GENERAL_ERROR);
82 return Either.right(errorResponseWrapper);
87 public void handleRegistration(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest,
88 AuditHandler auditHandler) {
89 CambriaErrorResponse registerResponse = null;
91 DistributionEngineConfiguration config = getConfigurationManager().getDistributionEngineConfiguration();
92 String statusTopicName = buildTopicName(config.getDistributionStatusTopicName(),
93 registrationRequest.getDistrEnvName());
94 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
95 SubscriberTypeEnum.PRODUCER, statusTopicName);
97 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.PRODUCER,
98 DistributionTopicData.newBuilder()
99 .statusTopic(statusTopicName)
101 boolean isRegisteredAsProducerOnStatusSuccess = responseWrapper.isEmpty();
103 // Story [347698] Distribution Client Get Indication from
104 // component whether to register as consumer and producer on
106 boolean registeredAsConsumerOnStatus = false;
107 if (isRegisteredAsProducerOnStatusSuccess && isTrue(registrationRequest.getIsConsumerToSdcDistrStatusTopic())) {
108 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
109 SubscriberTypeEnum.CONSUMER, statusTopicName);
110 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
111 DistributionTopicData.newBuilder()
112 .statusTopic(statusTopicName)
114 registeredAsConsumerOnStatus = responseWrapper.isEmpty();
118 if (responseWrapper.isEmpty()) {
119 String notificationTopicName = buildTopicName(config.getDistributionNotifTopicName(),
120 registrationRequest.getDistrEnvName());
121 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
122 SubscriberTypeEnum.CONSUMER, notificationTopicName);
123 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
124 DistributionTopicData.newBuilder()
125 .notificationTopic(notificationTopicName)
128 // Unregister Rollback
129 if (!responseWrapper.isEmpty()) {
130 if (isRegisteredAsProducerOnStatusSuccess) {
131 CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest,
132 SubscriberTypeEnum.PRODUCER, statusTopicName);
133 auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.PRODUCER,
134 DistributionTopicData.newBuilder()
135 .statusTopic(statusTopicName)
138 if (registeredAsConsumerOnStatus) {
139 CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest,
140 SubscriberTypeEnum.CONSUMER, statusTopicName);
141 auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.CONSUMER,
142 DistributionTopicData.newBuilder()
143 .statusTopic(statusTopicName)
148 if (responseWrapper.isEmpty()) {
149 TopicRegistrationResponse okTopicResponse = buildTopicResponse(registrationRequest);
150 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(okTopicResponse).build());
153 } catch (Exception e) {
154 log.error("registration to topic failed", e);
155 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE,
156 "registration of subscriber to topic");
157 Response errorResponse = buildErrorResponse(
158 getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
159 responseWrapper.setInnerElement(errorResponse);
161 auditHandler.auditRegisterRequest(registerResponse);
165 public void handleUnRegistration(Wrapper<Response> responseWrapper, RegistrationRequest unRegistrationRequest,
166 AuditHandler auditHandler) {
167 Wrapper<CambriaErrorResponse> cambriaResponseWrapper = new Wrapper<>();
169 String statusTopicName = getStatusTopicName(unRegistrationRequest.getDistrEnvName());
170 CambriaErrorResponse unregisterClientProducerTopicResponse = unRegisterDistributionClientFromTopic(
171 unRegistrationRequest, SubscriberTypeEnum.PRODUCER, statusTopicName);
172 auditHandler.auditUnRegisterACL(unregisterClientProducerTopicResponse, SubscriberTypeEnum.PRODUCER,
173 DistributionTopicData.newBuilder()
174 .statusTopic(statusTopicName)
176 updateResponseWrapper(cambriaResponseWrapper, unregisterClientProducerTopicResponse);
178 String notificationTopicName = getNotificationTopicName(unRegistrationRequest.getDistrEnvName());
179 CambriaErrorResponse unregisterClientConsumerTopicResponse = unRegisterDistributionClientFromTopic(
180 unRegistrationRequest, SubscriberTypeEnum.CONSUMER, notificationTopicName);
181 auditHandler.auditUnRegisterACL(unregisterClientConsumerTopicResponse, SubscriberTypeEnum.CONSUMER,
182 DistributionTopicData.newBuilder()
183 .notificationTopic(notificationTopicName)
185 updateResponseWrapper(cambriaResponseWrapper, unregisterClientConsumerTopicResponse);
187 // Success unregister both topics
188 TopicUnregistrationResponse unregisterResponse = new TopicUnregistrationResponse(
189 getNotificationTopicName(unRegistrationRequest.getDistrEnvName()),
190 getStatusTopicName(unRegistrationRequest.getDistrEnvName()),
191 unregisterClientConsumerTopicResponse.getOperationStatus(),
192 unregisterClientProducerTopicResponse.getOperationStatus());
194 if (cambriaResponseWrapper.getInnerElement().getOperationStatus() == CambriaOperationStatus.OK) {
195 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(unregisterResponse).build());
197 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(UN_REGISTER_IN_DISTRIBUTION_ENGINE,
198 "unregistration failed");
199 responseWrapper.setInnerElement(
200 Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).entity(unregisterResponse).build());
202 } catch (Exception e) {
203 log.error("unregistered to topic failed", e);
204 Response errorResponse = buildErrorResponse(
205 getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
206 responseWrapper.setInnerElement(errorResponse);
209 auditHandler.auditUnRegisterRequest(cambriaResponseWrapper.getInnerElement());
213 private void updateResponseWrapper(Wrapper<CambriaErrorResponse> cambriaResponseWrapper,
214 CambriaErrorResponse currentResponse) {
215 if (cambriaResponseWrapper.isEmpty()) {
216 cambriaResponseWrapper.setInnerElement(currentResponse);
217 } else if (currentResponse.getOperationStatus() != CambriaOperationStatus.OK) {
218 cambriaResponseWrapper.setInnerElement(currentResponse);
224 public static String getNotificationTopicName(String envName) {
225 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
226 .getDistributionEngineConfiguration();
227 return DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(), envName);
231 public static String getStatusTopicName(String envName) {
232 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
233 .getDistributionEngineConfiguration();
234 return DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(), envName);
238 protected CambriaErrorResponse unRegisterDistributionClientFromTopic(RegistrationRequest unRegistrationRequest,
239 SubscriberTypeEnum subscriberType, String topicName) {
240 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
241 .getDistributionEngineConfiguration();
242 initRequestEnvEndPoints(unRegistrationRequest, config);
244 log.debug("unregistering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName, unRegistrationRequest.getDistEnvEndPoints());
245 return getCambriaHandler().unRegisterFromTopic(unRegistrationRequest.getDistEnvEndPoints(), config.getUebPublicKey(),
246 config.getUebSecretKey(), unRegistrationRequest.getApiPublicKey(), subscriberType, topicName);
249 private TopicRegistrationResponse buildTopicResponse(RegistrationRequest registrationRequest) {
250 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
251 .getDistributionEngineConfiguration();
252 String statusTopicName = DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(),
253 registrationRequest.getDistrEnvName());
254 String notificationTopicName = DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(),
255 registrationRequest.getDistrEnvName());
257 TopicRegistrationResponse topicResponse = new TopicRegistrationResponse();
258 topicResponse.setDistrNotificationTopicName(notificationTopicName);
259 topicResponse.setDistrStatusTopicName(statusTopicName);
260 return topicResponse;
263 protected CambriaErrorResponse registerDistributionClientToTopic(Wrapper<Response> responseWrapper,
264 RegistrationRequest registrationRequest, SubscriberTypeEnum subscriberType, String topicName) {
265 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
266 .getDistributionEngineConfiguration();
267 initRequestEnvEndPoints(registrationRequest, config);
270 // Register for notifications as consumer
271 if (subscriberType == SubscriberTypeEnum.CONSUMER) {
272 errorMsg = "registration of subscriber to topic:" + topicName + " as consumer failed";
274 // Register for status as producer
276 errorMsg = "registration of subscriber to topic:" + topicName + " as producer failed";
278 log.debug("registering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName, registrationRequest.getDistEnvEndPoints());
279 CambriaErrorResponse registerToTopic = getCambriaHandler().registerToTopic(registrationRequest.getDistEnvEndPoints(),
280 config.getUebPublicKey(), config.getUebSecretKey(), registrationRequest.getApiPublicKey(),
281 subscriberType, topicName);
283 if (registerToTopic.getOperationStatus() != CambriaOperationStatus.OK) {
284 Response failedRegistrationResponse = buildErrorResponse(
285 getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
286 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE,
288 responseWrapper.setInnerElement(failedRegistrationResponse);
290 return registerToTopic;
293 protected Response buildErrorResponse(ResponseFormat requestErrorWrapper) {
294 return Response.status(requestErrorWrapper.getStatus())
295 .entity(gson.toJson(requestErrorWrapper.getRequestError())).build();
298 public ResponseFormatManager getResponseFormatManager() {
299 return responseFormatManager;
302 public IDistributionEngine getDistributionEngine() {
303 return distributionEngine;
306 public CambriaHandler getCambriaHandler() {
307 if (cambriaHandler == null) {
308 cambriaHandler = new CambriaHandler();
310 return cambriaHandler;