2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 package org.openecomp.sdc.be.distribution;
22 import static org.apache.commons.lang.BooleanUtils.isTrue;
23 import static org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask.buildTopicName;
24 import static org.openecomp.sdc.be.config.ConfigurationManager.getConfigurationManager;
26 import com.google.gson.Gson;
27 import com.google.gson.GsonBuilder;
28 import fj.data.Either;
29 import java.util.List;
30 import javax.annotation.Resource;
31 import javax.ws.rs.core.Response;
32 import org.apache.commons.collections.CollectionUtils;
33 import org.apache.http.HttpStatus;
34 import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
35 import org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask;
36 import org.openecomp.sdc.be.components.distribution.engine.ICambriaHandler;
37 import org.openecomp.sdc.be.components.distribution.engine.IDistributionEngine;
38 import org.openecomp.sdc.be.components.distribution.engine.SubscriberTypeEnum;
39 import org.openecomp.sdc.be.components.impl.ResponseFormatManager;
40 import org.openecomp.sdc.be.config.BeEcompErrorManager;
41 import org.openecomp.sdc.be.config.ConfigurationManager;
42 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
43 import org.openecomp.sdc.be.dao.api.ActionStatus;
44 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
45 import org.openecomp.sdc.be.distribution.api.client.RegistrationRequest;
46 import org.openecomp.sdc.be.distribution.api.client.ServerListResponse;
47 import org.openecomp.sdc.be.distribution.api.client.TopicRegistrationResponse;
48 import org.openecomp.sdc.be.distribution.api.client.TopicUnregistrationResponse;
49 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
50 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
51 import org.openecomp.sdc.common.datastructure.Wrapper;
52 import org.openecomp.sdc.common.log.wrappers.Logger;
53 import org.openecomp.sdc.exception.ResponseFormat;
54 import org.springframework.stereotype.Component;
56 @Component("distributionBusinessLogic")
57 public class DistributionBusinessLogic {
59 public static final String REGISTER_IN_DISTRIBUTION_ENGINE = "registerInDistributionEngine";
60 public static final String UN_REGISTER_IN_DISTRIBUTION_ENGINE = "unregisterInDistributionEngine";
61 private static final Logger log = Logger.getLogger(DistributionBusinessLogic.class);
62 private Gson gson = new GsonBuilder().setPrettyPrinting().create();
64 private IDistributionEngine distributionEngine;
65 private ResponseFormatManager responseFormatManager = ResponseFormatManager.getInstance();
67 private ICambriaHandler cambriaHandler;
69 public static String getNotificationTopicName(String envName) {
70 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
71 return DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(), envName);
74 public static String getStatusTopicName(String envName) {
75 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
76 return DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(), envName);
79 private void initRequestEnvEndPointsAndKeys(RegistrationRequest registrationRequest, DistributionEngineConfiguration config) {
80 if (CollectionUtils.isEmpty(registrationRequest.getDistEnvEndPoints())) {
81 registrationRequest.setDistEnvEndPoints(config.getUebServers());
82 registrationRequest.setManagerApiPublicKey(config.getUebPublicKey());
83 registrationRequest.setManagerApiSecretKey(config.getUebSecretKey());
85 OperationalEnvironmentEntry environment = distributionEngine.getEnvironmentByDmaapUebAddress(registrationRequest.getDistEnvEndPoints());
86 registrationRequest.setManagerApiPublicKey(environment.getUebApikey());
87 registrationRequest.setManagerApiSecretKey(environment.getUebSecretKey());
91 public Either<ServerListResponse, ResponseFormat> getUebServerList() {
92 DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
93 .getDistributionEngineConfiguration();
94 List<String> serverList = distributionEngineConfiguration.getUebServers();
95 if (serverList != null && !serverList.isEmpty()) {
96 ServerListResponse serverListResponse = new ServerListResponse();
97 serverListResponse.setUebServerList(serverList);
98 return Either.left(serverListResponse);
100 ResponseFormat errorResponseWrapper = getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR);
101 return Either.right(errorResponseWrapper);
105 public void handleRegistration(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest, AuditHandler auditHandler) {
106 CambriaErrorResponse registerResponse = null;
108 DistributionEngineConfiguration config = getConfigurationManager().getDistributionEngineConfiguration();
109 String statusTopicName = buildTopicName(config.getDistributionStatusTopicName(), registrationRequest.getDistrEnvName());
110 initRequestEnvEndPointsAndKeys(registrationRequest, config);
111 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.PRODUCER, statusTopicName);
112 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.PRODUCER,
113 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
114 boolean isRegisteredAsProducerOnStatusSuccess = responseWrapper.isEmpty();
115 // Story [347698] Distribution Client Get Indication from
117 // component whether to register as consumer and producer on
120 boolean registeredAsConsumerOnStatus = false;
121 if (isRegisteredAsProducerOnStatusSuccess && isTrue(registrationRequest.getIsConsumerToSdcDistrStatusTopic())) {
122 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.CONSUMER,
124 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
125 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
126 registeredAsConsumerOnStatus = responseWrapper.isEmpty();
128 if (responseWrapper.isEmpty()) {
129 String notificationTopicName = buildTopicName(config.getDistributionNotifTopicName(), registrationRequest.getDistrEnvName());
130 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.CONSUMER,
131 notificationTopicName);
132 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
133 DistributionTopicData.newBuilder().notificationTopic(notificationTopicName).build());
135 // Unregister Rollback
136 if (!responseWrapper.isEmpty()) {
137 if (isRegisteredAsProducerOnStatusSuccess) {
138 CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest, SubscriberTypeEnum.PRODUCER,
140 auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.PRODUCER,
141 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
143 if (registeredAsConsumerOnStatus) {
144 CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest, SubscriberTypeEnum.CONSUMER,
146 auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.CONSUMER,
147 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
150 if (responseWrapper.isEmpty()) {
151 TopicRegistrationResponse okTopicResponse = buildTopicResponse(registrationRequest);
152 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(okTopicResponse).build());
154 } catch (Exception e) {
155 log.error("registration to topic failed", e);
156 BeEcompErrorManager.getInstance()
157 .logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE, "registration of subscriber to topic");
158 Response errorResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
159 responseWrapper.setInnerElement(errorResponse);
161 auditHandler.auditRegisterRequest(registerResponse);
165 public void handleUnRegistration(Wrapper<Response> responseWrapper, RegistrationRequest unRegistrationRequest, AuditHandler auditHandler) {
166 Wrapper<CambriaErrorResponse> cambriaResponseWrapper = new Wrapper<>();
168 String statusTopicName = getStatusTopicName(unRegistrationRequest.getDistrEnvName());
169 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
170 initRequestEnvEndPointsAndKeys(unRegistrationRequest, config);
171 CambriaErrorResponse unregisterClientProducerTopicResponse = unRegisterDistributionClientFromTopic(unRegistrationRequest,
172 SubscriberTypeEnum.PRODUCER, statusTopicName);
173 auditHandler.auditUnRegisterACL(unregisterClientProducerTopicResponse, SubscriberTypeEnum.PRODUCER,
174 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
175 updateResponseWrapper(cambriaResponseWrapper, unregisterClientProducerTopicResponse);
176 String notificationTopicName = getNotificationTopicName(unRegistrationRequest.getDistrEnvName());
177 CambriaErrorResponse unregisterClientConsumerTopicResponse = unRegisterDistributionClientFromTopic(unRegistrationRequest,
178 SubscriberTypeEnum.CONSUMER, notificationTopicName);
179 auditHandler.auditUnRegisterACL(unregisterClientConsumerTopicResponse, SubscriberTypeEnum.CONSUMER,
180 DistributionTopicData.newBuilder().notificationTopic(notificationTopicName).build());
181 updateResponseWrapper(cambriaResponseWrapper, unregisterClientConsumerTopicResponse);
182 // Success unregister both topics
183 TopicUnregistrationResponse unregisterResponse = new TopicUnregistrationResponse(
184 getNotificationTopicName(unRegistrationRequest.getDistrEnvName()), getStatusTopicName(unRegistrationRequest.getDistrEnvName()),
185 unregisterClientConsumerTopicResponse.getOperationStatus(), unregisterClientProducerTopicResponse.getOperationStatus());
186 if (cambriaResponseWrapper.getInnerElement().getOperationStatus() == CambriaOperationStatus.OK) {
187 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(unregisterResponse).build());
189 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(UN_REGISTER_IN_DISTRIBUTION_ENGINE, "unregistration failed");
190 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).entity(unregisterResponse).build());
192 } catch (Exception e) {
193 log.error("unregistered to topic failed", e);
194 Response errorResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
195 responseWrapper.setInnerElement(errorResponse);
197 auditHandler.auditUnRegisterRequest(cambriaResponseWrapper.getInnerElement());
201 private void updateResponseWrapper(Wrapper<CambriaErrorResponse> cambriaResponseWrapper, CambriaErrorResponse currentResponse) {
202 if (cambriaResponseWrapper.isEmpty()) {
203 cambriaResponseWrapper.setInnerElement(currentResponse);
204 } else if (currentResponse.getOperationStatus() != CambriaOperationStatus.OK) {
205 cambriaResponseWrapper.setInnerElement(currentResponse);
209 protected CambriaErrorResponse unRegisterDistributionClientFromTopic(RegistrationRequest unRegistrationRequest, SubscriberTypeEnum subscriberType,
211 log.debug("unregistering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName,
212 unRegistrationRequest.getDistEnvEndPoints());
213 return cambriaHandler.unRegisterFromTopic(unRegistrationRequest.getDistEnvEndPoints(), unRegistrationRequest.getManagerApiPublicKey(),
214 unRegistrationRequest.getManagerApiSecretKey(), unRegistrationRequest.getApiPublicKey(), subscriberType, topicName);
217 private TopicRegistrationResponse buildTopicResponse(RegistrationRequest registrationRequest) {
218 DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
219 String statusTopicName = DistributionEngineInitTask
220 .buildTopicName(config.getDistributionStatusTopicName(), registrationRequest.getDistrEnvName());
221 String notificationTopicName = DistributionEngineInitTask
222 .buildTopicName(config.getDistributionNotifTopicName(), registrationRequest.getDistrEnvName());
223 TopicRegistrationResponse topicResponse = new TopicRegistrationResponse();
224 topicResponse.setDistrNotificationTopicName(notificationTopicName);
225 topicResponse.setDistrStatusTopicName(statusTopicName);
226 return topicResponse;
229 protected CambriaErrorResponse registerDistributionClientToTopic(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest,
230 SubscriberTypeEnum subscriberType, String topicName) {
232 // Register for notifications as consumer
233 if (subscriberType == SubscriberTypeEnum.CONSUMER) {
234 errorMsg = "registration of subscriber to topic:" + topicName + " as consumer failed";
236 // Register for status as producer
238 errorMsg = "registration of subscriber to topic:" + topicName + " as producer failed";
240 log.debug("registering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName,
241 registrationRequest.getDistEnvEndPoints());
242 CambriaErrorResponse registerToTopic = cambriaHandler
243 .registerToTopic(registrationRequest.getDistEnvEndPoints(), registrationRequest.getManagerApiPublicKey(),
244 registrationRequest.getManagerApiSecretKey(), registrationRequest.getApiPublicKey(), subscriberType, topicName);
245 if (registerToTopic.getOperationStatus() != CambriaOperationStatus.OK) {
246 Response failedRegistrationResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
247 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE, errorMsg);
248 responseWrapper.setInnerElement(failedRegistrationResponse);
250 return registerToTopic;
253 protected Response buildErrorResponse(ResponseFormat requestErrorWrapper) {
254 return Response.status(requestErrorWrapper.getStatus()).entity(gson.toJson(requestErrorWrapper.getRequestError())).build();
257 public ResponseFormatManager getResponseFormatManager() {
258 return responseFormatManager;
261 public IDistributionEngine getDistributionEngine() {
262 return distributionEngine;