From daa7b5d2cbda826ce8216875cffa2692ffa789c2 Mon Sep 17 00:00:00 2001 From: "aakansha.maltare" Date: Tue, 26 Aug 2025 14:19:35 +0200 Subject: [PATCH] Publish kafka notification for delete service Issue-ID: SDC-4767 Change-Id: I8f8e694d3aeb683da5f0c91171a5de5de2e220c6 Signed-off-by: aakansha.maltare --- .../DistributionDeleteNotificationSender.java | 115 +++++++++++++++++++++ .../distribution/engine/DistributionEngine.java | 67 ++++++++++++ .../distribution/engine/IDistributionEngine.java | 10 ++ .../be/components/impl/ServiceBusinessLogic.java | 23 +++++ .../DistributionDeleteNotificationSenderTest.java | 113 ++++++++++++++++++++ .../engine/DistributionEngineConfigTest.java | 1 + .../engine/DistributionEngineTest.java | 85 +++++++++++++++ .../components/impl/ServiceBusinessLogicTest.java | 29 ++++++ .../be/config/DistributionEngineConfiguration.java | 12 +++ .../sdc/common/util/YamlToObjectConverter.java | 1 + .../sdc/common/util/YamlToObjectConverterTest.java | 56 ++++++++++ 11 files changed, 512 insertions(+) create mode 100644 catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSender.java create mode 100644 catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSenderTest.java diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSender.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSender.java new file mode 100644 index 0000000000..4189e04dec --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSender.java @@ -0,0 +1,115 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright © 2025 Deutsche Telekom. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.components.distribution.engine; + +import org.openecomp.sdc.be.components.kafka.KafkaHandler; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.openecomp.sdc.be.dao.api.ActionStatus; +import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; +import org.openecomp.sdc.common.log.wrappers.Logger; +import org.springframework.stereotype.Component; + +@Component +public class DistributionDeleteNotificationSender { + private static final Logger logger = Logger.getLogger(DistributionDeleteNotificationSender.class.getName()); + + private final CambriaHandler cambriaHandler; + + private final KafkaHandler kafkaHandler; + + private final DistributionEngineConfiguration deConfiguration; + + public DistributionDeleteNotificationSender() { + this.kafkaHandler = new KafkaHandler(); + this.deConfiguration = ConfigurationManager.getConfigurationManager() + .getDistributionEngineConfiguration(); + this.cambriaHandler = new CambriaHandler(); + } + + public DistributionDeleteNotificationSender(KafkaHandler kafkaHandler, + DistributionEngineConfiguration deConfiguration, CambriaHandler cambriaHandler) { + this.kafkaHandler = kafkaHandler; + this.deConfiguration = deConfiguration; + this.cambriaHandler = cambriaHandler; + } + + /** + * This method is used to send the notification for the delete service. + * + * @param topicName name of the Kafka topic for delete service + * @param distributionId random UUID generated for the notification + * @param messageBusData environment message bus data + * @param notificationData the payload data to be sent in the Kafka notification + * @return the action status of the notification + */ + public ActionStatus sendNotificationForDeleteService(String topicName, String distributionId, + EnvironmentMessageBusData messageBusData, + INotificationData notificationData) { + logger.debug("sending notification with topicName={}, distributionId={}", + topicName, + distributionId); + + CambriaErrorResponse status; + if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) { + logger.debug("Kafka is inactive. Using CambriaHandler to send notification for topic={}", topicName); + status = cambriaHandler + .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), + messageBusData.getUebPrivateKey(), + messageBusData.getDmaaPuebEndpoints(), notificationData, + deConfiguration.getDistributionDeleteTopic().getMaxWaitingAfterSendingSeconds()); + } else { + logger.debug("Kafka is active. Using KafkaHandler to send notification for topic={}", topicName); + status = kafkaHandler.sendNotification(topicName, notificationData); + } + + logger.info("sending notification is completed for topicName={}, with status = {}", + topicName, status); + return convertCambriaResponse(status); + } + + /** + * This method is used to convert a {@link CambriaErrorResponse} into the + * corresponding {@link ActionStatus}. + * + * @param status the operation status + * @return the mapped ActionStatus based on the operation status + */ + private ActionStatus convertCambriaResponse(CambriaErrorResponse status) { + CambriaOperationStatus operationStatus = status.getOperationStatus(); + switch (operationStatus) { + case OK: + return ActionStatus.OK; + case AUTHENTICATION_ERROR: + return ActionStatus.AUTHENTICATION_ERROR; + case INTERNAL_SERVER_ERROR: + return ActionStatus.GENERAL_ERROR; + case UNKNOWN_HOST_ERROR: + return ActionStatus.UNKNOWN_HOST; + case CONNNECTION_ERROR: + return ActionStatus.CONNNECTION_ERROR; + case OBJECT_NOT_FOUND: + return ActionStatus.OBJECT_NOT_FOUND; + default: + return ActionStatus.GENERAL_ERROR; + } + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java index cebedadf10..91d6bc00d8 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngine.java @@ -16,6 +16,8 @@ * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= + * Modifications copyright (c) 2025 Deutsche Telekom. + * ================================================================================ */ package org.openecomp.sdc.be.components.distribution.engine; @@ -67,6 +69,9 @@ public class DistributionEngine implements IDistributionEngine { private DistributionEngineClusterHealth distributionEngineClusterHealth; @Resource private ServiceDistributionValidation serviceDistributionValidation; + @Resource + private DistributionDeleteNotificationSender distributionDeleteNotificationSender; + private Map envNamePerInitTask = new HashMap<>(); private Map envNamePerPollingTask = new HashMap<>(); private Map envNamePerStatus = new HashMap<>(); @@ -150,6 +155,8 @@ public class DistributionEngine implements IDistributionEngine { result = isValidParam(deConfiguration.getDistributionNotifTopicName(), methodName, "distributionNotifTopicName") && result; result = isValidParam(deConfiguration.getDistributionStatusTopicName(), methodName, "distributionStatusTopicName") && result; result = isValidObject(deConfiguration.getDistributionStatusTopic(), methodName, "distributionStatusTopic") && result; + result = isValidParam(deConfiguration.getDistributionDeleteTopicName(), methodName, + "distributionDeleteTopicName") && result; result = isValidObject(deConfiguration.getInitMaxIntervalSec(), methodName, "initMaxIntervalSec") && result; result = isValidObject(deConfiguration.getInitRetryIntervalSec(), methodName, "initRetryIntervalSec") && result; result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerId(), methodName, "consumerId") && result; @@ -304,4 +311,64 @@ public class DistributionEngine implements IDistributionEngine { value = serviceDistributionArtifactsBuilder.buildServiceForDistribution(value, service); return value; } + + /** + * This method is used to build the notification payload for delete service + * + * @param service service to be deleted + * @param distributionId random UUID generated for the notification + * @return notification payload of INotificationData type. + */ + @Override + public INotificationData buildServiceForDeleteNotification(Service service, String distributionId) { + INotificationData value = serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(service, + distributionId, null); + logger.debug("Payload for delete kafka notification is - {}", value); + return value; + } + + /** + * This method is used to notify the service for delete operation. + * + * @param distributionId random UUID generated for the notification + * @param notificationData the payload data to be sent in the notification + * @param service the service to be deleted + * @param envName the environment name + * @param user the user performing the delete operation + * @return the action status of the notification. + */ + @Override + public ActionStatus notifyServiceForDelete(String distributionId, INotificationData notificationData, + Service service, + String envName, User user) { + logger.debug( + "Received notify service request for delete. distributionId = {}, serviceUuid = {} serviceUid = {}, envName = {}, userId = {}, modifierName {}", + distributionId, service.getUUID(), service.getUniqueId(), envName, service.getLastUpdaterUserId(), + user); + + String topicName = buildDeleteNotificationTopicName(envName); + logger.debug("topic name for delete - {}", topicName); + + ActionStatus deleteServiceNotifStatus = Optional + .ofNullable(environmentsEngine.getEnvironmentById(envName)).map(EnvironmentMessageBusData::new).map( + messageBusData -> distributionDeleteNotificationSender + .sendNotificationForDeleteService(topicName, distributionId, messageBusData, + notificationData)) + .orElse(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE); + logger.debug("Finish delete notification service. notification status is {}", deleteServiceNotifStatus); + return deleteServiceNotifStatus; + } + + /** + * This method is used to create topic name for delete operation based on + * environment name. + * + * @param envName the environment name + * @return the delete topic name + */ + private String buildDeleteNotificationTopicName(String envName) { + String deleteTopicName = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration() + .getDistributionDeleteTopicName(); + return DistributionEngineInitTask.buildTopicName(deleteTopicName, envName); + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java index f08043cf76..078fb83c00 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/IDistributionEngine.java @@ -72,4 +72,14 @@ public interface IDistributionEngine { default OperationalEnvironmentEntry getEnvironmentByDmaapUebAddress(List dmaapUebAddress) { return null; } + + default ActionStatus notifyServiceForDelete(String distributionId, INotificationData notificationData, + Service service, + String envName, User user) { + return null; + } + + default INotificationData buildServiceForDeleteNotification(Service service, String distributionId) { + return null; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogic.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogic.java index 9874020b49..7fa58be94b 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogic.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogic.java @@ -18,6 +18,8 @@ * ============LICENSE_END========================================================= * Modifications copyright (c) 2019 Nokia * ================================================================================ + * Modifications copyright (c) 2025 Deutsche Telekom. + * ================================================================================ */ package org.openecomp.sdc.be.components.impl; @@ -1532,6 +1534,27 @@ public class ServiceBusinessLogic extends ComponentBusinessLogic { } toscaOperationFacade.commitAndCheck(service.getUniqueId()); updateCatalog(service, ChangeTypeEnum.DELETE); + + String distributionId = ThreadLocalsHolder.getUuid(); + String envName = getEnvNameFromConfiguration(); + INotificationData notificationData = distributionEngine.buildServiceForDeleteNotification(service, + distributionId); + log.debug("for kafka notification envName = {} and notificationData = {}", envName, notificationData); + ActionStatus notifyServiceResponse = distributionEngine.notifyServiceForDelete(distributionId, + notificationData, service, envName, user); + if (notifyServiceResponse == ActionStatus.OK) { + log.debug( + "Kafka notification successfully generated for delete service. Notification response details are - response={}, serviceName={}, serviceId={}", + notifyServiceResponse, + service.getName(), + service.getUniqueId()); + } else { + log.debug( + "Kafka notification failed for delete service. Notification response details are - response={}, serviceName={}, serviceId={}", + notifyServiceResponse, + service.getName(), + service.getUniqueId()); + } } catch (ComponentException exception) { log.debug("Failed to delete service, {}, in ServiceServlet", serviceId); janusGraphDao.rollback(); diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSenderTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSenderTest.java new file mode 100644 index 0000000000..f5faa7f68b --- /dev/null +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionDeleteNotificationSenderTest.java @@ -0,0 +1,113 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright © 2025 Deutsche Telekom. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.components.distribution.engine; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionDeleteTopicConfig; +import org.openecomp.sdc.be.dao.api.ActionStatus; +import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; +import org.openecomp.sdc.common.util.ThreadLocalsHolder; + +public class DistributionDeleteNotificationSenderTest { + private KafkaHandler kafkaHandler; + private DistributionEngineConfiguration distributionConfig; + private DistributionDeleteNotificationSender deleteNotificationSender; + private CambriaHandler cambriaHandler; + private static final String TOPIC_NAME = "delete_topic"; + + @Before + public void setUp() { + kafkaHandler = Mockito.mock(KafkaHandler.class); + distributionConfig = Mockito.mock(DistributionEngineConfiguration.class); + cambriaHandler = Mockito.mock(CambriaHandler.class); + deleteNotificationSender = new DistributionDeleteNotificationSender(kafkaHandler, distributionConfig, + cambriaHandler); + } + + @Test + public void testSendNotificationForDeleteService_WhenKafkaIsInactive() { + when(kafkaHandler.isKafkaActive()).thenReturn(false); + DistributionDeleteTopicConfig deleteTopicConfig = mock( + DistributionEngineConfiguration.DistributionDeleteTopicConfig.class); + when(distributionConfig.getDistributionDeleteTopic()).thenReturn(deleteTopicConfig); + when(deleteTopicConfig.getMaxWaitingAfterSendingSeconds()).thenReturn(10); + + EnvironmentMessageBusData messageBusData = mock(EnvironmentMessageBusData.class); + List serverList = Collections.singletonList("uebEndpoint"); + when(messageBusData.getUebPublicKey()).thenReturn("uebPublicKey"); + when(messageBusData.getUebPrivateKey()).thenReturn("uebPrivateKey"); + when(messageBusData.getDmaaPuebEndpoints()).thenReturn(serverList); + + INotificationData notificationData = new NotificationDataImpl(); + + when(cambriaHandler.sendNotificationAndClose(TOPIC_NAME, "uebPublicKey", "uebPrivateKey", serverList, + notificationData, 10)).thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK, 200)); + + ActionStatus result = deleteNotificationSender.sendNotificationForDeleteService( + TOPIC_NAME, ThreadLocalsHolder.getUuid(), messageBusData, notificationData); + + assertEquals(ActionStatus.OK, result); + verify(cambriaHandler, times(1)).sendNotificationAndClose(eq(TOPIC_NAME), eq("uebPublicKey"), + eq("uebPrivateKey"), eq(serverList), eq(notificationData), eq(10L)); + } + + @Test + public void testSendNotificationForDeleteService_WhenKafkaIsActive() { + INotificationData notificationData = new NotificationDataImpl(); + when(kafkaHandler.isKafkaActive()).thenReturn(true); + when(kafkaHandler.sendNotification(TOPIC_NAME, notificationData)) + .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK, 200)); + + ActionStatus result = deleteNotificationSender.sendNotificationForDeleteService( + TOPIC_NAME, ThreadLocalsHolder.getUuid(), null, notificationData); + + assertEquals(ActionStatus.OK, result); + verify(kafkaHandler, times(1)).sendNotification(eq(TOPIC_NAME), eq(notificationData)); + } + + @Test + public void testSendNotificationForDeleteService_WhenKafkaNotificationFails() { + INotificationData notificationData = new NotificationDataImpl(); + when(kafkaHandler.isKafkaActive()).thenReturn(true); + when(kafkaHandler.sendNotification(TOPIC_NAME, notificationData)) + .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 200)); + + ActionStatus result = deleteNotificationSender.sendNotificationForDeleteService( + TOPIC_NAME, ThreadLocalsHolder.getUuid(), null, notificationData); + + assertEquals(ActionStatus.GENERAL_ERROR, result); + verify(kafkaHandler, times(1)).sendNotification(eq(TOPIC_NAME), eq(notificationData)); + } +} diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineConfigTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineConfigTest.java index 21a9b198f1..0832150e2f 100644 --- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineConfigTest.java +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineConfigTest.java @@ -92,6 +92,7 @@ class DistributionEngineConfigTest { deConfiguration.setDistributionNotifTopicName("distributionNotifTopicName"); deConfiguration.setDistributionStatusTopicName("statusTopic"); + deConfiguration.setDistributionDeleteTopicName("distributionDeleteTopicName"); DistributionStatusTopicConfig distributionStatusTopic = new DistributionStatusTopicConfig(); distributionStatusTopic.setConsumerGroup("asdc-group"); diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineTest.java index c5d8d37d39..52a0a8b597 100644 --- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineTest.java +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineTest.java @@ -36,6 +36,7 @@ import org.openecomp.sdc.be.model.Service; import org.openecomp.sdc.be.model.User; import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus; import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry; +import org.openecomp.sdc.common.util.ThreadLocalsHolder; import java.util.HashSet; import java.util.List; @@ -49,7 +50,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -59,6 +63,7 @@ public class DistributionEngineTest { public static final String ENV_ID = "envId"; public static final String USER_ID = "userId"; public static final String MODIFIER = "modifier"; + public static final String DELETE_TOPIC = "deleteTopic"; @InjectMocks private DistributionEngine testInstance; @@ -72,6 +77,9 @@ public class DistributionEngineTest { @Mock private ServiceDistributionArtifactsBuilder serviceDistributionArtifactsBuilder; + @Mock + DistributionDeleteNotificationSender distributionDeleteNotificationSender; + private DummyDistributionConfigurationManager distributionEngineConfigurationMock; private Map envs; @@ -348,4 +356,81 @@ public class DistributionEngineTest { when(serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new NotificationDataImpl()); result = testInstance.buildServiceForDistribution(service, distributionId, workloadContext); } + + @Test + void testBuildServiceForDeleteNotification() { + Service service = new Service(); + String distributionId = ThreadLocalsHolder.getUuid(); + INotificationData expectedNotificationData = new NotificationDataImpl(); + expectedNotificationData.setDistributionID(distributionId); + + when(serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution( + eq(service), eq(distributionId), isNull())) + .thenReturn(expectedNotificationData); + + INotificationData actual = testInstance.buildServiceForDeleteNotification(service, distributionId); + verify(serviceDistributionArtifactsBuilder, times(1)) + .buildResourceInstanceForDistribution(eq(service), eq(distributionId), isNull()); + assertEquals(expectedNotificationData, actual); + assertEquals(expectedNotificationData.getDistributionID(), actual.getDistributionID()); + } + + @Test + void testNotifyServiceForDelete() { + String distributionId = ThreadLocalsHolder.getUuid(); + Service service = mock(Service.class); + User user = mock(User.class); + INotificationData notificationData = mock(INotificationData.class); + + when(environmentsEngine.getEnvironmentById(ENV_ID)).thenReturn(envs.get(ENV_ID)); + when(distributionEngineConfigurationMock.getConfigurationMock().getDistributionDeleteTopicName()) + .thenReturn(DELETE_TOPIC); + + String expectedTopicName = DistributionEngineInitTask.buildTopicName(DELETE_TOPIC, ENV_ID); + when(distributionDeleteNotificationSender.sendNotificationForDeleteService( + eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class), + eq(notificationData))) + .thenReturn(ActionStatus.OK); + + ActionStatus action = testInstance.notifyServiceForDelete(distributionId, notificationData, service, ENV_ID, + user); + assertEquals(ActionStatus.OK, action); + verify(distributionDeleteNotificationSender, times(1)).sendNotificationForDeleteService( + eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class), + eq(notificationData)); + } + + @Test + void testNotifyServiceForDeleteWhenEnvironmentDoesNotExist() { + when(environmentsEngine.getEnvironments()).thenReturn(envs); + ActionStatus actionStatus = testInstance.notifyServiceForDelete(DISTRIBUTION_ID, new NotificationDataImpl(), + new Service(), "ENV_123", modifier); + assertEquals(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE, actionStatus); + verifyNoInteractions(distributionNotificationSender); + } + + @Test + void testNotifyServiceForDeleteWhenKafkaNotificationFailed() { + String distributionId = ThreadLocalsHolder.getUuid(); + Service service = mock(Service.class); + User user = mock(User.class); + INotificationData notificationData = mock(INotificationData.class); + + when(environmentsEngine.getEnvironmentById(ENV_ID)).thenReturn(envs.get(ENV_ID)); + when(distributionEngineConfigurationMock.getConfigurationMock().getDistributionDeleteTopicName()) + .thenReturn(DELETE_TOPIC); + + String expectedTopicName = DistributionEngineInitTask.buildTopicName(DELETE_TOPIC, ENV_ID); + when(distributionDeleteNotificationSender.sendNotificationForDeleteService( + eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class), + eq(notificationData))) + .thenReturn(ActionStatus.GENERAL_ERROR); + + ActionStatus action = testInstance.notifyServiceForDelete(distributionId, notificationData, service, ENV_ID, + user); + assertEquals(ActionStatus.GENERAL_ERROR, action); + verify(distributionDeleteNotificationSender, times(1)).sendNotificationForDeleteService( + eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class), + eq(notificationData)); + } } diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogicTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogicTest.java index 587b902f4c..e62fb41993 100644 --- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogicTest.java +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/impl/ServiceBusinessLogicTest.java @@ -52,6 +52,7 @@ import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.openecomp.sdc.be.components.distribution.engine.INotificationData; import org.openecomp.sdc.be.components.impl.exceptions.ComponentException; import org.openecomp.sdc.be.dao.api.ActionStatus; import org.openecomp.sdc.be.datatypes.elements.ArtifactDataDefinition; @@ -84,6 +85,7 @@ import org.openecomp.sdc.be.resources.data.auditing.DistributionDeployEvent; import org.openecomp.sdc.be.resources.data.auditing.ResourceAdminEvent; import org.openecomp.sdc.be.types.ServiceConsumptionData; import org.openecomp.sdc.be.user.Role; +import org.openecomp.sdc.common.util.ThreadLocalsHolder; import org.openecomp.sdc.common.util.ValidationUtils; import org.openecomp.sdc.exception.ResponseFormat; import org.springframework.http.HttpStatus; @@ -715,8 +717,35 @@ class ServiceBusinessLogicTest extends ServiceBusinessLogicBaseTestSetup { Mockito.when(toscaOperationFacade.getToscaElement(Mockito.anyString())).thenReturn(eitherService); Mockito.when(toscaOperationFacade.deleteService(Mockito.anyString(), Mockito.eq(true))).thenReturn(deletedServcies); Mockito.when(modelOperation.findModelByName(model)).thenReturn(Optional.of(normativeExtensionModel)); + + String mockDistributionId = ThreadLocalsHolder.getUuid(); + INotificationData mockNotificationData = Mockito.mock(INotificationData.class); + Mockito.when(distributionEngine.buildServiceForDeleteNotification( + (Service) Mockito.eq(eitherService.left().value()), Mockito.eq(mockDistributionId))) + .thenReturn(mockNotificationData); + + Mockito.when(distributionEngine.notifyServiceForDelete( + Mockito.eq(mockDistributionId), + Mockito.eq(mockNotificationData), + (Service) Mockito.eq(eitherService.left().value()), + Mockito.anyString(), + Mockito.eq(user))).thenReturn(ActionStatus.OK); + bl.deleteServiceAllVersions(serviceId, user); Mockito.verify(modelOperation, Mockito.times(1)).deleteModel(normativeExtensionModel, false); + + // Verify Kafka notification payload building for delete service. + Mockito.verify(distributionEngine, Mockito.times(1)) + .buildServiceForDeleteNotification((Service) eitherService.left().value(), mockDistributionId); + + // Verify Kafka notification sending for delete service. + Mockito.verify(distributionEngine, Mockito.times(1)) + .notifyServiceForDelete( + Mockito.eq(mockDistributionId), + Mockito.eq(mockNotificationData), + (Service) Mockito.eq(eitherService.left().value()), + Mockito.anyString(), + Mockito.eq(user)); } @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/common-app-api/src/main/java/org/openecomp/sdc/be/config/DistributionEngineConfiguration.java b/common-app-api/src/main/java/org/openecomp/sdc/be/config/DistributionEngineConfiguration.java index 72870ea1d9..92d6ee5840 100644 --- a/common-app-api/src/main/java/org/openecomp/sdc/be/config/DistributionEngineConfiguration.java +++ b/common-app-api/src/main/java/org/openecomp/sdc/be/config/DistributionEngineConfiguration.java @@ -58,6 +58,8 @@ public class DistributionEngineConfiguration extends BasicConfiguration { private ExternalServiceConfig msoConfig; private Integer opEnvRecoveryIntervalSec; private Integer allowedTimeBeforeStaleSec; + private String distributionDeleteTopicName; + private DistributionDeleteTopicConfig distributionDeleteTopic; public void setEnvironments(List environments) { Set set = new HashSet<>(); @@ -156,4 +158,14 @@ public class DistributionEngineConfiguration extends BasicConfiguration { private List lifecycle; } + + @Getter + @Setter + @ToString + public static class DistributionDeleteTopicConfig { + + private Integer maxWaitingAfterSendingSeconds; + private Integer maxThreadPoolSize; + private Integer minThreadPoolSize; + } } diff --git a/common-app-api/src/main/java/org/openecomp/sdc/common/util/YamlToObjectConverter.java b/common-app-api/src/main/java/org/openecomp/sdc/common/util/YamlToObjectConverter.java index b1f68c9b82..a4d7b88ec8 100644 --- a/common-app-api/src/main/java/org/openecomp/sdc/common/util/YamlToObjectConverter.java +++ b/common-app-api/src/main/java/org/openecomp/sdc/common/util/YamlToObjectConverter.java @@ -64,6 +64,7 @@ public class YamlToObjectConverter { deDescription.putListPropertyType("distribNotifResourceArtifactTypes", ComponentArtifactTypesConfig.class); deDescription.putListPropertyType("createTopic", CreateTopicConfig.class); deDescription.putListPropertyType("distributionNotificationTopic", DistributionNotificationTopicConfig.class); + deDescription.putListPropertyType("distributionDeleteTopic", DistributionNotificationTopicConfig.class); deConstructor.addTypeDescription(deDescription); yamlConstructors.put(DistributionEngineConfiguration.class.getName(), deConstructor); // FE conf diff --git a/common-app-api/src/test/java/org/openecomp/sdc/common/util/YamlToObjectConverterTest.java b/common-app-api/src/test/java/org/openecomp/sdc/common/util/YamlToObjectConverterTest.java index 35af78bd4e..c3cdddd804 100644 --- a/common-app-api/src/test/java/org/openecomp/sdc/common/util/YamlToObjectConverterTest.java +++ b/common-app-api/src/test/java/org/openecomp/sdc/common/util/YamlToObjectConverterTest.java @@ -21,20 +21,29 @@ package org.openecomp.sdc.common.util; import org.apache.commons.codec.binary.Base64; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.openecomp.sdc.be.config.Configuration; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionDeleteTopicConfig; import org.openecomp.sdc.common.http.client.api.HttpClient; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Field; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.HashMap; + import org.openecomp.sdc.exception.YamlConversionException; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.Constructor; import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertFalse; import static junit.framework.TestCase.assertNull; import static junit.framework.TestCase.assertTrue; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class YamlToObjectConverterTest { @@ -46,6 +55,21 @@ public class YamlToObjectConverterTest { " key: value"; private YamlToObjectConverter yamlToObjectConverter; + private static HashMap yamlConstructors; + + /** + * This method is to initialize the static block before class loads. + * + * @throws Exception if exception occurs while getting the fields from class. + */ + @BeforeClass + public static void init() throws Exception { + Class.forName("org.openecomp.sdc.common.util.YamlToObjectConverter"); + Field field = Class.forName("org.openecomp.sdc.common.util.YamlToObjectConverter") + .getDeclaredField("yamlConstructors"); + field.setAccessible(true); + yamlConstructors = (HashMap) field.get(null); + } @Before public void setUp() { @@ -169,4 +193,36 @@ public class YamlToObjectConverterTest { assertEquals(result.getUsers().size(),1); assertEquals(result.getUsers().get("tom"),"passwd"); } + + /** + * This method is to test getting the delete-topic configurations from yaml + * file. + * + * @throws Exception If any exception occurs while getting the configuration + * from yaml file. + */ + @Test + public void testContainsDistributionDeleteTopic() throws Exception { + assertTrue(yamlConstructors.containsKey("org.openecomp.sdc.be.config.DistributionEngineConfiguration")); + + Constructor constructor = yamlConstructors.get("org.openecomp.sdc.be.config.DistributionEngineConfiguration"); + assertNotNull(constructor); + + String yamlStr = "distributionDeleteTopicName: SDC-DELETE-TOPIC\n" + + "distributionDeleteTopic:\n" + + " maxWaitingAfterSendingSeconds: 10\n" + + " maxThreadPoolSize: 20\n" + + " minThreadPoolSize: 5\n"; + + Yaml yaml = new Yaml(constructor); + DistributionEngineConfiguration distributionEngineConfiguration = yaml.load(yamlStr); + assertEquals("SDC-DELETE-TOPIC", distributionEngineConfiguration.getDistributionDeleteTopicName()); + + DistributionDeleteTopicConfig deleteTopicConfig = distributionEngineConfiguration.getDistributionDeleteTopic(); + + assertNotNull(deleteTopicConfig); + assertEquals(Integer.valueOf(10), deleteTopicConfig.getMaxWaitingAfterSendingSeconds()); + assertEquals(Integer.valueOf(20), deleteTopicConfig.getMaxThreadPoolSize()); + assertEquals(Integer.valueOf(5), deleteTopicConfig.getMinThreadPoolSize()); + } } -- 2.16.6