KPI-MS Switch from Cambria library to dmaap-sdk 28/130628/9
authorAhila <ahila.pandaram@wipro.com>
Thu, 1 Sep 2022 12:01:38 +0000 (17:31 +0530)
committerAhila <ahila.pandaram@wipro.com>
Thu, 1 Sep 2022 21:03:17 +0000 (02:33 +0530)
Issue-ID: DCAEGEN2-3180

Signed-off-by: Ahila <ahila.pandaram@wipro.com>
Change-Id: Ib4c483f56588a345096278510d8deb16a6a6fb7f

12 files changed:
components/kpi-computation-ms/Changelog.md
components/kpi-computation-ms/pom.xml
components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/DmaapClient.java
components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClient.java
components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumer.java
components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducer.java
components/kpi-computation-ms/src/main/java/org/onap/dcaegen2/kpi/utils/DmaapUtils.java
components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/DmaapClientTest.java
components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/KpiDmaapClientTest.java
components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationConsumerTest.java
components/kpi-computation-ms/src/test/java/org/onap/dcaegen2/kpi/dmaap/NotificationProducerTest.java
components/kpi-computation-ms/version.properties

index de62dec..6ac726c 100644 (file)
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](http://keepachangelog.com/)
 and this project adheres to [Semantic Versioning](http://semver.org/).
 
+## [1.0.8]
+### Changed
+* KPI MS - Switch from Cambria library to dmaap-client library (dcaegen2/sdk) (DCAEGEN2-3180)
+
 ## [1.0.7]
 ### Changed
 * Append SNSSAI with MeasType string and handle multiple operands (DCAEGEN2-3243)
index 89f83aa..8bde484 100644 (file)
@@ -29,7 +29,7 @@
 
     <groupId>org.onap.dcaegen2.services.components</groupId>
     <artifactId>kpi-ms</artifactId>
-    <version>1.0.7-SNAPSHOT</version>
+    <version>1.0.8-SNAPSHOT</version>
     <name>dcaegen2-services-kpi-computation-ms</name>
     <description>Kpi ms</description>
     <packaging>jar</packaging>
             <type>pom</type>
             <scope>import</scope>
         </dependency>
-        <dependency>
-            <groupId>com.att.nsa</groupId>
-            <artifactId>cambriaClient</artifactId>
-            <version>0.0.1</version>
-        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
             <artifactId>logback-core</artifactId>
             <version>1.2.10</version>
         </dependency>
+        <dependency>
+            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+            <artifactId>dmaap-client</artifactId>
+            <version>${sdk.version}</version>
+       </dependency>
     </dependencies>
 
     <build>
index d6e17d6..c5dea5e 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2021 China Mobile.
+ *  Copyright (C) 2022 Wipro Limited.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,8 +21,6 @@
 
 package org.onap.dcaegen2.kpi.dmaap;
 
-import com.att.nsa.cambria.client.CambriaConsumer;
-
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -31,9 +30,10 @@ import javax.annotation.PostConstruct;
 
 import org.onap.dcaegen2.kpi.models.Configuration;
 import org.onap.dcaegen2.kpi.utils.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.springframework.stereotype.Component;
 
 /**
@@ -77,12 +77,13 @@ public class DmaapClient {
         String pmTopic = pmTopicSplit[pmTopicSplit.length - 1];
         log.debug("pm topic : {}", pmTopic);
 
-        CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
+        MessageRouterSubscriber pmNotifSubscriber = dmaapUtils.buildSubscriber();
+        MessageRouterSubscribeRequest subscriberRequest = dmaapUtils.buildSubscriberRequest(configuration, pmTopic);
 
         ScheduledExecutorService executorPool;
 
         // create notification consumers for PM
-        NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer,
+        NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifSubscriber, subscriberRequest,
                 new KpiComputationCallBack());
         // start pm notification consumer threads
         executorPool = Executors.newScheduledThreadPool(10);
index 5e8733e..eeeb725 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2021 China Mobile.
+ *  Copyright (C) 2022 Wipro Limited.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,11 +26,11 @@ import java.util.Map;
 
 import org.onap.dcaegen2.kpi.models.Configuration;
 import org.onap.dcaegen2.kpi.utils.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-
 /**
  * Client class to handle kpi interactions.
  */
@@ -61,12 +62,12 @@ public class KpiDmaapClient {
         logger.info("Kpi publish topic url: {}", topicUrl);
         String[] topicSplit = topicUrl.split("\\/");
         String topic = topicSplit[topicSplit.length - 1];
-        CambriaBatchingPublisher cambriaBatchingPublisher;
+        MessageRouterPublisher messageRouterPublisher;
+        MessageRouterPublishRequest messageRouterPublishRequest;
         try {
-
-            cambriaBatchingPublisher = dmaapUtils.buildPublisher(configuration, topic);
-
-            NotificationProducer notificationProducer = new NotificationProducer(cambriaBatchingPublisher);
+            messageRouterPublisher = dmaapUtils.buildPublisher();
+            messageRouterPublishRequest = dmaapUtils.buildPublisherRequest(configuration, topic);
+            NotificationProducer notificationProducer = new NotificationProducer(messageRouterPublisher, messageRouterPublishRequest);
             notificationProducer.sendNotification(msg);
         } catch (IOException e) {
             return false;
index d7e3376..eba6019 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2021 China Mobile.
+ *  Copyright (C) 2022 Wipro Limited.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.kpi.dmaap;
 
-import com.att.nsa.cambria.client.CambriaConsumer;
+import java.time.Duration;
 
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.gson.JsonElement;
+
 /**
  * Consume Notifications from DMAAP events.
  */
 public class NotificationConsumer implements Runnable {
-
     private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class);
-    private CambriaConsumer cambriaConsumer;
+    private MessageRouterSubscriber messageSubscriber;
+    private MessageRouterSubscribeRequest subscriberRequest;
     private NotificationCallback notificationCallback;
 
     /**
      * Parameterized Constructor.
      */
-    public NotificationConsumer(CambriaConsumer cambriaConsumer, NotificationCallback notificationCallback) {
+    public NotificationConsumer(MessageRouterSubscriber messageSubscriber, MessageRouterSubscribeRequest subscriberRequest, NotificationCallback notificationCallback) {
         super();
-        this.cambriaConsumer = cambriaConsumer;
+        this.messageSubscriber = messageSubscriber;
+        this.subscriberRequest = subscriberRequest;
         this.notificationCallback = notificationCallback;
     }
 
@@ -48,15 +54,14 @@ public class NotificationConsumer implements Runnable {
      */
     @Override
     public void run() {
-        try {
-            Iterable<String> msgs = cambriaConsumer.fetch();
-            for (String msg : msgs) {
+        messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1))
+            .map(JsonElement::getAsString)
+            .subscribe(msg -> {
                 log.info(msg);
                 notificationCallback.activateCallBack(msg);
-            }
-        } catch (Exception e) {
-            log.debug("exception when fetching msgs from dmaap", e);
-        }
-
+             },
+                    ex -> {
+                        log.warn("An unexpected error while receiving messages from DMaaP", ex);
+                     });
     }
 }
index 628f3d0..c5be6cc 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2021 China Mobile.
+ *  Copyright (C) 2022 Wipro Limited.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.kpi.dmaap;
 
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-
 import java.io.IOException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.gson.JsonPrimitive;
+import reactor.core.publisher.Flux;
 
 /**
  * Produces Notification on DMAAP events.
  */
 public class NotificationProducer {
-
-    private CambriaBatchingPublisher cambriaBatchingPublisher;
-
+    private static Logger logger = LoggerFactory.getLogger(NotificationProducer.class);
+    private MessageRouterPublisher messageRouterPublisher;
+    private MessageRouterPublishRequest messageRouterPublishRequest;
+    
     /**
      * Parameterized constructor.
      */
-    public NotificationProducer(CambriaBatchingPublisher cambriaBatchingPublisher) {
+    public NotificationProducer(MessageRouterPublisher messageRouterPublisher, MessageRouterPublishRequest messageRouterPublishRequest) {
         super();
-        this.cambriaBatchingPublisher = cambriaBatchingPublisher;
+        this.messageRouterPublisher = messageRouterPublisher;
+        this.messageRouterPublishRequest = messageRouterPublishRequest;
     }
 
     /**
      * sends notification to dmaap.
      */
-    public int sendNotification(String msg) throws IOException {
-
-        return cambriaBatchingPublisher.send("", msg);
-
+    public void sendNotification(String msg) throws IOException {
+        Flux.just(1, 2, 3)
+           .map(JsonPrimitive::new)
+           .transform(input -> messageRouterPublisher.put(messageRouterPublishRequest, input))
+           .subscribe(resp -> {
+               if (resp.successful()) {
+                   logger.debug("Sent a batch of messages to the MR");
+                       } else {
+                           logger.warn("Message sending has failed: {}", resp.failReason());
+                     }
+                },
+                ex -> {
+                    logger.warn("An unexpected error while sending messages to DMaaP", ex);
+                 });
     }
-
 }
index bf25252..38c1ca8 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2021 China Mobile.
+ *  Copyright (c) 2021-2022 Wipro Limited.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.kpi.utils;
 
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-
 import org.onap.dcaegen2.kpi.models.Configuration;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 
 /**
  * Utility class to perform actions related to Dmaap.
@@ -39,65 +45,35 @@ import org.onap.dcaegen2.kpi.models.Configuration;
  */
 public class DmaapUtils {
 
-    /**
-     * Build publisher.
-     */
-    public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) {
-        try {
-            return builder(config, topic).build();
-        } catch (MalformedURLException | GeneralSecurityException e) {
-            return null;
-
-        }
-    }
-
-    /**
-     * Build consumer.
-     */
-    public CambriaConsumer buildConsumer(Configuration config, String topic) {
-
-        try {
-            return builderConsumer(config, topic).build();
-        } catch (MalformedURLException | GeneralSecurityException e) {
-            return null;
-        }
-
+    public MessageRouterPublisher buildPublisher() {
+        final MessageRouterPublisher publisher = DmaapClientFactory
+             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+        return publisher;
     }
 
-    private static PublisherBuilder builder(Configuration config, String topic) {
-        if (config.isSecured()) {
-            return authenticatedBuilder(config, topic);
-        } else {
-            return unAuthenticatedBuilder(config, topic);
-        }
+    public MessageRouterPublishRequest buildPublisherRequest(Configuration config, String topicUrl) {
+        MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder().topicUrl(topicUrl)
+             .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
+             .password(config.getAafPassword()).build())
+             .build();
+        MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
+             .sinkDefinition(sinkDefinition).build();
+        return request;
     }
 
-    private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) {
-        return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
-                config.getAafPassword());
+    public MessageRouterSubscriber buildSubscriber() {
+        MessageRouterSubscriber subscriber = DmaapClientFactory
+            .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+        return subscriber;
     }
 
-    private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) {
-        return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
-                .logSendFailuresAfter(5);
+    public MessageRouterSubscribeRequest buildSubscriberRequest(Configuration config, String topicUrl) {
+        MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder().topicUrl(topicUrl)
+             .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
+             .password(config.getAafPassword()).build())
+             .build();
+        MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
+             .consumerGroup(config.getCg()).consumerId(config.getCid()).sourceDefinition(sourceDefinition).build();
+        return request;
     }
-
-    private static ConsumerBuilder builderConsumer(Configuration config, String topic) {
-        if (config.isSecured()) {
-            return authenticatedConsumerBuilder(config, topic);
-        } else {
-            return unAuthenticatedConsumerBuilder(config, topic);
-        }
-    }
-
-    private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) {
-        return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
-                .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000);
-    }
-
-    private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) {
-        return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
-                config.getAafPassword());
-    }
-
 }
index 8020f93..cc23bab 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2021 China Mobile.
+ *  Copyright (C) 2022 Wipro Limited.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.kpi.dmaap;
 
-import static org.mockito.Mockito.when;
-
-import com.att.nsa.cambria.client.CambriaTopicManager;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-
 import java.io.BufferedReader;
 import java.io.FileReader;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -38,20 +32,20 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
-import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.onap.dcaegen2.kpi.models.Configuration;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
+//import com.att.nsa.cambria.client.CambriaTopicManager;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = DmaapClientTest.class)
 public class DmaapClientTest {
 
-    @Mock
-    private CambriaTopicManager topicManager;
-
     @InjectMocks
     DmaapClient client;
 
@@ -73,18 +67,9 @@ public class DmaapClientTest {
         configuration.setCid("cid");
         configuration.setPollingInterval(30);
         configuration.setPollingTimeout(100);
-
-        try {
-            when(topicManager.getTopics()).thenReturn(topics);
-
-            client = Mockito.mock(DmaapClient.class);
-            client.initClient();
-            Mockito.verify(client).initClient();
-            // Mockito.verifycreateAndConfigureTopics();
-
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+        client = Mockito.mock(DmaapClient.class);
+        client.initClient();
+        Mockito.verify(client).initClient();
     }
 
     @Test
@@ -100,9 +85,6 @@ public class DmaapClientTest {
             configuration.updateConfigurationFromJsonObject(config);
             DmaapClient client = new DmaapClient();
             client.initClient();
-            // Mockito.verify(client).startClient();
-            // Mockito.verifycreateAndConfigureTopics();
-
         } catch (Exception e) {
             e.printStackTrace();
         }
index e8fd992..81699c2 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2021 China Mobile.
+ *  Copyright (C) 2022 Wipro Limited.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,6 +22,8 @@
 package org.onap.dcaegen2.kpi.dmaap;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -35,10 +38,15 @@ import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.onap.dcaegen2.kpi.models.Configuration;
 import org.onap.dcaegen2.kpi.utils.DmaapUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.springframework.boot.test.context.SpringBootTest;
 
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import com.google.gson.JsonPrimitive;
+
+import reactor.core.publisher.Flux;
 
 @RunWith(MockitoJUnitRunner.class)
 @SpringBootTest(classes = KpiDmaapClient.class)
@@ -48,42 +56,38 @@ public class KpiDmaapClientTest {
     Configuration configurationMock;
 
     @Mock
-    DmaapUtils dmaapUtilsMock;
-
-    @InjectMocks
-    KpiDmaapClient kpiDmaapClient;
+    DmaapUtils dmaapUtilsMock;    
+    
+    @Mock
+    MessageRouterPublisher messageRouterPublisher;
 
     @Mock
-    CambriaConsumer kpiResponseCambriaConsumerMock;
+    MessageRouterPublishRequest messageRouterPublishRequest;
 
     @Mock
-    CambriaBatchingPublisher cambriaBatchingPublisherMock;
+    KpiDmaapClient kpiDmaapClient;
 
     @Mock
     NotificationProducer notificationProducerMock;
 
-    @Before
-    public void setup() {
-        kpiDmaapClient = new KpiDmaapClient(dmaapUtilsMock, configurationMock);
-    }
-
     @Test
-    public void sendNotificationToPolicyTest() {
+    public void sendNotificationToPolicyTest() throws IOException {
         Map<String, Object> streamsPublishes = new HashMap<>();
         Map<String, String> topics = new HashMap<>();
         Map<String, Object> dmaapInfo = new HashMap<>();
         topics.put("topic_url", "https://message-router.onap.svc.cluster.local:3905/events/DCAE_KPI_OUTPUT");
         dmaapInfo.put("dmaap_info", topics);
         streamsPublishes.put("kpi_topic", dmaapInfo);
-        Mockito.when(configurationMock.getStreamsPublishes()).thenReturn(streamsPublishes);
-        Mockito.when(dmaapUtilsMock.buildPublisher(configurationMock, "DCAE_KPI_OUTPUT"))
-                .thenReturn(cambriaBatchingPublisherMock);
-        try {
-            Mockito.when(cambriaBatchingPublisherMock.send("", "hello")).thenReturn(0);
-        } catch (IOException e) {
-            e.printStackTrace();
+        Mockito.when(configurationMock.getStreamsPublishes()).thenReturn(streamsPublishes);    
+        Mockito.doNothing().when(notificationProducerMock).sendNotification(Mockito.anyString());
+        io.vavr.collection.List<String> expectedItems = io.vavr.collection.List.of("kpi-1", "kpi-2", "kpi-3");
+        MessageRouterPublishResponse expectedResponse = ImmutableMessageRouterPublishResponse
+             .builder().items(expectedItems.map(JsonPrimitive::new))
+             .build();         
+        Flux<MessageRouterPublishResponse> responses = Flux.just(expectedResponse);
+        when(messageRouterPublisher.put(Mockito.any(), Mockito.any())).thenReturn(responses);
+        when(kpiDmaapClient.sendNotificationToDmaap(Mockito.anyString())).thenReturn(Boolean.TRUE);
+        Boolean response = kpiDmaapClient.sendNotificationToDmaap(Mockito.anyString());
+        assertEquals(Boolean.TRUE, response);
         }
-        assertTrue(kpiDmaapClient.sendNotificationToDmaap("hello"));
-
-    }
 }
index 1d04a62..8d72d3c 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2021 China Mobile.
+ *  Copyright (C) 2022 Wipro Limited.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.kpi.dmaap;
 
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.when;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.time.Duration;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
-import com.att.nsa.cambria.client.CambriaConsumer;
+import com.google.gson.JsonElement;
+
+import reactor.core.CoreSubscriber;
+import reactor.core.publisher.Flux;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = NotificationConsumerTest.class)
 public class NotificationConsumerTest {
 
-    @Mock
-    CambriaConsumer cambriaConsumer;
-
     @Mock
     NotificationCallback notificationCallback;
 
     @InjectMocks
     NotificationConsumer notificationConsumer;
+    
+    @Mock
+    MessageRouterSubscriber messageSubscriber;
+    
+    @Mock
+    MessageRouterSubscribeRequest subscriberRequest;
 
     @Test
     public void testNotificationConsumer() {
         try {
-            List<String> notifications = new ArrayList<>();
-            notifications.add("notification1");
-            when(cambriaConsumer.fetch()).thenReturn(notifications);
+            Flux<JsonElement> json = new Flux<JsonElement>() {
+                @Override
+                public void subscribe(CoreSubscriber<? super JsonElement> actual) {
+                }
+            };
             Mockito.doNothing().when(notificationCallback).activateCallBack(Mockito.anyString());
-            notificationConsumer.run();
-
+            when(messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1))).thenReturn(json);
+            assertNotNull(messageSubscriber.subscribeForElements(subscriberRequest, Duration.ofMinutes(1)));
         } catch (Exception e) {
             e.printStackTrace();
         }
index c835d49..f880ec7 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2021 China Mobile.
+ *  Copyright (C) 2022 Wipro Limited.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.kpi.dmaap;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-
 import java.io.IOException;
 
 import org.junit.Test;
@@ -37,10 +32,19 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.onap.dcaegen2.kpi.computation.FileUtils;
 import org.onap.dcaegen2.kpi.models.Configuration;
-import org.powermock.api.mockito.PowerMockito;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+
+import reactor.core.publisher.Flux;
+
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = NotificationProducerTest.class)
 public class NotificationProducerTest {
@@ -49,24 +53,25 @@ public class NotificationProducerTest {
     private static final String CBS_CONFIG_FILE = "kpi/cbs_config2.json";
 
     @Mock
-    CambriaBatchingPublisher cambriaBatchingPublisher;
-
+    MessageRouterPublisher messageRouterPublisher;                                                                                                                               
+    
+    @Mock
+    MessageRouterPublishRequest messageRouterPublishRequest;
+    
     @InjectMocks
     NotificationProducer notificationProducer;
 
     @Test
-    public void notificationProducerTest() {
-
-        try {
-            when(cambriaBatchingPublisher.send(Mockito.anyString(), Mockito.anyString())).thenReturn(0);
-            int result = notificationProducer.sendNotification("msg");
-            assertEquals(0, result);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
+    public void notificationProducerTest() throws IOException {
+        io.vavr.collection.List<String> expectedItems = io.vavr.collection.List.of("kpi-1", "kpi-2", "kpi-3");
+        MessageRouterPublishResponse expectedResponse = ImmutableMessageRouterPublishResponse
+                .builder().items(expectedItems.map(JsonPrimitive::new))
+                .build();
+        Flux<MessageRouterPublishResponse> responses = Flux.just(expectedResponse);
+        when(messageRouterPublisher.put(Mockito.any(), Mockito.any())).thenReturn(responses);
+        notificationProducer.sendNotification("msg");
     }
-
+    
     @Test
     public void kpiResultWithoutConfigTest() {
 
index a112fe0..f35e655 100644 (file)
@@ -21,7 +21,7 @@
 ###############################################################################
 major=1
 minor=0
-patch=7
+patch=8
 base_version=${major}.${minor}.${patch}
 release_version=${base_version}
 snapshot_version=${base_version}-SNAPSHOT