Catalog alignment
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / distribution / DistributionBusinessLogic.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.distribution;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import fj.data.Either;
26 import org.apache.commons.collections.CollectionUtils;
27 import org.apache.http.HttpStatus;
28 import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
29 import org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask;
30 import org.openecomp.sdc.be.components.distribution.engine.ICambriaHandler;
31 import org.openecomp.sdc.be.components.distribution.engine.IDistributionEngine;
32 import org.openecomp.sdc.be.components.distribution.engine.SubscriberTypeEnum;
33 import org.openecomp.sdc.be.components.impl.ResponseFormatManager;
34 import org.openecomp.sdc.be.config.BeEcompErrorManager;
35 import org.openecomp.sdc.be.config.ConfigurationManager;
36 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
37 import org.openecomp.sdc.be.dao.api.ActionStatus;
38 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
39 import org.openecomp.sdc.be.distribution.api.client.RegistrationRequest;
40 import org.openecomp.sdc.be.distribution.api.client.ServerListResponse;
41 import org.openecomp.sdc.be.distribution.api.client.TopicRegistrationResponse;
42 import org.openecomp.sdc.be.distribution.api.client.TopicUnregistrationResponse;
43 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
44 import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
45 import org.openecomp.sdc.common.datastructure.Wrapper;
46 import org.openecomp.sdc.common.log.wrappers.Logger;
47 import org.openecomp.sdc.exception.ResponseFormat;
48 import org.springframework.stereotype.Component;
49
50 import javax.annotation.Resource;
51 import javax.ws.rs.core.Response;
52 import java.util.List;
53
54 import static org.apache.commons.lang.BooleanUtils.isTrue;
55 import static org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask.buildTopicName;
56 import static org.openecomp.sdc.be.config.ConfigurationManager.getConfigurationManager;
57
58 @Component("distributionBusinessLogic")
59 public class DistributionBusinessLogic {
60     public static final String REGISTER_IN_DISTRIBUTION_ENGINE = "registerInDistributionEngine";
61     public static final String UN_REGISTER_IN_DISTRIBUTION_ENGINE = "unregisterInDistributionEngine";
62     private Gson gson = new GsonBuilder().setPrettyPrinting().create();
63     private static final Logger log = Logger.getLogger(DistributionBusinessLogic.class);
64     @Resource
65     private IDistributionEngine distributionEngine;
66
67     private ResponseFormatManager responseFormatManager = ResponseFormatManager.getInstance();
68     @Resource
69     private ICambriaHandler cambriaHandler;
70
71     private void initRequestEnvEndPointsAndKeys(RegistrationRequest registrationRequest, DistributionEngineConfiguration config) {
72         if(CollectionUtils.isEmpty(registrationRequest.getDistEnvEndPoints())){
73             registrationRequest.setDistEnvEndPoints(config.getUebServers());
74             registrationRequest.setManagerApiPublicKey(config.getUebPublicKey());
75             registrationRequest.setManagerApiSecretKey(config.getUebSecretKey());
76         } else {
77             OperationalEnvironmentEntry environment = distributionEngine.getEnvironmentByDmaapUebAddress(registrationRequest.getDistEnvEndPoints());
78             registrationRequest.setManagerApiPublicKey(environment.getUebApikey());
79             registrationRequest.setManagerApiSecretKey(environment.getUebSecretKey());
80         }
81     }
82     public Either<ServerListResponse, ResponseFormat> getUebServerList() {
83
84         DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
85                 .getDistributionEngineConfiguration();
86
87         List<String> serverList = distributionEngineConfiguration.getUebServers();
88
89         if (serverList != null && !serverList.isEmpty()) {
90
91             ServerListResponse serverListResponse = new ServerListResponse();
92
93             serverListResponse.setUebServerList(serverList);
94
95             return Either.left(serverListResponse);
96         } else {
97             ResponseFormat errorResponseWrapper = getResponseFormatManager()
98                     .getResponseFormat(ActionStatus.GENERAL_ERROR);
99             return Either.right(errorResponseWrapper);
100         }
101
102     }
103
104     public void handleRegistration(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest,
105             AuditHandler auditHandler) {
106         CambriaErrorResponse registerResponse = null;
107         try {
108             DistributionEngineConfiguration config = getConfigurationManager().getDistributionEngineConfiguration();
109             String statusTopicName = buildTopicName(config.getDistributionStatusTopicName(),
110                     registrationRequest.getDistrEnvName());
111             initRequestEnvEndPointsAndKeys(registrationRequest, config);
112             registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
113                     SubscriberTypeEnum.PRODUCER, statusTopicName);
114
115             auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.PRODUCER,
116                     DistributionTopicData.newBuilder()
117                         .statusTopic(statusTopicName)
118                         .build());
119             boolean isRegisteredAsProducerOnStatusSuccess = responseWrapper.isEmpty();
120
121             // Story [347698] Distribution Client Get Indication from
122             // component whether to register as consumer and producer on
123             // status topic
124             boolean registeredAsConsumerOnStatus = false;
125             if (isRegisteredAsProducerOnStatusSuccess && isTrue(registrationRequest.getIsConsumerToSdcDistrStatusTopic())) {
126                 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
127                         SubscriberTypeEnum.CONSUMER, statusTopicName);
128                 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
129                         DistributionTopicData.newBuilder()
130                                 .statusTopic(statusTopicName)
131                                 .build());
132                 registeredAsConsumerOnStatus = responseWrapper.isEmpty();
133
134             }
135
136             if (responseWrapper.isEmpty()) {
137                 String notificationTopicName = buildTopicName(config.getDistributionNotifTopicName(),
138                         registrationRequest.getDistrEnvName());
139                 registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
140                         SubscriberTypeEnum.CONSUMER, notificationTopicName);
141                 auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
142                         DistributionTopicData.newBuilder()
143                             .notificationTopic(notificationTopicName)
144                             .build());
145             }
146             // Unregister Rollback
147             if (!responseWrapper.isEmpty()) {
148                 if (isRegisteredAsProducerOnStatusSuccess) {
149                     CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest,
150                             SubscriberTypeEnum.PRODUCER, statusTopicName);
151                     auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.PRODUCER,
152                             DistributionTopicData.newBuilder()
153                                     .statusTopic(statusTopicName)
154                                     .build());
155                 }
156                 if (registeredAsConsumerOnStatus) {
157                     CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest,
158                             SubscriberTypeEnum.CONSUMER, statusTopicName);
159                     auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.CONSUMER,
160                             DistributionTopicData.newBuilder()
161                             .statusTopic(statusTopicName)
162                             .build());
163                 }
164             }
165
166             if (responseWrapper.isEmpty()) {
167                 TopicRegistrationResponse okTopicResponse = buildTopicResponse(registrationRequest);
168                 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(okTopicResponse).build());
169             }
170
171         } catch (Exception e) {
172             log.error("registration to topic failed", e);
173             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE,
174                     "registration of subscriber to topic");
175             Response errorResponse = buildErrorResponse(
176                     getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
177             responseWrapper.setInnerElement(errorResponse);
178         } finally {
179             auditHandler.auditRegisterRequest(registerResponse);
180         }
181     }
182
183     public void handleUnRegistration(Wrapper<Response> responseWrapper, RegistrationRequest unRegistrationRequest,
184             AuditHandler auditHandler) {
185         Wrapper<CambriaErrorResponse> cambriaResponseWrapper = new Wrapper<>();
186         try {
187             String statusTopicName = getStatusTopicName(unRegistrationRequest.getDistrEnvName());
188             DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
189                     .getDistributionEngineConfiguration();
190             initRequestEnvEndPointsAndKeys(unRegistrationRequest, config);
191             CambriaErrorResponse unregisterClientProducerTopicResponse = unRegisterDistributionClientFromTopic(
192                     unRegistrationRequest, SubscriberTypeEnum.PRODUCER, statusTopicName);
193             auditHandler.auditUnRegisterACL(unregisterClientProducerTopicResponse, SubscriberTypeEnum.PRODUCER,
194                     DistributionTopicData.newBuilder()
195                             .statusTopic(statusTopicName)
196                             .build());
197             updateResponseWrapper(cambriaResponseWrapper, unregisterClientProducerTopicResponse);
198
199             String notificationTopicName = getNotificationTopicName(unRegistrationRequest.getDistrEnvName());
200             CambriaErrorResponse unregisterClientConsumerTopicResponse = unRegisterDistributionClientFromTopic(
201                     unRegistrationRequest, SubscriberTypeEnum.CONSUMER, notificationTopicName);
202             auditHandler.auditUnRegisterACL(unregisterClientConsumerTopicResponse, SubscriberTypeEnum.CONSUMER,
203                     DistributionTopicData.newBuilder()
204                             .notificationTopic(notificationTopicName)
205                             .build());
206             updateResponseWrapper(cambriaResponseWrapper, unregisterClientConsumerTopicResponse);
207
208             // Success unregister both topics
209             TopicUnregistrationResponse unregisterResponse = new TopicUnregistrationResponse(
210                     getNotificationTopicName(unRegistrationRequest.getDistrEnvName()),
211                     getStatusTopicName(unRegistrationRequest.getDistrEnvName()),
212                     unregisterClientConsumerTopicResponse.getOperationStatus(),
213                     unregisterClientProducerTopicResponse.getOperationStatus());
214
215             if (cambriaResponseWrapper.getInnerElement().getOperationStatus() == CambriaOperationStatus.OK) {
216                 responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(unregisterResponse).build());
217             } else {
218                 BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(UN_REGISTER_IN_DISTRIBUTION_ENGINE,
219                         "unregistration failed");
220                 responseWrapper.setInnerElement(
221                         Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).entity(unregisterResponse).build());
222             }
223         } catch (Exception e) {
224             log.error("unregistered to topic failed", e);
225             Response errorResponse = buildErrorResponse(
226                     getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
227             responseWrapper.setInnerElement(errorResponse);
228
229         } finally {
230             auditHandler.auditUnRegisterRequest(cambriaResponseWrapper.getInnerElement());
231         }
232     }
233
234     private void updateResponseWrapper(Wrapper<CambriaErrorResponse> cambriaResponseWrapper,
235             CambriaErrorResponse currentResponse) {
236         if (cambriaResponseWrapper.isEmpty()) {
237             cambriaResponseWrapper.setInnerElement(currentResponse);
238         } else if (currentResponse.getOperationStatus() != CambriaOperationStatus.OK) {
239             cambriaResponseWrapper.setInnerElement(currentResponse);
240
241         }
242
243     }
244
245     public static String getNotificationTopicName(String envName) {
246         DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
247                 .getDistributionEngineConfiguration();
248         return DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(), envName);
249
250     }
251
252     public static String getStatusTopicName(String envName) {
253         DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
254                 .getDistributionEngineConfiguration();
255         return DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(), envName);
256
257     }
258
259     protected CambriaErrorResponse unRegisterDistributionClientFromTopic(RegistrationRequest unRegistrationRequest,
260             SubscriberTypeEnum subscriberType, String topicName) {
261
262         log.debug("unregistering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName, unRegistrationRequest.getDistEnvEndPoints());
263         return cambriaHandler.unRegisterFromTopic(unRegistrationRequest.getDistEnvEndPoints(), unRegistrationRequest.getManagerApiPublicKey(),
264                 unRegistrationRequest.getManagerApiSecretKey(), unRegistrationRequest.getApiPublicKey(), subscriberType, topicName);
265     }
266
267     private TopicRegistrationResponse buildTopicResponse(RegistrationRequest registrationRequest) {
268         DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
269                 .getDistributionEngineConfiguration();
270         String statusTopicName = DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(),
271                 registrationRequest.getDistrEnvName());
272         String notificationTopicName = DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(),
273                 registrationRequest.getDistrEnvName());
274
275         TopicRegistrationResponse topicResponse = new TopicRegistrationResponse();
276         topicResponse.setDistrNotificationTopicName(notificationTopicName);
277         topicResponse.setDistrStatusTopicName(statusTopicName);
278         return topicResponse;
279     }
280
281     protected CambriaErrorResponse registerDistributionClientToTopic(Wrapper<Response> responseWrapper,
282             RegistrationRequest registrationRequest, SubscriberTypeEnum subscriberType, String topicName) {
283
284         String errorMsg;
285
286         // Register for notifications as consumer
287         if (subscriberType == SubscriberTypeEnum.CONSUMER) {
288             errorMsg = "registration of subscriber to topic:" + topicName + " as consumer failed";
289         }
290         // Register for status as producer
291         else {
292             errorMsg = "registration of subscriber to topic:" + topicName + " as producer failed";
293         }
294         log.debug("registering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName, registrationRequest.getDistEnvEndPoints());
295         CambriaErrorResponse registerToTopic = cambriaHandler.registerToTopic(registrationRequest.getDistEnvEndPoints(),
296                 registrationRequest.getManagerApiPublicKey(), registrationRequest.getManagerApiSecretKey(), registrationRequest.getApiPublicKey(),
297                 subscriberType, topicName);
298
299         if (registerToTopic.getOperationStatus() != CambriaOperationStatus.OK) {
300             Response failedRegistrationResponse = buildErrorResponse(
301                     getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
302             BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE,
303                     errorMsg);
304             responseWrapper.setInnerElement(failedRegistrationResponse);
305         }
306         return registerToTopic;
307     }
308
309     protected Response buildErrorResponse(ResponseFormat requestErrorWrapper) {
310         return Response.status(requestErrorWrapper.getStatus())
311                 .entity(gson.toJson(requestErrorWrapper.getRequestError())).build();
312     }
313
314     public ResponseFormatManager getResponseFormatManager() {
315         return responseFormatManager;
316     }
317
318     public IDistributionEngine getDistributionEngine() {
319         return distributionEngine;
320     }
321
322 }