d293e9b61522cb6347fca48b72fd4c841be21380
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / distribution / DistributionBusinessLogic.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.distribution;
21
22 import static org.apache.commons.lang.BooleanUtils.isTrue;
23 import static org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask.buildTopicName;
24 import static org.openecomp.sdc.be.config.ConfigurationManager.getConfigurationManager;
25
26 import com.google.gson.Gson;
27 import com.google.gson.GsonBuilder;
28 import fj.data.Either;
29 import java.util.List;
30 import javax.annotation.Resource;
31 import javax.ws.rs.core.Response;
32 import org.apache.commons.collections.CollectionUtils;
33 import org.apache.http.HttpStatus;
34 import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
35 import org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask;
36 import org.openecomp.sdc.be.components.distribution.engine.ICambriaHandler;
37 import org.openecomp.sdc.be.components.distribution.engine.IDistributionEngine;
38 import org.openecomp.sdc.be.components.distribution.engine.SubscriberTypeEnum;
39 import org.openecomp.sdc.be.components.impl.ResponseFormatManager;
40 import org.openecomp.sdc.be.config.BeEcompErrorManager;
41 import org.openecomp.sdc.be.config.ConfigurationManager;
42 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
43 import org.openecomp.sdc.be.dao.api.ActionStatus;
44 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
45 import org.openecomp.sdc.be.distribution.api.client.RegistrationRequest;
46 import org.openecomp.sdc.be.distribution.api.client.ServerListResponse;
47 import org.openecomp.sdc.be.distribution.api.client.TopicRegistrationResponse;
48 import org.openecomp.sdc.be.distribution.api.client.TopicUnregistrationResponse;
49 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
50 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
51 import org.openecomp.sdc.common.datastructure.Wrapper;
52 import org.openecomp.sdc.common.log.wrappers.Logger;
53 import org.openecomp.sdc.exception.ResponseFormat;
54 import org.springframework.stereotype.Component;
55
56 @Component("distributionBusinessLogic")
57 public class DistributionBusinessLogic {
58
59     public static final String REGISTER_IN_DISTRIBUTION_ENGINE = "registerInDistributionEngine";
60     public static final String UN_REGISTER_IN_DISTRIBUTION_ENGINE = "unregisterInDistributionEngine";
61     private static final Logger log = Logger.getLogger(DistributionBusinessLogic.class);
62     private Gson gson = new GsonBuilder().setPrettyPrinting().create();
63     @Resource
64     private IDistributionEngine distributionEngine;
65     private ResponseFormatManager responseFormatManager = ResponseFormatManager.getInstance();
66     @Resource
67     private ICambriaHandler cambriaHandler;
68
69     public static String getNotificationTopicName(String envName) {
70         DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
71         return DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(), envName);
72     }
73
74     public static String getStatusTopicName(String envName) {
75         DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
76         return DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(), envName);
77     }
78
79     private void initRequestEnvEndPointsAndKeys(RegistrationRequest registrationRequest, DistributionEngineConfiguration config) {
80         if (CollectionUtils.isEmpty(registrationRequest.getDistEnvEndPoints())) {
81             registrationRequest.setDistEnvEndPoints(config.getUebServers());
82             registrationRequest.setManagerApiPublicKey(config.getUebPublicKey());
83             registrationRequest.setManagerApiSecretKey(config.getUebSecretKey());
84         } else {
85             OperationalEnvironmentEntry environment = distributionEngine.getEnvironmentByDmaapUebAddress(registrationRequest.getDistEnvEndPoints());
86             registrationRequest.setManagerApiPublicKey(environment.getUebApikey());
87             registrationRequest.setManagerApiSecretKey(environment.getUebSecretKey());
88         }
89     }
90
91     public Either<ServerListResponse, ResponseFormat> getUebServerList() {
92         DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
93             .getDistributionEngineConfiguration();
94         List<String> serverList = distributionEngineConfiguration.getUebServers();
95         if (serverList != null && !serverList.isEmpty()) {
96             ServerListResponse serverListResponse = new ServerListResponse();
97             serverListResponse.setUebServerList(serverList);
98             return Either.left(serverListResponse);
99         } else {
100             ResponseFormat errorResponseWrapper = getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR);
101             return Either.right(errorResponseWrapper);
102         }
103     }
104
105     public void handleRegistration(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest, AuditHandler auditHandler) {
106         CambriaErrorResponse registerResponse = null;
107         try {
108             DistributionEngineConfiguration config = getConfigurationManager().getDistributionEngineConfiguration();
109             String statusTopicName = buildTopicName(config.getDistributionStatusTopicName(), registrationRequest.getDistrEnvName());
110             initRequestEnvEndPointsAndKeys(registrationRequest, config);
111             registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.PRODUCER, statusTopicName);
112             auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.PRODUCER,
113                 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
114             boolean isRegisteredAsProducerOnStatusSuccess = responseWrapper.isEmpty();
115             // Story [347698] Distribution Client Get Indication from
116
117             // component whether to register as consumer and producer on
118
119             // status topic
120             boolean registeredAsConsumerOnStatus = false;
121             if (isRegisteredAsProducerOnStatusSuccess && isTrue(registrationRequest.getIsConsumerToSdcDistrStatusTopic())) {
122                 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.CONSUMER,
123                     statusTopicName);
124                 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
125                     DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
126                 registeredAsConsumerOnStatus = responseWrapper.isEmpty();
127             }
128             if (responseWrapper.isEmpty()) {
129                 String notificationTopicName = buildTopicName(config.getDistributionNotifTopicName(), registrationRequest.getDistrEnvName());
130                 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest, SubscriberTypeEnum.CONSUMER,
131                     notificationTopicName);
132                 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
133                     DistributionTopicData.newBuilder().notificationTopic(notificationTopicName).build());
134             }
135             // Unregister Rollback
136             if (!responseWrapper.isEmpty()) {
137                 if (isRegisteredAsProducerOnStatusSuccess) {
138                     CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest, SubscriberTypeEnum.PRODUCER,
139                         statusTopicName);
140                     auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.PRODUCER,
141                         DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
142                 }
143                 if (registeredAsConsumerOnStatus) {
144                     CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest, SubscriberTypeEnum.CONSUMER,
145                         statusTopicName);
146                     auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.CONSUMER,
147                         DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
148                 }
149             }
150             if (responseWrapper.isEmpty()) {
151                 TopicRegistrationResponse okTopicResponse = buildTopicResponse(registrationRequest);
152                 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(okTopicResponse).build());
153             }
154         } catch (Exception e) {
155             log.error("registration to topic failed", e);
156             BeEcompErrorManager.getInstance()
157                 .logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE, "registration of subscriber to topic");
158             Response errorResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
159             responseWrapper.setInnerElement(errorResponse);
160         } finally {
161             auditHandler.auditRegisterRequest(registerResponse);
162         }
163     }
164
165     public void handleUnRegistration(Wrapper<Response> responseWrapper, RegistrationRequest unRegistrationRequest, AuditHandler auditHandler) {
166         Wrapper<CambriaErrorResponse> cambriaResponseWrapper = new Wrapper<>();
167         try {
168             String statusTopicName = getStatusTopicName(unRegistrationRequest.getDistrEnvName());
169             DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
170             initRequestEnvEndPointsAndKeys(unRegistrationRequest, config);
171             CambriaErrorResponse unregisterClientProducerTopicResponse = unRegisterDistributionClientFromTopic(unRegistrationRequest,
172                 SubscriberTypeEnum.PRODUCER, statusTopicName);
173             auditHandler.auditUnRegisterACL(unregisterClientProducerTopicResponse, SubscriberTypeEnum.PRODUCER,
174                 DistributionTopicData.newBuilder().statusTopic(statusTopicName).build());
175             updateResponseWrapper(cambriaResponseWrapper, unregisterClientProducerTopicResponse);
176             String notificationTopicName = getNotificationTopicName(unRegistrationRequest.getDistrEnvName());
177             CambriaErrorResponse unregisterClientConsumerTopicResponse = unRegisterDistributionClientFromTopic(unRegistrationRequest,
178                 SubscriberTypeEnum.CONSUMER, notificationTopicName);
179             auditHandler.auditUnRegisterACL(unregisterClientConsumerTopicResponse, SubscriberTypeEnum.CONSUMER,
180                 DistributionTopicData.newBuilder().notificationTopic(notificationTopicName).build());
181             updateResponseWrapper(cambriaResponseWrapper, unregisterClientConsumerTopicResponse);
182             // Success unregister both topics
183             TopicUnregistrationResponse unregisterResponse = new TopicUnregistrationResponse(
184                 getNotificationTopicName(unRegistrationRequest.getDistrEnvName()), getStatusTopicName(unRegistrationRequest.getDistrEnvName()),
185                 unregisterClientConsumerTopicResponse.getOperationStatus(), unregisterClientProducerTopicResponse.getOperationStatus());
186             if (cambriaResponseWrapper.getInnerElement().getOperationStatus() == CambriaOperationStatus.OK) {
187                 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(unregisterResponse).build());
188             } else {
189                 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(UN_REGISTER_IN_DISTRIBUTION_ENGINE, "unregistration failed");
190                 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).entity(unregisterResponse).build());
191             }
192         } catch (Exception e) {
193             log.error("unregistered to topic failed", e);
194             Response errorResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
195             responseWrapper.setInnerElement(errorResponse);
196         } finally {
197             auditHandler.auditUnRegisterRequest(cambriaResponseWrapper.getInnerElement());
198         }
199     }
200
201     private void updateResponseWrapper(Wrapper<CambriaErrorResponse> cambriaResponseWrapper, CambriaErrorResponse currentResponse) {
202         if (cambriaResponseWrapper.isEmpty()) {
203             cambriaResponseWrapper.setInnerElement(currentResponse);
204         } else if (currentResponse.getOperationStatus() != CambriaOperationStatus.OK) {
205             cambriaResponseWrapper.setInnerElement(currentResponse);
206         }
207     }
208
209     protected CambriaErrorResponse unRegisterDistributionClientFromTopic(RegistrationRequest unRegistrationRequest, SubscriberTypeEnum subscriberType,
210                                                                          String topicName) {
211         log.debug("unregistering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName,
212             unRegistrationRequest.getDistEnvEndPoints());
213         return cambriaHandler.unRegisterFromTopic(unRegistrationRequest.getDistEnvEndPoints(), unRegistrationRequest.getManagerApiPublicKey(),
214             unRegistrationRequest.getManagerApiSecretKey(), unRegistrationRequest.getApiPublicKey(), subscriberType, topicName);
215     }
216
217     private TopicRegistrationResponse buildTopicResponse(RegistrationRequest registrationRequest) {
218         DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
219         String statusTopicName = DistributionEngineInitTask
220             .buildTopicName(config.getDistributionStatusTopicName(), registrationRequest.getDistrEnvName());
221         String notificationTopicName = DistributionEngineInitTask
222             .buildTopicName(config.getDistributionNotifTopicName(), registrationRequest.getDistrEnvName());
223         TopicRegistrationResponse topicResponse = new TopicRegistrationResponse();
224         topicResponse.setDistrNotificationTopicName(notificationTopicName);
225         topicResponse.setDistrStatusTopicName(statusTopicName);
226         return topicResponse;
227     }
228
229     protected CambriaErrorResponse registerDistributionClientToTopic(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest,
230                                                                      SubscriberTypeEnum subscriberType, String topicName) {
231         String errorMsg;
232         // Register for notifications as consumer
233         if (subscriberType == SubscriberTypeEnum.CONSUMER) {
234             errorMsg = "registration of subscriber to topic:" + topicName + " as consumer failed";
235         }
236         // Register for status as producer
237         else {
238             errorMsg = "registration of subscriber to topic:" + topicName + " as producer failed";
239         }
240         log.debug("registering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName,
241             registrationRequest.getDistEnvEndPoints());
242         CambriaErrorResponse registerToTopic = cambriaHandler
243             .registerToTopic(registrationRequest.getDistEnvEndPoints(), registrationRequest.getManagerApiPublicKey(),
244                 registrationRequest.getManagerApiSecretKey(), registrationRequest.getApiPublicKey(), subscriberType, topicName);
245         if (registerToTopic.getOperationStatus() != CambriaOperationStatus.OK) {
246             Response failedRegistrationResponse = buildErrorResponse(getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
247             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE, errorMsg);
248             responseWrapper.setInnerElement(failedRegistrationResponse);
249         }
250         return registerToTopic;
251     }
252
253     protected Response buildErrorResponse(ResponseFormat requestErrorWrapper) {
254         return Response.status(requestErrorWrapper.getStatus()).entity(gson.toJson(requestErrorWrapper.getRequestError())).build();
255     }
256
257     public ResponseFormatManager getResponseFormatManager() {
258         return responseFormatManager;
259     }
260
261     public IDistributionEngine getDistributionEngine() {
262         return distributionEngine;
263     }
264 }