import org.onap.sdc.utils.DistributionActionResultEnum;
import org.onap.sdc.utils.DistributionClientConstants;
import org.onap.sdc.utils.GeneralUtils;
+import org.onap.sdc.utils.NotificationSender;
import org.onap.sdc.utils.Pair;
import org.onap.sdc.utils.Wrapper;
import org.onap.sdc.api.notification.IVfModuleMetadata;
import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
import com.att.nsa.cambria.client.CambriaConsumer;
import com.att.nsa.cambria.client.CambriaIdentityManager;
-import com.att.nsa.cambria.client.CambriaPublisher.message;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
public class DistributionClientImpl implements IDistributionClient {
- public static final int POLLING_TIMEOUT_MULTIPLIER = 1000;
- public static final long SLEEPING_THREAD_TIME = 1000L;
- public static final long PUBLISHER_TIMEOUT = 10L;
- public static final int TERMINATION_TIMEOUT = 60;
- private static Logger log = LoggerFactory.getLogger(DistributionClientImpl.class.getName());
+ private static final int POLLING_TIMEOUT_MULTIPLIER = 1000;
+ private static final int TERMINATION_TIMEOUT = 60;
+ private static final Logger log = LoggerFactory.getLogger(DistributionClientImpl.class);
private SdcConnectorClient asdcConnector;
private ScheduledExecutorService executorPool = null;
protected CambriaIdentityManager cambriaIdentityManager = null;
private List<String> brokerServers;
- protected ApiCredential credential;
+ private ApiCredential credential;
protected Configuration configuration;
private INotificationCallback callback;
private IStatusCallback statusCallback;
private String notificationTopic;
private String statusTopic;
private boolean isConsumerGroupGenerated = false;
+ private NotificationSender notificationSender;
- private boolean isInitialized, isStarted, isTerminated;
+ private boolean isInitialized;
+ private boolean isStarted;
+ private boolean isTerminated;
@Override
public IConfiguration getConfiguration() {
}
// 1. get ueb server list from configuration
if (errorWrapper.isEmpty()) {
- initUebServerList(errorWrapper);
+ List<String> servers = initUebServerList(errorWrapper);
+ if (servers != null) {
+ this.brokerServers = servers;
+ }
}
// 2.validate artifact types against asdc server
if (errorWrapper.isEmpty()) {
// 3. create keys
if (errorWrapper.isEmpty()) {
this.callback = callback;
- createUebKeys(errorWrapper);
+ ApiCredential apiCredential = createUebKeys(errorWrapper);
+ if (apiCredential != null) {
+ this.credential = apiCredential;
+ }
}
// 4. register for topics
if (errorWrapper.isEmpty()) {
- registerForTopics(errorWrapper);
+ TopicRegistrationResponse topics = registerForTopics(errorWrapper, this.credential);
+ if (topics != null) {
+ this.notificationTopic = topics.getDistrNotificationTopicName();
+ this.statusTopic = topics.getDistrStatusTopicName();
+ this.notificationSender = createNotificationSender();
+ }
}
IDistributionClientResult result;
return new SdcConnectorClient(configuration, new HttpAsdcClient(configuration));
}
- private void registerForTopics(Wrapper<IDistributionClientResult> errorWrapper) {
+ private NotificationSender createNotificationSender() {
+ return new NotificationSender(brokerServers);
+ }
+
+ private TopicRegistrationResponse registerForTopics(Wrapper<IDistributionClientResult> errorWrapper, ApiCredential credential) {
Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics = asdcConnector.registerAsdcTopics(credential);
if (registerAsdcTopics.isRight()) {
}
errorWrapper.setInnerElement(registerAsdcTopics.right().value());
} else {
- TopicRegistrationResponse topics = registerAsdcTopics.left().value();
- notificationTopic = topics.getDistrNotificationTopicName();
- statusTopic = topics.getDistrStatusTopicName();
+ return registerAsdcTopics.left().value();
}
-
+ return null;
}
- private void createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) {
+ private ApiCredential createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) {
+ ApiCredential apiCredential = null;
+
initCambriaClient(errorWrapper);
if (errorWrapper.isEmpty()) {
log.debug("create keys");
- DistributionClientResultImpl createKeysResponse = createUebKeys();
+ Pair<DistributionClientResultImpl, ApiCredential> uebKeys = createUebKeys();
+ DistributionClientResultImpl createKeysResponse = uebKeys.getFirst();
+ apiCredential = uebKeys.getSecond();
if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) {
errorWrapper.setInnerElement(createKeysResponse);
}
}
+ return apiCredential;
}
private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) {
}
}
- private void initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) {
+ private List<String> initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) {
+ List<String> brokerServers = null;
log.debug("get ueb cluster server list from component(configuration file)");
Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList();
if (serverListResponse.isRight()) {
errorWrapper.setInnerElement(serverListResponse.right().value());
} else {
-
brokerServers = serverListResponse.left().value();
}
+ return brokerServers;
}
private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) {
@Override
public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage) {
log.info("DistributionClient - sendDownloadStatus");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
-
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
- private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder builder) {
- DistributionClientResultImpl statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
- log.info("DistributionClient - sendStatus");
- Either<CambriaBatchingPublisher, IDistributionClientResult> eitherPublisher = getCambriaPublisher();
- if (eitherPublisher.isRight()) {
- return eitherPublisher.right().value();
- }
- CambriaBatchingPublisher pub = eitherPublisher.left().value();
-
- log.debug("after create publisher server list " + brokerServers.toString());
- String jsonRequest = builder.build();
-
- log.debug("try to send status " + jsonRequest);
-
- try {
- pub.send("MyPartitionKey", jsonRequest);
- Thread.sleep(SLEEPING_THREAD_TIME);
- } catch (IOException e) {
- log.debug("DistributionClient - sendDownloadStatus. Failed to send download status");
- } catch (InterruptedException e) {
- log.debug("DistributionClient - sendDownloadStatus. thread was interrupted");
- } finally {
-
- try {
- List<message> stuck = pub.close(PUBLISHER_TIMEOUT, TimeUnit.SECONDS);
-
- if (!stuck.isEmpty()) {
- log.debug("DistributionClient - sendDownloadStatus. " + stuck.size() + " messages unsent");
- } else {
- statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "messages successfully sent");
- }
- } catch (IOException | InterruptedException e) {
- log.debug("DistributionClient - sendDownloadStatus. failed to send messages and close publisher ");
- }
+ private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder statusBuilder) {
+ IDistributionClientResult distributionResult;
+ Either<CambriaBatchingPublisher, IDistributionClientResult> cambriaPublisher = getCambriaPublisher(statusTopic, configuration, brokerServers, credential);
+ if (cambriaPublisher.isRight()) {
+ distributionResult = cambriaPublisher.right().value();
+ } else {
+ String statusMessage = statusBuilder.build();
+ distributionResult = notificationSender.send(cambriaPublisher.left().value(), statusMessage);
}
- return statusResult;
+
+ return distributionResult;
}
- private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher() {
+ private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher(String statusTopic, Configuration configuration, List<String> brokerServers, ApiCredential credential) {
CambriaBatchingPublisher cambriaPublisher = null;
try {
- cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers).build();
+ cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap())
+ .usingHosts(brokerServers).build();
cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
} catch (MalformedURLException | GeneralSecurityException e) {
Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
@Override
public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage) {
log.info("DistributionClient - sendDeploymentStatus");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
IDistributionClientResult sendNotificationStatus(long currentTimeMillis, String distributionId, ArtifactInfoImpl artifactInfo, boolean isNotified) {
if (!errorWrapper.isEmpty()) {
return errorWrapper.getInnerElement();
}
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified));
+ IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified);
+ return sendStatus(builder);
}
/* *************************** Private Methods *************************************************** */
- protected DistributionClientResultImpl createUebKeys() {
+ protected Pair<DistributionClientResultImpl, ApiCredential> createUebKeys() {
DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly");
+ ApiCredential credential = null;
try {
String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID());
credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description);
response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage());
log.error(response.toString());
}
- return response;
+ return new Pair<>(response, credential);
}
private IDistributionClientResult restartConsumer() {
private void validateRunReady(Wrapper<IDistributionClientResult> errorWrapper) {
if (errorWrapper.isEmpty()) {
- validateInitilized(errorWrapper);
+ validateInitialized(errorWrapper);
}
if (errorWrapper.isEmpty()) {
validateNotTerminated(errorWrapper);
}
- private void validateInitilized(Wrapper<IDistributionClientResult> errorWrapper) {
+ private void validateInitialized(Wrapper<IDistributionClientResult> errorWrapper) {
if (!isInitialized) {
log.debug("client was not initialized");
IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized");
@Override
public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage, String errorReason) {
log.info("DistributionClient - sendDownloadStatus with errorReason");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
-
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
}
@Override
public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage, String errorReason) {
log.info("DistributionClient - sendDeploymentStatus with errorReason");
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+
+ }
+
+ private IDistributionClientResult sendErrorStatus(IDistributionStatusMessageJsonBuilder builder) {
Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
validateRunReady(errorWrapper);
if (!errorWrapper.isEmpty()) {
return errorWrapper.getInnerElement();
}
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
-
+ return sendStatus(builder);
}
@Override
public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage) {
log.info("DistributionClient - sendComponentDone status");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage,
String errorReason) {
log.info("DistributionClient - sendComponentDone status with errorReason");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
}
public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage) {
log.info("DistributionClient - sendFinalDistributionStatus status");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage,
String errorReason) {
log.info("DistributionClient - sendFinalDistributionStatus status with errorReason");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
}
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.sdc.utils;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaPublisher;
+import org.onap.sdc.api.results.IDistributionClientResult;
+import org.onap.sdc.impl.DistributionClientResultImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class NotificationSender {
+
+ private static final Logger log = LoggerFactory.getLogger(NotificationSender.class);
+ private static final long PUBLISHER_CLOSING_TIMEOUT = 10L;
+ private static final long SLEEP_TIME = 1;
+
+ private final List<String> brokerServers;
+
+ public NotificationSender(List<String> brokerServers) {
+ this.brokerServers = brokerServers;
+ }
+
+ public IDistributionClientResult send(CambriaBatchingPublisher publisher, String status) {
+ log.info("DistributionClient - sendStatus");
+ DistributionClientResultImpl distributionResult;
+ try {
+ log.debug("Publisher server list: {}", brokerServers);
+ log.debug("Trying to send status: {}", status);
+ publisher.send("MyPartitionKey", status);
+ TimeUnit.SECONDS.sleep(SLEEP_TIME);
+ } catch (IOException | InterruptedException e) {
+ log.error("DistributionClient - sendDownloadStatus. Failed to send download status", e);
+ } finally {
+ distributionResult = closePublisher(publisher);
+ }
+ return distributionResult;
+ }
+
+ private DistributionClientResultImpl closePublisher(CambriaBatchingPublisher publisher) {
+ DistributionClientResultImpl distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
+ try {
+ List<CambriaPublisher.message> notSentMessages = publisher.close(PUBLISHER_CLOSING_TIMEOUT, TimeUnit.SECONDS);
+ if (notSentMessages.isEmpty()) {
+ distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
+ } else {
+ log.debug("DistributionClient - sendDownloadStatus. {} messages were not sent", notSentMessages.size());
+ }
+ } catch (IOException | InterruptedException e) {
+ log.error("DistributionClient - sendDownloadStatus. Failed to send messages and close publisher.", e);
+ }
+ return distributionResult;
+ }
+}
--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.sdc.utils;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaPublisher;
+import fj.data.Either;
+import org.junit.Test;
+import org.onap.sdc.api.results.IDistributionClientResult;
+import org.onap.sdc.impl.DistributionClientResultImpl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class NotificationSenderTest {
+
+ private final String status = "status";
+ private final CambriaPublisher.message message = new CambriaPublisher.message("sample-partition", "sample-message");
+ private final List<CambriaPublisher.message> notEmptySendingFailedMessages = Collections.singletonList(message);
+ private final DistributionClientResultImpl successResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
+ private final DistributionClientResultImpl generalErrorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
+
+ private final CambriaBatchingPublisher publisher = mock(CambriaBatchingPublisher.class);
+ private final List<String> emptyServers = Collections.emptyList();
+ private final NotificationSender validNotificationSender = new NotificationSender(emptyServers);;
+
+
+ @Test
+ public void whenPublisherIsValidAndNoExceptionsAreThrownShouldReturnSuccessStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenReturn(0);
+ when(publisher.close(anyLong(), any())).thenReturn(Collections.emptyList());
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(successResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+
+ @Test
+ public void whenPublisherCouldNotSendShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenReturn(0);
+ when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+
+ @Test
+ public void whenSendingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenThrow(new IOException());
+ when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+
+ @Test
+ public void whenSendingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenAnswer(invocationOnMock -> {throw new InterruptedException();});
+ when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+
+ @Test
+ public void whenClosingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenReturn(0);
+ when(publisher.close(anyLong(), any())).thenThrow(new IOException());
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+
+ @Test
+ public void whenClosingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenReturn(0);
+ when(publisher.close(anyLong(), any())).thenAnswer(invocationOnMock -> {throw new InterruptedException();});
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+}
\ No newline at end of file