Revert "KPI-MS Switch from Cambria library to dmaap-sdk" 42/131442/3 1.0.10-kpi-computation-ms
authormalar <malarvizhi.44@wipro.com>
Wed, 12 Oct 2022 07:22:02 +0000 (07:22 +0000)
committermalar <malarvizhi.44@wipro.com>
Wed, 12 Oct 2022 08:01:49 +0000 (08:01 +0000)
This reverts commit 8db39359abef9f94c8cbec10189cd295cf1d814f.

Issue-ID: DCAEGEN2-3180
Change-Id: I41c3fc52ec93aac5473869f2098fede89b340d34
Signed-off-by: malar <malarvizhi.44@wipro.com>
12 files changed:
components/kpi-computation-ms/Changelog.md
components/kpi-computation-ms/pom.xml
components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java
components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java
components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java
components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java
components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java
components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java
components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java
components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java
components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java
components/kpi-computation-ms/version.properties

index 65ecd17..e96d788 100644 (file)
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](http://keepachangelog.com/)
 and this project adheres to [Semantic Versioning](http://semver.org/).
 
+## [1.0.10]
+### Changed
+* Revert commit - KPI MS - Switch from Cambria library to dmaap-client library (dcaegen2/sdk) (DCAEGEN2-3180)
+
 ## [1.0.9]
 ### Changed
 * KPI MS - Vulnerability updates (DCAEGEN2-3216)
index 6625181..8ee8280 100644 (file)
@@ -29,7 +29,7 @@
 
     <groupId>org.onap.dcaegen2.services.components</groupId>
     <artifactId>kpi-ms</artifactId>
-    <version>1.0.9-SNAPSHOT</version>
+    <version>1.0.10-SNAPSHOT</version>
     <name>dcaegen2-services-kpi-computation-ms</name>
     <description>Kpi ms</description>
     <packaging>jar</packaging>
             <type>pom</type>
             <scope>import</scope>
         </dependency>
+        <dependency>
+            <groupId>com.att.nsa</groupId>
+            <artifactId>cambriaClient</artifactId>
+            <version>0.0.1</version>
+        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
             <artifactId>logback-core</artifactId>
             <version>1.2.11</version>
         </dependency>
-        <dependency>
-            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
-            <artifactId>dmaap-client</artifactId>
-            <version>${sdk.version}</version>
-       </dependency>
     </dependencies>
 
     <build>
index c5dea5e..44b6535 100644 (file)
@@ -21,6 +21,8 @@
 
 package org.onap.dcaegen2.kpi.dmaap;
 
+import com.att.nsa.cambria.client.CambriaConsumer;
+
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -30,10 +32,9 @@ import javax.annotation.PostConstruct;
 
 import org.onap.dcaegen2.kpi.models.Configuration;
 import org.onap.dcaegen2.kpi.utils.DmaapUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.springframework.stereotype.Component;
 
 /**
@@ -77,13 +78,12 @@ public class DmaapClient {
         String pmTopic = pmTopicSplit[pmTopicSplit.length - 1];
         log.debug("pm topic : {}", pmTopic);
 
-        MessageRouterSubscriber pmNotifSubscriber = dmaapUtils.buildSubscriber();
-        MessageRouterSubscribeRequest subscriberRequest = dmaapUtils.buildSubscriberRequest(configuration, pmTopic);
+        CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
 
         ScheduledExecutorService executorPool;
 
         // create notification consumers for PM
-        NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifSubscriber, subscriberRequest,
+        NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer,
                 new KpiComputationCallBack());
         // start pm notification consumer threads
         executorPool = Executors.newScheduledThreadPool(10);
index eeeb725..fb96787 100644 (file)
@@ -26,11 +26,11 @@ import java.util.Map;
 
 import org.onap.dcaegen2.kpi.models.Configuration;
 import org.onap.dcaegen2.kpi.utils.DmaapUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+
 /**
  * Client class to handle kpi interactions.
  */
@@ -62,12 +62,12 @@ public class KpiDmaapClient {
         logger.info("Kpi publish topic url: {}", topicUrl);
         String[] topicSplit = topicUrl.split("\\/");
         String topic = topicSplit[topicSplit.length - 1];
-        MessageRouterPublisher messageRouterPublisher;
-        MessageRouterPublishRequest messageRouterPublishRequest;
+        CambriaBatchingPublisher cambriaBatchingPublisher;
         try {
-            messageRouterPublisher = dmaapUtils.buildPublisher();
-            messageRouterPublishRequest = dmaapUtils.buildPublisherRequest(configuration, topic);
-            NotificationProducer notificationProducer = new NotificationProducer(messageRouterPublisher, messageRouterPublishRequest);
+
+            cambriaBatchingPublisher = dmaapUtils.buildPublisher(configuration, topic);
+
+            NotificationProducer notificationProducer = new NotificationProducer(cambriaBatchingPublisher);
             notificationProducer.sendNotification(msg);
         } catch (IOException e) {
             return false;
index eba6019..fbf8bc7 100644 (file)
 
 package org.onap.dcaegen2.kpi.dmaap;
 
-import java.time.Duration;
+import com.att.nsa.cambria.client.CambriaConsumer;
 
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.JsonElement;
-
 /**
  * Consume Notifications from DMAAP events.
  */
 public class NotificationConsumer implements Runnable {
+
     private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class);
-    private MessageRouterSubscriber messageSubscriber;
-    private MessageRouterSubscribeRequest subscriberRequest;
+    private CambriaConsumer cambriaConsumer;
     private NotificationCallback notificationCallback;
 
     /**
      * Parameterized Constructor.
      */
-    public NotificationConsumer(MessageRouterSubscriber messageSubscriber, MessageRouterSubscribeRequest subscriberRequest, NotificationCallback notificationCallback) {
+    public NotificationConsumer(CambriaConsumer cambriaConsumer, NotificationCallback notificationCallback) {
         super();
-        this.messageSubscriber = messageSubscriber;
-        this.subscriberRequest = subscriberRequest;
+        this.cambriaConsumer = cambriaConsumer;
         this.notificationCallback = notificationCallback;
     }
 
@@ -54,14 +49,15 @@ public class NotificationConsumer implements Runnable {
      */
     @Override
     public void run() {
-        messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1))
-            .map(JsonElement::getAsString)
-            .subscribe(msg -> {
+        try {
+            Iterable<String> msgs = cambriaConsumer.fetch();
+            for (String msg : msgs) {
                 log.info(msg);
                 notificationCallback.activateCallBack(msg);
-             },
-                    ex -> {
-                        log.warn("An unexpected error while receiving messages from DMaaP", ex);
-                     });
+            }
+        } catch (Exception e) {
+            log.debug("exception when fetching msgs from dmaap", e);
+        }
+
     }
 }
index c5be6cc..3425543 100644 (file)
 
 package org.onap.dcaegen2.kpi.dmaap;
 
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+
 import java.io.IOException;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.gson.JsonPrimitive;
-import reactor.core.publisher.Flux;
 
 /**
  * Produces Notification on DMAAP events.
  */
 public class NotificationProducer {
-    private static Logger logger = LoggerFactory.getLogger(NotificationProducer.class);
-    private MessageRouterPublisher messageRouterPublisher;
-    private MessageRouterPublishRequest messageRouterPublishRequest;
-    
+
+    private CambriaBatchingPublisher cambriaBatchingPublisher;
+
     /**
      * Parameterized constructor.
      */
-    public NotificationProducer(MessageRouterPublisher messageRouterPublisher, MessageRouterPublishRequest messageRouterPublishRequest) {
+    public NotificationProducer(CambriaBatchingPublisher cambriaBatchingPublisher) {
         super();
-        this.messageRouterPublisher = messageRouterPublisher;
-        this.messageRouterPublishRequest = messageRouterPublishRequest;
+        this.cambriaBatchingPublisher = cambriaBatchingPublisher;
     }
 
     /**
      * sends notification to dmaap.
      */
-    public void sendNotification(String msg) throws IOException {
-        Flux.just(1, 2, 3)
-           .map(JsonPrimitive::new)
-           .transform(input -> messageRouterPublisher.put(messageRouterPublishRequest, input))
-           .subscribe(resp -> {
-               if (resp.successful()) {
-                   logger.debug("Sent a batch of messages to the MR");
-                       } else {
-                           logger.warn("Message sending has failed: {}", resp.failReason());
-                     }
-                },
-                ex -> {
-                    logger.warn("An unexpected error while sending messages to DMaaP", ex);
-                 });
+    public int sendNotification(String msg) throws IOException {
+
+        return cambriaBatchingPublisher.send("", msg);
+
     }
+
 }
index 38c1ca8..2f1e19c 100644 (file)
@@ -1,7 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2021 China Mobile.
- *  Copyright (c) 2021-2022 Wipro Limited.
+ *  Copyright (C) 2022 Wipro Limited.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.kpi.utils;
 
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+import com.att.nsa.cambria.client.CambriaConsumer;
+
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+
 import org.onap.dcaegen2.kpi.models.Configuration;
-import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 
 /**
  * Utility class to perform actions related to Dmaap.
@@ -45,35 +40,65 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me
  */
 public class DmaapUtils {
 
-    public MessageRouterPublisher buildPublisher() {
-        final MessageRouterPublisher publisher = DmaapClientFactory
-             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-        return publisher;
+    /**
+     * Build publisher.
+     */
+    public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) {
+        try {
+            return builder(config, topic).build();
+        } catch (MalformedURLException | GeneralSecurityException e) {
+            return null;
+
+        }
+    }
+
+    /**
+     * Build consumer.
+     */
+    public CambriaConsumer buildConsumer(Configuration config, String topic) {
+
+        try {
+            return builderConsumer(config, topic).build();
+        } catch (MalformedURLException | GeneralSecurityException e) {
+            return null;
+        }
+
     }
 
-    public MessageRouterPublishRequest buildPublisherRequest(Configuration config, String topicUrl) {
-        MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder().topicUrl(topicUrl)
-             .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
-             .password(config.getAafPassword()).build())
-             .build();
-        MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
-             .sinkDefinition(sinkDefinition).build();
-        return request;
+    private static PublisherBuilder builder(Configuration config, String topic) {
+        if (config.isSecured()) {
+            return authenticatedBuilder(config, topic);
+        } else {
+            return unAuthenticatedBuilder(config, topic);
+        }
     }
 
-    public MessageRouterSubscriber buildSubscriber() {
-        MessageRouterSubscriber subscriber = DmaapClientFactory
-            .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-        return subscriber;
+    private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) {
+        return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
+                config.getAafPassword());
     }
 
-    public MessageRouterSubscribeRequest buildSubscriberRequest(Configuration config, String topicUrl) {
-        MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder().topicUrl(topicUrl)
-             .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
-             .password(config.getAafPassword()).build())
-             .build();
-        MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
-             .consumerGroup(config.getCg()).consumerId(config.getCid()).sourceDefinition(sourceDefinition).build();
-        return request;
+    private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) {
+        return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
+                .logSendFailuresAfter(5);
     }
+
+    private static ConsumerBuilder builderConsumer(Configuration config, String topic) {
+        if (config.isSecured()) {
+            return authenticatedConsumerBuilder(config, topic);
+        } else {
+            return unAuthenticatedConsumerBuilder(config, topic);
+        }
+    }
+
+    private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) {
+        return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
+                .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000);
+    }
+
+    private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) {
+        return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
+                config.getAafPassword());
+    }
+
 }
index cc23bab..f51bf24 100644 (file)
 
 package org.onap.dcaegen2.kpi.dmaap;
 
+import static org.mockito.Mockito.when;
+
+import com.att.nsa.cambria.client.CambriaTopicManager;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
 import java.io.BufferedReader;
 import java.io.FileReader;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -32,20 +39,20 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
+import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.onap.dcaegen2.kpi.models.Configuration;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
-//import com.att.nsa.cambria.client.CambriaTopicManager;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = DmaapClientTest.class)
 public class DmaapClientTest {
 
+    @Mock
+    private CambriaTopicManager topicManager;
+
     @InjectMocks
     DmaapClient client;
 
@@ -67,9 +74,18 @@ public class DmaapClientTest {
         configuration.setCid("cid");
         configuration.setPollingInterval(30);
         configuration.setPollingTimeout(100);
-        client = Mockito.mock(DmaapClient.class);
-        client.initClient();
-        Mockito.verify(client).initClient();
+
+        try {
+            when(topicManager.getTopics()).thenReturn(topics);
+
+            client = Mockito.mock(DmaapClient.class);
+            client.initClient();
+            Mockito.verify(client).initClient();
+            // Mockito.verifycreateAndConfigureTopics();
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
     }
 
     @Test
@@ -85,6 +101,9 @@ public class DmaapClientTest {
             configuration.updateConfigurationFromJsonObject(config);
             DmaapClient client = new DmaapClient();
             client.initClient();
+            // Mockito.verify(client).startClient();
+            // Mockito.verifycreateAndConfigureTopics();
+
         } catch (Exception e) {
             e.printStackTrace();
         }
index 81699c2..bf5b625 100644 (file)
@@ -22,8 +22,6 @@
 package org.onap.dcaegen2.kpi.dmaap;
 
 import static org.junit.Assert.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -38,15 +36,10 @@ import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.onap.dcaegen2.kpi.models.Configuration;
 import org.onap.dcaegen2.kpi.utils.DmaapUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.springframework.boot.test.context.SpringBootTest;
 
-import com.google.gson.JsonPrimitive;
-
-import reactor.core.publisher.Flux;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaConsumer;
 
 @RunWith(MockitoJUnitRunner.class)
 @SpringBootTest(classes = KpiDmaapClient.class)
@@ -56,38 +49,42 @@ public class KpiDmaapClientTest {
     Configuration configurationMock;
 
     @Mock
-    DmaapUtils dmaapUtilsMock;    
-    
-    @Mock
-    MessageRouterPublisher messageRouterPublisher;
+    DmaapUtils dmaapUtilsMock;
+
+    @InjectMocks
+    KpiDmaapClient kpiDmaapClient;
 
     @Mock
-    MessageRouterPublishRequest messageRouterPublishRequest;
+    CambriaConsumer kpiResponseCambriaConsumerMock;
 
     @Mock
-    KpiDmaapClient kpiDmaapClient;
+    CambriaBatchingPublisher cambriaBatchingPublisherMock;
 
     @Mock
     NotificationProducer notificationProducerMock;
 
+    @Before
+    public void setup() {
+        kpiDmaapClient = new KpiDmaapClient(dmaapUtilsMock, configurationMock);
+    }
+
     @Test
-    public void sendNotificationToPolicyTest() throws IOException {
+    public void sendNotificationToPolicyTest() {
         Map<String, Object> streamsPublishes = new HashMap<>();
         Map<String, String> topics = new HashMap<>();
         Map<String, Object> dmaapInfo = new HashMap<>();
         topics.put("topic_url", "https://message-router.onap.svc.cluster.local:3905/events/DCAE_KPI_OUTPUT");
         dmaapInfo.put("dmaap_info", topics);
         streamsPublishes.put("kpi_topic", dmaapInfo);
-        Mockito.when(configurationMock.getStreamsPublishes()).thenReturn(streamsPublishes);    
-        Mockito.doNothing().when(notificationProducerMock).sendNotification(Mockito.anyString());
-        io.vavr.collection.List<String> expectedItems = io.vavr.collection.List.of("kpi-1", "kpi-2", "kpi-3");
-        MessageRouterPublishResponse expectedResponse = ImmutableMessageRouterPublishResponse
-             .builder().items(expectedItems.map(JsonPrimitive::new))
-             .build();         
-        Flux<MessageRouterPublishResponse> responses = Flux.just(expectedResponse);
-        when(messageRouterPublisher.put(Mockito.any(), Mockito.any())).thenReturn(responses);
-        when(kpiDmaapClient.sendNotificationToDmaap(Mockito.anyString())).thenReturn(Boolean.TRUE);
-        Boolean response = kpiDmaapClient.sendNotificationToDmaap(Mockito.anyString());
-        assertEquals(Boolean.TRUE, response);
+        Mockito.when(configurationMock.getStreamsPublishes()).thenReturn(streamsPublishes);
+        Mockito.when(dmaapUtilsMock.buildPublisher(configurationMock, "DCAE_KPI_OUTPUT"))
+                .thenReturn(cambriaBatchingPublisherMock);
+        try {
+            Mockito.when(cambriaBatchingPublisherMock.send("", "hello")).thenReturn(0);
+        } catch (IOException e) {
+            e.printStackTrace();
         }
+        assertTrue(kpiDmaapClient.sendNotificationToDmaap("hello"));
+
+    }
 }
index 8d72d3c..69d0daa 100644 (file)
 
 package org.onap.dcaegen2.kpi.dmaap;
 
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.when;
 
-import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
-import com.google.gson.JsonElement;
-
-import reactor.core.CoreSubscriber;
-import reactor.core.publisher.Flux;
+import com.att.nsa.cambria.client.CambriaConsumer;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = NotificationConsumerTest.class)
 public class NotificationConsumerTest {
 
+    @Mock
+    CambriaConsumer cambriaConsumer;
+
     @Mock
     NotificationCallback notificationCallback;
 
     @InjectMocks
     NotificationConsumer notificationConsumer;
-    
-    @Mock
-    MessageRouterSubscriber messageSubscriber;
-    
-    @Mock
-    MessageRouterSubscribeRequest subscriberRequest;
 
     @Test
     public void testNotificationConsumer() {
         try {
-            Flux<JsonElement> json = new Flux<JsonElement>() {
-                @Override
-                public void subscribe(CoreSubscriber<? super JsonElement> actual) {
-                }
-            };
+            List<String> notifications = new ArrayList<>();
+            notifications.add("notification1");
+            when(cambriaConsumer.fetch()).thenReturn(notifications);
             Mockito.doNothing().when(notificationCallback).activateCallBack(Mockito.anyString());
-            when(messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1))).thenReturn(json);
-            assertNotNull(messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1)));
+            notificationConsumer.run();
+
         } catch (Exception e) {
             e.printStackTrace();
         }
index f880ec7..7ad5786 100644 (file)
 
 package org.onap.dcaegen2.kpi.dmaap;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
 import java.io.IOException;
 
 import org.junit.Test;
@@ -32,19 +38,10 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.onap.dcaegen2.kpi.computation.FileUtils;
 import org.onap.dcaegen2.kpi.models.Configuration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.powermock.api.mockito.PowerMockito;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonPrimitive;
-
-import reactor.core.publisher.Flux;
-
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = NotificationProducerTest.class)
 public class NotificationProducerTest {
@@ -53,25 +50,24 @@ public class NotificationProducerTest {
     private static final String CBS_CONFIG_FILE = "kpi/cbs_config2.json";
 
     @Mock
-    MessageRouterPublisher messageRouterPublisher;                                                                                                                               
-    
-    @Mock
-    MessageRouterPublishRequest messageRouterPublishRequest;
-    
+    CambriaBatchingPublisher cambriaBatchingPublisher;
+
     @InjectMocks
     NotificationProducer notificationProducer;
 
     @Test
-    public void notificationProducerTest() throws IOException {
-        io.vavr.collection.List<String> expectedItems = io.vavr.collection.List.of("kpi-1", "kpi-2", "kpi-3");
-        MessageRouterPublishResponse expectedResponse = ImmutableMessageRouterPublishResponse
-                .builder().items(expectedItems.map(JsonPrimitive::new))
-                .build();
-        Flux<MessageRouterPublishResponse> responses = Flux.just(expectedResponse);
-        when(messageRouterPublisher.put(Mockito.any(), Mockito.any())).thenReturn(responses);
-        notificationProducer.sendNotification("msg");
+    public void notificationProducerTest() {
+
+        try {
+            when(cambriaBatchingPublisher.send(Mockito.anyString(), Mockito.anyString())).thenReturn(0);
+            int result = notificationProducer.sendNotification("msg");
+            assertEquals(0, result);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
     }
-    
+
     @Test
     public void kpiResultWithoutConfigTest() {
 
index f57e7a9..db099b7 100644 (file)
@@ -21,7 +21,7 @@
 ###############################################################################
 major=1
 minor=0
-patch=9
+patch=10
 base_version=${major}.${minor}.${patch}
 release_version=${base_version}
 snapshot_version=${base_version}-SNAPSHOT