Catalog alignment
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / distribution / DistributionBusinessLogic.java
index 1589b93..81c41bc 100644 (file)
-/*-\r
- * ============LICENSE_START=======================================================\r
- * SDC\r
- * ================================================================================\r
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.\r
- * ================================================================================\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- * \r
- *      http://www.apache.org/licenses/LICENSE-2.0\r
- * \r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- * ============LICENSE_END=========================================================\r
- */\r
-\r
-package org.openecomp.sdc.be.distribution;\r
-\r
-import com.google.gson.Gson;\r
-import com.google.gson.GsonBuilder;\r
-import fj.data.Either;\r
-import org.apache.http.HttpStatus;\r
-import org.openecomp.sdc.be.components.distribution.engine.*;\r
-import org.openecomp.sdc.be.components.impl.ResponseFormatManager;\r
-import org.openecomp.sdc.be.config.BeEcompErrorManager;\r
-import org.openecomp.sdc.be.config.ConfigurationManager;\r
-import org.openecomp.sdc.be.config.DistributionEngineConfiguration;\r
-import org.openecomp.sdc.be.dao.api.ActionStatus;\r
-import org.openecomp.sdc.be.distribution.api.client.*;\r
-import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;\r
-import org.openecomp.sdc.common.datastructure.Wrapper;\r
-import org.openecomp.sdc.common.log.wrappers.Logger;\r
-import org.openecomp.sdc.exception.ResponseFormat;\r
-import org.springframework.stereotype.Component;\r
-\r
-import javax.annotation.Resource;\r
-import javax.ws.rs.core.Response;\r
-import java.util.List;\r
-\r
-import static org.apache.commons.lang.BooleanUtils.isTrue;\r
-import static org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask.buildTopicName;\r
-import static org.openecomp.sdc.be.config.ConfigurationManager.getConfigurationManager;\r
-\r
-@Component("distributionBusinessLogic")\r
-public class DistributionBusinessLogic {\r
-    public static final String REGISTER_IN_DISTRIBUTION_ENGINE = "registerInDistributionEngine";\r
-    public static final String UN_REGISTER_IN_DISTRIBUTION_ENGINE = "unregisterInDistributionEngine";\r
-    private Gson gson = new GsonBuilder().setPrettyPrinting().create();\r
-    private static final Logger log = Logger.getLogger(DistributionBusinessLogic.class);\r
-    @Resource\r
-    private IDistributionEngine distributionEngine;\r
-\r
-    private ResponseFormatManager responseFormatManager = ResponseFormatManager.getInstance();\r
-    private CambriaHandler cambriaHandler;\r
-\r
-    private void initRequestEnvEndPoints(RegistrationRequest registrationRequest, DistributionEngineConfiguration config) {\r
-        if(registrationRequest.getDistEnvEndPoints() == null || registrationRequest.getDistEnvEndPoints().isEmpty()){\r
-            registrationRequest.setDistEnvEndPoints(config.getUebServers());\r
-        }\r
-    }\r
-    public Either<ServerListResponse, ResponseFormat> getUebServerList() {\r
-\r
-        DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()\r
-                .getDistributionEngineConfiguration();\r
-\r
-        List<String> serverList = distributionEngineConfiguration.getUebServers();\r
-\r
-        if (serverList != null && !serverList.isEmpty()) {\r
-\r
-            ServerListResponse serverListResponse = new ServerListResponse();\r
-\r
-            serverListResponse.setUebServerList(serverList);\r
-\r
-            return Either.left(serverListResponse);\r
-        } else {\r
-            ResponseFormat errorResponseWrapper = getResponseFormatManager()\r
-                    .getResponseFormat(ActionStatus.GENERAL_ERROR);\r
-            return Either.right(errorResponseWrapper);\r
-        }\r
-\r
-    }\r
-\r
-    public void handleRegistration(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest,\r
-            AuditHandler auditHandler) {\r
-        CambriaErrorResponse registerResponse = null;\r
-        try {\r
-            DistributionEngineConfiguration config = getConfigurationManager().getDistributionEngineConfiguration();\r
-            String statusTopicName = buildTopicName(config.getDistributionStatusTopicName(),\r
-                    registrationRequest.getDistrEnvName());\r
-            registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,\r
-                    SubscriberTypeEnum.PRODUCER, statusTopicName);\r
-\r
-            auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.PRODUCER,\r
-                    DistributionTopicData.newBuilder()\r
-                        .statusTopic(statusTopicName)\r
-                        .build());\r
-            boolean isRegisteredAsProducerOnStatusSuccess = responseWrapper.isEmpty();\r
-\r
-            // Story [347698] Distribution Client Get Indication from\r
-            // component whether to register as consumer and producer on\r
-            // status topic\r
-            boolean registeredAsConsumerOnStatus = false;\r
-            if (isRegisteredAsProducerOnStatusSuccess && isTrue(registrationRequest.getIsConsumerToSdcDistrStatusTopic())) {\r
-                registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,\r
-                        SubscriberTypeEnum.CONSUMER, statusTopicName);\r
-                auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,\r
-                        DistributionTopicData.newBuilder()\r
-                                .statusTopic(statusTopicName)\r
-                                .build());\r
-                registeredAsConsumerOnStatus = responseWrapper.isEmpty();\r
-\r
-            }\r
-\r
-            if (responseWrapper.isEmpty()) {\r
-                String notificationTopicName = buildTopicName(config.getDistributionNotifTopicName(),\r
-                        registrationRequest.getDistrEnvName());\r
-                registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,\r
-                        SubscriberTypeEnum.CONSUMER, notificationTopicName);\r
-                auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,\r
-                        DistributionTopicData.newBuilder()\r
-                            .notificationTopic(notificationTopicName)\r
-                            .build());\r
-            }\r
-            // Unregister Rollback\r
-            if (!responseWrapper.isEmpty()) {\r
-                if (isRegisteredAsProducerOnStatusSuccess) {\r
-                    CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest,\r
-                            SubscriberTypeEnum.PRODUCER, statusTopicName);\r
-                    auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.PRODUCER,\r
-                            DistributionTopicData.newBuilder()\r
-                                    .statusTopic(statusTopicName)\r
-                                    .build());\r
-                }\r
-                if (registeredAsConsumerOnStatus) {\r
-                    CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest,\r
-                            SubscriberTypeEnum.CONSUMER, statusTopicName);\r
-                    auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.CONSUMER,\r
-                            DistributionTopicData.newBuilder()\r
-                            .statusTopic(statusTopicName)\r
-                            .build());\r
-                }\r
-            }\r
-\r
-            if (responseWrapper.isEmpty()) {\r
-                TopicRegistrationResponse okTopicResponse = buildTopicResponse(registrationRequest);\r
-                responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(okTopicResponse).build());\r
-            }\r
-\r
-        } catch (Exception e) {\r
-            log.error("registration to topic failed", e);\r
-            BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE,\r
-                    "registration of subscriber to topic");\r
-            Response errorResponse = buildErrorResponse(\r
-                    getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));\r
-            responseWrapper.setInnerElement(errorResponse);\r
-        } finally {\r
-            auditHandler.auditRegisterRequest(registerResponse);\r
-        }\r
-    }\r
-\r
-    public void handleUnRegistration(Wrapper<Response> responseWrapper, RegistrationRequest unRegistrationRequest,\r
-            AuditHandler auditHandler) {\r
-        Wrapper<CambriaErrorResponse> cambriaResponseWrapper = new Wrapper<>();\r
-        try {\r
-            String statusTopicName = getStatusTopicName(unRegistrationRequest.getDistrEnvName());\r
-            CambriaErrorResponse unregisterClientProducerTopicResponse = unRegisterDistributionClientFromTopic(\r
-                    unRegistrationRequest, SubscriberTypeEnum.PRODUCER, statusTopicName);\r
-            auditHandler.auditUnRegisterACL(unregisterClientProducerTopicResponse, SubscriberTypeEnum.PRODUCER,\r
-                    DistributionTopicData.newBuilder()\r
-                            .statusTopic(statusTopicName)\r
-                            .build());\r
-            updateResponseWrapper(cambriaResponseWrapper, unregisterClientProducerTopicResponse);\r
-\r
-            String notificationTopicName = getNotificationTopicName(unRegistrationRequest.getDistrEnvName());\r
-            CambriaErrorResponse unregisterClientConsumerTopicResponse = unRegisterDistributionClientFromTopic(\r
-                    unRegistrationRequest, SubscriberTypeEnum.CONSUMER, notificationTopicName);\r
-            auditHandler.auditUnRegisterACL(unregisterClientConsumerTopicResponse, SubscriberTypeEnum.CONSUMER,\r
-                    DistributionTopicData.newBuilder()\r
-                            .notificationTopic(notificationTopicName)\r
-                            .build());\r
-            updateResponseWrapper(cambriaResponseWrapper, unregisterClientConsumerTopicResponse);\r
-\r
-            // Success unregister both topics\r
-            TopicUnregistrationResponse unregisterResponse = new TopicUnregistrationResponse(\r
-                    getNotificationTopicName(unRegistrationRequest.getDistrEnvName()),\r
-                    getStatusTopicName(unRegistrationRequest.getDistrEnvName()),\r
-                    unregisterClientConsumerTopicResponse.getOperationStatus(),\r
-                    unregisterClientProducerTopicResponse.getOperationStatus());\r
-\r
-            if (cambriaResponseWrapper.getInnerElement().getOperationStatus() == CambriaOperationStatus.OK) {\r
-                responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(unregisterResponse).build());\r
-            } else {\r
-                BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(UN_REGISTER_IN_DISTRIBUTION_ENGINE,\r
-                        "unregistration failed");\r
-                responseWrapper.setInnerElement(\r
-                        Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).entity(unregisterResponse).build());\r
-            }\r
-        } catch (Exception e) {\r
-            log.error("unregistered to topic failed", e);\r
-            Response errorResponse = buildErrorResponse(\r
-                    getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));\r
-            responseWrapper.setInnerElement(errorResponse);\r
-\r
-        } finally {\r
-            auditHandler.auditUnRegisterRequest(cambriaResponseWrapper.getInnerElement());\r
-        }\r
-    }\r
-\r
-    private void updateResponseWrapper(Wrapper<CambriaErrorResponse> cambriaResponseWrapper,\r
-            CambriaErrorResponse currentResponse) {\r
-        if (cambriaResponseWrapper.isEmpty()) {\r
-            cambriaResponseWrapper.setInnerElement(currentResponse);\r
-        } else if (currentResponse.getOperationStatus() != CambriaOperationStatus.OK) {\r
-            cambriaResponseWrapper.setInnerElement(currentResponse);\r
-\r
-        }\r
-\r
-    }\r
-\r
-    public static String getNotificationTopicName(String envName) {\r
-        DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()\r
-                .getDistributionEngineConfiguration();\r
-        return DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(), envName);\r
-\r
-    }\r
-\r
-    public static String getStatusTopicName(String envName) {\r
-        DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()\r
-                .getDistributionEngineConfiguration();\r
-        return DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(), envName);\r
-\r
-    }\r
-\r
-    protected CambriaErrorResponse unRegisterDistributionClientFromTopic(RegistrationRequest unRegistrationRequest,\r
-            SubscriberTypeEnum subscriberType, String topicName) {\r
-        DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()\r
-                .getDistributionEngineConfiguration();\r
-        initRequestEnvEndPoints(unRegistrationRequest, config);\r
-\r
-        log.debug("unregistering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName, unRegistrationRequest.getDistEnvEndPoints());\r
-        return getCambriaHandler().unRegisterFromTopic(unRegistrationRequest.getDistEnvEndPoints(), config.getUebPublicKey(),\r
-                config.getUebSecretKey(), unRegistrationRequest.getApiPublicKey(), subscriberType, topicName);\r
-    }\r
-\r
-    private TopicRegistrationResponse buildTopicResponse(RegistrationRequest registrationRequest) {\r
-        DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()\r
-                .getDistributionEngineConfiguration();\r
-        String statusTopicName = DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(),\r
-                registrationRequest.getDistrEnvName());\r
-        String notificationTopicName = DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(),\r
-                registrationRequest.getDistrEnvName());\r
-\r
-        TopicRegistrationResponse topicResponse = new TopicRegistrationResponse();\r
-        topicResponse.setDistrNotificationTopicName(notificationTopicName);\r
-        topicResponse.setDistrStatusTopicName(statusTopicName);\r
-        return topicResponse;\r
-    }\r
-\r
-    protected CambriaErrorResponse registerDistributionClientToTopic(Wrapper<Response> responseWrapper,\r
-            RegistrationRequest registrationRequest, SubscriberTypeEnum subscriberType, String topicName) {\r
-        DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()\r
-                .getDistributionEngineConfiguration();\r
-        initRequestEnvEndPoints(registrationRequest, config);\r
-        String errorMsg;\r
-\r
-        // Register for notifications as consumer\r
-        if (subscriberType == SubscriberTypeEnum.CONSUMER) {\r
-            errorMsg = "registration of subscriber to topic:" + topicName + " as consumer failed";\r
-        }\r
-        // Register for status as producer\r
-        else {\r
-            errorMsg = "registration of subscriber to topic:" + topicName + " as producer failed";\r
-        }\r
-        log.debug("registering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName, registrationRequest.getDistEnvEndPoints());\r
-        CambriaErrorResponse registerToTopic = getCambriaHandler().registerToTopic(registrationRequest.getDistEnvEndPoints(),\r
-                config.getUebPublicKey(), config.getUebSecretKey(), registrationRequest.getApiPublicKey(),\r
-                subscriberType, topicName);\r
-\r
-        if (registerToTopic.getOperationStatus() != CambriaOperationStatus.OK) {\r
-            Response failedRegistrationResponse = buildErrorResponse(\r
-                    getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));\r
-            BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE,\r
-                    errorMsg);\r
-            responseWrapper.setInnerElement(failedRegistrationResponse);\r
-        }\r
-        return registerToTopic;\r
-    }\r
-\r
-    protected Response buildErrorResponse(ResponseFormat requestErrorWrapper) {\r
-        return Response.status(requestErrorWrapper.getStatus())\r
-                .entity(gson.toJson(requestErrorWrapper.getRequestError())).build();\r
-    }\r
-\r
-    public ResponseFormatManager getResponseFormatManager() {\r
-        return responseFormatManager;\r
-    }\r
-\r
-    public IDistributionEngine getDistributionEngine() {\r
-        return distributionEngine;\r
-    }\r
-\r
-    public CambriaHandler getCambriaHandler() {\r
-        if (cambriaHandler == null) {\r
-            cambriaHandler = new CambriaHandler();\r
-        }\r
-        return cambriaHandler;\r
-    }\r
-\r
-}\r
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.sdc.be.distribution;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import fj.data.Either;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.http.HttpStatus;
+import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
+import org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask;
+import org.openecomp.sdc.be.components.distribution.engine.ICambriaHandler;
+import org.openecomp.sdc.be.components.distribution.engine.IDistributionEngine;
+import org.openecomp.sdc.be.components.distribution.engine.SubscriberTypeEnum;
+import org.openecomp.sdc.be.components.impl.ResponseFormatManager;
+import org.openecomp.sdc.be.config.BeEcompErrorManager;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.dao.api.ActionStatus;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.openecomp.sdc.be.distribution.api.client.RegistrationRequest;
+import org.openecomp.sdc.be.distribution.api.client.ServerListResponse;
+import org.openecomp.sdc.be.distribution.api.client.TopicRegistrationResponse;
+import org.openecomp.sdc.be.distribution.api.client.TopicUnregistrationResponse;
+import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
+import org.openecomp.sdc.be.resources.data.auditing.model.DistributionTopicData;
+import org.openecomp.sdc.common.datastructure.Wrapper;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+import org.openecomp.sdc.exception.ResponseFormat;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import javax.ws.rs.core.Response;
+import java.util.List;
+
+import static org.apache.commons.lang.BooleanUtils.isTrue;
+import static org.openecomp.sdc.be.components.distribution.engine.DistributionEngineInitTask.buildTopicName;
+import static org.openecomp.sdc.be.config.ConfigurationManager.getConfigurationManager;
+
+@Component("distributionBusinessLogic")
+public class DistributionBusinessLogic {
+    public static final String REGISTER_IN_DISTRIBUTION_ENGINE = "registerInDistributionEngine";
+    public static final String UN_REGISTER_IN_DISTRIBUTION_ENGINE = "unregisterInDistributionEngine";
+    private Gson gson = new GsonBuilder().setPrettyPrinting().create();
+    private static final Logger log = Logger.getLogger(DistributionBusinessLogic.class);
+    @Resource
+    private IDistributionEngine distributionEngine;
+
+    private ResponseFormatManager responseFormatManager = ResponseFormatManager.getInstance();
+    @Resource
+    private ICambriaHandler cambriaHandler;
+
+    private void initRequestEnvEndPointsAndKeys(RegistrationRequest registrationRequest, DistributionEngineConfiguration config) {
+        if(CollectionUtils.isEmpty(registrationRequest.getDistEnvEndPoints())){
+            registrationRequest.setDistEnvEndPoints(config.getUebServers());
+            registrationRequest.setManagerApiPublicKey(config.getUebPublicKey());
+            registrationRequest.setManagerApiSecretKey(config.getUebSecretKey());
+        } else {
+            OperationalEnvironmentEntry environment = distributionEngine.getEnvironmentByDmaapUebAddress(registrationRequest.getDistEnvEndPoints());
+            registrationRequest.setManagerApiPublicKey(environment.getUebApikey());
+            registrationRequest.setManagerApiSecretKey(environment.getUebSecretKey());
+        }
+    }
+    public Either<ServerListResponse, ResponseFormat> getUebServerList() {
+
+        DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
+                .getDistributionEngineConfiguration();
+
+        List<String> serverList = distributionEngineConfiguration.getUebServers();
+
+        if (serverList != null && !serverList.isEmpty()) {
+
+            ServerListResponse serverListResponse = new ServerListResponse();
+
+            serverListResponse.setUebServerList(serverList);
+
+            return Either.left(serverListResponse);
+        } else {
+            ResponseFormat errorResponseWrapper = getResponseFormatManager()
+                    .getResponseFormat(ActionStatus.GENERAL_ERROR);
+            return Either.right(errorResponseWrapper);
+        }
+
+    }
+
+    public void handleRegistration(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest,
+            AuditHandler auditHandler) {
+        CambriaErrorResponse registerResponse = null;
+        try {
+            DistributionEngineConfiguration config = getConfigurationManager().getDistributionEngineConfiguration();
+            String statusTopicName = buildTopicName(config.getDistributionStatusTopicName(),
+                    registrationRequest.getDistrEnvName());
+            initRequestEnvEndPointsAndKeys(registrationRequest, config);
+            registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
+                    SubscriberTypeEnum.PRODUCER, statusTopicName);
+
+            auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.PRODUCER,
+                    DistributionTopicData.newBuilder()
+                        .statusTopic(statusTopicName)
+                        .build());
+            boolean isRegisteredAsProducerOnStatusSuccess = responseWrapper.isEmpty();
+
+            // Story [347698] Distribution Client Get Indication from
+            // component whether to register as consumer and producer on
+            // status topic
+            boolean registeredAsConsumerOnStatus = false;
+            if (isRegisteredAsProducerOnStatusSuccess && isTrue(registrationRequest.getIsConsumerToSdcDistrStatusTopic())) {
+                registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
+                        SubscriberTypeEnum.CONSUMER, statusTopicName);
+                auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
+                        DistributionTopicData.newBuilder()
+                                .statusTopic(statusTopicName)
+                                .build());
+                registeredAsConsumerOnStatus = responseWrapper.isEmpty();
+
+            }
+
+            if (responseWrapper.isEmpty()) {
+                String notificationTopicName = buildTopicName(config.getDistributionNotifTopicName(),
+                        registrationRequest.getDistrEnvName());
+                registerResponse = registerDistributionClientToTopic(responseWrapper, registrationRequest,
+                        SubscriberTypeEnum.CONSUMER, notificationTopicName);
+                auditHandler.auditRegisterACL(registerResponse, SubscriberTypeEnum.CONSUMER,
+                        DistributionTopicData.newBuilder()
+                            .notificationTopic(notificationTopicName)
+                            .build());
+            }
+            // Unregister Rollback
+            if (!responseWrapper.isEmpty()) {
+                if (isRegisteredAsProducerOnStatusSuccess) {
+                    CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest,
+                            SubscriberTypeEnum.PRODUCER, statusTopicName);
+                    auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.PRODUCER,
+                            DistributionTopicData.newBuilder()
+                                    .statusTopic(statusTopicName)
+                                    .build());
+                }
+                if (registeredAsConsumerOnStatus) {
+                    CambriaErrorResponse unRegisterResponse = unRegisterDistributionClientFromTopic(registrationRequest,
+                            SubscriberTypeEnum.CONSUMER, statusTopicName);
+                    auditHandler.auditUnRegisterACL(unRegisterResponse, SubscriberTypeEnum.CONSUMER,
+                            DistributionTopicData.newBuilder()
+                            .statusTopic(statusTopicName)
+                            .build());
+                }
+            }
+
+            if (responseWrapper.isEmpty()) {
+                TopicRegistrationResponse okTopicResponse = buildTopicResponse(registrationRequest);
+                responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(okTopicResponse).build());
+            }
+
+        } catch (Exception e) {
+            log.error("registration to topic failed", e);
+            BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE,
+                    "registration of subscriber to topic");
+            Response errorResponse = buildErrorResponse(
+                    getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
+            responseWrapper.setInnerElement(errorResponse);
+        } finally {
+            auditHandler.auditRegisterRequest(registerResponse);
+        }
+    }
+
+    public void handleUnRegistration(Wrapper<Response> responseWrapper, RegistrationRequest unRegistrationRequest,
+            AuditHandler auditHandler) {
+        Wrapper<CambriaErrorResponse> cambriaResponseWrapper = new Wrapper<>();
+        try {
+            String statusTopicName = getStatusTopicName(unRegistrationRequest.getDistrEnvName());
+            DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
+                    .getDistributionEngineConfiguration();
+            initRequestEnvEndPointsAndKeys(unRegistrationRequest, config);
+            CambriaErrorResponse unregisterClientProducerTopicResponse = unRegisterDistributionClientFromTopic(
+                    unRegistrationRequest, SubscriberTypeEnum.PRODUCER, statusTopicName);
+            auditHandler.auditUnRegisterACL(unregisterClientProducerTopicResponse, SubscriberTypeEnum.PRODUCER,
+                    DistributionTopicData.newBuilder()
+                            .statusTopic(statusTopicName)
+                            .build());
+            updateResponseWrapper(cambriaResponseWrapper, unregisterClientProducerTopicResponse);
+
+            String notificationTopicName = getNotificationTopicName(unRegistrationRequest.getDistrEnvName());
+            CambriaErrorResponse unregisterClientConsumerTopicResponse = unRegisterDistributionClientFromTopic(
+                    unRegistrationRequest, SubscriberTypeEnum.CONSUMER, notificationTopicName);
+            auditHandler.auditUnRegisterACL(unregisterClientConsumerTopicResponse, SubscriberTypeEnum.CONSUMER,
+                    DistributionTopicData.newBuilder()
+                            .notificationTopic(notificationTopicName)
+                            .build());
+            updateResponseWrapper(cambriaResponseWrapper, unregisterClientConsumerTopicResponse);
+
+            // Success unregister both topics
+            TopicUnregistrationResponse unregisterResponse = new TopicUnregistrationResponse(
+                    getNotificationTopicName(unRegistrationRequest.getDistrEnvName()),
+                    getStatusTopicName(unRegistrationRequest.getDistrEnvName()),
+                    unregisterClientConsumerTopicResponse.getOperationStatus(),
+                    unregisterClientProducerTopicResponse.getOperationStatus());
+
+            if (cambriaResponseWrapper.getInnerElement().getOperationStatus() == CambriaOperationStatus.OK) {
+                responseWrapper.setInnerElement(Response.status(HttpStatus.SC_OK).entity(unregisterResponse).build());
+            } else {
+                BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(UN_REGISTER_IN_DISTRIBUTION_ENGINE,
+                        "unregistration failed");
+                responseWrapper.setInnerElement(
+                        Response.status(HttpStatus.SC_INTERNAL_SERVER_ERROR).entity(unregisterResponse).build());
+            }
+        } catch (Exception e) {
+            log.error("unregistered to topic failed", e);
+            Response errorResponse = buildErrorResponse(
+                    getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
+            responseWrapper.setInnerElement(errorResponse);
+
+        } finally {
+            auditHandler.auditUnRegisterRequest(cambriaResponseWrapper.getInnerElement());
+        }
+    }
+
+    private void updateResponseWrapper(Wrapper<CambriaErrorResponse> cambriaResponseWrapper,
+            CambriaErrorResponse currentResponse) {
+        if (cambriaResponseWrapper.isEmpty()) {
+            cambriaResponseWrapper.setInnerElement(currentResponse);
+        } else if (currentResponse.getOperationStatus() != CambriaOperationStatus.OK) {
+            cambriaResponseWrapper.setInnerElement(currentResponse);
+
+        }
+
+    }
+
+    public static String getNotificationTopicName(String envName) {
+        DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
+                .getDistributionEngineConfiguration();
+        return DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(), envName);
+
+    }
+
+    public static String getStatusTopicName(String envName) {
+        DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
+                .getDistributionEngineConfiguration();
+        return DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(), envName);
+
+    }
+
+    protected CambriaErrorResponse unRegisterDistributionClientFromTopic(RegistrationRequest unRegistrationRequest,
+            SubscriberTypeEnum subscriberType, String topicName) {
+
+        log.debug("unregistering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName, unRegistrationRequest.getDistEnvEndPoints());
+        return cambriaHandler.unRegisterFromTopic(unRegistrationRequest.getDistEnvEndPoints(), unRegistrationRequest.getManagerApiPublicKey(),
+                unRegistrationRequest.getManagerApiSecretKey(), unRegistrationRequest.getApiPublicKey(), subscriberType, topicName);
+    }
+
+    private TopicRegistrationResponse buildTopicResponse(RegistrationRequest registrationRequest) {
+        DistributionEngineConfiguration config = ConfigurationManager.getConfigurationManager()
+                .getDistributionEngineConfiguration();
+        String statusTopicName = DistributionEngineInitTask.buildTopicName(config.getDistributionStatusTopicName(),
+                registrationRequest.getDistrEnvName());
+        String notificationTopicName = DistributionEngineInitTask.buildTopicName(config.getDistributionNotifTopicName(),
+                registrationRequest.getDistrEnvName());
+
+        TopicRegistrationResponse topicResponse = new TopicRegistrationResponse();
+        topicResponse.setDistrNotificationTopicName(notificationTopicName);
+        topicResponse.setDistrStatusTopicName(statusTopicName);
+        return topicResponse;
+    }
+
+    protected CambriaErrorResponse registerDistributionClientToTopic(Wrapper<Response> responseWrapper,
+            RegistrationRequest registrationRequest, SubscriberTypeEnum subscriberType, String topicName) {
+
+        String errorMsg;
+
+        // Register for notifications as consumer
+        if (subscriberType == SubscriberTypeEnum.CONSUMER) {
+            errorMsg = "registration of subscriber to topic:" + topicName + " as consumer failed";
+        }
+        // Register for status as producer
+        else {
+            errorMsg = "registration of subscriber to topic:" + topicName + " as producer failed";
+        }
+        log.debug("registering client as {} , from topic: {}, using DistEnvPoints: {}", subscriberType, topicName, registrationRequest.getDistEnvEndPoints());
+        CambriaErrorResponse registerToTopic = cambriaHandler.registerToTopic(registrationRequest.getDistEnvEndPoints(),
+                registrationRequest.getManagerApiPublicKey(), registrationRequest.getManagerApiSecretKey(), registrationRequest.getApiPublicKey(),
+                subscriberType, topicName);
+
+        if (registerToTopic.getOperationStatus() != CambriaOperationStatus.OK) {
+            Response failedRegistrationResponse = buildErrorResponse(
+                    getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR));
+            BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError(REGISTER_IN_DISTRIBUTION_ENGINE,
+                    errorMsg);
+            responseWrapper.setInnerElement(failedRegistrationResponse);
+        }
+        return registerToTopic;
+    }
+
+    protected Response buildErrorResponse(ResponseFormat requestErrorWrapper) {
+        return Response.status(requestErrorWrapper.getStatus())
+                .entity(gson.toJson(requestErrorWrapper.getRequestError())).build();
+    }
+
+    public ResponseFormatManager getResponseFormatManager() {
+        return responseFormatManager;
+    }
+
+    public IDistributionEngine getDistributionEngine() {
+        return distributionEngine;
+    }
+
+}