Publish kafka notification for delete service 45/141945/6
authoraakansha.maltare <aakansha.maltare@t-systems.com>
Tue, 26 Aug 2025 12:19:35 +0000 (14:19 +0200)
committerLukasz Rajewski <lukasz.rajewski@t-mobile.pl>
Tue, 9 Sep 2025 16:11:48 +0000 (16:11 +0000)
Issue-ID: SDC-4767
Change-Id: I8f8e694d3aeb683da5f0c91171a5de5de2e220c6
Signed-off-by: aakansha.maltare <aakansha.maltare@t-systems.com>
catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSender.java [new file with mode: 0644]
catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java
catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java
catalog-be/src/main/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogic.java
catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSenderTest.java [new file with mode: 0644]
catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineConfigTest.java
catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineTest.java
catalog-be/src/test/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogicTest.java
common-app-api/src/main/java/org/openecomp/sdc/be/config/DistributionEngineConfiguration.java
common-app-api/src/main/java/org/openecomp/sdc/common/util/YamlToObjectConverter.java
common-app-api/src/test/java/org/openecomp/sdc/common/util/YamlToObjectConverterTest.java

diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSender.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSender.java
new file mode 100644 (file)
index 0000000..4189e04
--- /dev/null
@@ -0,0 +1,115 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright © 2025 Deutsche Telekom. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.dao.api.ActionStatus;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DistributionDeleteNotificationSender {
+    private static final Logger logger = Logger.getLogger(DistributionDeleteNotificationSender.class.getName());
+
+    private final CambriaHandler cambriaHandler;
+
+    private final KafkaHandler kafkaHandler;
+
+    private final DistributionEngineConfiguration deConfiguration;
+
+    public DistributionDeleteNotificationSender() {
+        this.kafkaHandler = new KafkaHandler();
+        this.deConfiguration = ConfigurationManager.getConfigurationManager()
+                .getDistributionEngineConfiguration();
+        this.cambriaHandler = new CambriaHandler();
+    }
+
+    public DistributionDeleteNotificationSender(KafkaHandler kafkaHandler,
+            DistributionEngineConfiguration deConfiguration, CambriaHandler cambriaHandler) {
+        this.kafkaHandler = kafkaHandler;
+        this.deConfiguration = deConfiguration;
+        this.cambriaHandler = cambriaHandler;
+    }
+
+    /**
+     * This method is used to send the notification for the delete service.
+     * 
+     * @param topicName        name of the Kafka topic for delete service
+     * @param distributionId   random UUID generated for the notification
+     * @param messageBusData   environment message bus data
+     * @param notificationData the payload data to be sent in the Kafka notification
+     * @return the action status of the notification
+     */
+    public ActionStatus sendNotificationForDeleteService(String topicName, String distributionId,
+            EnvironmentMessageBusData messageBusData,
+            INotificationData notificationData) {
+        logger.debug("sending notification with topicName={}, distributionId={}",
+                topicName,
+                distributionId);
+
+        CambriaErrorResponse status;
+        if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+            logger.debug("Kafka is inactive. Using CambriaHandler to send notification for topic={}", topicName);
+            status = cambriaHandler
+                    .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(),
+                            messageBusData.getUebPrivateKey(),
+                            messageBusData.getDmaaPuebEndpoints(), notificationData,
+                            deConfiguration.getDistributionDeleteTopic().getMaxWaitingAfterSendingSeconds());
+        } else {
+            logger.debug("Kafka is active. Using KafkaHandler to send notification for topic={}", topicName);
+            status = kafkaHandler.sendNotification(topicName, notificationData);
+        }
+
+        logger.info("sending notification is completed for topicName={}, with status = {}",
+                topicName, status);
+        return convertCambriaResponse(status);
+    }
+
+    /**
+     * This method is used to convert a {@link CambriaErrorResponse} into the
+     * corresponding {@link ActionStatus}.
+     * 
+     * @param status the operation status
+     * @return the mapped ActionStatus based on the operation status
+     */
+    private ActionStatus convertCambriaResponse(CambriaErrorResponse status) {
+        CambriaOperationStatus operationStatus = status.getOperationStatus();
+        switch (operationStatus) {
+            case OK:
+                return ActionStatus.OK;
+            case AUTHENTICATION_ERROR:
+                return ActionStatus.AUTHENTICATION_ERROR;
+            case INTERNAL_SERVER_ERROR:
+                return ActionStatus.GENERAL_ERROR;
+            case UNKNOWN_HOST_ERROR:
+                return ActionStatus.UNKNOWN_HOST;
+            case CONNNECTION_ERROR:
+                return ActionStatus.CONNNECTION_ERROR;
+            case OBJECT_NOT_FOUND:
+                return ActionStatus.OBJECT_NOT_FOUND;
+            default:
+                return ActionStatus.GENERAL_ERROR;
+        }
+    }
+}
index cebedad..91d6bc0 100644 (file)
@@ -16,6 +16,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * ============LICENSE_END=========================================================
+ * Modifications copyright (c) 2025 Deutsche Telekom.
+ * ================================================================================
  */
 package org.openecomp.sdc.be.components.distribution.engine;
 
@@ -67,6 +69,9 @@ public class DistributionEngine implements IDistributionEngine {
     private DistributionEngineClusterHealth distributionEngineClusterHealth;
     @Resource
     private ServiceDistributionValidation serviceDistributionValidation;
+    @Resource
+    private DistributionDeleteNotificationSender distributionDeleteNotificationSender;
+
     private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
     private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
     private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
@@ -150,6 +155,8 @@ public class DistributionEngine implements IDistributionEngine {
         result = isValidParam(deConfiguration.getDistributionNotifTopicName(), methodName, "distributionNotifTopicName") && result;
         result = isValidParam(deConfiguration.getDistributionStatusTopicName(), methodName, "distributionStatusTopicName") && result;
         result = isValidObject(deConfiguration.getDistributionStatusTopic(), methodName, "distributionStatusTopic") && result;
+        result = isValidParam(deConfiguration.getDistributionDeleteTopicName(), methodName,
+                "distributionDeleteTopicName") && result;
         result = isValidObject(deConfiguration.getInitMaxIntervalSec(), methodName, "initMaxIntervalSec") && result;
         result = isValidObject(deConfiguration.getInitRetryIntervalSec(), methodName, "initRetryIntervalSec") && result;
         result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerId(), methodName, "consumerId") && result;
@@ -304,4 +311,64 @@ public class DistributionEngine implements IDistributionEngine {
         value = serviceDistributionArtifactsBuilder.buildServiceForDistribution(value, service);
         return value;
     }
+
+    /**
+     * This method is used to build the notification payload for delete service
+     * 
+     * @param service        service to be deleted
+     * @param distributionId random UUID generated for the notification
+     * @return notification payload of INotificationData type.
+     */
+    @Override
+    public INotificationData buildServiceForDeleteNotification(Service service, String distributionId) {
+        INotificationData value = serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(service,
+                distributionId, null);
+        logger.debug("Payload for delete kafka notification is -  {}", value);
+        return value;
+    }
+
+    /**
+     * This method is used to notify the service for delete operation.
+     * 
+     * @param distributionId   random UUID generated for the notification
+     * @param notificationData the payload data to be sent in the notification
+     * @param service          the service to be deleted
+     * @param envName          the environment name
+     * @param user             the user performing the delete operation
+     * @return the action status of the notification.
+     */
+    @Override
+    public ActionStatus notifyServiceForDelete(String distributionId, INotificationData notificationData,
+            Service service,
+            String envName, User user) {
+        logger.debug(
+                "Received notify service request for delete. distributionId = {}, serviceUuid = {} serviceUid = {}, envName = {}, userId = {}, modifierName {}",
+                distributionId, service.getUUID(), service.getUniqueId(), envName, service.getLastUpdaterUserId(),
+                user);
+
+        String topicName = buildDeleteNotificationTopicName(envName);
+        logger.debug("topic name for delete - {}", topicName);
+
+        ActionStatus deleteServiceNotifStatus = Optional
+                .ofNullable(environmentsEngine.getEnvironmentById(envName)).map(EnvironmentMessageBusData::new).map(
+                        messageBusData -> distributionDeleteNotificationSender
+                                .sendNotificationForDeleteService(topicName, distributionId, messageBusData,
+                                        notificationData))
+                .orElse(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE);
+        logger.debug("Finish delete notification service. notification status is {}", deleteServiceNotifStatus);
+        return deleteServiceNotifStatus;
+    }
+
+    /**
+     * This method is used to create topic name for delete operation based on
+     * environment name.
+     * 
+     * @param envName the environment name
+     * @return the delete topic name
+     */
+    private String buildDeleteNotificationTopicName(String envName) {
+        String deleteTopicName = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration()
+                .getDistributionDeleteTopicName();
+        return DistributionEngineInitTask.buildTopicName(deleteTopicName, envName);
+    }
 }
index f08043c..078fb83 100644 (file)
@@ -72,4 +72,14 @@ public interface IDistributionEngine {
     default OperationalEnvironmentEntry getEnvironmentByDmaapUebAddress(List<String> dmaapUebAddress) {
         return null;
     }
+
+    default ActionStatus notifyServiceForDelete(String distributionId, INotificationData notificationData,
+            Service service,
+            String envName, User user) {
+        return null;
+    }
+
+    default INotificationData buildServiceForDeleteNotification(Service service, String distributionId) {
+        return null;
+    }
 }
index 9874020..7fa58be 100644 (file)
@@ -18,6 +18,8 @@
  * ============LICENSE_END=========================================================
  * Modifications copyright (c) 2019 Nokia
  * ================================================================================
+ * Modifications copyright (c) 2025 Deutsche Telekom.
+ * ================================================================================
  */
 
 package org.openecomp.sdc.be.components.impl;
@@ -1532,6 +1534,27 @@ public class ServiceBusinessLogic extends ComponentBusinessLogic {
             }
             toscaOperationFacade.commitAndCheck(service.getUniqueId());
             updateCatalog(service, ChangeTypeEnum.DELETE);
+
+            String distributionId = ThreadLocalsHolder.getUuid();
+            String envName = getEnvNameFromConfiguration();
+            INotificationData notificationData = distributionEngine.buildServiceForDeleteNotification(service,
+                    distributionId);
+            log.debug("for kafka notification envName = {} and  notificationData = {}", envName, notificationData);
+            ActionStatus notifyServiceResponse = distributionEngine.notifyServiceForDelete(distributionId,
+                    notificationData, service, envName, user);
+            if (notifyServiceResponse == ActionStatus.OK) {
+                log.debug(
+                        "Kafka notification successfully generated for delete service. Notification response details are - response={}, serviceName={}, serviceId={}",
+                        notifyServiceResponse,
+                        service.getName(),
+                        service.getUniqueId());
+            } else {
+                log.debug(
+                        "Kafka notification failed for delete service. Notification response details are - response={}, serviceName={}, serviceId={}",
+                        notifyServiceResponse,
+                        service.getName(),
+                        service.getUniqueId());
+            }
         } catch (ComponentException exception) {
             log.debug("Failed to delete service, {}, in ServiceServlet", serviceId);
             janusGraphDao.rollback();
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSenderTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSenderTest.java
new file mode 100644 (file)
index 0000000..f5faa7f
--- /dev/null
@@ -0,0 +1,113 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright © 2025 Deutsche Telekom. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionDeleteTopicConfig;
+import org.openecomp.sdc.be.dao.api.ActionStatus;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.openecomp.sdc.common.util.ThreadLocalsHolder;
+
+public class DistributionDeleteNotificationSenderTest {
+    private KafkaHandler kafkaHandler;
+    private DistributionEngineConfiguration distributionConfig;
+    private DistributionDeleteNotificationSender deleteNotificationSender;
+    private CambriaHandler cambriaHandler;
+    private static final String TOPIC_NAME = "delete_topic";
+
+    @Before
+    public void setUp() {
+        kafkaHandler = Mockito.mock(KafkaHandler.class);
+        distributionConfig = Mockito.mock(DistributionEngineConfiguration.class);
+        cambriaHandler = Mockito.mock(CambriaHandler.class);
+        deleteNotificationSender = new DistributionDeleteNotificationSender(kafkaHandler, distributionConfig,
+                cambriaHandler);
+    }
+
+    @Test
+    public void testSendNotificationForDeleteService_WhenKafkaIsInactive() {
+        when(kafkaHandler.isKafkaActive()).thenReturn(false);
+        DistributionDeleteTopicConfig deleteTopicConfig = mock(
+                DistributionEngineConfiguration.DistributionDeleteTopicConfig.class);
+        when(distributionConfig.getDistributionDeleteTopic()).thenReturn(deleteTopicConfig);
+        when(deleteTopicConfig.getMaxWaitingAfterSendingSeconds()).thenReturn(10);
+
+        EnvironmentMessageBusData messageBusData = mock(EnvironmentMessageBusData.class);
+        List<String> serverList = Collections.singletonList("uebEndpoint");
+        when(messageBusData.getUebPublicKey()).thenReturn("uebPublicKey");
+        when(messageBusData.getUebPrivateKey()).thenReturn("uebPrivateKey");
+        when(messageBusData.getDmaaPuebEndpoints()).thenReturn(serverList);
+
+        INotificationData notificationData = new NotificationDataImpl();
+
+        when(cambriaHandler.sendNotificationAndClose(TOPIC_NAME, "uebPublicKey", "uebPrivateKey", serverList,
+                notificationData, 10)).thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK, 200));
+
+        ActionStatus result = deleteNotificationSender.sendNotificationForDeleteService(
+                TOPIC_NAME, ThreadLocalsHolder.getUuid(), messageBusData, notificationData);
+
+        assertEquals(ActionStatus.OK, result);
+        verify(cambriaHandler, times(1)).sendNotificationAndClose(eq(TOPIC_NAME), eq("uebPublicKey"),
+                eq("uebPrivateKey"), eq(serverList), eq(notificationData), eq(10L));
+    }
+
+    @Test
+    public void testSendNotificationForDeleteService_WhenKafkaIsActive() {
+        INotificationData notificationData = new NotificationDataImpl();
+        when(kafkaHandler.isKafkaActive()).thenReturn(true);
+        when(kafkaHandler.sendNotification(TOPIC_NAME, notificationData))
+                .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK, 200));
+
+        ActionStatus result = deleteNotificationSender.sendNotificationForDeleteService(
+                TOPIC_NAME, ThreadLocalsHolder.getUuid(), null, notificationData);
+
+        assertEquals(ActionStatus.OK, result);
+        verify(kafkaHandler, times(1)).sendNotification(eq(TOPIC_NAME), eq(notificationData));
+    }
+
+    @Test
+    public void testSendNotificationForDeleteService_WhenKafkaNotificationFails() {
+        INotificationData notificationData = new NotificationDataImpl();
+        when(kafkaHandler.isKafkaActive()).thenReturn(true);
+        when(kafkaHandler.sendNotification(TOPIC_NAME, notificationData))
+                .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 200));
+
+        ActionStatus result = deleteNotificationSender.sendNotificationForDeleteService(
+                TOPIC_NAME, ThreadLocalsHolder.getUuid(), null, notificationData);
+
+        assertEquals(ActionStatus.GENERAL_ERROR, result);
+        verify(kafkaHandler, times(1)).sendNotification(eq(TOPIC_NAME), eq(notificationData));
+    }
+}
index 21a9b19..0832150 100644 (file)
@@ -92,6 +92,7 @@ class DistributionEngineConfigTest {
 
         deConfiguration.setDistributionNotifTopicName("distributionNotifTopicName");
         deConfiguration.setDistributionStatusTopicName("statusTopic");
+        deConfiguration.setDistributionDeleteTopicName("distributionDeleteTopicName");
 
         DistributionStatusTopicConfig distributionStatusTopic = new DistributionStatusTopicConfig();
         distributionStatusTopic.setConsumerGroup("asdc-group");
index c5d8d37..52a0a8b 100644 (file)
@@ -36,6 +36,7 @@ import org.openecomp.sdc.be.model.Service;
 import org.openecomp.sdc.be.model.User;
 import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus;
 import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
+import org.openecomp.sdc.common.util.ThreadLocalsHolder;
 
 import java.util.HashSet;
 import java.util.List;
@@ -49,7 +50,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
 
@@ -59,6 +63,7 @@ public class DistributionEngineTest {
     public static final String ENV_ID = "envId";
     public static final String USER_ID = "userId";
     public static final String MODIFIER = "modifier";
+    public static final String DELETE_TOPIC = "deleteTopic";
 
     @InjectMocks
     private DistributionEngine testInstance;
@@ -72,6 +77,9 @@ public class DistributionEngineTest {
     @Mock
     private ServiceDistributionArtifactsBuilder serviceDistributionArtifactsBuilder;
 
+    @Mock
+    DistributionDeleteNotificationSender distributionDeleteNotificationSender;
+
     private DummyDistributionConfigurationManager distributionEngineConfigurationMock;
 
     private Map<String, OperationalEnvironmentEntry> envs;
@@ -348,4 +356,81 @@ public class DistributionEngineTest {
         when(serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new NotificationDataImpl());
         result = testInstance.buildServiceForDistribution(service, distributionId, workloadContext);
     }
+
+    @Test
+    void testBuildServiceForDeleteNotification() {
+        Service service = new Service();
+        String distributionId = ThreadLocalsHolder.getUuid();
+        INotificationData expectedNotificationData = new NotificationDataImpl();
+        expectedNotificationData.setDistributionID(distributionId);
+
+        when(serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(
+                eq(service), eq(distributionId), isNull()))
+                .thenReturn(expectedNotificationData);
+
+        INotificationData actual = testInstance.buildServiceForDeleteNotification(service, distributionId);
+        verify(serviceDistributionArtifactsBuilder, times(1))
+                .buildResourceInstanceForDistribution(eq(service), eq(distributionId), isNull());
+        assertEquals(expectedNotificationData, actual);
+        assertEquals(expectedNotificationData.getDistributionID(), actual.getDistributionID());
+    }
+
+    @Test
+    void testNotifyServiceForDelete() {
+        String distributionId = ThreadLocalsHolder.getUuid();
+        Service service = mock(Service.class);
+        User user = mock(User.class);
+        INotificationData notificationData = mock(INotificationData.class);
+
+        when(environmentsEngine.getEnvironmentById(ENV_ID)).thenReturn(envs.get(ENV_ID));
+        when(distributionEngineConfigurationMock.getConfigurationMock().getDistributionDeleteTopicName())
+                .thenReturn(DELETE_TOPIC);
+
+        String expectedTopicName = DistributionEngineInitTask.buildTopicName(DELETE_TOPIC, ENV_ID);
+        when(distributionDeleteNotificationSender.sendNotificationForDeleteService(
+                eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class),
+                eq(notificationData)))
+                .thenReturn(ActionStatus.OK);
+
+        ActionStatus action = testInstance.notifyServiceForDelete(distributionId, notificationData, service, ENV_ID,
+                user);
+        assertEquals(ActionStatus.OK, action);
+        verify(distributionDeleteNotificationSender, times(1)).sendNotificationForDeleteService(
+                eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class),
+                eq(notificationData));
+    }
+
+    @Test
+    void testNotifyServiceForDeleteWhenEnvironmentDoesNotExist() {
+        when(environmentsEngine.getEnvironments()).thenReturn(envs);
+        ActionStatus actionStatus = testInstance.notifyServiceForDelete(DISTRIBUTION_ID, new NotificationDataImpl(),
+                new Service(), "ENV_123", modifier);
+        assertEquals(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE, actionStatus);
+        verifyNoInteractions(distributionNotificationSender);
+    }
+
+    @Test
+    void testNotifyServiceForDeleteWhenKafkaNotificationFailed() {
+        String distributionId = ThreadLocalsHolder.getUuid();
+        Service service = mock(Service.class);
+        User user = mock(User.class);
+        INotificationData notificationData = mock(INotificationData.class);
+
+        when(environmentsEngine.getEnvironmentById(ENV_ID)).thenReturn(envs.get(ENV_ID));
+        when(distributionEngineConfigurationMock.getConfigurationMock().getDistributionDeleteTopicName())
+                .thenReturn(DELETE_TOPIC);
+
+        String expectedTopicName = DistributionEngineInitTask.buildTopicName(DELETE_TOPIC, ENV_ID);
+        when(distributionDeleteNotificationSender.sendNotificationForDeleteService(
+                eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class),
+                eq(notificationData)))
+                .thenReturn(ActionStatus.GENERAL_ERROR);
+
+        ActionStatus action = testInstance.notifyServiceForDelete(distributionId, notificationData, service, ENV_ID,
+                user);
+        assertEquals(ActionStatus.GENERAL_ERROR, action);
+        verify(distributionDeleteNotificationSender, times(1)).sendNotificationForDeleteService(
+                eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class),
+                eq(notificationData));
+    }
 }
index 587b902..e62fb41 100644 (file)
@@ -52,6 +52,7 @@ import org.hamcrest.MatcherAssert;
 import org.junit.Assert;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
+import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
 import org.openecomp.sdc.be.components.impl.exceptions.ComponentException;
 import org.openecomp.sdc.be.dao.api.ActionStatus;
 import org.openecomp.sdc.be.datatypes.elements.ArtifactDataDefinition;
@@ -84,6 +85,7 @@ import org.openecomp.sdc.be.resources.data.auditing.DistributionDeployEvent;
 import org.openecomp.sdc.be.resources.data.auditing.ResourceAdminEvent;
 import org.openecomp.sdc.be.types.ServiceConsumptionData;
 import org.openecomp.sdc.be.user.Role;
+import org.openecomp.sdc.common.util.ThreadLocalsHolder;
 import org.openecomp.sdc.common.util.ValidationUtils;
 import org.openecomp.sdc.exception.ResponseFormat;
 import org.springframework.http.HttpStatus;
@@ -715,8 +717,35 @@ class ServiceBusinessLogicTest extends ServiceBusinessLogicBaseTestSetup {
         Mockito.when(toscaOperationFacade.getToscaElement(Mockito.anyString())).thenReturn(eitherService);
         Mockito.when(toscaOperationFacade.deleteService(Mockito.anyString(), Mockito.eq(true))).thenReturn(deletedServcies);
         Mockito.when(modelOperation.findModelByName(model)).thenReturn(Optional.of(normativeExtensionModel));
+
+        String mockDistributionId = ThreadLocalsHolder.getUuid();
+        INotificationData mockNotificationData = Mockito.mock(INotificationData.class);
+        Mockito.when(distributionEngine.buildServiceForDeleteNotification(
+                (Service) Mockito.eq(eitherService.left().value()), Mockito.eq(mockDistributionId)))
+                .thenReturn(mockNotificationData);
+
+        Mockito.when(distributionEngine.notifyServiceForDelete(
+                Mockito.eq(mockDistributionId),
+                Mockito.eq(mockNotificationData),
+                (Service) Mockito.eq(eitherService.left().value()),
+                Mockito.anyString(),
+                Mockito.eq(user))).thenReturn(ActionStatus.OK);
+
         bl.deleteServiceAllVersions(serviceId, user);
         Mockito.verify(modelOperation, Mockito.times(1)).deleteModel(normativeExtensionModel, false);
+
+        // Verify Kafka notification payload building for delete service.
+        Mockito.verify(distributionEngine, Mockito.times(1))
+                .buildServiceForDeleteNotification((Service) eitherService.left().value(), mockDistributionId);
+
+        // Verify Kafka notification sending for delete service.
+        Mockito.verify(distributionEngine, Mockito.times(1))
+                .notifyServiceForDelete(
+                        Mockito.eq(mockDistributionId),
+                        Mockito.eq(mockNotificationData),
+                        (Service) Mockito.eq(eitherService.left().value()),
+                        Mockito.anyString(),
+                        Mockito.eq(user));
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
index 72870ea..92d6ee5 100644 (file)
@@ -58,6 +58,8 @@ public class DistributionEngineConfiguration extends BasicConfiguration {
     private ExternalServiceConfig msoConfig;
     private Integer opEnvRecoveryIntervalSec;
     private Integer allowedTimeBeforeStaleSec;
+    private String distributionDeleteTopicName;
+    private DistributionDeleteTopicConfig distributionDeleteTopic;
 
     public void setEnvironments(List<String> environments) {
         Set<String> set = new HashSet<>();
@@ -156,4 +158,14 @@ public class DistributionEngineConfiguration extends BasicConfiguration {
         private List<String> lifecycle;
 
     }
+
+    @Getter
+    @Setter
+    @ToString
+    public static class DistributionDeleteTopicConfig {
+
+        private Integer maxWaitingAfterSendingSeconds;
+        private Integer maxThreadPoolSize;
+        private Integer minThreadPoolSize;
+    }
 }
index b1f68c9..a4d7b88 100644 (file)
@@ -64,6 +64,7 @@ public class YamlToObjectConverter {
         deDescription.putListPropertyType("distribNotifResourceArtifactTypes", ComponentArtifactTypesConfig.class);
         deDescription.putListPropertyType("createTopic", CreateTopicConfig.class);
         deDescription.putListPropertyType("distributionNotificationTopic", DistributionNotificationTopicConfig.class);
+        deDescription.putListPropertyType("distributionDeleteTopic", DistributionNotificationTopicConfig.class);
         deConstructor.addTypeDescription(deDescription);
         yamlConstructors.put(DistributionEngineConfiguration.class.getName(), deConstructor);
         // FE conf
index 35af78b..c3cdddd 100644 (file)
@@ -21,20 +21,29 @@ package org.openecomp.sdc.common.util;
 
 import org.apache.commons.codec.binary.Base64;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.openecomp.sdc.be.config.Configuration;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionDeleteTopicConfig;
 import org.openecomp.sdc.common.http.client.api.HttpClient;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Field;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.HashMap;
+
 import org.openecomp.sdc.exception.YamlConversionException;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
 
 import static junit.framework.TestCase.assertEquals;
 import static junit.framework.TestCase.assertFalse;
 import static junit.framework.TestCase.assertNull;
 import static junit.framework.TestCase.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 public class YamlToObjectConverterTest {
 
@@ -46,6 +55,21 @@ public class YamlToObjectConverterTest {
             "  key: value";
 
     private YamlToObjectConverter yamlToObjectConverter;
+    private static HashMap<String, Constructor> yamlConstructors;
+
+    /**
+     * This method is to initialize the static block before class loads.
+     * 
+     * @throws Exception if exception occurs while getting the fields from class.
+     */
+    @BeforeClass
+    public static void init() throws Exception {
+        Class.forName("org.openecomp.sdc.common.util.YamlToObjectConverter");
+        Field field = Class.forName("org.openecomp.sdc.common.util.YamlToObjectConverter")
+                .getDeclaredField("yamlConstructors");
+        field.setAccessible(true);
+        yamlConstructors = (HashMap<String, Constructor>) field.get(null);
+    }
 
     @Before
     public void setUp() {
@@ -169,4 +193,36 @@ public class YamlToObjectConverterTest {
         assertEquals(result.getUsers().size(),1);
         assertEquals(result.getUsers().get("tom"),"passwd");
     }
+
+    /**
+     * This method is to test getting the delete-topic configurations from yaml
+     * file.
+     * 
+     * @throws Exception If any exception occurs while getting the configuration
+     *                   from yaml file.
+     */
+    @Test
+    public void testContainsDistributionDeleteTopic() throws Exception {
+        assertTrue(yamlConstructors.containsKey("org.openecomp.sdc.be.config.DistributionEngineConfiguration"));
+
+        Constructor constructor = yamlConstructors.get("org.openecomp.sdc.be.config.DistributionEngineConfiguration");
+        assertNotNull(constructor);
+
+        String yamlStr = "distributionDeleteTopicName: SDC-DELETE-TOPIC\n" +
+                "distributionDeleteTopic:\n" +
+                "  maxWaitingAfterSendingSeconds: 10\n" +
+                "  maxThreadPoolSize: 20\n" +
+                "  minThreadPoolSize: 5\n";
+
+        Yaml yaml = new Yaml(constructor);
+        DistributionEngineConfiguration distributionEngineConfiguration = yaml.load(yamlStr);
+        assertEquals("SDC-DELETE-TOPIC", distributionEngineConfiguration.getDistributionDeleteTopicName());
+
+        DistributionDeleteTopicConfig deleteTopicConfig = distributionEngineConfiguration.getDistributionDeleteTopic();
+
+        assertNotNull(deleteTopicConfig);
+        assertEquals(Integer.valueOf(10), deleteTopicConfig.getMaxWaitingAfterSendingSeconds());
+        assertEquals(Integer.valueOf(20), deleteTopicConfig.getMaxThreadPoolSize());
+        assertEquals(Integer.valueOf(5), deleteTopicConfig.getMinThreadPoolSize());
+    }
 }