--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright © 2025 Deutsche Telekom. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.dao.api.ActionStatus;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DistributionDeleteNotificationSender {
+ private static final Logger logger = Logger.getLogger(DistributionDeleteNotificationSender.class.getName());
+
+ private final CambriaHandler cambriaHandler;
+
+ private final KafkaHandler kafkaHandler;
+
+ private final DistributionEngineConfiguration deConfiguration;
+
+ public DistributionDeleteNotificationSender() {
+ this.kafkaHandler = new KafkaHandler();
+ this.deConfiguration = ConfigurationManager.getConfigurationManager()
+ .getDistributionEngineConfiguration();
+ this.cambriaHandler = new CambriaHandler();
+ }
+
+ public DistributionDeleteNotificationSender(KafkaHandler kafkaHandler,
+ DistributionEngineConfiguration deConfiguration, CambriaHandler cambriaHandler) {
+ this.kafkaHandler = kafkaHandler;
+ this.deConfiguration = deConfiguration;
+ this.cambriaHandler = cambriaHandler;
+ }
+
+ /**
+ * This method is used to send the notification for the delete service.
+ *
+ * @param topicName name of the Kafka topic for delete service
+ * @param distributionId random UUID generated for the notification
+ * @param messageBusData environment message bus data
+ * @param notificationData the payload data to be sent in the Kafka notification
+ * @return the action status of the notification
+ */
+ public ActionStatus sendNotificationForDeleteService(String topicName, String distributionId,
+ EnvironmentMessageBusData messageBusData,
+ INotificationData notificationData) {
+ logger.debug("sending notification with topicName={}, distributionId={}",
+ topicName,
+ distributionId);
+
+ CambriaErrorResponse status;
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ logger.debug("Kafka is inactive. Using CambriaHandler to send notification for topic={}", topicName);
+ status = cambriaHandler
+ .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(),
+ messageBusData.getUebPrivateKey(),
+ messageBusData.getDmaaPuebEndpoints(), notificationData,
+ deConfiguration.getDistributionDeleteTopic().getMaxWaitingAfterSendingSeconds());
+ } else {
+ logger.debug("Kafka is active. Using KafkaHandler to send notification for topic={}", topicName);
+ status = kafkaHandler.sendNotification(topicName, notificationData);
+ }
+
+ logger.info("sending notification is completed for topicName={}, with status = {}",
+ topicName, status);
+ return convertCambriaResponse(status);
+ }
+
+ /**
+ * This method is used to convert a {@link CambriaErrorResponse} into the
+ * corresponding {@link ActionStatus}.
+ *
+ * @param status the operation status
+ * @return the mapped ActionStatus based on the operation status
+ */
+ private ActionStatus convertCambriaResponse(CambriaErrorResponse status) {
+ CambriaOperationStatus operationStatus = status.getOperationStatus();
+ switch (operationStatus) {
+ case OK:
+ return ActionStatus.OK;
+ case AUTHENTICATION_ERROR:
+ return ActionStatus.AUTHENTICATION_ERROR;
+ case INTERNAL_SERVER_ERROR:
+ return ActionStatus.GENERAL_ERROR;
+ case UNKNOWN_HOST_ERROR:
+ return ActionStatus.UNKNOWN_HOST;
+ case CONNNECTION_ERROR:
+ return ActionStatus.CONNNECTION_ERROR;
+ case OBJECT_NOT_FOUND:
+ return ActionStatus.OBJECT_NOT_FOUND;
+ default:
+ return ActionStatus.GENERAL_ERROR;
+ }
+ }
+}
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
+ * Modifications copyright (c) 2025 Deutsche Telekom.
+ * ================================================================================
*/
package org.openecomp.sdc.be.components.distribution.engine;
private DistributionEngineClusterHealth distributionEngineClusterHealth;
@Resource
private ServiceDistributionValidation serviceDistributionValidation;
+ @Resource
+ private DistributionDeleteNotificationSender distributionDeleteNotificationSender;
+
private Map<String, DistributionEngineInitTask> envNamePerInitTask = new HashMap<>();
private Map<String, DistributionEnginePollingTask> envNamePerPollingTask = new HashMap<>();
private Map<String, AtomicBoolean> envNamePerStatus = new HashMap<>();
result = isValidParam(deConfiguration.getDistributionNotifTopicName(), methodName, "distributionNotifTopicName") && result;
result = isValidParam(deConfiguration.getDistributionStatusTopicName(), methodName, "distributionStatusTopicName") && result;
result = isValidObject(deConfiguration.getDistributionStatusTopic(), methodName, "distributionStatusTopic") && result;
+ result = isValidParam(deConfiguration.getDistributionDeleteTopicName(), methodName,
+ "distributionDeleteTopicName") && result;
result = isValidObject(deConfiguration.getInitMaxIntervalSec(), methodName, "initMaxIntervalSec") && result;
result = isValidObject(deConfiguration.getInitRetryIntervalSec(), methodName, "initRetryIntervalSec") && result;
result = isValidParam(deConfiguration.getDistributionStatusTopic().getConsumerId(), methodName, "consumerId") && result;
value = serviceDistributionArtifactsBuilder.buildServiceForDistribution(value, service);
return value;
}
+
+ /**
+ * This method is used to build the notification payload for delete service
+ *
+ * @param service service to be deleted
+ * @param distributionId random UUID generated for the notification
+ * @return notification payload of INotificationData type.
+ */
+ @Override
+ public INotificationData buildServiceForDeleteNotification(Service service, String distributionId) {
+ INotificationData value = serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(service,
+ distributionId, null);
+ logger.debug("Payload for delete kafka notification is - {}", value);
+ return value;
+ }
+
+ /**
+ * This method is used to notify the service for delete operation.
+ *
+ * @param distributionId random UUID generated for the notification
+ * @param notificationData the payload data to be sent in the notification
+ * @param service the service to be deleted
+ * @param envName the environment name
+ * @param user the user performing the delete operation
+ * @return the action status of the notification.
+ */
+ @Override
+ public ActionStatus notifyServiceForDelete(String distributionId, INotificationData notificationData,
+ Service service,
+ String envName, User user) {
+ logger.debug(
+ "Received notify service request for delete. distributionId = {}, serviceUuid = {} serviceUid = {}, envName = {}, userId = {}, modifierName {}",
+ distributionId, service.getUUID(), service.getUniqueId(), envName, service.getLastUpdaterUserId(),
+ user);
+
+ String topicName = buildDeleteNotificationTopicName(envName);
+ logger.debug("topic name for delete - {}", topicName);
+
+ ActionStatus deleteServiceNotifStatus = Optional
+ .ofNullable(environmentsEngine.getEnvironmentById(envName)).map(EnvironmentMessageBusData::new).map(
+ messageBusData -> distributionDeleteNotificationSender
+ .sendNotificationForDeleteService(topicName, distributionId, messageBusData,
+ notificationData))
+ .orElse(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE);
+ logger.debug("Finish delete notification service. notification status is {}", deleteServiceNotifStatus);
+ return deleteServiceNotifStatus;
+ }
+
+ /**
+ * This method is used to create topic name for delete operation based on
+ * environment name.
+ *
+ * @param envName the environment name
+ * @return the delete topic name
+ */
+ private String buildDeleteNotificationTopicName(String envName) {
+ String deleteTopicName = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration()
+ .getDistributionDeleteTopicName();
+ return DistributionEngineInitTask.buildTopicName(deleteTopicName, envName);
+ }
}
default OperationalEnvironmentEntry getEnvironmentByDmaapUebAddress(List<String> dmaapUebAddress) {
return null;
}
+
+ default ActionStatus notifyServiceForDelete(String distributionId, INotificationData notificationData,
+ Service service,
+ String envName, User user) {
+ return null;
+ }
+
+ default INotificationData buildServiceForDeleteNotification(Service service, String distributionId) {
+ return null;
+ }
}
* ============LICENSE_END=========================================================
* Modifications copyright (c) 2019 Nokia
* ================================================================================
+ * Modifications copyright (c) 2025 Deutsche Telekom.
+ * ================================================================================
*/
package org.openecomp.sdc.be.components.impl;
}
toscaOperationFacade.commitAndCheck(service.getUniqueId());
updateCatalog(service, ChangeTypeEnum.DELETE);
+
+ String distributionId = ThreadLocalsHolder.getUuid();
+ String envName = getEnvNameFromConfiguration();
+ INotificationData notificationData = distributionEngine.buildServiceForDeleteNotification(service,
+ distributionId);
+ log.debug("for kafka notification envName = {} and notificationData = {}", envName, notificationData);
+ ActionStatus notifyServiceResponse = distributionEngine.notifyServiceForDelete(distributionId,
+ notificationData, service, envName, user);
+ if (notifyServiceResponse == ActionStatus.OK) {
+ log.debug(
+ "Kafka notification successfully generated for delete service. Notification response details are - response={}, serviceName={}, serviceId={}",
+ notifyServiceResponse,
+ service.getName(),
+ service.getUniqueId());
+ } else {
+ log.debug(
+ "Kafka notification failed for delete service. Notification response details are - response={}, serviceName={}, serviceId={}",
+ notifyServiceResponse,
+ service.getName(),
+ service.getUniqueId());
+ }
} catch (ComponentException exception) {
log.debug("Failed to delete service, {}, in ServiceServlet", serviceId);
janusGraphDao.rollback();
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright © 2025 Deutsche Telekom. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.sdc.be.components.distribution.engine;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionDeleteTopicConfig;
+import org.openecomp.sdc.be.dao.api.ActionStatus;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.openecomp.sdc.common.util.ThreadLocalsHolder;
+
+public class DistributionDeleteNotificationSenderTest {
+ private KafkaHandler kafkaHandler;
+ private DistributionEngineConfiguration distributionConfig;
+ private DistributionDeleteNotificationSender deleteNotificationSender;
+ private CambriaHandler cambriaHandler;
+ private static final String TOPIC_NAME = "delete_topic";
+
+ @Before
+ public void setUp() {
+ kafkaHandler = Mockito.mock(KafkaHandler.class);
+ distributionConfig = Mockito.mock(DistributionEngineConfiguration.class);
+ cambriaHandler = Mockito.mock(CambriaHandler.class);
+ deleteNotificationSender = new DistributionDeleteNotificationSender(kafkaHandler, distributionConfig,
+ cambriaHandler);
+ }
+
+ @Test
+ public void testSendNotificationForDeleteService_WhenKafkaIsInactive() {
+ when(kafkaHandler.isKafkaActive()).thenReturn(false);
+ DistributionDeleteTopicConfig deleteTopicConfig = mock(
+ DistributionEngineConfiguration.DistributionDeleteTopicConfig.class);
+ when(distributionConfig.getDistributionDeleteTopic()).thenReturn(deleteTopicConfig);
+ when(deleteTopicConfig.getMaxWaitingAfterSendingSeconds()).thenReturn(10);
+
+ EnvironmentMessageBusData messageBusData = mock(EnvironmentMessageBusData.class);
+ List<String> serverList = Collections.singletonList("uebEndpoint");
+ when(messageBusData.getUebPublicKey()).thenReturn("uebPublicKey");
+ when(messageBusData.getUebPrivateKey()).thenReturn("uebPrivateKey");
+ when(messageBusData.getDmaaPuebEndpoints()).thenReturn(serverList);
+
+ INotificationData notificationData = new NotificationDataImpl();
+
+ when(cambriaHandler.sendNotificationAndClose(TOPIC_NAME, "uebPublicKey", "uebPrivateKey", serverList,
+ notificationData, 10)).thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK, 200));
+
+ ActionStatus result = deleteNotificationSender.sendNotificationForDeleteService(
+ TOPIC_NAME, ThreadLocalsHolder.getUuid(), messageBusData, notificationData);
+
+ assertEquals(ActionStatus.OK, result);
+ verify(cambriaHandler, times(1)).sendNotificationAndClose(eq(TOPIC_NAME), eq("uebPublicKey"),
+ eq("uebPrivateKey"), eq(serverList), eq(notificationData), eq(10L));
+ }
+
+ @Test
+ public void testSendNotificationForDeleteService_WhenKafkaIsActive() {
+ INotificationData notificationData = new NotificationDataImpl();
+ when(kafkaHandler.isKafkaActive()).thenReturn(true);
+ when(kafkaHandler.sendNotification(TOPIC_NAME, notificationData))
+ .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.OK, 200));
+
+ ActionStatus result = deleteNotificationSender.sendNotificationForDeleteService(
+ TOPIC_NAME, ThreadLocalsHolder.getUuid(), null, notificationData);
+
+ assertEquals(ActionStatus.OK, result);
+ verify(kafkaHandler, times(1)).sendNotification(eq(TOPIC_NAME), eq(notificationData));
+ }
+
+ @Test
+ public void testSendNotificationForDeleteService_WhenKafkaNotificationFails() {
+ INotificationData notificationData = new NotificationDataImpl();
+ when(kafkaHandler.isKafkaActive()).thenReturn(true);
+ when(kafkaHandler.sendNotification(TOPIC_NAME, notificationData))
+ .thenReturn(new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 200));
+
+ ActionStatus result = deleteNotificationSender.sendNotificationForDeleteService(
+ TOPIC_NAME, ThreadLocalsHolder.getUuid(), null, notificationData);
+
+ assertEquals(ActionStatus.GENERAL_ERROR, result);
+ verify(kafkaHandler, times(1)).sendNotification(eq(TOPIC_NAME), eq(notificationData));
+ }
+}
deConfiguration.setDistributionNotifTopicName("distributionNotifTopicName");
deConfiguration.setDistributionStatusTopicName("statusTopic");
+ deConfiguration.setDistributionDeleteTopicName("distributionDeleteTopicName");
DistributionStatusTopicConfig distributionStatusTopic = new DistributionStatusTopicConfig();
distributionStatusTopic.setConsumerGroup("asdc-group");
import org.openecomp.sdc.be.model.User;
import org.openecomp.sdc.be.model.operations.api.StorageOperationStatus;
import org.openecomp.sdc.be.resources.data.OperationalEnvironmentEntry;
+import org.openecomp.sdc.common.util.ThreadLocalsHolder;
import java.util.HashSet;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
public static final String ENV_ID = "envId";
public static final String USER_ID = "userId";
public static final String MODIFIER = "modifier";
+ public static final String DELETE_TOPIC = "deleteTopic";
@InjectMocks
private DistributionEngine testInstance;
@Mock
private ServiceDistributionArtifactsBuilder serviceDistributionArtifactsBuilder;
+ @Mock
+ DistributionDeleteNotificationSender distributionDeleteNotificationSender;
+
private DummyDistributionConfigurationManager distributionEngineConfigurationMock;
private Map<String, OperationalEnvironmentEntry> envs;
when(serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(new NotificationDataImpl());
result = testInstance.buildServiceForDistribution(service, distributionId, workloadContext);
}
+
+ @Test
+ void testBuildServiceForDeleteNotification() {
+ Service service = new Service();
+ String distributionId = ThreadLocalsHolder.getUuid();
+ INotificationData expectedNotificationData = new NotificationDataImpl();
+ expectedNotificationData.setDistributionID(distributionId);
+
+ when(serviceDistributionArtifactsBuilder.buildResourceInstanceForDistribution(
+ eq(service), eq(distributionId), isNull()))
+ .thenReturn(expectedNotificationData);
+
+ INotificationData actual = testInstance.buildServiceForDeleteNotification(service, distributionId);
+ verify(serviceDistributionArtifactsBuilder, times(1))
+ .buildResourceInstanceForDistribution(eq(service), eq(distributionId), isNull());
+ assertEquals(expectedNotificationData, actual);
+ assertEquals(expectedNotificationData.getDistributionID(), actual.getDistributionID());
+ }
+
+ @Test
+ void testNotifyServiceForDelete() {
+ String distributionId = ThreadLocalsHolder.getUuid();
+ Service service = mock(Service.class);
+ User user = mock(User.class);
+ INotificationData notificationData = mock(INotificationData.class);
+
+ when(environmentsEngine.getEnvironmentById(ENV_ID)).thenReturn(envs.get(ENV_ID));
+ when(distributionEngineConfigurationMock.getConfigurationMock().getDistributionDeleteTopicName())
+ .thenReturn(DELETE_TOPIC);
+
+ String expectedTopicName = DistributionEngineInitTask.buildTopicName(DELETE_TOPIC, ENV_ID);
+ when(distributionDeleteNotificationSender.sendNotificationForDeleteService(
+ eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class),
+ eq(notificationData)))
+ .thenReturn(ActionStatus.OK);
+
+ ActionStatus action = testInstance.notifyServiceForDelete(distributionId, notificationData, service, ENV_ID,
+ user);
+ assertEquals(ActionStatus.OK, action);
+ verify(distributionDeleteNotificationSender, times(1)).sendNotificationForDeleteService(
+ eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class),
+ eq(notificationData));
+ }
+
+ @Test
+ void testNotifyServiceForDeleteWhenEnvironmentDoesNotExist() {
+ when(environmentsEngine.getEnvironments()).thenReturn(envs);
+ ActionStatus actionStatus = testInstance.notifyServiceForDelete(DISTRIBUTION_ID, new NotificationDataImpl(),
+ new Service(), "ENV_123", modifier);
+ assertEquals(ActionStatus.DISTRIBUTION_ENVIRONMENT_NOT_AVAILABLE, actionStatus);
+ verifyNoInteractions(distributionNotificationSender);
+ }
+
+ @Test
+ void testNotifyServiceForDeleteWhenKafkaNotificationFailed() {
+ String distributionId = ThreadLocalsHolder.getUuid();
+ Service service = mock(Service.class);
+ User user = mock(User.class);
+ INotificationData notificationData = mock(INotificationData.class);
+
+ when(environmentsEngine.getEnvironmentById(ENV_ID)).thenReturn(envs.get(ENV_ID));
+ when(distributionEngineConfigurationMock.getConfigurationMock().getDistributionDeleteTopicName())
+ .thenReturn(DELETE_TOPIC);
+
+ String expectedTopicName = DistributionEngineInitTask.buildTopicName(DELETE_TOPIC, ENV_ID);
+ when(distributionDeleteNotificationSender.sendNotificationForDeleteService(
+ eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class),
+ eq(notificationData)))
+ .thenReturn(ActionStatus.GENERAL_ERROR);
+
+ ActionStatus action = testInstance.notifyServiceForDelete(distributionId, notificationData, service, ENV_ID,
+ user);
+ assertEquals(ActionStatus.GENERAL_ERROR, action);
+ verify(distributionDeleteNotificationSender, times(1)).sendNotificationForDeleteService(
+ eq(expectedTopicName), eq(distributionId), any(EnvironmentMessageBusData.class),
+ eq(notificationData));
+ }
}
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
import org.openecomp.sdc.be.components.impl.exceptions.ComponentException;
import org.openecomp.sdc.be.dao.api.ActionStatus;
import org.openecomp.sdc.be.datatypes.elements.ArtifactDataDefinition;
import org.openecomp.sdc.be.resources.data.auditing.ResourceAdminEvent;
import org.openecomp.sdc.be.types.ServiceConsumptionData;
import org.openecomp.sdc.be.user.Role;
+import org.openecomp.sdc.common.util.ThreadLocalsHolder;
import org.openecomp.sdc.common.util.ValidationUtils;
import org.openecomp.sdc.exception.ResponseFormat;
import org.springframework.http.HttpStatus;
Mockito.when(toscaOperationFacade.getToscaElement(Mockito.anyString())).thenReturn(eitherService);
Mockito.when(toscaOperationFacade.deleteService(Mockito.anyString(), Mockito.eq(true))).thenReturn(deletedServcies);
Mockito.when(modelOperation.findModelByName(model)).thenReturn(Optional.of(normativeExtensionModel));
+
+ String mockDistributionId = ThreadLocalsHolder.getUuid();
+ INotificationData mockNotificationData = Mockito.mock(INotificationData.class);
+ Mockito.when(distributionEngine.buildServiceForDeleteNotification(
+ (Service) Mockito.eq(eitherService.left().value()), Mockito.eq(mockDistributionId)))
+ .thenReturn(mockNotificationData);
+
+ Mockito.when(distributionEngine.notifyServiceForDelete(
+ Mockito.eq(mockDistributionId),
+ Mockito.eq(mockNotificationData),
+ (Service) Mockito.eq(eitherService.left().value()),
+ Mockito.anyString(),
+ Mockito.eq(user))).thenReturn(ActionStatus.OK);
+
bl.deleteServiceAllVersions(serviceId, user);
Mockito.verify(modelOperation, Mockito.times(1)).deleteModel(normativeExtensionModel, false);
+
+ // Verify Kafka notification payload building for delete service.
+ Mockito.verify(distributionEngine, Mockito.times(1))
+ .buildServiceForDeleteNotification((Service) eitherService.left().value(), mockDistributionId);
+
+ // Verify Kafka notification sending for delete service.
+ Mockito.verify(distributionEngine, Mockito.times(1))
+ .notifyServiceForDelete(
+ Mockito.eq(mockDistributionId),
+ Mockito.eq(mockNotificationData),
+ (Service) Mockito.eq(eitherService.left().value()),
+ Mockito.anyString(),
+ Mockito.eq(user));
}
@SuppressWarnings({"unchecked", "rawtypes"})
private ExternalServiceConfig msoConfig;
private Integer opEnvRecoveryIntervalSec;
private Integer allowedTimeBeforeStaleSec;
+ private String distributionDeleteTopicName;
+ private DistributionDeleteTopicConfig distributionDeleteTopic;
public void setEnvironments(List<String> environments) {
Set<String> set = new HashSet<>();
private List<String> lifecycle;
}
+
+ @Getter
+ @Setter
+ @ToString
+ public static class DistributionDeleteTopicConfig {
+
+ private Integer maxWaitingAfterSendingSeconds;
+ private Integer maxThreadPoolSize;
+ private Integer minThreadPoolSize;
+ }
}
deDescription.putListPropertyType("distribNotifResourceArtifactTypes", ComponentArtifactTypesConfig.class);
deDescription.putListPropertyType("createTopic", CreateTopicConfig.class);
deDescription.putListPropertyType("distributionNotificationTopic", DistributionNotificationTopicConfig.class);
+ deDescription.putListPropertyType("distributionDeleteTopic", DistributionNotificationTopicConfig.class);
deConstructor.addTypeDescription(deDescription);
yamlConstructors.put(DistributionEngineConfiguration.class.getName(), deConstructor);
// FE conf
import org.apache.commons.codec.binary.Base64;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.openecomp.sdc.be.config.Configuration;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionDeleteTopicConfig;
import org.openecomp.sdc.common.http.client.api.HttpClient;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.HashMap;
+
import org.openecomp.sdc.exception.YamlConversionException;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertNull;
import static junit.framework.TestCase.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
public class YamlToObjectConverterTest {
" key: value";
private YamlToObjectConverter yamlToObjectConverter;
+ private static HashMap<String, Constructor> yamlConstructors;
+
+ /**
+ * This method is to initialize the static block before class loads.
+ *
+ * @throws Exception if exception occurs while getting the fields from class.
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ Class.forName("org.openecomp.sdc.common.util.YamlToObjectConverter");
+ Field field = Class.forName("org.openecomp.sdc.common.util.YamlToObjectConverter")
+ .getDeclaredField("yamlConstructors");
+ field.setAccessible(true);
+ yamlConstructors = (HashMap<String, Constructor>) field.get(null);
+ }
@Before
public void setUp() {
assertEquals(result.getUsers().size(),1);
assertEquals(result.getUsers().get("tom"),"passwd");
}
+
+ /**
+ * This method is to test getting the delete-topic configurations from yaml
+ * file.
+ *
+ * @throws Exception If any exception occurs while getting the configuration
+ * from yaml file.
+ */
+ @Test
+ public void testContainsDistributionDeleteTopic() throws Exception {
+ assertTrue(yamlConstructors.containsKey("org.openecomp.sdc.be.config.DistributionEngineConfiguration"));
+
+ Constructor constructor = yamlConstructors.get("org.openecomp.sdc.be.config.DistributionEngineConfiguration");
+ assertNotNull(constructor);
+
+ String yamlStr = "distributionDeleteTopicName: SDC-DELETE-TOPIC\n" +
+ "distributionDeleteTopic:\n" +
+ " maxWaitingAfterSendingSeconds: 10\n" +
+ " maxThreadPoolSize: 20\n" +
+ " minThreadPoolSize: 5\n";
+
+ Yaml yaml = new Yaml(constructor);
+ DistributionEngineConfiguration distributionEngineConfiguration = yaml.load(yamlStr);
+ assertEquals("SDC-DELETE-TOPIC", distributionEngineConfiguration.getDistributionDeleteTopicName());
+
+ DistributionDeleteTopicConfig deleteTopicConfig = distributionEngineConfiguration.getDistributionDeleteTopic();
+
+ assertNotNull(deleteTopicConfig);
+ assertEquals(Integer.valueOf(10), deleteTopicConfig.getMaxWaitingAfterSendingSeconds());
+ assertEquals(Integer.valueOf(20), deleteTopicConfig.getMaxThreadPoolSize());
+ assertEquals(Integer.valueOf(5), deleteTopicConfig.getMinThreadPoolSize());
+ }
}