[DCAEGEN2] Use kafka API directly in DMaaP library 01/135701/6
authorsushant53 <sushant.jadhav@t-systems.com>
Fri, 11 Aug 2023 14:15:44 +0000 (19:45 +0530)
committerSushant Jadhav <sushant.jadhav@t-systems.com>
Mon, 11 Sep 2023 12:09:33 +0000 (12:09 +0000)
Use kafka API directly in dmaap-client library instead of the DMaaP Rest APIs.

Issue-ID: DCAEGEN2-3364
Change-Id: I7f27d9d5f443fe3934896fa01f907b6001898495
Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
19 files changed:
Changelog.md
pom.xml
rest-services/dmaap-client/pom.xml
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/Commons.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DMaapContainer.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/CommonsTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImplTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImplTest.java
rest-services/dmaap-client/src/test/resources/dmaap-msg-router/message-router-compose.yml
version.properties

index 396a043..d68406c 100644 (file)
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](http://keepachangelog.com/)
 and this project adheres to [Semantic Versioning](http://semver.org/).
 
+## [1.9.4] - 2023/02/23
+#### Added
+      - [DCAEGEN2-3364] - To use kafka API instead of DMAAP API in dmaap-client
+
 ## [1.9.3] - 2023/02/23
 #### Added
       - [DCAEGEN2-3344] - Upgrade dependencies for dcaegen2-services-sdk
diff --git a/pom.xml b/pom.xml
index 884b41c..7a11907 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,7 @@ language governing permissions and limitations under the License.
         <reactor.bom.version>2020.0.1</reactor.bom.version>
         <slf4j.version>1.7.25</slf4j.version>
         <logback.version>1.2.11</logback.version>
-        <mockito.version>2.28.2</mockito.version>
+        <mockito.version>3.4.2</mockito.version>
         <protobuf.version>4.0.0-rc-2</protobuf.version>
         <vavr.version>0.10.2</vavr.version>
         <commons-text.version>1.6</commons-text.version>
@@ -97,7 +97,7 @@ language governing permissions and limitations under the License.
         <sonar.coverage.jacoco.xmlReportPaths>
             ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml
         </sonar.coverage.jacoco.xmlReportPaths>
-        <revision>1.9.3-SNAPSHOT</revision>
+        <revision>1.9.4-SNAPSHOT</revision>
     </properties>
 
     <modules>
index 8123af3..5a3ef94 100644 (file)
@@ -2,6 +2,7 @@
 <!--
 ============LICENSE_START=======================================================
 Copyright (c) 2022 Nokia. All rights reserved.
+Copyright (c) 2023 Deutsche Telekom AG. All rights reserved.
 ================================================================================
 Licensed under the Apache License, Version 2.0 (the "License"); you may not
 use this file except in compliance with the License. You may obtain a copy
@@ -98,5 +99,22 @@ language governing permissions and limitations under the License.
           <artifactId>mockserver-client-java</artifactId>
           <version>${mockserver-client.version}</version>
       </dependency>
+      <dependency>
+          <groupId>org.apache.kafka</groupId>
+             <artifactId>kafka-clients</artifactId>
+          <version>3.3.1</version>
+      </dependency>
+      <dependency>
+          <groupId>uk.org.webcompere</groupId>
+          <artifactId>system-stubs-jupiter</artifactId>
+          <version>1.1.0</version>
+          <scope>test</scope>
+      </dependency>
+      <dependency>
+          <groupId>org.mockito</groupId>
+          <artifactId>mockito-junit-jupiter</artifactId>
+          <scope>test</scope>
+      </dependency>
+
   </dependencies>
 </project>
index ee4f6d3..6e7f604 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -35,6 +36,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRou
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapClientConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 
@@ -46,26 +49,37 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.co
  * @since 1.1.4
  */
 public final class DmaapClientFactory {
-
+    private static final Logger LOGGER = LoggerFactory.getLogger(DmaapClientFactory.class);
     private DmaapClientFactory() {
     }
 
     public static @NotNull MessageRouterPublisher createMessageRouterPublisher(
             @NotNull MessageRouterPublisherConfig clientConfiguration) {
 
-        return new MessageRouterPublisherImpl(
+        try {
+            return new MessageRouterPublisherImpl(
                 createHttpClient(clientConfiguration),
                 clientConfiguration.maxBatchSize(),
                 clientConfiguration.maxBatchDuration(),
                 new ClientErrorReasonPresenter());
+        } catch (Exception e) {
+            LOGGER.error("Error while creating the Message Router Publisher.");
+            return null;
+        }
     }
 
     public static @NotNull MessageRouterSubscriber createMessageRouterSubscriber(
             @NotNull MessageRouterSubscriberConfig clientConfiguration) {
-        return new MessageRouterSubscriberImpl(
-                createHttpClient(clientConfiguration),
-                clientConfiguration.gsonInstance(),
-                new ClientErrorReasonPresenter());
+        try {
+            return new MessageRouterSubscriberImpl(
+                    createHttpClient(clientConfiguration),
+                    clientConfiguration.gsonInstance(),
+                    new ClientErrorReasonPresenter());
+        } catch (Exception e) {
+            LOGGER.error("Error while creating the Message Router Subscriber.");
+            return null;
+        }
+       
     }
 
     private static @NotNull RxHttpClient createHttpClient(DmaapClientConfiguration config) {
index 08825b4..f98e819 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
 import com.google.gson.JsonElement;
+
+import io.vavr.collection.List;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import reactor.core.publisher.Flux;
@@ -30,5 +38,7 @@ import reactor.core.publisher.Flux;
  * @since 1.1.4
  */
 public interface MessageRouterPublisher {
+    void close();
+    void setKafkaProducer(Producer<String, String> kafkaProducer);
     Flux<MessageRouterPublishResponse> put(MessageRouterPublishRequest request, Flux<? extends JsonElement> items);
 }
index d91535d..fbf90d9 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
 import com.google.gson.JsonElement;
+
+import io.vavr.collection.List;
+
 import java.time.Duration;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 import reactor.core.publisher.Flux;
@@ -33,6 +41,8 @@ import reactor.core.publisher.Mono;
  */
 public interface MessageRouterSubscriber {
 
+    void setConsumer(Consumer<String, String> consumer);
+    void close();
     Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request);
 
     default Flux<JsonElement> getElements(MessageRouterSubscribeRequest request) {
@@ -49,4 +59,5 @@ public interface MessageRouterSubscriber {
     default Flux<JsonElement> subscribeForElements(MessageRouterSubscribeRequest request, Duration period) {
         return Flux.interval(period).concatMap(i->getElements(request));
     }
+    
 }
index 9f534d8..4ea80e4 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,19 +24,43 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
 import io.vavr.Tuple;
 import io.vavr.Tuple2;
 import io.vavr.control.Option;
+
+import org.apache.kafka.clients.admin.AdminClient;
 import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since April 2019
  */
-final class Commons {
+public final class Commons {
+    static String commonInURL = "/events/";
+    static String KAFKA_PROPS_PREFIX = "kafka.";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Commons.class);
+    private static AdminClient kafkaAdminClient;
+    private static Map<String,Object> map = new HashMap<>();
 
+    static {
+        map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        map.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        map.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        map.put("max.poll.interval.ms", 300000);
+        map.put("heartbeat.interval.ms", 60000);
+        map.put("session.timeout.ms", 240000);
+        map.put("max.poll.records", 1000);
+    }
     private Commons() {
     }
 
@@ -67,4 +92,69 @@ final class Commons {
                 .map(s -> s.getBytes(StandardCharsets.UTF_8))
                 .getOrElse(new byte[0]);
     }
+    /**
+     * Extracts the topic name from the topicUrl.
+     * 
+     * <p>Condition for extracting topic name : Substring after '/events/' in the topicUrl</p>
+     * 
+     * @param topicUrl
+     * @return topic
+     */
+    public static String getTopicFromTopicUrl(String topicUrl) {
+        if(topicUrl.endsWith("/")) {
+            return topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length(), topicUrl.lastIndexOf("/"));
+        }
+        return  topicUrl.substring(topicUrl.indexOf(commonInURL)+commonInURL.length());
+    }
+    
+    public static Properties setKafkaPropertiesFromSystemEnv(Map<String, String> envs) {
+        Map<String, Object> propMap= getKafkaPropertiesMap(envs);
+        Properties props = new Properties();
+        propMap.forEach((k ,v) -> props.put(k, v));
+        map.forEach((k,v) -> {
+            if(!propMap.containsKey(k)) {
+                props.put(k, v);
+            }
+        });
+
+        return props;
+    }
+    
+    static Map<String, Object> getKafkaPropertiesMap(Map<String, String> envs){
+        Map<String, Object> propMap = new HashMap<>();
+        envs.forEach((k ,v) -> {
+            if(k.startsWith(KAFKA_PROPS_PREFIX)){
+                String key = k.substring(KAFKA_PROPS_PREFIX.length());
+                propMap.put(key, v);
+            }
+        });
+        return propMap;
+    }
+
+    public static void closeKafkaAdminClient() {
+        if(kafkaAdminClient != null) {
+            LOGGER.info("Closing the Kafka AdminClient.");
+            kafkaAdminClient.close();
+            kafkaAdminClient=null;
+        }
+    }
+
+    public static boolean checkIfTopicIsPresentInKafka(String topic, Properties adminProps) {
+        if(kafkaAdminClient == null) {
+            kafkaAdminClient = AdminClient.create(adminProps);
+        }
+        try {
+            for (String name : kafkaAdminClient.listTopics().names().get()) {
+                if (name.equals(topic)) {
+                    LOGGER.debug("TOPIC_NAME: {} is equal to : {}", name, topic);
+                    return true;
+                }
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOGGER.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e);
+            return false;
+        }
+        return false;
+    }
+    
 }
index 534fca6..329c2a3 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,12 +23,28 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
 import io.netty.handler.timeout.ReadTimeoutException;
 import io.vavr.collection.HashMap;
 import io.vavr.collection.List;
 import io.vavr.collection.Map;
+import io.vavr.collection.Stream;
 import io.vavr.control.Option;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
 import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
@@ -54,9 +71,14 @@ import reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitExcepti
 
 import java.net.ConnectException;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.setKafkaPropertiesFromSystemEnv;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -67,24 +89,136 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
     private final int maxBatchSize;
     private final Duration maxBatchDuration;
     private final ClientErrorReasonPresenter clientErrorReasonPresenter;
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class);
-
-    public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) {
+    private static Properties props;
+    private static final String kafkaBootstrapServers = "BOOTSTRAP_SERVERS";
+    private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class);   
+    private static Producer<String, String> kafkaProducer;
+    public static Future<RecordMetadata> future;
+    static boolean flag;
+    static Exception exception;
+    public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration, ClientErrorReasonPresenter clientErrorReasonPresenter) throws Exception {
         this.httpClient = httpClient;
         this.maxBatchSize = maxBatchSize;
         this.maxBatchDuration = maxBatchDuration;
         this.clientErrorReasonPresenter = clientErrorReasonPresenter;
+        setProperties();
     }
-
+    
+    /**
+     * New constructor that does not take DMaaP parameters as arguments.
+     * 
+     * @throws Exception
+     */
+    public MessageRouterPublisherImpl() throws Exception {
+        this.httpClient = null;
+        this.maxBatchSize = 0;
+        this.maxBatchDuration = null;
+        this.clientErrorReasonPresenter = null;
+        setProperties();
+    }
+    
+//    @Override
+//    public Flux<MessageRouterPublishResponse> put(
+//            MessageRouterPublishRequest request,
+//            Flux<? extends JsonElement> items) {
+//        return items.bufferTimeout(maxBatchSize, maxBatchDuration)
+//                .flatMap(subItems -> subItems.isEmpty() ? Mono.empty() : pushBatchToMr(request, List.ofAll(subItems)));
+//    }
+    
+    
     @Override
     public Flux<MessageRouterPublishResponse> put(
             MessageRouterPublishRequest request,
             Flux<? extends JsonElement> items) {
-        return items.bufferTimeout(maxBatchSize, maxBatchDuration)
-                .flatMap(subItems -> subItems.isEmpty() ? Mono.empty() : pushBatchToMr(request, List.ofAll(subItems)));
+        flag = true;
+        exception=null;
+        future = null;
+        List<String> batch = getBatch(items);
+        String topic = getTopicFromTopicUrl(request.sinkDefinition().topicUrl());
+        LOGGER.info("Topic extracted from URL {} is : {} ",request.sinkDefinition().topicUrl(),topic);
+        LOGGER.info("Sending a batch of {} items for topic {} to kafka", batch.size(),topic);
+        LOGGER.trace("The items to be sent: {}", batch);
+        if(kafkaProducer == null) {
+            kafkaProducer = new KafkaProducer<>(props);
+        }
+        Flux<MessageRouterPublishResponse> response;
+        try {
+            
+            for (String msg : batch) {
+                 ProducerRecord<String, String> data =
+                        new ProducerRecord<>(topic,  msg);
+                 future = kafkaProducer.send(data,new Callback() {
+                    
+                    @Override
+                    public void onCompletion(RecordMetadata metadata, Exception e) {
+                        
+                        if(e != null) {
+                            flag=false;
+                            exception = e;
+                        } 
+                    }
+                });
+            }
+            if(flag) {
+                LOGGER.info("Sent a batch of {} items for topic {} to kafka", batch.size(),topic);
+                response = Flux.just(ImmutableMessageRouterPublishResponse.builder().items(List.ofAll(items.collectList().block())).build());
+            }else {
+                throw exception;
+            }
+        }catch(Exception e) {
+            LOGGER.error("Error while publishing the messages : {}",e.getStackTrace());
+            response = Flux.just(ImmutableMessageRouterPublishResponse.builder()
+                    .failReason(e.getMessage())
+                    .build());
+        }
+        return response;
     }
-
+    
+    @Override
+    public void close() {
+        LOGGER.info("Closing the Kafka Producer");
+        if(kafkaProducer != null) {
+            kafkaProducer.close();
+            kafkaProducer=null;
+        }
+    }
+    
+    @Override
+    public void setKafkaProducer(Producer<String, String> kafkaProducer) {
+        this.kafkaProducer = kafkaProducer;
+    }
+    
+    public static Future<RecordMetadata> getFuture(){
+        return future;
+    }
+    
+    void setProperties() throws Exception {
+        props = setKafkaPropertiesFromSystemEnv(System.getenv());
+        
+        if(System.getenv(kafkaBootstrapServers) == null) { 
+            LOGGER.error("Environment Variable "+ kafkaBootstrapServers+" is missing");
+            throw new Exception("Environment Variable "+ kafkaBootstrapServers+" is missing");
+        }else {
+            props.put("bootstrap.servers", System.getenv(kafkaBootstrapServers));
+        }
+        if(System.getenv("JAAS_CONFIG") == null) {
+            LOGGER.info("Not using any authentication for kafka interaction");
+        }else {
+            LOGGER.info("Using {} authentication provided for kafka interaction",ScramMechanism.SCRAM_SHA_512.mechanismName());
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name);
+            props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName());
+            props.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG"));
+        }
+    }
+    
+    static List<String> getBatch(Flux<? extends JsonElement> items){
+        java.util.List<String> list = new ArrayList<>();
+        items.map(msg -> msg.toString()).collectList().subscribe(data -> list.addAll(data));
+        return List.ofAll(list);
+        
+    }
+    
     private Publisher<? extends MessageRouterPublishResponse> pushBatchToMr(
             MessageRouterPublishRequest request,
             List<JsonElement> batch) {
@@ -153,4 +287,5 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
                 .getOrElse(HashMap.empty());
         return headers.put(HttpHeaders.CONTENT_TYPE, request.contentType().toString());
     }
+
 }
index d98e8d3..c90a806 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,6 +30,16 @@ import io.vavr.collection.HashMap;
 import io.vavr.collection.List;
 import io.vavr.collection.Map;
 import io.vavr.control.Option;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
@@ -40,19 +51,28 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRout
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.net.ConnectException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Properties;
 
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.checkIfTopicIsPresentInKafka;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.setKafkaPropertiesFromSystemEnv;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
@@ -63,27 +83,134 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
     private final Gson gson;
     private final ClientErrorReasonPresenter clientErrorReasonPresenter;
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class);
-
+    private static Properties props;
+    private static final String kafkaBootstrapServers = "BOOTSTRAP_SERVERS";
+    private static Consumer<String, String> consumer;
+    
     public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson,
-                                       ClientErrorReasonPresenter clientErrorReasonPresenter) {
+                                       ClientErrorReasonPresenter clientErrorReasonPresenter) throws Exception {
         this.httpClient = httpClient;
         this.gson = gson;
         this.clientErrorReasonPresenter = clientErrorReasonPresenter;
+        setProperties();
+    }
+    
+    /**
+     * New constructor that does not take DMaaP parameters as arguments.
+     * 
+     * @throws Exception
+     */
+    public MessageRouterSubscriberImpl() throws Exception {
+        this.httpClient = null;
+        this.gson = null;
+        this.clientErrorReasonPresenter = null;
+        setProperties();
     }
 
     @Override
     public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
-        LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
-        return httpClient.call(buildGetHttpRequest(request))
-                .map(this::buildGetResponse)
-                .doOnError(ReadTimeoutException.class,
-                        e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
-                .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
-                .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
-                .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
-                .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse())));
+        LOGGER.info("Requesting new items from DMaaP MR: {}", request);
+        String topic = getTopicFromTopicUrl(request.sourceDefinition().topicUrl());
+       
+        String fakeGroupName = request.consumerGroup(); 
+        props.put("client.id", request.consumerId());
+        props.put("group.id", fakeGroupName);
+        
+        try{
+            if (consumer == null) {
+                if(!checkIfTopicIsPresentInKafka(topic,getAdminProps())) {
+                    LOGGER.error("No such topic exists, TOPIC_NAME : {}", topic);
+                    return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+                            .failReason("404 Topic Not Found")
+                            .build());
+                }
+                consumer = getKafkaConsumer(props);
+                consumer.subscribe(Arrays.asList(topic));
+            }
+        ArrayList<String> msgs = new ArrayList<>();
+        
+            ConsumerRecords<String, String> records = null;
+            synchronized (consumer) {
+                records = consumer.poll(Duration.ofMillis(500));
+            }
+                for (ConsumerRecord<String, String> rec : records) {
+                    msgs.add(rec.value());
+                }
+            List<JsonElement> list = List.ofAll(msgs).map(r -> JsonParser.parseString(r));
+            return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+                    .items(list)
+                    .build());
+        } catch(Exception e) {
+            LOGGER.error("Error while consuming the messages : {}",e.getMessage());
+            return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
+                    .failReason(e.getMessage())
+                    .build());
+        }
     }
-
+    
+    @Override
+    public void setConsumer(Consumer<String, String> consumer) {
+        MessageRouterSubscriberImpl.consumer = consumer;
+    }
+    
+    public static KafkaConsumer<String, String> getKafkaConsumer(Properties props){
+        return new KafkaConsumer<>(props);
+    }
+    
+    @Override
+    public void close(){
+        if(consumer != null) {
+            LOGGER.info("Closing the Kafka Consumer");
+            consumer.close();
+            consumer = null;
+        }
+        Commons.closeKafkaAdminClient();
+    }
+    
+    void setProperties() throws Exception {
+        props = setKafkaPropertiesFromSystemEnv(System.getenv());
+        
+        if(System.getenv(kafkaBootstrapServers) == null) { 
+            LOGGER.error("Environment Variable "+ kafkaBootstrapServers+" is missing");
+            throw new Exception("Environment Variable "+ kafkaBootstrapServers+" is missing");
+        }else {
+            props.put("bootstrap.servers", System.getenv(kafkaBootstrapServers));
+        }
+        props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,false);
+        
+        if(System.getenv("JAAS_CONFIG") == null) {
+            LOGGER.info("Not using any authentication for kafka interaction");
+        }else {
+            LOGGER.info("Using {} authentication provided for kafka interaction",ScramMechanism.SCRAM_SHA_512.mechanismName());
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name);
+            props.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName());
+            props.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG"));
+        }     
+    }
+      
+//    @Override
+//    public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
+//        LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
+//        return httpClient.call(buildGetHttpRequest(request))
+//                .map(this::buildGetResponse)
+//                .doOnError(ReadTimeoutException.class,
+//                        e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
+//                .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
+//                .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
+//                .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
+//                .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse())));
+//    }
+    public static Properties getAdminProps() {
+        Properties adminProps = new Properties();
+        adminProps.put("bootstrap.servers", System.getenv(kafkaBootstrapServers));
+        if(System.getenv("JAAS_CONFIG") != null) {
+            adminProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,SecurityProtocol.SASL_PLAINTEXT.name);
+            adminProps.put(SaslConfigs.SASL_MECHANISM, ScramMechanism.SCRAM_SHA_512.mechanismName());
+            adminProps.put(SaslConfigs.SASL_JAAS_CONFIG, System.getenv("JAAS_CONFIG"));
+        }
+        return adminProps;
+    }
+    
     private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
         return ImmutableHttpRequest.builder()
                 .method(HttpMethod.GET)
@@ -132,4 +259,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
                 .map(HashMap::of)
                 .getOrElse(HashMap.empty());
     }
+
+    
+    
 }
index 5b1984d..a1f9ac9 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -38,7 +39,6 @@ final class DMaapContainer {
     static DockerComposeContainer createContainerInstance(){
         return new DockerComposeContainer(
                 new File(DOCKER_COMPOSE_FILE_PATH))
-                .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)
                 .withLocalCompose(true);
     }
 
index a1ad951..a806ba1 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,13 +24,22 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import io.vavr.collection.List;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedStatic;
 import org.mockserver.client.MockServerClient;
 import org.mockserver.matchers.Times;
 import org.mockserver.verify.VerificationTimes;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
@@ -45,10 +55,16 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
 
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import static org.mockito.Mockito.mockStatic;
 import static org.mockserver.model.HttpRequest.request;
 import static org.mockserver.model.HttpResponse.response;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageRouterTestsUtils.createMRSubscribeRequest;
@@ -65,6 +81,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
 
+@ExtendWith(SystemStubsExtension.class)
 @Testcontainers
 class MessageRouterPublisherIT {
     @Container
@@ -74,6 +91,7 @@ class MessageRouterPublisherIT {
     private static String EVENTS_PATH;
     private static String PROXY_MOCK_EVENTS_PATH;
 
+    private static final long REPEAT_SUBSCRIPTION = 20;
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
     private static final String DMAAP_400_ERROR_RESPONSE_FORMAT = "400 Bad Request\n"
             + "{"
@@ -108,22 +126,46 @@ class MessageRouterPublisherIT {
             + "}"
             + "}";
 
-    private final MessageRouterPublisher publisher = DmaapClientFactory
-            .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-    private final MessageRouterSubscriber subscriber = DmaapClientFactory
-            .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-
+    private MessageRouterPublisher publisher;
+    private MessageRouterSubscriber subscriber;
+    Mono<MessageRouterSubscribeResponse> response;
+    
+    @SystemStub
+    EnvironmentVariables environmentVariables = new EnvironmentVariables();
+    
     @BeforeAll
     static void setUp() {
         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
         PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+        //sleep introduced to wait till all containers are started
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
     }
-
+    
+    @AfterEach
+    void afterEach() {
+        publisher.close();
+        subscriber.close();
+    }
+         
     @BeforeEach
     void set() {
         MOCK_SERVER_CLIENT.reset();
+        environmentVariables
+        .set("BOOTSTRAP_SERVERS", "localhost:9092")
+        .set("kafka.auto.offset.reset","earliest");
+        publisher = DmaapClientFactory
+                .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+        subscriber = DmaapClientFactory
+                .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+        response=null;
+        
     }
-
+    
+    @Disabled
     @Test
     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
         //given
@@ -143,7 +185,8 @@ class MessageRouterPublisherIT {
                 .expectComplete()
                 .verify(TIMEOUT);
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldHandleBadRequestError() {
         //given
@@ -175,13 +218,16 @@ class MessageRouterPublisherIT {
         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
-
+        
         //when
-        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
-        Mono<MessageRouterSubscribeResponse> response = publisher
-                .put(publishRequest, jsonMessageBatch)
-                .then(subscriber.get(subscribeRequest));
-
+        publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+           .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+               if(!resp.items().isEmpty()) {
+                   response = Mono.just(resp);
+               }
+           });
+       });
         //then
         StepVerifier.create(response)
                 .expectNext(expectedResponse)
@@ -200,12 +246,15 @@ class MessageRouterPublisherIT {
         final MessageRouterPublishRequest publishRequest = createPublishRequest(topicUrl);
         final MessageRouterSubscribeRequest subscribeRequest = createMRSubscribeRequest(topicUrl, "sampleGroup", "sampleId");
         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
-
+                
         //when
-        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
-        Mono<MessageRouterSubscribeResponse> response = publisher
-                .put(publishRequest, jsonMessageBatch)
-                .then(subscriber.get(subscribeRequest));
+        publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+            .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+                if(!resp.items().isEmpty()) {
+                    response = Mono.just(resp);
+                }
+            });
+        });
 
         //then
         StepVerifier.create(response)
@@ -229,11 +278,13 @@ class MessageRouterPublisherIT {
         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
         //when
-        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
-        Mono<MessageRouterSubscribeResponse> response = publisher
-                .put(publishRequest, plainBatch)
-                .then(subscriber.get(subscribeRequest));
-
+        publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+            .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+                if(!resp.items().isEmpty()) {
+                    response = Mono.just(resp);
+                }
+            });
+        });
         //then
         StepVerifier.create(response)
                 .expectNext(expectedResponse)
@@ -256,10 +307,13 @@ class MessageRouterPublisherIT {
         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
         //when
-        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
-        Mono<MessageRouterSubscribeResponse> response = publisher
-                .put(publishRequest, plainBatch)
-                .then(subscriber.get(subscribeRequest));
+        publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+            .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+                if(!resp.items().isEmpty()) {
+                    response = Mono.just(resp);
+                }
+            });
+        });
 
         //then
         StepVerifier.create(response)
@@ -283,10 +337,13 @@ class MessageRouterPublisherIT {
         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
         //when
-        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
-        Mono<MessageRouterSubscribeResponse> response = publisher
-                .put(publishRequest, plainBatch)
-                .then(subscriber.get(subscribeRequest));
+        publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+            .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+                if(!resp.items().isEmpty()) {
+                    response = Mono.just(resp);
+                }
+            });
+        });
 
         //then
         StepVerifier.create(response)
@@ -310,10 +367,13 @@ class MessageRouterPublisherIT {
         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
         //when
-        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
-        Mono<MessageRouterSubscribeResponse> response = publisher
-                .put(publishRequest, plainBatch)
-                .then(subscriber.get(subscribeRequest));
+        publisher.put(publishRequest, plainBatch).repeat(REPEAT_SUBSCRIPTION)
+            .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+                if(!resp.items().isEmpty()) {
+                    response = Mono.just(resp);
+                }
+            });
+        });
 
         //then
         StepVerifier.create(response)
@@ -321,7 +381,8 @@ class MessageRouterPublisherIT {
                 .expectComplete()
                 .verify();
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
         //given
@@ -347,7 +408,8 @@ class MessageRouterPublisherIT {
 
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldRetryWhenRetryableHttpCodeAndSuccessfullyPublish() {
         //given
@@ -380,7 +442,8 @@ class MessageRouterPublisherIT {
 
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublish() {
         //given
@@ -412,7 +475,8 @@ class MessageRouterPublisherIT {
 
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldRetryManyTimesAndSuccessfullyPublish() {
         //given
@@ -453,7 +517,8 @@ class MessageRouterPublisherIT {
 
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldHandleLastRetryError500() {
         //given
@@ -489,7 +554,8 @@ class MessageRouterPublisherIT {
 
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldSuccessfullyPublishWhenConnectionPoolConfigurationIsSet() {
         //given
@@ -521,7 +587,8 @@ class MessageRouterPublisherIT {
 
         MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(1));
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldRetryWhenClientTimeoutAndSuccessfullyPublishWithConnectionPoolConfiguration() {
         //given
@@ -553,7 +620,8 @@ class MessageRouterPublisherIT {
 
         MOCK_SERVER_CLIENT.verify(request().withPath(path).withKeepAlive(true), VerificationTimes.exactly(2));
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldSuccessfullyPublishSingleMessageWithBasicAuthHeader() {
         //given
@@ -581,7 +649,8 @@ class MessageRouterPublisherIT {
         MOCK_SERVER_CLIENT.verify(request().withPath(path)
                 .withHeader("Authorization" ,"Basic dXNlcm5hbWU6cGFzc3dvcmQ="), VerificationTimes.exactly(1));
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldHandleError429WhenConnectionPollLimitsHasBeenReached() {
         //given
index 97fd26f..816021b 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,13 +24,24 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonPrimitive;
 import io.vavr.collection.List;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.Ignore;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterPublisherImpl;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
@@ -40,10 +52,17 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
 
 import java.time.Duration;
+import java.util.concurrent.Future;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay;
@@ -52,6 +71,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since May 2019
  */
+@ExtendWith(SystemStubsExtension.class)
 class MessageRouterPublisherTest {
 
     private static final String ERROR_MESSAGE = "Something went wrong";
@@ -71,9 +91,9 @@ class MessageRouterPublisherTest {
     private static final List<String> messageBatchItems = List.of("ala", "ma", "kota");
     private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
     private static final DummyHttpServer SERVER = initialize();
-    private MessageRouterPublisher sut = DmaapClientFactory
-            .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-
+    private MessageRouterPublisher sut;
+    MockProducer<String, String> mockProducer = 
+            new MockProducer<>(true, new StringSerializer(), new StringSerializer());
     private static DummyHttpServer initialize() {
         return DummyHttpServer.start(routes -> routes
                 .post(SUCCESS_RESP_TOPIC_PATH, (req, resp) -> sendString(resp, Mono.just("OK")))
@@ -86,7 +106,25 @@ class MessageRouterPublisherTest {
                 .post(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
         );
     }
-
+    
+    @SystemStub
+    EnvironmentVariables environmentVariables = new EnvironmentVariables();
+    
+    @BeforeEach
+    void setUp() {
+        environmentVariables
+        .set("BOOTSTRAP_SERVERS", "localhost:9092")
+        .set("JAAS_CONFIG", "jaas.config");
+        
+        sut = DmaapClientFactory
+                .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+        sut.setKafkaProducer(mockProducer);
+    }
+    @AfterEach
+    void afterEach() {
+        sut.close();
+    }
+    
     @Test
     void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch() {
         //given
@@ -102,7 +140,45 @@ class MessageRouterPublisherTest {
                 .expectComplete()
                 .verify(TIMEOUT);
     }
+    
+    @Test
+    void test_put_givenMessageBatch_shouldMakeSuccessfulPostRequestReturningBatch_ForConstructorWithoutDMaapParameters() throws Exception {
+        //given
+        final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER);
+        final List<JsonElement> expectedItems = messageBatchItems.map(JsonPrimitive::new);
+        sut = new MessageRouterPublisherImpl();
+        sut.setKafkaProducer(mockProducer);
+        //when
+        final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
 
+        //then
+        StepVerifier.create(result)
+                .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build())
+                .expectComplete()
+                .verify(TIMEOUT);
+    }
+    @Test
+    void publisher_shouldHandleError() {
+         
+         sut.setKafkaProducer(mockProducer);
+         
+         final MessageRouterPublishRequest mrRequest = createTextPlainMRRequest(SUCCESS_RESP_TOPIC_PATH, SERVER);
+         
+         //when
+         final Flux<MessageRouterPublishResponse> result = sut.put(mrRequest, messageBatch);
+         RuntimeException e = new RuntimeException();
+         mockProducer.errorNext(e);
+         Future<RecordMetadata> record =MessageRouterPublisherImpl.getFuture();
+         try{
+             record.get();
+         }catch(Exception ex) {
+            assertEquals(e, ex);
+         }
+         assertTrue(record.isDone());
+         
+    }
+    
+    @Disabled
     @ParameterizedTest
     @CsvSource({
             FAILING_WITH_400_RESP_PATH + "," + "400 Bad Request",
@@ -126,7 +202,8 @@ class MessageRouterPublisherTest {
                 .expectComplete()
                 .verify(TIMEOUT);
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldHandleClientTimeoutError() {
         //given
@@ -142,7 +219,8 @@ class MessageRouterPublisherTest {
                 .expectComplete()
                 .verify(TIMEOUT);
     }
-
+    
+    @Disabled
     @Test
     void publisher_shouldHandleConnectionError() {
         //given
@@ -179,9 +257,7 @@ class MessageRouterPublisherTest {
     private static MessageRouterSink createMRSink(String topicPath, DummyHttpServer dummyHttpServer) {
         return ImmutableMessageRouterSink.builder()
                 .name("the topic")
-                .topicUrl(String.format("http://%s:%d%s",
-                        dummyHttpServer.host(),
-                        dummyHttpServer.port(),
+                .topicUrl(String.format("http://dmaap-mr%s",
                         topicPath)
                 )
                 .build();
index 3d43e81..48a1245 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,9 +24,13 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import io.vavr.collection.List;
+
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockserver.client.MockServerClient;
 import org.mockserver.matchers.Times;
 import org.mockserver.verify.VerificationTimes;
@@ -43,6 +48,9 @@ import org.testcontainers.junit.jupiter.Testcontainers;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
 
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
@@ -62,6 +70,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaa
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.PROXY_MOCK_SERVICE_EXPOSED_PORT;
 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DMaapContainer.createContainerInstance;
 
+@ExtendWith(SystemStubsExtension.class)
 @Testcontainers
 class MessageRouterSubscriberIT {
     @Container
@@ -71,16 +80,11 @@ class MessageRouterSubscriberIT {
     private static String EVENTS_PATH;
     private static String PROXY_MOCK_EVENTS_PATH;
 
+    private static final long REPEAT_SUBSCRIPTION = 20;
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
     private static final String CONSUMER_GROUP = "group1";
     private static final String CONSUMER_ID = "consumer200";
-    private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Not Found\n" +
-            "{" +
-            "\"mrstatus\":3001," +
-            "\"helpURL\":\"http://onap.readthedocs.io\"," +
-            "\"message\":\"No such topic exists.-[%s]\"," +
-            "\"status\":404" +
-            "}";
+    private static final String DMAAP_404_ERROR_RESPONSE_FORMAT = "404 Topic Not Found";
     private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
             + "{"
             + "\"requestError\":"
@@ -94,22 +98,44 @@ class MessageRouterSubscriberIT {
             + "}"
             + "}";
 
-    private MessageRouterPublisher publisher = DmaapClientFactory
-            .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
-    private MessageRouterSubscriber subscriber = DmaapClientFactory
-            .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
-
+    private MessageRouterPublisher publisher;
+    private MessageRouterSubscriber subscriber;
+    Mono<MessageRouterSubscribeResponse> response;
+    @SystemStub
+    EnvironmentVariables environmentVariables = new EnvironmentVariables();
+    
     @BeforeAll
     static void setUp() {
         EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, DMAAP_SERVICE_EXPOSED_PORT);
         PROXY_MOCK_EVENTS_PATH = String.format("http://%s:%d/events", LOCALHOST, PROXY_MOCK_SERVICE_EXPOSED_PORT);
+       //sleep introduced to wait till all containers are started
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
     }
 
     @BeforeEach
     void set() {
         MOCK_SERVER_CLIENT.reset();
+        environmentVariables
+        .set("BOOTSTRAP_SERVERS", "localhost:9092")
+        .set("kafka.auto.offset.reset","earliest");
+        publisher = DmaapClientFactory
+                .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+        subscriber = DmaapClientFactory
+                .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+        response=null;
+    }
+    
+    @AfterEach
+    void afterEach() {
+        if(publisher != null)
+            publisher.close();
+        if(subscriber != null)
+            subscriber.close();
     }
-
     @Test
     void subscriber_shouldHandleNoSuchTopicException() {
         //given
@@ -144,11 +170,13 @@ class MessageRouterSubscriberIT {
         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedItems);
 
         //when
-        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
-        Mono<MessageRouterSubscribeResponse> response = publisher
-                .put(publishRequest, jsonMessageBatch)
-                .then(subscriber.get(subscribeRequest));
-
+        publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+            .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+                if(!resp.items().isEmpty()) {
+                    response = Mono.just(resp);
+                }
+            });
+        });
         //then
         StepVerifier.create(response)
                 .expectNext(expectedResponse)
@@ -171,11 +199,14 @@ class MessageRouterSubscriberIT {
         final MessageRouterSubscribeResponse expectedResponse = successSubscribeResponse(expectedElements);
 
         //when
-        registerTopic(publisher, publishRequest, subscriber, subscribeRequest);
-        Mono<MessageRouterSubscribeResponse> response = publisher
-                .put(publishRequest, jsonMessageBatch)
-                .then(subscriber.get(subscribeRequest));
-
+        publisher.put(publishRequest, jsonMessageBatch).repeat(REPEAT_SUBSCRIPTION)
+            .subscribe(r -> { subscriber.get(subscribeRequest).subscribe(resp -> {
+                if(!resp.items().isEmpty()) {
+                    response = Mono.just(resp);
+                }
+            });
+        });
+        
         //then
         StepVerifier.create(response)
                 .expectNext(expectedResponse)
@@ -183,6 +214,7 @@ class MessageRouterSubscriberIT {
                 .verify();
     }
 
+    @Disabled 
     @Test
     void subscriber_shouldExtractItemsFromResponse() {
         //given
@@ -208,7 +240,8 @@ class MessageRouterSubscriberIT {
                 .expectComplete()
                 .verify(TIMEOUT);
     }
-
+    
+    @Disabled
     @Test
     void subscriber_shouldSubscribeToTopic() {
         //given
@@ -235,7 +268,8 @@ class MessageRouterSubscriberIT {
                 .expectComplete()
                 .verify(TIMEOUT);
     }
-
+    
+    @Disabled
     @Test
     void subscriber_shouldHandleTimeoutException() {
         //given
@@ -261,6 +295,7 @@ class MessageRouterSubscriberIT {
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(1));
     }
 
+    @Disabled
     @Test
     void subscriber_shouldRetryWhenRetryableHttpCodeAndSuccessfullySubscribe() {
         //given
@@ -298,6 +333,7 @@ class MessageRouterSubscriberIT {
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
     }
 
+    @Disabled
     @Test
     void subscriber_shouldRetryWhenClientTimeoutAndSuccessfullySubscribe() {
         //given
@@ -336,6 +372,7 @@ class MessageRouterSubscriberIT {
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
     }
 
+    @Disabled
     @Test
     void subscriber_shouldRetryManyTimesAndSuccessfullySubscribe() {
         //given
@@ -383,6 +420,7 @@ class MessageRouterSubscriberIT {
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(5));
     }
 
+    @Disabled
     @Test
     void subscriber_shouldHandleLastRetryError500() {
         //given
@@ -416,6 +454,7 @@ class MessageRouterSubscriberIT {
         MOCK_SERVER_CLIENT.verify(request().withPath(path), VerificationTimes.exactly(2));
     }
 
+    @Disabled
     @Test
     void subscriber_shouldSubscribeToTopicWithConnectionPoolConfiguration() {
         //given
@@ -445,6 +484,7 @@ class MessageRouterSubscriberIT {
                 .verify(TIMEOUT);
     }
 
+    @Disabled
     @Test
     void subscriber_shouldHandleSingleItemResponseWithBasicAuthHeader() {
         //given
index e928f03..db1fb4f 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -23,12 +24,27 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonPrimitive;
 import io.vavr.collection.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.mockito.MockedStatic;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterSubscriberImpl;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
@@ -39,10 +55,19 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.Me
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
+import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
+import uk.org.webcompere.systemstubs.jupiter.SystemStub;
+import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
 
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Properties;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
 import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendWithDelay;
@@ -51,6 +76,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.Du
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since May 2019
  */
+@ExtendWith(SystemStubsExtension.class)
 class MessageRouterSubscriberTest {
     private static final Duration TIMEOUT = Duration.ofSeconds(10);
     private static final String ERROR_MESSAGE = "Something went wrong";
@@ -64,6 +90,10 @@ class MessageRouterSubscriberTest {
     private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
     private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429";
     private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500";
+    
+    private static final String POLL_EXCEPTION_MESSAGE = "Poll Exception";
+    private static final String TOPIC_NOT_FOUND_ERROR_MESSAGE = "404 Topic Not Found";
+    
 
     private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP);
 
@@ -85,13 +115,15 @@ class MessageRouterSubscriberTest {
     private static final DummyHttpServer DISPOSED_HTTP_SERVER = initialize().closeAndGet();
     private static final DummyHttpServer SERVER = initialize();
 
-    private MessageRouterSubscriber sut = DmaapClientFactory
-            .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+    private MessageRouterSubscriber sut;
     private static MessageRouterSource sourceDefinition = createMessageRouterSource(SERVER);
     private static MessageRouterSource failingSourceDefinition = createMessageRouterSource(DISPOSED_HTTP_SERVER);
     private static MessageRouterSubscribeRequest mrSuccessRequest = createSuccessRequest(sourceDefinition);
     private static MessageRouterSubscribeRequest mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
-
+    static MockConsumer<String, String> mockConsumer;// = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+    Properties prop = new Properties();
+    static MockedStatic<Commons> commonsMock;
+    
     private static DummyHttpServer initialize() {
         return DummyHttpServer.start(routes -> routes
                 .get(SUCCESS_RESP_PATH, (req, resp) ->
@@ -103,9 +135,61 @@ class MessageRouterSubscriberTest {
                 .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> sendError(resp, 429, ERROR_MESSAGE))
                 .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> sendError(resp, 500, ERROR_MESSAGE)));
     }
-
+    
+    @SystemStub
+    EnvironmentVariables environmentVariables = new EnvironmentVariables();
+    
+    @BeforeAll
+    static void set() {
+        commonsMock = mockStatic(Commons.class);
+    }
+    @AfterEach
+    void afterEach() {
+        sut.close();
+    }
+    @AfterAll
+    static void after() {
+        commonsMock.close();
+    }
+    @BeforeEach
+    void setup() {
+        
+        when(Commons.setKafkaPropertiesFromSystemEnv(System.getenv())).thenReturn(prop);
+        mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        environmentVariables
+        .set("BOOTSTRAP_SERVERS", "localhost:9092")
+        .set("JAAS_CONFIG", "jaas.config");
+        
+        sut = DmaapClientFactory
+                .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+        
+        configureMockConsumer();
+        sut.setConsumer(mockConsumer);
+        
+        
+    }
+    
+    private void configureMockConsumer() {
+        mockConsumer.assign(Arrays.asList(new TopicPartition("TOPIC", 0)));
+
+        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(new TopicPartition("TOPIC", 0), 0L);
+        mockConsumer.updateBeginningOffsets(beginningOffsets);
+        mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 0L, "key", "I"));
+        mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 1L, "key", "like"));
+        mockConsumer.addRecord(new ConsumerRecord<String, String>("TOPIC", 0, 2L, "key", "pizza"));
+    }
+    
+    private void stubForTopicCheck(boolean response) {
+        
+        when(Commons.checkIfTopicIsPresentInKafka("TOPIC",MessageRouterSubscriberImpl.getAdminProps())).thenReturn(response);
+        when(Commons.getTopicFromTopicUrl("http://dmaap-mr/events/TOPIC")).thenReturn("TOPIC");
+    }
+    
     @Test
     void subscriber_shouldGetCorrectResponse() {
+        
+        stubForTopicCheck(true);
         Mono<MessageRouterSubscribeResponse> response = sut
                 .get(mrSuccessRequest);
 
@@ -119,9 +203,49 @@ class MessageRouterSubscriberTest {
         StepVerifier.create(response)
                 .expectNext(expectedResponse)
                 .expectComplete()
-                .verify(TIMEOUT);
+                .verify();
     }
+    
+    @Test
+    void subscriber_shouldGetCorrectResponse_ForConstructorWithoutDMaapParameters() throws Exception {
+        sut = new MessageRouterSubscriberImpl();
+        sut.setConsumer(mockConsumer);
+        stubForTopicCheck(true);
+        Mono<MessageRouterSubscribeResponse> response = sut
+                .get(mrSuccessRequest);
+
+        List<String> expectedItems = List.of("I", "like", "pizza");
 
+        MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+                .builder()
+                .items(expectedItems.map(JsonPrimitive::new))
+                .build();
+
+        StepVerifier.create(response)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify();
+    }
+    
+    @Test
+    void whenTopicNotFound_shouldReturnError() {
+        stubForTopicCheck(false);
+        sut.setConsumer(null);
+         Mono<MessageRouterSubscribeResponse> response = sut
+                 .get(mrSuccessRequest);
+
+         MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+                 .builder()
+                 .failReason(TOPIC_NOT_FOUND_ERROR_MESSAGE)
+                 .build();
+
+         StepVerifier.create(response)
+                 .expectNext(expectedResponse)
+                 .expectComplete()
+                 .verify(TIMEOUT);
+    }
+    
+    @Disabled
     @ParameterizedTest
     @CsvSource({
             FAILING_WITH_401_CONSUMER_ID + "," + "401 Unauthorized",
@@ -144,29 +268,60 @@ class MessageRouterSubscriberTest {
 
     @Test
     void subscriber_shouldParseCorrectResponse() {
+        stubForTopicCheck(true);
         final Flux<String> result = sut
                 .getElements(mrSuccessRequest)
                 .map(JsonElement::getAsString);
-
         StepVerifier.create(result)
                 .expectNext("I", "like", "pizza")
                 .expectComplete()
                 .verify(TIMEOUT);
     }
-
+    
+    @Test
+    void whenSubscribingToTopicAndExceptionOccurs_thenExpectExceptionIsHandledCorrectly() {
+        stubForTopicCheck(true);
+        MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.schedulePollTask(() -> {
+            consumer.setPollException(new KafkaException(POLL_EXCEPTION_MESSAGE));
+        });
+        HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
+        TopicPartition tp = new TopicPartition("TOPIC", 0);
+        startOffsets.put(tp, 0L);
+        consumer.updateBeginningOffsets(startOffsets);
+        sut.setConsumer(consumer);
+        
+        Mono<MessageRouterSubscribeResponse> response = sut
+                .get(mrSuccessRequest);
+        assertThatExceptionOfType(KafkaException.class)
+            .isThrownBy(() -> {throw new KafkaException(POLL_EXCEPTION_MESSAGE);})
+            .withMessage(POLL_EXCEPTION_MESSAGE);
+        
+        StepVerifier.create(response)
+            .expectNext(ImmutableMessageRouterSubscribeResponse.builder().failReason(POLL_EXCEPTION_MESSAGE).build())
+            .expectComplete()
+            .verify(TIMEOUT);
+        
+    }
+    
+    
     @Test
     void subscriber_shouldParseErrorResponse() {
+        stubForTopicCheck(false);
+        sut.setConsumer(null);
         Flux<String> result = sut
                 .getElements(mrFailingRequest)
                 .map(JsonElement::getAsString);
-
+   
         StepVerifier.create(result)
                 .expectError(IllegalStateException.class)
                 .verify(TIMEOUT);
     }
-
+    
+    @Disabled
     @Test
     void subscriber_shouldSubscribeCorrectly() {
+        
         Flux<String> subscriptionForElements = sut
                 .subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1))
                 .map(JsonElement::getAsString);
@@ -176,7 +331,8 @@ class MessageRouterSubscriberTest {
                 .expectComplete()
                 .verify(TIMEOUT);
     }
-
+    
+    @Disabled
     @Test
     void subscriber_shouldParseErrorWhenSubscribed() {
         Flux<String> subscriptionForElements = sut
@@ -187,7 +343,8 @@ class MessageRouterSubscriberTest {
                 .expectError(IllegalStateException.class)
                 .verify(TIMEOUT);
     }
-
+    
+    @Disabled
     @Test
     void subscriber_shouldHandleClientTimeoutError() {
         Duration requestTimeout = Duration.ofMillis(1);
@@ -200,6 +357,7 @@ class MessageRouterSubscriberTest {
                 .verify(TIMEOUT);
     }
 
+    @Disabled
     @Test
     void subscriber_shouldHandleConnectionError() {
         MessageRouterSubscribeRequest request = createSuccessRequest(failingSourceDefinition);
@@ -214,7 +372,7 @@ class MessageRouterSubscriberTest {
     private static MessageRouterSource createMessageRouterSource(DummyHttpServer server) {
         return ImmutableMessageRouterSource.builder()
                 .name("the topic")
-                .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
+                .topicUrl(String.format("http://dmaap-mr/events/TOPIC"))
                 .build();
     }
 
index 72c3592..3d35c2a 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,8 +26,13 @@ import io.vavr.Tuple2;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials;
 import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.getTopicFromTopicUrl;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Properties;
 
 class CommonsTest {
 
@@ -53,7 +59,26 @@ class CommonsTest {
         // then
         verifyBasicAuthHeader(basicAuthHeader, "Og==");
     }
-
+      
+    @Test
+    void shouldFetchTopicFromTopicURL() {
+        String topicUrl = "http://message-router:3904/events/unauthenticated.VES_PNFREG_OUTPUT";
+        String expected = "unauthenticated.VES_PNFREG_OUTPUT";
+        assertThat(getTopicFromTopicUrl(topicUrl))
+            .withFailMessage("Extracted topic name from topicUrl '%s' is not as expected topic '%s'",topicUrl, expected)
+            .isEqualTo(expected);
+    }
+    
+    @Test
+    void shouldFetchTopicFromTopicUrlEndingWithSlash() {
+        String topicUrl = "http://message-router:3904/events/unauthenticated.VES_PNFREG_OUTPUT/";
+        String expected = "unauthenticated.VES_PNFREG_OUTPUT";
+        assertThat(getTopicFromTopicUrl(topicUrl))
+            .withFailMessage("Extracted topic name from topicUrl '%s' is not as expected topic '%s'",topicUrl, expected)
+            .isEqualTo(expected);
+    }
+    
+    
     private AafCredentials create(String username, String password) {
         return ImmutableAafCredentials.builder()
                 .username(username)
index 6c6ded1..2e169dc 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -30,6 +31,8 @@ import io.netty.handler.codec.http.HttpHeaderValues;
 import io.netty.handler.timeout.ReadTimeoutException;
 import io.vavr.collection.HashMultimap;
 import io.vavr.collection.List;
+
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
@@ -68,6 +71,7 @@ import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.MessageR
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since April 2019
  */
+@Disabled
 class MessageRouterPublisherImplTest {
     private static final Duration TIMEOUT = Duration.ofSeconds(5);
     private static final String TOPIC_URL = "https://dmaap-mr/TOPIC";
@@ -75,14 +79,17 @@ class MessageRouterPublisherImplTest {
     private static final String ERROR_MESSAGE = "Something went wrong";
     private final RxHttpClient httpClient = mock(RxHttpClient.class);
     private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
-    private final MessageRouterPublisher cut = new MessageRouterPublisherImpl(
-            httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
+    private final MessageRouterPublisher cut;
     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
     private final MessageRouterPublishRequest plainPublishRequest = createPublishRequest(TOPIC_URL, ContentType.TEXT_PLAIN);
     private final MessageRouterPublishRequest jsonPublishRequest = createPublishRequest(TOPIC_URL);
     private final HttpResponse successHttpResponse = createHttpResponse("OK", 200);
     private final HttpResponse retryableHttpResponse = createHttpResponse("ERROR", 500);
-
+    
+    private MessageRouterPublisherImplTest()  throws Exception{
+        cut = new MessageRouterPublisherImpl(
+                httpClient, MAX_BATCH_SIZE, Duration.ofMinutes(1), clientErrorReasonPresenter);
+    }
     @Test
     void puttingElementsShouldYieldNonChunkedHttpRequest() {
         // given
index 006965c..373424b 100644 (file)
@@ -3,6 +3,7 @@
  * DCAEGEN2-SERVICES-SDK
  * =========================================================
  * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom AG. All rights reserved.
  * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -31,11 +32,17 @@ import static org.mockito.Mockito.verify;
 import com.google.gson.JsonSyntaxException;
 import io.netty.handler.timeout.ReadTimeoutException;
 import io.vavr.collection.HashMultimap;
+
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
 import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
@@ -50,6 +57,8 @@ import java.net.ConnectException;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since May 2019
  */
+
+@Disabled
 class MessageRouterSubscriberImplTest {
 
     private static final String ERROR_MESSAGE = "Something went wrong";
@@ -57,8 +66,7 @@ class MessageRouterSubscriberImplTest {
     private final ClientErrorReasonPresenter clientErrorReasonPresenter = mock(ClientErrorReasonPresenter.class);
     private final MessageRouterSubscriberConfig clientConfig = MessageRouterSubscriberConfig.createDefault();
     private final MessageRouterSubscriber
-            cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter);
-
+            cut;
     private final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class);
     private final MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
             .name("sample topic")
@@ -96,7 +104,10 @@ class MessageRouterSubscriberImplTest {
             .rawBody("{}".getBytes())
             .headers(HashMultimap.withSeq().empty())
             .build();
-
+    private MessageRouterSubscriberImplTest() throws Exception{
+        cut = new MessageRouterSubscriberImpl(httpClient, clientConfig.gsonInstance(),clientErrorReasonPresenter);
+    }
+    
     @Test
     void getWithProperRequest_shouldReturnCorrectResponse() {
         // given
index 26eb176..85e1b19 100644 (file)
@@ -32,7 +32,7 @@ services:
       KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 40000
       KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 40000
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
-      KAFKA_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT://kafka:9092
+      KAFKA_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT://localhost:9092
       KAFKA_LISTENERS: INTERNAL_PLAINTEXT://0.0.0.0:9092
       KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT
       KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
@@ -48,25 +48,6 @@ services:
     depends_on:
       - zookeeper
 
-  dmaap:
-    image: nexus3.onap.org:10001/onap/dmaap/dmaap-mr:1.1.18
-    ports:
-      - "3904:3904"
-      - "3905:3905"
-    environment:
-      enableCadi: 'false'
-    volumes:
-      - ./MsgRtrApi.properties:/appl/dmaapMR1/bundleconfig/etc/appprops/MsgRtrApi.properties
-      - ./logback.xml:/appl/dmaapMR1/bundleconfig/etc/logback.xml
-      - ./cadi.properties:/appl/dmaapMR1/etc/cadi.properties
-    networks:
-      net:
-        aliases:
-          - dmaap
-    depends_on:
-      - zookeeper
-      - kafka
-
   mockserver:
     image: mockserver/mockserver:mockserver-5.11.2
     command: -serverPort 1090 -proxyRemotePort 3904 -proxyRemoteHost dmaap
@@ -74,8 +55,6 @@ services:
       - "1080:1090"
     networks:
       - net
-    depends_on:
-      - dmaap
 
 networks:
   net:
index 61ffcde..b7127de 100644 (file)
@@ -1,6 +1,6 @@
 major=1
 minor=9
-patch=3
+patch=4
 base_version=${major}.${minor}.${patch}
 release_version=${base_version}
 snapshot_version=${base_version}-SNAPSHOT