[DCAEGEN2] Remove DMaaP dependency in PRH 91/136191/1 1.10.1
authorsushant53 <sushant.jadhav@t-systems.com>
Fri, 27 Oct 2023 11:14:30 +0000 (16:44 +0530)
committersushant53 <sushant.jadhav@t-systems.com>
Fri, 27 Oct 2023 11:15:02 +0000 (16:45 +0530)
Removed DMaaP dependency in PRH by using new sdk library, which
uses Kafka API directly.

Issue-ID: DCAEGEN2-3402
Change-Id: I5456ce432a9fd4a58826275a17c603379b0c18ee
Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
14 files changed:
Changelog.md
pom.xml
prh-app-server/pom.xml
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
prh-commons/pom.xml
version.properties

index 2cb689e..6b93a73 100644 (file)
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](http://keepachangelog.com/)
 and this project adheres to [Semantic Versioning](http://semver.org/).
 
+## [1.10.1] - 2023/10/27
+### Changed
+- [DCAEGEN2-3402] Remove DMaaP dependency in PRH
+
 ## [1.10.0] - 2023/09/01
 ### Changed
 - [DCAEGEN2-3365] Code changed so that the autoCommitDisabled mode of PRH use CBSContentParser for environment variables.
diff --git a/pom.xml b/pom.xml
index e5eacdd..431494d 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
 
     <groupId>org.onap.dcaegen2.services</groupId>
     <artifactId>prh</artifactId>
-    <version>1.10.0-SNAPSHOT</version>
+    <version>1.10.1-SNAPSHOT</version>
 
     <name>dcaegen2-services-prh</name>
     <description>PNF Registration Handler</description>
@@ -59,7 +59,7 @@
         <spring-cloud.version>2021.0.3</spring-cloud.version>
         <springfox.version>3.0.0</springfox.version>
         <immutables.version>2.7.5</immutables.version>
-        <sdk.version>1.9.3</sdk.version>
+        <sdk.version>1.9.4</sdk.version>
         <guava.version>29.0-jre</guava.version>
         <sonar.coverage.jacoco.xmlReportPaths>
             ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml
index 189a37c..3924971 100644 (file)
@@ -28,7 +28,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.services</groupId>
         <artifactId>prh</artifactId>
-        <version>1.10.0-SNAPSHOT</version>
+        <version>1.10.1-SNAPSHOT</version>
     </parent>
 
     <groupId>org.onap.dcaegen2.services.prh</groupId>
index 64fff9a..22763e8 100644 (file)
@@ -58,7 +58,14 @@ public class CbsConfiguration implements Config {
 
         messageRouterSubscriber = DmaapClientFactory
                 .createMessageRouterSubscriber(consulConfigurationParser.getMessageRouterSubscriberConfig());
+        String prevTopicUrl = null;
+        if(messageRouterCBSSubscribeRequest != null) {
+            prevTopicUrl = messageRouterCBSSubscribeRequest.sourceDefinition().topicUrl();
+        }
         messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest();
+        if(!messageRouterCBSSubscribeRequest.sourceDefinition().topicUrl().equals(prevTopicUrl)) {
+            messageRouterSubscriber.close();
+        }
      }
 
     @Override
@@ -95,5 +102,4 @@ public class CbsConfiguration implements Config {
                 .orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING));
     }
     
-
 }
index fcbd10a..aafcd81 100644 (file)
@@ -65,6 +65,7 @@ public class ScheduleController {
     public Mono<ResponseEntity<String>> stopTask() {
         LOGGER.trace("Receiving stop scheduling worker request");
         return Mono.defer(() -> {
+            scheduledTasksRunner.closeKafkaPublisherSubscriber();
             scheduledTasksRunner.cancelTasks();
             return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK));
         });
index e90b027..5a5eb07 100644 (file)
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledFuture;
 import javax.annotation.PreDestroy;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.PrhProperties;
 import org.springframework.boot.context.event.ApplicationStartedEvent;
 import org.springframework.context.annotation.Configuration;
@@ -46,11 +47,13 @@ public class ScheduledTasksRunner {
     private final TaskScheduler taskScheduler;
     private final ScheduledTasks scheduledTask;
     private final PrhProperties prhProperties;
+    private final CbsConfiguration cbsConfiguration;
     public ScheduledTasksRunner(TaskScheduler taskScheduler, ScheduledTasks scheduledTask,
-        PrhProperties prhProperties) {
+        PrhProperties prhProperties, CbsConfiguration cbsConfiguration) {
         this.taskScheduler = taskScheduler;
         this.scheduledTask = scheduledTask;
         this.prhProperties = prhProperties;
+        this.cbsConfiguration = cbsConfiguration;
     }
     
     @EventListener
@@ -82,4 +85,12 @@ public class ScheduledTasksRunner {
             return false;
         }
     }
+    
+    /**
+     * Function for cleaning resources for kafka subscriber and publisher.
+     */
+    public synchronized void closeKafkaPublisherSubscriber() {
+        cbsConfiguration.getMessageRouterSubscriber().close();
+        cbsConfiguration.getMessageRouterPublisher().close();
+    }
 }
index 89f1a04..80a0007 100644 (file)
@@ -43,7 +43,9 @@ public class CbsConfigurationForAutoCommitDisabledModeTest {
 
     @Test
     void beforecbsConfigurationForAutoCommitDisabledMode() throws Exception {
-        withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+        withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+        .and("BOOTSTRAP_SERVERS", "localhost:9092")
+        .execute(() -> {
             this.cbsConfigurationForAutoCommitDisabledMode();
         });
     }
index 4f3cd86..8cd7d5e 100644 (file)
@@ -26,6 +26,8 @@ import com.google.gson.JsonObject;
 import org.junit.jupiter.api.Test;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
 import static java.lang.ClassLoader.getSystemResource;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -52,19 +54,25 @@ class CbsConfigurationTest {
 
     @Test
     void cbsConfigurationShouldExposeDataReceivedAsJsonFromCbs() throws Exception {
-        JsonObject cbsConfigJson = new Gson().fromJson(
-                new String(Files.readAllBytes(Paths.get(getSystemResource("configurationFromCbs.json").toURI()))),
-                JsonObject.class);
-        CbsConfiguration cbsConfiguration = new CbsConfiguration();
+        
+        withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+        .and("BOOTSTRAP_SERVERS", "localhost:9092")
+        .execute(() -> {
+            JsonObject cbsConfigJson = new Gson().fromJson(
+                    new String(Files.readAllBytes(Paths.get(getSystemResource("configurationFromCbs.json").toURI()))),
+                    JsonObject.class);
+            CbsConfiguration cbsConfiguration = new CbsConfiguration();
 
-        cbsConfiguration.parseCBSConfig(cbsConfigJson);
+            cbsConfiguration.parseCBSConfig(cbsConfigJson);
 
-        assertThat(cbsConfiguration.getAaiClientConfiguration()).isNotNull();
-        assertThat(cbsConfiguration.getMessageRouterPublisher()).isNotNull();
-        assertThat(cbsConfiguration.getMessageRouterSubscriber()).isNotNull();
-        assertThat(cbsConfiguration.getMessageRouterPublishRequest()).isNotNull();
-        assertThat(cbsConfiguration.getMessageRouterSubscribeRequest()).isNotNull();
-        assertThat(cbsConfiguration.getMessageRouterUpdatePublishRequest()).isNotNull();
+            assertThat(cbsConfiguration.getAaiClientConfiguration()).isNotNull();
+            assertThat(cbsConfiguration.getMessageRouterPublisher()).isNotNull();
+            assertThat(cbsConfiguration.getMessageRouterSubscriber()).isNotNull();
+            assertThat(cbsConfiguration.getMessageRouterPublishRequest()).isNotNull();
+            assertThat(cbsConfiguration.getMessageRouterSubscribeRequest()).isNotNull();
+            assertThat(cbsConfiguration.getMessageRouterUpdatePublishRequest()).isNotNull();
+        });
+       
 
     }
 
index bd7d777..b9a05a9 100644 (file)
@@ -57,7 +57,9 @@ public class KafkaConfigTest {
 
     @Test
     void beforecbsConfigurationForAutoCommitDisabledMode() throws Exception {
-        withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+        withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+        .and("BOOTSTRAP_SERVERS", "localhost:9092")
+        .execute(() -> {
             this.consumerFactoryTest();
         });
     }
@@ -120,7 +122,9 @@ public class KafkaConfigTest {
 
     @Test
     void beforeKafkaListenerContainerFactoryTest() throws Exception {
-        withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+        withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+        .and("BOOTSTRAP_SERVERS", "localhost:9092")
+        .execute(() -> {
             this.kafkaListenerContainerFactoryTest();
         });
     }
index b10c1ad..21f9d09 100644 (file)
@@ -25,24 +25,35 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 import com.github.tomakehurst.wiremock.client.WireMock;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
 import com.jayway.jsonpath.JsonPath;
+
+import io.vavr.collection.List;
 import reactor.core.publisher.Flux;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
 import org.onap.dcaegen2.services.prh.MainApp;
 import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.adapter.kafka.ImmutableKafkaConfiguration;
 import org.onap.dcaegen2.services.prh.adapter.kafka.KafkaConfiguration;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
 import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
 import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
 import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksWithCommit;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.configurationprocessor.json.JSONException;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.boot.test.mock.mockito.SpyBean;
 import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -65,6 +76,9 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import static java.lang.ClassLoader.getSystemResource;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -91,6 +105,12 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest {
 
     @Autowired
     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+    
+    @SpyBean
+    CbsConfiguration cbsConfiguration;
+    
+    @Mock
+    MessageRouterPublisher publisher;
 
     @Configuration
     @Import(MainApp.class)
@@ -112,7 +132,12 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest {
             CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode = new CbsConfigurationForAutoCommitDisabledMode();
 
             try {
-                cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJson);
+                withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+                .and("BOOTSTRAP_SERVERS", "localhost:9092")
+                .execute(() -> {
+                    cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJson);
+                });
+                
             } catch (Exception e) {
                //Exception is expected as environment variable for JAAS_CONFIG is not available
                 if (e.getMessage() == "kafkaJaasConfig") {
@@ -136,7 +161,8 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest {
 
     @Test
     void beforeCbsConfigurationForAutoCommitDisabledMode() throws Exception {
-        withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+        withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+        .execute(() -> {
             this.whenThereAreNoEventsInDmaap_WorkflowShouldFinish();
         });
     }
@@ -153,7 +179,8 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest {
 
     @Test
     void beforeWhenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() throws Exception {
-        withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+        withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+        .execute(() -> {
             this.whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification();
         });
     }
@@ -175,11 +202,16 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest {
         stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY")));
 
         when(kafkaConsumerTaskImpl.execute()).thenReturn(fluxList);
-
+        
+        List<String> expectedItems = List.of(event);
+        Flux<MessageRouterPublishResponse> pubresp = Flux.just(ImmutableMessageRouterPublishResponse
+                .builder()
+                .items(expectedItems.map(JsonPrimitive::new))
+                .build());
+        when(cbsConfiguration.getMessageRouterPublisher()).thenReturn(publisher);
+        when(publisher.put(any(MessageRouterPublishRequest.class),any())).thenReturn(pubresp);
         scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
-
-        verify(1, postRequestedFor(urlEqualTo("/events/unauthenticated.PNF_READY"))
-                .withRequestBody(matchingJsonPath("$[0].correlationId", equalTo(pnfName))));
+        verify(publisher,times(1)).put(any(MessageRouterPublishRequest.class),any());
 
     }
 
index f5033ca..a77fcd7 100644 (file)
@@ -24,23 +24,40 @@ package org.onap.dcaegen2.services.prh.integration;
 import com.github.tomakehurst.wiremock.client.WireMock;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
 import com.jayway.jsonpath.JsonPath;
+
+import io.vavr.collection.List;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
 import org.onap.dcaegen2.services.prh.MainApp;
 import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
 import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
 import org.onap.dcaegen2.services.prh.tasks.ScheduledTasksRunner;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.boot.test.mock.mockito.SpyBean;
 import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Import;
 import org.springframework.test.context.ActiveProfiles;
 import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
 import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
 import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
 import static com.github.tomakehurst.wiremock.client.WireMock.ok;
@@ -57,6 +74,10 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import static java.lang.ClassLoader.getSystemResource;
 import static java.util.Collections.singletonList;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 
 @SpringBootTest
@@ -66,27 +87,40 @@ class PrhWorkflowIntegrationTest {
 
     @Autowired
     private ScheduledTasks scheduledTasks;
-
+    
+    @SpyBean
+    CbsConfiguration cbsConfiguration;
+    
     @MockBean
     private ScheduledTasksRunner scheduledTasksRunner;  // just to disable scheduling - some configurability in ScheduledTaskRunner not to start tasks at app startup would be welcome
-
-
+    
+    @Mock
+    MessageRouterSubscriber subscriber;
+    
+    @Mock
+    MessageRouterPublisher publisher;
+    
     @Configuration
     @Import(MainApp.class)
     static class CbsConfigTestConfig {
 
         @Value("http://localhost:${wiremock.server.port}")
         private String wiremockServerAddress;
-
+                
         @Bean
-        public CbsConfiguration cbsConfiguration() {
+        public CbsConfiguration cbsConfiguration() throws Exception {
             JsonObject cbsConfigJson = new Gson().fromJson(getResourceContent("configurationFromCbs.json")
                             .replaceAll("https?://dmaap-mr[\\w.]*:\\d+", wiremockServerAddress)
                             .replaceAll("https?://aai[\\w.]*:\\d+", wiremockServerAddress),
                     JsonObject.class);
-
+            
             CbsConfiguration cbsConfiguration = new CbsConfiguration();
-            cbsConfiguration.parseCBSConfig(cbsConfigJson);
+            withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+            .and("BOOTSTRAP_SERVERS", "localhost:9092")
+            .execute(() -> {
+                cbsConfiguration.parseCBSConfig(cbsConfigJson);
+            });
+            
             return cbsConfiguration;
         }
         
@@ -100,7 +134,7 @@ class PrhWorkflowIntegrationTest {
 
 
     @Test
-    void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() {
+    void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() {    
         stubFor(get(urlEqualTo("/events/unauthenticated.VES_PNFREG_OUTPUT/OpenDCAE-c12/c12"))
                 .willReturn(aResponse().withBody("[]")));
 
@@ -115,17 +149,27 @@ class PrhWorkflowIntegrationTest {
     void whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() {
         String event = getResourceContent("integration/event.json");
         String pnfName = JsonPath.read(event, "$.event.commonEventHeader.sourceName");
-
-        stubFor(get(urlEqualTo("/events/unauthenticated.VES_PNFREG_OUTPUT/OpenDCAE-c12/c12"))
-                .willReturn(ok().withBody(new Gson().toJson(singletonList(event)))));
         stubFor(get(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)).willReturn(ok().withBody("{}")));
         stubFor(patch(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)));
-        stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY")));
-
+        
+        List<String> expectedItems = List.of(event);
+        Mono<MessageRouterSubscribeResponse> resp = Mono.just(ImmutableMessageRouterSubscribeResponse
+                .builder()
+                .items(expectedItems.map(JsonPrimitive::new))
+                .build());
+        Flux<MessageRouterPublishResponse> pubresp = Flux.just(ImmutableMessageRouterPublishResponse
+                .builder()
+                .items(expectedItems.map(JsonPrimitive::new))
+                .build());
+        
+        when(cbsConfiguration.getMessageRouterSubscriber()).thenReturn(subscriber);
+        when(cbsConfiguration.getMessageRouterPublisher()).thenReturn(publisher);
+        when(subscriber.get(any(MessageRouterSubscribeRequest.class))).thenReturn(resp);
+        when(publisher.put(any(MessageRouterPublishRequest.class),any())).thenReturn(pubresp);
+        
         scheduledTasks.scheduleMainPrhEventTask();
-
-        verify(1, postRequestedFor(urlEqualTo("/events/unauthenticated.PNF_READY"))
-                .withRequestBody(matchingJsonPath("$[0].correlationId", equalTo(pnfName))));
+        verify(subscriber,times(1)).get(any(MessageRouterSubscribeRequest.class));
+        verify(publisher,times(1)).put(any(MessageRouterPublishRequest.class),any());
     }
 
 
index f33ff43..42a2e7f 100644 (file)
@@ -78,7 +78,9 @@ public class KafkaConsumerTaskImplTest {
 
     @Test
     void beforeOnMessageTest() throws Exception {
-        withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+        withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+        .and("BOOTSTRAP_SERVERS", "localhost:9092")
+        .execute(() -> {
             cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
             kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
                     dmaapConsumerJsonParser, epochDateTimeConversion);
@@ -100,7 +102,9 @@ public class KafkaConsumerTaskImplTest {
 
     @Test
     void beforeCommitOffsetTest() throws Exception {
-        withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+        withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+        .and("BOOTSTRAP_SERVERS", "localhost:9092")
+        .execute(() -> {
             cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
             kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
                     dmaapConsumerJsonParser, epochDateTimeConversion);
@@ -110,7 +114,9 @@ public class KafkaConsumerTaskImplTest {
 
     @Test
     void beforeExecuteTest() throws Exception {
-        withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+        withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+        .and("BOOTSTRAP_SERVERS", "localhost:9092")
+        .execute(() -> {
             cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
             kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
                     dmaapConsumerJsonParser, epochDateTimeConversion);
index 074ef92..d4a1c3c 100644 (file)
@@ -28,7 +28,7 @@
   <parent>
     <groupId>org.onap.dcaegen2.services</groupId>
     <artifactId>prh</artifactId>
-    <version>1.10.0-SNAPSHOT</version>
+    <version>1.10.1-SNAPSHOT</version>
   </parent>
 
   <groupId>org.onap.dcaegen2.services.prh</groupId>
index e9e5596..81b72c0 100644 (file)
@@ -1,6 +1,6 @@
 major=1
 minor=10
-patch=0
+patch=1
 base_version=${major}.${minor}.${patch}
 release_version=${base_version}
 snapshot_version=${base_version}-SNAPSHOT