Improve code quality 23/116023/9
authorMichal Banka <michal.banka@nokia.com>
Wed, 2 Dec 2020 09:41:26 +0000 (10:41 +0100)
committerMicha? Ba?ka <michal.banka@nokia.com>
Thu, 3 Dec 2020 10:26:42 +0000 (10:26 +0000)
- Extracted NotificationSender class from DistributionClientImpl
- Fixed list formatting in README
- Other small refactors
- +2% code coverage

Change-Id: I753502d13504057804fcb3557c808dae2520cc74
Signed-off-by: Michal Banka <michal.banka@nokia.com>
Issue-ID: SDC-3388

README.md
sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java
sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionStatusMessageJsonBuilderFactory.java
sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionStatusEnum.java
sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java [new file with mode: 0644]
sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java [new file with mode: 0644]

index 1f455f7..17c519c 100644 (file)
--- a/README.md
+++ b/README.md
@@ -23,18 +23,18 @@ Every client that wants to use the JAR, need to implement IConfiguration interfa
 
 Configuration parameters:
 --------------------------
-AsdcAddress                    : ASDC Distribution Engine address. Value can be either hostname (with or without port), IP:port or FQDN (Fully Qualified Domain Name).
-User                                   : User Name for ASDC distribution consumer authentication.
-Password                               : User Password for ASDC distribution consumer authentication.
-PollingInterval                        : Distribution Client Polling Interval towards UEB in seconds. Can Be reconfigured in runtime.
-PollingTimeout                 : Distribution Client Timeout in seconds waiting to UEB server response in each fetch interval. Can Be reconfigured in runtime.
-RelevantArtifactTypes  : List of artifact types. If the service contains any of the artifacts in the list, the callback will be activated. Can Be reconfigured in runtime.
-ConsumerGroup                  : Returns the consumer group defined for this ONAP component, if no consumer group is defined return null. 
-EnvironmentName                        : Returns the environment name (testing, production etc... Can Be reconfigured in runtime.
-ConsumerID                             : Unique ID of ONAP component instance (e.x INSTAR name).
-KeyStorePath                   : Return full path to Client's Key Store that contains either CA certificate or the ASDC's public key (e.g /etc/keystore/asdc-client.jks). file will be deployed with asdc-distribution jar
-KeyStorePassword               : Return client's Key Store password.
-activateServerTLSAuth  : Sets whether ASDC server TLS authentication is activated. If set to false, Key Store path and password are not needed to be set.
+- AsdcAddress                  : ASDC Distribution Engine address. Value can be either hostname (with or without port), IP:port or FQDN (Fully Qualified Domain Name).
+- User                                 : User Name for ASDC distribution consumer authentication.
+- Password                             : User Password for ASDC distribution consumer authentication.
+- PollingInterval                      : Distribution Client Polling Interval towards UEB in seconds. Can Be reconfigured in runtime.
+- PollingTimeout                       : Distribution Client Timeout in seconds waiting to UEB server response in each fetch interval. Can Be reconfigured in runtime.
+- RelevantArtifactTypes        : List of artifact types. If the service contains any of the artifacts in the list, the callback will be activated. Can Be reconfigured in runtime.
+- ConsumerGroup                        : Returns the consumer group defined for this ONAP component, if no consumer group is defined return null. 
+- EnvironmentName                      : Returns the environment name (testing, production etc... Can Be reconfigured in runtime.
+- ConsumerID                           : Unique ID of ONAP component instance (e.x INSTAR name).
+- KeyStorePath                 : Return full path to Client's Key Store that contains either CA certificate or the ASDC's public key (e.g /etc/keystore/asdc-client.jks). file will be deployed with asdc-distribution jar
+- KeyStorePassword             : Return client's Key Store password.
+- activateServerTLSAuth        : Sets whether ASDC server TLS authentication is activated. If set to false, Key Store path and password are not needed to be set.
 
 Example of configuration file implementing IConfiguration interface:
 --------------------------------------------------------------------
index 2999ebe..36044c7 100644 (file)
@@ -190,12 +190,11 @@ public class SdcConnectorClient {
         HttpAsdcResponse downloadResponse = downloadPair.getFirst();
 
         int status = downloadResponse.getStatus();
-        if (status == HttpStatus.SC_OK) {
 
+        if (status == HttpStatus.SC_OK) {
             response = parseDownloadArtifactResponse(artifactInfo, downloadResponse);
         } else {
             response = handleAsdcDownloadArtifactError(downloadResponse);
-
         }
         handeAsdcConnectionClose(downloadPair);
         return response;
index 136d43e..8e3aa9b 100644 (file)
@@ -55,6 +55,7 @@ import org.onap.sdc.http.TopicRegistrationResponse;
 import org.onap.sdc.utils.DistributionActionResultEnum;
 import org.onap.sdc.utils.DistributionClientConstants;
 import org.onap.sdc.utils.GeneralUtils;
+import org.onap.sdc.utils.NotificationSender;
 import org.onap.sdc.utils.Pair;
 import org.onap.sdc.utils.Wrapper;
 import org.onap.sdc.api.notification.IVfModuleMetadata;
@@ -71,7 +72,6 @@ import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
 import com.att.nsa.cambria.client.CambriaConsumer;
 import com.att.nsa.cambria.client.CambriaIdentityManager;
-import com.att.nsa.cambria.client.CambriaPublisher.message;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
@@ -80,25 +80,26 @@ import fj.data.Either;
 
 public class DistributionClientImpl implements IDistributionClient {
 
-    public static final int POLLING_TIMEOUT_MULTIPLIER = 1000;
-    public static final long SLEEPING_THREAD_TIME = 1000L;
-    public static final long PUBLISHER_TIMEOUT = 10L;
-    public static final int TERMINATION_TIMEOUT = 60;
-    private static Logger log = LoggerFactory.getLogger(DistributionClientImpl.class.getName());
+    private static final int POLLING_TIMEOUT_MULTIPLIER = 1000;
+    private static final int TERMINATION_TIMEOUT = 60;
+    private static final Logger log = LoggerFactory.getLogger(DistributionClientImpl.class);
 
     private SdcConnectorClient asdcConnector;
     private ScheduledExecutorService executorPool = null;
     protected CambriaIdentityManager cambriaIdentityManager = null;
     private List<String> brokerServers;
-    protected ApiCredential credential;
+    private ApiCredential credential;
     protected Configuration configuration;
     private INotificationCallback callback;
     private IStatusCallback statusCallback;
     private String notificationTopic;
     private String statusTopic;
     private boolean isConsumerGroupGenerated = false;
+    private NotificationSender notificationSender;
 
-    private boolean isInitialized, isStarted, isTerminated;
+    private boolean isInitialized;
+    private boolean isStarted;
+    private boolean isTerminated;
 
     @Override
     public IConfiguration getConfiguration() {
@@ -292,7 +293,10 @@ public class DistributionClientImpl implements IDistributionClient {
         }
         // 1. get ueb server list from configuration
         if (errorWrapper.isEmpty()) {
-            initUebServerList(errorWrapper);
+            List<String> servers = initUebServerList(errorWrapper);
+            if (servers != null) {
+                this.brokerServers = servers;
+            }
         }
         // 2.validate artifact types against asdc server
         if (errorWrapper.isEmpty()) {
@@ -301,11 +305,19 @@ public class DistributionClientImpl implements IDistributionClient {
         // 3. create keys
         if (errorWrapper.isEmpty()) {
             this.callback = callback;
-            createUebKeys(errorWrapper);
+            ApiCredential apiCredential = createUebKeys(errorWrapper);
+            if (apiCredential != null) {
+                this.credential = apiCredential;
+            }
         }
         // 4. register for topics
         if (errorWrapper.isEmpty()) {
-            registerForTopics(errorWrapper);
+            TopicRegistrationResponse topics = registerForTopics(errorWrapper, this.credential);
+            if (topics != null) {
+                this.notificationTopic = topics.getDistrNotificationTopicName();
+                this.statusTopic = topics.getDistrStatusTopicName();
+                this.notificationSender = createNotificationSender();
+            }
         }
 
         IDistributionClientResult result;
@@ -323,7 +335,11 @@ public class DistributionClientImpl implements IDistributionClient {
         return new SdcConnectorClient(configuration, new HttpAsdcClient(configuration));
     }
 
-    private void registerForTopics(Wrapper<IDistributionClientResult> errorWrapper) {
+    private NotificationSender createNotificationSender() {
+        return new NotificationSender(brokerServers);
+    }
+
+    private TopicRegistrationResponse registerForTopics(Wrapper<IDistributionClientResult> errorWrapper, ApiCredential credential) {
         Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics = asdcConnector.registerAsdcTopics(credential);
         if (registerAsdcTopics.isRight()) {
 
@@ -334,22 +350,25 @@ public class DistributionClientImpl implements IDistributionClient {
             }
             errorWrapper.setInnerElement(registerAsdcTopics.right().value());
         } else {
-            TopicRegistrationResponse topics = registerAsdcTopics.left().value();
-            notificationTopic = topics.getDistrNotificationTopicName();
-            statusTopic = topics.getDistrStatusTopicName();
+            return registerAsdcTopics.left().value();
         }
-
+        return null;
     }
 
-    private void createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) {
+    private ApiCredential createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) {
+        ApiCredential apiCredential = null;
+
         initCambriaClient(errorWrapper);
         if (errorWrapper.isEmpty()) {
             log.debug("create keys");
-            DistributionClientResultImpl createKeysResponse = createUebKeys();
+            Pair<DistributionClientResultImpl, ApiCredential> uebKeys = createUebKeys();
+            DistributionClientResultImpl createKeysResponse = uebKeys.getFirst();
+            apiCredential = uebKeys.getSecond();
             if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) {
                 errorWrapper.setInnerElement(createKeysResponse);
             }
         }
+        return apiCredential;
     }
 
     private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) {
@@ -376,17 +395,18 @@ public class DistributionClientImpl implements IDistributionClient {
         }
     }
 
-    private void initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) {
+    private List<String> initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) {
+        List<String> brokerServers = null;
         log.debug("get ueb cluster server list from component(configuration file)");
 
         Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList();
         if (serverListResponse.isRight()) {
             errorWrapper.setInnerElement(serverListResponse.right().value());
         } else {
-
             brokerServers = serverListResponse.left().value();
         }
 
+        return brokerServers;
     }
 
     private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) {
@@ -400,58 +420,28 @@ public class DistributionClientImpl implements IDistributionClient {
     @Override
     public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage) {
         log.info("DistributionClient - sendDownloadStatus");
-        Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
-        validateRunReady(errorWrapper);
-        if (!errorWrapper.isEmpty()) {
-            return errorWrapper.getInnerElement();
-        }
-
-        return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+        return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
     }
 
-    private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder builder) {
-        DistributionClientResultImpl statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
-        log.info("DistributionClient - sendStatus");
-        Either<CambriaBatchingPublisher, IDistributionClientResult> eitherPublisher = getCambriaPublisher();
-        if (eitherPublisher.isRight()) {
-            return eitherPublisher.right().value();
-        }
-        CambriaBatchingPublisher pub = eitherPublisher.left().value();
-
-        log.debug("after create publisher server list " + brokerServers.toString());
-        String jsonRequest = builder.build();
-
-        log.debug("try to send status " + jsonRequest);
-
-        try {
-            pub.send("MyPartitionKey", jsonRequest);
-            Thread.sleep(SLEEPING_THREAD_TIME);
-        } catch (IOException e) {
-            log.debug("DistributionClient - sendDownloadStatus. Failed to send download status");
-        } catch (InterruptedException e) {
-            log.debug("DistributionClient - sendDownloadStatus. thread was interrupted");
-        } finally {
-
-            try {
-                List<message> stuck = pub.close(PUBLISHER_TIMEOUT, TimeUnit.SECONDS);
-
-                if (!stuck.isEmpty()) {
-                    log.debug("DistributionClient - sendDownloadStatus. " + stuck.size() + " messages unsent");
-                } else {
-                    statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "messages successfully sent");
-                }
-            } catch (IOException | InterruptedException e) {
-                log.debug("DistributionClient - sendDownloadStatus. failed to send messages and close publisher ");
-            }
+    private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder statusBuilder) {
+        IDistributionClientResult distributionResult;
 
+        Either<CambriaBatchingPublisher, IDistributionClientResult> cambriaPublisher = getCambriaPublisher(statusTopic, configuration, brokerServers, credential);
+        if (cambriaPublisher.isRight()) {
+            distributionResult = cambriaPublisher.right().value();
+        } else {
+            String statusMessage = statusBuilder.build();
+            distributionResult = notificationSender.send(cambriaPublisher.left().value(), statusMessage);
         }
-        return statusResult;
+
+        return distributionResult;
     }
 
-    private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher() {
+    private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher(String statusTopic, Configuration configuration, List<String> brokerServers, ApiCredential credential) {
         CambriaBatchingPublisher cambriaPublisher = null;
         try {
-            cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers).build();
+            cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap())
+                    .usingHosts(brokerServers).build();
             cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
         } catch (MalformedURLException | GeneralSecurityException e) {
             Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
@@ -464,12 +454,7 @@ public class DistributionClientImpl implements IDistributionClient {
     @Override
     public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage) {
         log.info("DistributionClient - sendDeploymentStatus");
-        Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
-        validateRunReady(errorWrapper);
-        if (!errorWrapper.isEmpty()) {
-            return errorWrapper.getInnerElement();
-        }
-        return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+        return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
     }
 
     IDistributionClientResult sendNotificationStatus(long currentTimeMillis, String distributionId, ArtifactInfoImpl artifactInfo, boolean isNotified) {
@@ -479,13 +464,15 @@ public class DistributionClientImpl implements IDistributionClient {
         if (!errorWrapper.isEmpty()) {
             return errorWrapper.getInnerElement();
         }
-        return sendStatus(DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified));
+        IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified);
+        return sendStatus(builder);
     }
 
     /* *************************** Private Methods *************************************************** */
 
-    protected DistributionClientResultImpl createUebKeys() {
+    protected Pair<DistributionClientResultImpl, ApiCredential> createUebKeys() {
         DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly");
+        ApiCredential credential = null;
         try {
             String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID());
             credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description);
@@ -495,7 +482,7 @@ public class DistributionClientImpl implements IDistributionClient {
             response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage());
             log.error(response.toString());
         }
-        return response;
+        return new Pair<>(response, credential);
     }
 
     private IDistributionClientResult restartConsumer() {
@@ -619,7 +606,7 @@ public class DistributionClientImpl implements IDistributionClient {
 
     private void validateRunReady(Wrapper<IDistributionClientResult> errorWrapper) {
         if (errorWrapper.isEmpty()) {
-            validateInitilized(errorWrapper);
+            validateInitialized(errorWrapper);
         }
         if (errorWrapper.isEmpty()) {
             validateNotTerminated(errorWrapper);
@@ -627,7 +614,7 @@ public class DistributionClientImpl implements IDistributionClient {
 
     }
 
-    private void validateInitilized(Wrapper<IDistributionClientResult> errorWrapper) {
+    private void validateInitialized(Wrapper<IDistributionClientResult> errorWrapper) {
         if (!isInitialized) {
             log.debug("client was not initialized");
             IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized");
@@ -694,37 +681,30 @@ public class DistributionClientImpl implements IDistributionClient {
     @Override
     public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage, String errorReason) {
         log.info("DistributionClient - sendDownloadStatus with errorReason");
-        Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
-        validateRunReady(errorWrapper);
-        if (!errorWrapper.isEmpty()) {
-            return errorWrapper.getInnerElement();
-        }
-
-        return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+        return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
 
     }
 
     @Override
     public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage, String errorReason) {
         log.info("DistributionClient - sendDeploymentStatus with errorReason");
+        return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+
+    }
+
+    private IDistributionClientResult sendErrorStatus(IDistributionStatusMessageJsonBuilder builder) {
         Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
         validateRunReady(errorWrapper);
         if (!errorWrapper.isEmpty()) {
             return errorWrapper.getInnerElement();
         }
-        return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
-
+        return sendStatus(builder);
     }
 
     @Override
     public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage) {
         log.info("DistributionClient - sendComponentDone status");
-        Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
-        validateRunReady(errorWrapper);
-        if (!errorWrapper.isEmpty()) {
-            return errorWrapper.getInnerElement();
-        }
-        return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+        return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
 
     }
 
@@ -732,12 +712,7 @@ public class DistributionClientImpl implements IDistributionClient {
     public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage,
                                                              String errorReason) {
         log.info("DistributionClient - sendComponentDone status with errorReason");
-        Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
-        validateRunReady(errorWrapper);
-        if (!errorWrapper.isEmpty()) {
-            return errorWrapper.getInnerElement();
-        }
-        return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+        return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
     }
 
 
@@ -754,12 +729,7 @@ public class DistributionClientImpl implements IDistributionClient {
 
     public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage) {
         log.info("DistributionClient - sendFinalDistributionStatus status");
-        Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
-        validateRunReady(errorWrapper);
-        if (!errorWrapper.isEmpty()) {
-            return errorWrapper.getInnerElement();
-        }
-        return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+        return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
 
     }
 
@@ -768,12 +738,7 @@ public class DistributionClientImpl implements IDistributionClient {
     public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage,
                                                           String errorReason) {
         log.info("DistributionClient - sendFinalDistributionStatus status with errorReason");
-        Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
-        validateRunReady(errorWrapper);
-        if (!errorWrapper.isEmpty()) {
-            return errorWrapper.getInnerElement();
-        }
-        return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+        return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
 
 
     }
index 9792467..62be395 100644 (file)
@@ -77,19 +77,10 @@ public class DistributionStatusMessageJsonBuilderFactory {
     static IDistributionStatusMessageJsonBuilder prepareBuilderForNotificationStatus(final String consumerId, final long currentTimeMillis, final String distributionId,
                                                                                      final ArtifactInfoImpl artifactInfo, boolean isNotified) {
 
-        final DistributionStatusEnum fakeStatusToReplace = DistributionStatusEnum.DOWNLOAD_OK;
-        final String jsonRequest = buildDistributionStatusJson(consumerId, currentTimeMillis, distributionId, artifactInfo, fakeStatusToReplace);
-
-        DistributionStatusNotificationEnum notificationStatus = isNotified ? DistributionStatusNotificationEnum.NOTIFIED : DistributionStatusNotificationEnum.NOT_NOTIFIED;
-        final String changedRequest = jsonRequest.replace(fakeStatusToReplace.name(), notificationStatus.name());
-        IDistributionStatusMessageJsonBuilder builder = new IDistributionStatusMessageJsonBuilder() {
-            @Override
-            public String build() {
-                return changedRequest;
-            }
-        };
-        return builder;
+        final DistributionStatusEnum distributionStatus = isNotified ? DistributionStatusEnum.NOTIFIED : DistributionStatusEnum.NOT_NOTIFIED;
+        final String jsonRequest = buildDistributionStatusJson(consumerId, currentTimeMillis, distributionId, artifactInfo, distributionStatus);
 
+        return () -> jsonRequest;
     }
 
     private static String buildDistributionStatusJson(final String consumerId,
@@ -125,24 +116,10 @@ public class DistributionStatusMessageJsonBuilderFactory {
         };
 
         DistributionStatusMessageImpl message = new DistributionStatusMessageImpl(statusMessage);
-        final String jsonRequest = gson.toJson(message);
-        return jsonRequest;
+        return gson.toJson(message);
     }
 
     private static IDistributionStatusMessageJsonBuilder prepareBuilderFromImpl(DistributionStatusMessageImpl message) {
-        final String jsonRequest = gson.toJson(message);
-        IDistributionStatusMessageJsonBuilder builder = new IDistributionStatusMessageJsonBuilder() {
-            @Override
-            public String build() {
-                return jsonRequest;
-            }
-        };
-        return builder;
-    }
-
-    private enum DistributionStatusNotificationEnum {
-        NOTIFIED, NOT_NOTIFIED
+        return () -> gson.toJson(message);
     }
-
-
 }
index 3e7f061..d77ac60 100644 (file)
@@ -54,7 +54,6 @@ public enum DistributionStatusEnum {
      * ONAP component is requested to publish this status once component successfully complete downloading and storing all the data it needs from the service.
      */
     COMPONENT_DONE_OK,
-
     /**
      * ONAP component is requested to publish this status when component failed to download or failed to store one or more of the mandatory information it requires from the service model.
      * <p>
@@ -66,5 +65,10 @@ public enum DistributionStatusEnum {
      */
     DISTRIBUTION_COMPLETE_OK,
 
-    DISTRIBUTION_COMPLETE_ERROR
+    DISTRIBUTION_COMPLETE_ERROR,
+
+    NOTIFIED,
+
+    NOT_NOTIFIED
+
 }
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java
new file mode 100644 (file)
index 0000000..1fb71a6
--- /dev/null
@@ -0,0 +1,76 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.sdc.utils;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaPublisher;
+import org.onap.sdc.api.results.IDistributionClientResult;
+import org.onap.sdc.impl.DistributionClientResultImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class NotificationSender {
+
+    private static final Logger log = LoggerFactory.getLogger(NotificationSender.class);
+    private static final long PUBLISHER_CLOSING_TIMEOUT = 10L;
+    private static final long SLEEP_TIME = 1;
+
+    private final List<String> brokerServers;
+
+    public NotificationSender(List<String> brokerServers) {
+        this.brokerServers = brokerServers;
+    }
+
+    public IDistributionClientResult send(CambriaBatchingPublisher publisher, String status) {
+        log.info("DistributionClient - sendStatus");
+        DistributionClientResultImpl distributionResult;
+        try {
+            log.debug("Publisher server list: {}", brokerServers);
+            log.debug("Trying to send status: {}", status);
+            publisher.send("MyPartitionKey", status);
+            TimeUnit.SECONDS.sleep(SLEEP_TIME);
+        } catch (IOException | InterruptedException e) {
+            log.error("DistributionClient - sendDownloadStatus. Failed to send download status", e);
+        } finally {
+            distributionResult = closePublisher(publisher);
+        }
+        return distributionResult;
+    }
+
+    private DistributionClientResultImpl closePublisher(CambriaBatchingPublisher publisher) {
+        DistributionClientResultImpl distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
+        try {
+            List<CambriaPublisher.message> notSentMessages = publisher.close(PUBLISHER_CLOSING_TIMEOUT, TimeUnit.SECONDS);
+            if (notSentMessages.isEmpty()) {
+                distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
+            } else {
+                log.debug("DistributionClient - sendDownloadStatus. {} messages were not sent", notSentMessages.size());
+            }
+        } catch (IOException | InterruptedException e) {
+            log.error("DistributionClient - sendDownloadStatus. Failed to send messages and close publisher.", e);
+        }
+        return distributionResult;
+    }
+}
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java
new file mode 100644 (file)
index 0000000..0be7793
--- /dev/null
@@ -0,0 +1,132 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.sdc.utils;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaPublisher;
+import fj.data.Either;
+import org.junit.Test;
+import org.onap.sdc.api.results.IDistributionClientResult;
+import org.onap.sdc.impl.DistributionClientResultImpl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class NotificationSenderTest {
+
+    private final String status = "status";
+    private final CambriaPublisher.message message = new CambriaPublisher.message("sample-partition", "sample-message");
+    private final List<CambriaPublisher.message> notEmptySendingFailedMessages = Collections.singletonList(message);
+    private final DistributionClientResultImpl successResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
+    private final DistributionClientResultImpl generalErrorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
+
+    private final CambriaBatchingPublisher publisher = mock(CambriaBatchingPublisher.class);
+    private final List<String> emptyServers = Collections.emptyList();
+    private final NotificationSender validNotificationSender = new NotificationSender(emptyServers);;
+
+
+    @Test
+    public void whenPublisherIsValidAndNoExceptionsAreThrownShouldReturnSuccessStatus() throws IOException, InterruptedException {
+        //given
+        when(publisher.send(anyString(), anyString())).thenReturn(0);
+        when(publisher.close(anyLong(), any())).thenReturn(Collections.emptyList());
+
+        //when
+        IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+        //then
+        assertEquals(successResponse.getDistributionActionResult(), result.getDistributionActionResult());
+    }
+
+    @Test
+    public void whenPublisherCouldNotSendShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+        //given
+        when(publisher.send(anyString(), anyString())).thenReturn(0);
+        when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
+
+        //when
+        IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+        //then
+        assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+    }
+
+    @Test
+    public void whenSendingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+        //given
+        when(publisher.send(anyString(), anyString())).thenThrow(new IOException());
+        when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
+
+        //when
+        IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+        //then
+        assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+    }
+
+    @Test
+    public void whenSendingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+        //given
+        when(publisher.send(anyString(), anyString())).thenAnswer(invocationOnMock -> {throw new InterruptedException();});
+        when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
+
+        //when
+        IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+        //then
+        assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+    }
+
+    @Test
+    public void whenClosingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+        //given
+        when(publisher.send(anyString(), anyString())).thenReturn(0);
+        when(publisher.close(anyLong(), any())).thenThrow(new IOException());
+
+        //when
+        IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+        //then
+        assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+    }
+
+    @Test
+    public void whenClosingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+        //given
+        when(publisher.send(anyString(), anyString())).thenReturn(0);
+        when(publisher.close(anyLong(), any())).thenAnswer(invocationOnMock -> {throw new InterruptedException();});
+
+        //when
+        IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+        //then
+        assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+    }
+}
\ No newline at end of file