Align PRH to El Alto SDK 45/87645/8
authorpwielebs <piotr.wielebski@nokia.com>
Thu, 16 May 2019 15:44:45 +0000 (17:44 +0200)
committerpwielebs <piotr.wielebski@nokia.com>
Wed, 22 May 2019 12:01:54 +0000 (14:01 +0200)
Change-Id: I65c445d76092e11084fb60c68740e1321b35708c
Issue-ID: DCAEGEN2-1501
Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
23 files changed:
pom.xml
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java [new file with mode: 0644]
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java [new file with mode: 0644]
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/TestAppConfiguration.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/ConsulConfigurationParserTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java [deleted file]
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java

diff --git a/pom.xml b/pom.xml
index 676b097..f320360 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@
     <spring-boot.version>2.1.2.RELEASE</spring-boot.version>
     <springfox.version>2.9.2</springfox.version>
     <immutables.version>2.7.5</immutables.version>
-    <sdk.version>1.1.5-SNAPSHOT</sdk.version>
+    <sdk.version>1.2.0-SNAPSHOT</sdk.version>
   </properties>
 
   <modules>
index 3ff81e1..889dae2 100644 (file)
@@ -25,8 +25,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientC
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,9 +50,9 @@ import java.util.Optional;
 public class CbsConfiguration extends PrhAppConfig {
     private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfiguration.class);
     private AaiClientConfiguration aaiClientCBSConfiguration;
-    private DmaapPublisherConfiguration dmaapPublisherCBSConfiguration;
-    private DmaapConsumerConfiguration dmaapConsumerCBSConfiguration;
-    private DmaapPublisherConfiguration dmaapUpdatePublisherCBSConfiguration;
+    private MessageRouterPublishRequest messageRouterCBSPublishRequest;
+    private MessageRouterSubscribeRequest messageRouterCBSSubscribeRequest;
+    private MessageRouterPublishRequest messageRouterCBSUpdatePublishRequest;
 
     @Autowired
     private ConsulConfigFileReader consulConfigFileReader;
@@ -82,10 +82,10 @@ public class CbsConfiguration extends PrhAppConfig {
     private void parseCBSConfig(JsonObject jsonObject) {
         LOGGER.info("Received application configuration: {}", jsonObject);
         CbsContentParser consulConfigurationParser = new CbsContentParser(jsonObject);
-        dmaapPublisherCBSConfiguration = consulConfigurationParser.getDmaapPublisherConfig();
-        dmaapUpdatePublisherCBSConfiguration = consulConfigurationParser.getDmaapUpdatePublisherConfig();
+        messageRouterCBSPublishRequest = consulConfigurationParser.getMessageRouterPublishRequest();
+        messageRouterCBSUpdatePublishRequest = consulConfigurationParser.getMessageRouterUpdatePublishRequest();
         aaiClientCBSConfiguration = consulConfigurationParser.getAaiClientConfig();
-        dmaapConsumerCBSConfiguration = consulConfigurationParser.getDmaapConsumerConfig();
+        messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest();
     }
 
     private void parsingConfigError(Throwable throwable) {
@@ -97,13 +97,13 @@ public class CbsConfiguration extends PrhAppConfig {
     }
 
     @Override
-    public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
-        return Optional.ofNullable(dmaapPublisherCBSConfiguration).orElse(super.getDmaapPublisherConfiguration());
+    public MessageRouterPublishRequest getMessageRouterPublishRequest() {
+        return Optional.ofNullable(messageRouterCBSPublishRequest).orElse(super.getMessageRouterPublishRequest());
     }
 
     @Override
-    public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() {
-        return Optional.ofNullable(dmaapUpdatePublisherCBSConfiguration).orElse(super.getDmaapPublisherConfiguration());
+    public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() {
+        return Optional.ofNullable(messageRouterCBSUpdatePublishRequest).orElse(super.getMessageRouterUpdatePublishRequest());
     }
 
     @Override
@@ -112,7 +112,7 @@ public class CbsConfiguration extends PrhAppConfig {
     }
 
     @Override
-    public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
-        return Optional.ofNullable(dmaapConsumerCBSConfiguration).orElse(super.getDmaapConsumerConfiguration());
+    public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
+        return Optional.ofNullable(messageRouterCBSSubscribeRequest).orElse(super.getMessageRouterSubscribeRequest());
     }
 }
index f19eb3e..51d8639 100644 (file)
@@ -30,11 +30,12 @@ import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientC
 import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 
+import java.time.Duration;
 import java.util.Map;
 
 import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
@@ -47,7 +48,6 @@ class CbsContentParser {
     private static final String SECURITY_TRUST_STORE_PASS_PATH = "security.trustStorePasswordPath";
     private static final String SECURITY_KEY_STORE_PATH = "security.keyStorePath";
     private static final String SECURITY_KEY_STORE_PASS_PATH = "security.keyStorePasswordPath";
-    private static final String SECURITY_ENABLE_DMAAP_CERT_AUTH = "security.enableDmaapCertAuth";
     private static final String CONFIG = "config";
     private static final String PNF_UPDATE = "pnf-update";
     private static final String PNF_READY = "pnf-ready";
@@ -59,48 +59,24 @@ class CbsContentParser {
         this.jsonObject = jsonObject.getAsJsonObject(CONFIG);
     }
 
-    DmaapPublisherConfiguration getDmaapPublisherConfig() {
+    MessageRouterPublishRequest getMessageRouterPublishRequest() {
         RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_READY)).get();
         MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
 
-        return new ImmutableDmaapPublisherConfiguration.Builder()
-            .endpointUrl(parsedSink.topicUrl())
-            .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
-            .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
-            .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
-            .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
-            .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
-            .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
-            .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
-            .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
-            .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString())
-            .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
-            .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
-            .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
-            .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean())
-            .build();
+        return ImmutableMessageRouterPublishRequest.builder()
+                .contentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
+                .sinkDefinition(parsedSink)
+                .build();
     }
 
-    DmaapPublisherConfiguration getDmaapUpdatePublisherConfig() {
+    MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() {
         RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(PNF_UPDATE)).get();
         MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
 
-        return new ImmutableDmaapPublisherConfiguration.Builder()
-            .endpointUrl(parsedSink.topicUrl())
-            .dmaapTopicName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapTopicName").getAsString())
-            .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString())
-            .dmaapPortNumber(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapPortNumber").getAsInt())
-            .dmaapProtocol(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapProtocol").getAsString())
-            .dmaapContentType(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapContentType").getAsString())
-            .dmaapHostName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapHostName").getAsString())
-            .dmaapUserName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserName").getAsString())
-            .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString())
-            .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString())
-            .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
-            .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
-            .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
-            .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean())
-            .build();
+        return ImmutableMessageRouterPublishRequest.builder()
+                .contentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
+                .sinkDefinition(parsedSink)
+                .build();
     }
 
     AaiClientConfiguration getAaiClientConfig() {
@@ -126,28 +102,15 @@ class CbsContentParser {
             .build();
     }
 
-    DmaapConsumerConfiguration getDmaapConsumerConfig() {
+    MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
         RawDataStream<JsonObject> source = DataStreams.namedSources(jsonObject).find(streamWithName(VES_REG_OUTPUT)).get();
         MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
 
-        return new ImmutableDmaapConsumerConfiguration.Builder()
-            .endpointUrl(parsedSource.topicUrl())
-            .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt())
-            .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
-            .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
-            .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
-            .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
-            .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
-            .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
-            .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
-            .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
-            .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
-            .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
-            .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString())
-            .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
-            .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
-            .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
-            .enableDmaapCertAuth(jsonObject.get(SECURITY_ENABLE_DMAAP_CERT_AUTH).getAsBoolean())
-            .build();
+        return ImmutableMessageRouterSubscribeRequest.builder()
+                .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
+                .sourceDefinition(parsedSource)
+                .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
+                .timeout(Duration.ofMillis(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsLong()))
+                .build();
     }
 }
\ No newline at end of file
index 6363356..7b87415 100644 (file)
@@ -23,6 +23,8 @@ package org.onap.dcaegen2.services.prh.configuration;
 import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.springframework.core.io.Resource;
 
 /**
@@ -32,12 +34,12 @@ public interface Config {
 
     Resource getGitInfo();
 
-    DmaapConsumerConfiguration getDmaapConsumerConfiguration();
+    MessageRouterSubscribeRequest getMessageRouterSubscribeRequest();
 
     AaiClientConfiguration getAaiClientConfiguration();
 
-    DmaapPublisherConfiguration getDmaapPublisherConfiguration();
+    MessageRouterPublishRequest getMessageRouterPublishRequest();
 
-    DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration();
+    MessageRouterPublishRequest getMessageRouterUpdatePublishRequest();
 
 }
index 7355cf4..f18f1d9 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcaegen2.services.prh.configuration;
 
 import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
 import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTaskImpl;
+import org.onap.dcaegen2.services.prh.tasks.MessageRouterPublisherResolver;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -31,12 +32,12 @@ public class DmaapPublisherTaskConfig {
     @Bean(name = "ReadyPublisherTask")
     @Autowired
     public DmaapPublisherTask getReadyPublisherTask(final Config config) {
-        return new DmaapPublisherTaskImpl(config::getDmaapPublisherConfiguration);
+        return new DmaapPublisherTaskImpl(config::getMessageRouterPublishRequest, new MessageRouterPublisherResolver());
     }
 
     @Bean(name = "UpdatePublisherTask")
     @Autowired
     public DmaapPublisherTask getUpdatePublisherTask(final Config config) {
-        return new DmaapPublisherTaskImpl(config::getDmaapUpdatePublisherConfiguration);
+        return new DmaapPublisherTaskImpl(config::getMessageRouterUpdatePublishRequest, new MessageRouterPublisherResolver());
     }
 }
index 4b48fa3..01ef206 100644 (file)
@@ -21,8 +21,8 @@
 package org.onap.dcaegen2.services.prh.configuration;
 
 import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
@@ -47,11 +47,11 @@ public abstract class PrhAppConfig implements Config {
 
     AaiClientConfiguration aaiClientConfiguration;
 
-    DmaapConsumerConfiguration dmaapConsumerConfiguration;
+    MessageRouterSubscribeRequest messageRouterSubscribeRequest;
 
-    DmaapPublisherConfiguration dmaapPublisherConfiguration;
+    MessageRouterPublishRequest messageRouterPublishRequest;
 
-    DmaapPublisherConfiguration dmaapUpdatePublisherConfiguration;
+    MessageRouterPublishRequest messageRouterUpdatePublishRequest;
 
     @Value("classpath:git_info.json")
     private Resource gitInfo;
@@ -67,8 +67,8 @@ public abstract class PrhAppConfig implements Config {
     }
 
     @Override
-    public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
-        return dmaapConsumerConfiguration;
+    public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
+        return messageRouterSubscribeRequest;
     }
 
     @Override
@@ -77,12 +77,12 @@ public abstract class PrhAppConfig implements Config {
     }
 
     @Override
-    public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
-        return dmaapPublisherConfiguration;
+    public MessageRouterPublishRequest getMessageRouterPublishRequest() {
+        return messageRouterPublishRequest;
     }
 
     @Override
-    public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() {
-        return dmaapUpdatePublisherConfiguration;
+    public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() {
+        return messageRouterUpdatePublishRequest;
     }
 }
index 43d6922..b3d8456 100644 (file)
 
 package org.onap.dcaegen2.services.prh.service;
 
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
 import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS;
 import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER;
 import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT;
@@ -37,21 +54,6 @@ import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIA
 import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME;
 import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION;
 
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.StringUtils;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
@@ -77,34 +79,21 @@ public class DmaapConsumerJsonParser {
      * @param monoMessage - results from DMaaP
      * @return reactive DMaaPModel
      */
-    public Flux<ConsumerDmaapModel> getJsonObject(Mono<JsonElement> monoMessage) {
-        return monoMessage
-            .flatMapMany(this::getConsumerDmaapModelFromJsonArray);
+    public Flux<ConsumerDmaapModel> getJsonObject(Mono<MessageRouterSubscribeResponse> monoMessage) {
+        return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items()));
     }
 
-    private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
-        LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", jsonElement);
-
-        if (jsonElement instanceof JsonObject) {
-            LOGGER.debug("Element is JsonObject");
-            return create(Flux.just((JsonObject) jsonElement));
-        }
+    private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonArray items) {
+        LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items);
 
-        if (jsonElement instanceof JsonArray) {
-            LOGGER.debug("Element is JsonArray");
-            JsonArray jsonArray = (JsonArray) jsonElement;
-            if (jsonArray.size() == 0) {
-                LOGGER.debug("Nothing to consume from DMaaP");
-                return Flux.empty();
-            }
-            return create(
-                Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonArray.spliterator(), false)
-                    .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
-                        .orElseGet(JsonObject::new)))));
+        if (items.size() == 0) {
+            LOGGER.debug("Nothing to consume from DMaaP");
+            return Flux.empty();
         }
-
-        LOGGER.warn("Element is neither JSON Object or Array");
-        return Flux.empty();
+        return create(
+                Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false)
+                        .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+                                .orElseGet(JsonObject::new)))));
     }
 
     Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
index 0269144..0b26890 100644 (file)
@@ -32,21 +32,14 @@ import org.onap.dcaegen2.services.prh.model.bbs.ImmutableRelationshipWrapper;
 import org.onap.dcaegen2.services.prh.model.bbs.RelationshipWrapper;
 import org.onap.dcaegen2.services.prh.model.utils.GsonSerializer;
 import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.*;
 import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder;
-import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 
-
-
 import java.util.Arrays;
 import java.util.List;
 import java.util.function.Function;
@@ -66,7 +59,7 @@ public class BbsActionsTaskImpl implements BbsActionsTask {
 
     @Autowired
     BbsActionsTaskImpl(Config config) {
-        this(config, RxHttpClient.create(new SslFactory().createInsecureClientContext()));
+        this(config, RxHttpClientFactory.createInsecure());
     }
 
     BbsActionsTaskImpl(Config config, RxHttpClient httpClient) {
index 3a630a4..5fc41d9 100644 (file)
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import javax.net.ssl.SSLException;
-
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-
 import reactor.core.publisher.Flux;
 
+import javax.net.ssl.SSLException;
+
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
 interface DmaapConsumerTask {
-
     Flux<ConsumerDmaapModel> execute(String object) throws SSLException;
-
-    DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException;
 }
index d3086cb..f46e2cc 100644 (file)
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import com.google.gson.JsonElement;
-import java.util.Optional;
-import javax.net.ssl.SSLException;
 import org.onap.dcaegen2.services.prh.configuration.Config;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -46,33 +42,26 @@ public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
     private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
     private final Config config;
     private final DmaapConsumerJsonParser dmaapConsumerJsonParser;
-    private final ConsumerReactiveHttpClientFactory httpClientFactory;
+
 
     @Autowired
     public DmaapConsumerTaskImpl(Config config) {
-        this(config, new DmaapConsumerJsonParser(),
-            new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
+        this(config, new DmaapConsumerJsonParser());
     }
 
-    DmaapConsumerTaskImpl(Config prhAppConfig,
-        DmaapConsumerJsonParser dmaapConsumerJsonParser,
-        ConsumerReactiveHttpClientFactory httpClientFactory) {
+    DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
         this.config = prhAppConfig;
         this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
-        this.httpClientFactory = httpClientFactory;
     }
 
     @Override
-    public Flux<ConsumerDmaapModel> execute(String object) throws SSLException {
-        DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
+    public Flux<ConsumerDmaapModel> execute(String object) {
+        MessageRouterSubscriber messageRouterSubscriberClient =
+                new MessageRouterSubscriberResolver().resolveClient();
         LOGGER.debug("Method called with arg {}", object);
-        Mono<JsonElement> response = dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(
-            Optional.empty());
+        Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriberClient
+                .get(config.getMessageRouterSubscribeRequest());
         return dmaapConsumerJsonParser.getJsonObject(response);
     }
 
-    @Override
-    public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException {
-        return httpClientFactory.create(config.getDmaapConsumerConfiguration());
-    }
 }
index 7fc596c..f63f4d7 100644 (file)
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import javax.net.ssl.SSLException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
@@ -31,16 +31,8 @@ import reactor.core.publisher.Mono;
  */
 public interface DmaapPublisherTask {
 
-    /**
-     *
-     * Does not work reactive version with DMaaP MR  - to be investigated why in future
-     * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
-     * */
-    @Deprecated
-    Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse>
-        execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException;
 
-    Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+    Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
 
-    DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;;
+    Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
 }
index 55a8bb5..3a72488 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.dcaegen2.services.prh.tasks;
 
+import com.google.gson.JsonPrimitive;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.StringEntity;
@@ -27,17 +28,14 @@ import org.apache.http.impl.client.DefaultHttpClient;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.uri.URI;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import javax.net.ssl.SSLException;
-import java.util.Optional;
 import java.util.function.Supplier;
 
 /**
@@ -46,70 +44,52 @@ import java.util.function.Supplier;
 public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
-    private final Supplier<DmaapPublisherConfiguration> config;
+
+    private final Supplier<MessageRouterPublishRequest> config;
+    private final MessageRouterPublisherResolver messageRouterPublisherClientResolver;
     private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl();
-    private final PublisherReactiveHttpClientFactory httpClientFactory;
 
-    public DmaapPublisherTaskImpl(final Supplier<DmaapPublisherConfiguration> config) {
-        this(config, new PublisherReactiveHttpClientFactory(
-                new DmaaPRestTemplateFactory(),
-                new PnfReadyJsonBodyBuilderImpl()));
-    }
 
-    DmaapPublisherTaskImpl(
-            Supplier<DmaapPublisherConfiguration> config,
-            PublisherReactiveHttpClientFactory httpClientFactory) {
+    public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> config, MessageRouterPublisherResolver messageRouterPublisherClientResolver) {
         this.config = config;
-        this.httpClientFactory = httpClientFactory;
+        this.messageRouterPublisherClientResolver = messageRouterPublisherClientResolver;
     }
 
     @Override
-    public Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse>
-        execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException, SSLException {
+    public Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
         if (consumerDmaapModel == null) {
             throw new DmaapNotFoundException("Invoked null object to DMaaP task");
         }
-        DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient();
+        MessageRouterPublisher messageRouterPublisher = messageRouterPublisherClientResolver.resolveClient();
         LOGGER.info("Method called with arg {}", consumerDmaapModel);
-        return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty());
-    }
-
-    @Override
-    public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException {
-        return httpClientFactory.create(config.get());
-
+        String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel);
+        return messageRouterPublisher.put(
+                config.get(),
+                Flux.just(json).map(JsonPrimitive::new));
     }
 
     /**
      *
      * Does not work reactive version with DMaaP MR  - to be investigated why in future
-     * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+     * As WA please use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
      * */
     @Override
     public Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) {
         String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel);
-        DefaultHttpClient httpClient = new DefaultHttpClient();
-        HttpPost postRequest = new HttpPost(getUrl());
-        try {
-            StringEntity input = new StringEntity(json);
-            input.setContentType(config.get().dmaapContentType());
-            postRequest.setEntity(input);
-            HttpResponse response = httpClient.execute(postRequest);
-            return Mono.just(response);
-        } catch (Exception e) {
-            LOGGER.warn("Publishing to DMaaP MR failed: {}", e);
-            return Mono.error(e);
+        try (DefaultHttpClient httpClient = new DefaultHttpClient()) {
+            HttpPost postRequest = new HttpPost(config.get().sinkDefinition().topicUrl());
+            try {
+                StringEntity input = new StringEntity(json);
+                input.setContentType(config.get().contentType());
+                postRequest.setEntity(input);
+                HttpResponse response = httpClient.execute(postRequest);
+                return Mono.just(response);
+            } catch (Exception e) {
+                LOGGER.warn("Publishing to DMaaP MR failed: {}", e);
+                return Mono.error(e);
+            }
         }
     }
 
-    private String getUrl() {
-        return (new URI.URIBuilder()).scheme(config.get().dmaapProtocol())
-            .host(config.get().dmaapHostName())
-            .port(config.get().dmaapPortNumber()).path(this.createRequestPath()).build()
-            .toString();
-    }
 
-    private String createRequestPath() {
-        return "/" + config.get().dmaapTopicName();
-    }
 }
\ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterPublisherResolver.java
new file mode 100644 (file)
index 0000000..2f4e386
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageRouterPublisherResolver {
+
+    public MessageRouterPublisher resolveClient() {
+        return DmaapClientFactory.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+    }
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/MessageRouterSubscriberResolver.java
new file mode 100644 (file)
index 0000000..63930ef
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageRouterSubscriberResolver {
+
+    public MessageRouterSubscriber resolveClient() {
+        return DmaapClientFactory.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+    }
+}
index 72ec0ca..4b3436e 100644 (file)
@@ -25,6 +25,7 @@ import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.logging.MdcVariables;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.slf4j.*;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
@@ -50,6 +51,7 @@ public class ScheduledTasks {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class);
     private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
     private final DmaapConsumerTask dmaapConsumerTask;
     private final DmaapPublisherTask dmaapReadyProducerTask;
     private final DmaapPublisherTask dmaapUpdateProducerTask;
@@ -208,7 +210,7 @@ public class ScheduledTasks {
      * Marked as deprecated due to problems with DMaaP MR, to be fixed in future
      */
     @Deprecated
-    private Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse>
+    private Flux<MessageRouterPublishResponse>
     publishToDmaapConfiguration(final State state) {
         try {
             if (state.ActivationStatus) {
@@ -217,8 +219,8 @@ public class ScheduledTasks {
             }
 
             return dmaapReadyProducerTask.execute(state.DmaapModel);
-        } catch (PrhTaskException | SSLException e) {
-            return Mono.error(e);
+        } catch (PrhTaskException e) {
+            return Flux.error(e);
         }
     }
 
index 9dca398..fb0b1b4 100644 (file)
 
 package org.onap.dcaegen2.services.prh;
 
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
 import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+
+import java.time.Duration;
 
 
 public class TestAppConfiguration {
-    public static ImmutableDmaapConsumerConfiguration createDefaultDmaapConsumerConfiguration() {
-        return new ImmutableDmaapConsumerConfiguration.Builder()
-                .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT")
+    public static ImmutableMessageRouterSubscribeRequest createDefaultMessageRouterSubscribeRequest() {
+        return ImmutableMessageRouterSubscribeRequest.builder()
                 .consumerGroup("OpenDCAE-c12")
+                .sourceDefinition(ImmutableMessageRouterSource.builder()
+                        .name("the topic")
+                        .topicUrl(String.format("http://%s:%d/events/TOPIC", "www", 1234))
+                        .build())
                 .consumerId("c12")
-                .dmaapContentType("application/json")
-                .dmaapHostName("message-router.onap.svc.cluster.local")
-                .dmaapPortNumber(3904)
-                .dmaapProtocol("http")
-                .dmaapUserName("admin")
-                .dmaapUserPassword("admin")
-                .trustStorePath("/opt/app/prh/local/org.onap.prh.trust.jks")
-                .trustStorePasswordPath("change_it")
-                .keyStorePath("/opt/app/prh/local/org.onap.prh.p12")
-                .keyStorePasswordPath("change_it")
-                .enableDmaapCertAuth(false)
-                .dmaapTopicName("/events/unauthenticated.SEC_OTHER_OUTPUT")
-                .timeoutMs(-1)
-                .messageLimit(-1)
+                .timeout(Duration.ofMillis(1))
                 .build();
     }
 
-    public static ImmutableDmaapPublisherConfiguration createDefaultDmaapPublisherConfiguration() {
-        return new ImmutableDmaapPublisherConfiguration.Builder()
-                .endpointUrl("http://dmaap-mr:2222/events/unauthenticated.PNF_READY")
-                .dmaapContentType("application/json")
-                .dmaapHostName("message-router.onap.svc.cluster.local")
-                .dmaapPortNumber(3904)
-                .dmaapProtocol("http")
-                .dmaapUserName("admin")
-                .dmaapUserPassword("admin")
-                .trustStorePath("/opt/app/prh/local/org.onap.prh.trust.jks")
-                .trustStorePasswordPath("change_it")
-                .keyStorePath("/opt/app/prh/local/org.onap.prh.p12")
-                .keyStorePasswordPath("change_it")
-                .enableDmaapCertAuth(false)
-                .dmaapTopicName("/events/unauthenticated.PNF_READY")
+    public static ImmutableMessageRouterPublishRequest createDefaultMessageRouterPublishRequest() {
+        return ImmutableMessageRouterPublishRequest.builder()
+                .contentType("application/json")
+                .sinkDefinition(ImmutableMessageRouterSink.builder()
+                        .name("the topic")
+                        .topicUrl(String.format("http://%s:%d/events/TOPIC", "www", 1234))
+                        .build())
                 .build();
-    }
+
+  }
 
     public static ImmutableAaiClientConfiguration createDefaultAaiClientConfiguration() {
         return new ImmutableAaiClientConfiguration.Builder()
index 8a2a498..350cee6 100644 (file)
 
 package org.onap.dcaegen2.services.prh.configuration;
 
-import static java.lang.ClassLoader.getSystemResource;
-import static org.assertj.core.api.Assertions.assertThat;
-
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.prh.TestAppConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.Duration;
+
+import static java.lang.ClassLoader.getSystemResource;
+import static org.assertj.core.api.Assertions.assertThat;
 
 
 class ConsulConfigurationParserTest {
@@ -43,10 +44,8 @@ class ConsulConfigurationParserTest {
             new String(Files.readAllBytes(Paths.get(getSystemResource("flattened_configuration.json").toURI())));
     private final ImmutableAaiClientConfiguration correctAaiClientConfig =
             TestAppConfiguration.createDefaultAaiClientConfiguration();
-    private final ImmutableDmaapConsumerConfiguration correctDmaapConsumerConfig =
-            TestAppConfiguration.createDefaultDmaapConsumerConfiguration();
-    private final ImmutableDmaapPublisherConfiguration correctDmaapPublisherConfig =
-            TestAppConfiguration.createDefaultDmaapPublisherConfiguration();
+    private final ImmutableMessageRouterPublishRequest correctDmaapPublisherConfig =
+            TestAppConfiguration.createDefaultMessageRouterPublishRequest();
     private final CbsContentParser consulConfigurationParser = new CbsContentParser(
             new Gson().fromJson(correctJson, JsonObject.class));
 
@@ -63,25 +62,25 @@ class ConsulConfigurationParserTest {
         assertThat(aaiClientConfig).isEqualToComparingFieldByField(correctAaiClientConfig);
     }
 
-
     @Test
-    void shouldCreateDmaapConsumerConfigurationCorrectly() {
-        // when
-        DmaapConsumerConfiguration dmaapConsumerConfig = consulConfigurationParser.getDmaapConsumerConfig();
+    void shouldCreateMessageRouterSubscribeRequestCorrectly() {
+        // given
+        MessageRouterSubscribeRequest messageRouterSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest();
 
         // then
-        assertThat(dmaapConsumerConfig).isNotNull();
-        assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(correctDmaapConsumerConfig);
+        assertThat(messageRouterSubscribeRequest.sourceDefinition().topicUrl()).isEqualTo("http://dmaap-mr:2222/events/unauthenticated.VES_PNFREG_OUTPUT");
+        assertThat(messageRouterSubscribeRequest.consumerGroup()).isEqualTo("OpenDCAE-c12");
+        assertThat(messageRouterSubscribeRequest.consumerId()).isEqualTo("c12");
+        assertThat(messageRouterSubscribeRequest.timeout()).isEqualTo(Duration.ofMillis(-1));
     }
 
-
     @Test
-    void shouldCreateDmaapPublisherConfigurationCorrectly() {
+    void shouldCreateMessageRouterPublishConfigurationCorrectly() {
         // when
-        DmaapPublisherConfiguration dmaapPublisherConfig = consulConfigurationParser.getDmaapPublisherConfig();
+        MessageRouterPublishRequest messageRouterPublishRequest = consulConfigurationParser.getMessageRouterPublishRequest();
 
         // then
-        assertThat(dmaapPublisherConfig).isNotNull();
-        assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(correctDmaapPublisherConfig);
+        assertThat(messageRouterPublishRequest.contentType()).isEqualTo("application/json");
+        assertThat(messageRouterPublishRequest.sinkDefinition().topicUrl()).isEqualTo("http://dmaap-mr:2222/events/unauthenticated.PNF_READY");
     }
 }
\ No newline at end of file
index cdcef07..98b7314 100644 (file)
 
 package org.onap.dcaegen2.services.prh.service;
 
-import static org.mockito.Mockito.spy;
-
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
-import java.util.Optional;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import java.util.Optional;
+
+import static org.mockito.Mockito.spy;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
  */
@@ -101,6 +104,7 @@ class DmaapConsumerJsonParserTest {
             .build();
 
         JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+        MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
 
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
@@ -109,7 +113,7 @@ class DmaapConsumerJsonParserTest {
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
 
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
-            .getJsonObject(Mono.just((messageAsJsonArray))).blockFirst();
+            .getJsonObject(Mono.just((response))).blockFirst();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -163,15 +167,15 @@ class DmaapConsumerJsonParserTest {
             .nfRole("gNB")
             .swVersion("v4.5.0.1")
             .build();
-        JsonArray mesageAsJsonArray = (JsonArray) jsonParser.parse(message);
-
+        JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+        MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
-            .getJsonObject(Mono.just((mesageAsJsonArray))).blockFirst();
+            .getJsonObject(Mono.just((response))).blockFirst();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -230,6 +234,7 @@ class DmaapConsumerJsonParserTest {
             .build();
 
         JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+        MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
 
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
@@ -238,7 +243,7 @@ class DmaapConsumerJsonParserTest {
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
 
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
-            .getJsonObject(Mono.just((messageAsJsonArray))).blockFirst();
+            .getJsonObject(Mono.just((response))).blockFirst();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -293,6 +298,7 @@ class DmaapConsumerJsonParserTest {
             .build();
 
         JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+        MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
 
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
@@ -301,7 +307,7 @@ class DmaapConsumerJsonParserTest {
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
 
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
-            .getJsonObject(Mono.just((messageAsJsonArray))).blockFirst();
+            .getJsonObject(Mono.just((response))).blockFirst();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -349,8 +355,9 @@ class DmaapConsumerJsonParserTest {
             + "}}}]";
 
         JsonArray incorrectMessageAsJsonArray = (JsonArray) jsonParser.parse(incorrectMessage);
+        MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(incorrectMessageAsJsonArray).build();
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessageAsJsonArray)))
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response)))
             .expectSubscription().thenRequest(1).verifyComplete();
     }
 
@@ -394,8 +401,9 @@ class DmaapConsumerJsonParserTest {
                 + "}}}]";
 
         JsonArray jsonWithoutSourceNameAsJsonArray = (JsonArray) jsonParser.parse(jsonWithoutSourceName);
+        MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(jsonWithoutSourceNameAsJsonArray).build();
         StepVerifier
-            .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutSourceNameAsJsonArray)))
+            .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response)))
             .expectSubscription().thenRequest(1)
             .verifyComplete();
 
@@ -444,8 +452,9 @@ class DmaapConsumerJsonParserTest {
                 + "}}}]";
 
         JsonArray jsonWithoutIpInformationAsJsonArray = (JsonArray) jsonParser.parse(jsonWithoutIpInformation);
+        MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(jsonWithoutIpInformationAsJsonArray).build();
 
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIpInformationAsJsonArray)))
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response)))
             .expectSubscription().thenRequest(1).verifyComplete();
     }
 
@@ -485,15 +494,15 @@ class DmaapConsumerJsonParserTest {
             + "}}}";
 
         JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
-
+        MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = jsonParser.parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
 
-        dmaapConsumerJsonParser.getJsonObject(Mono.just((messageAsJsonArray)));
-        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageAsJsonArray)))
+        dmaapConsumerJsonParser.getJsonObject(Mono.just((response)));
+        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(response))
             .blockFirst();
         //then
         ConsumerDmaapModel expectedObject = ImmutableConsumerDmaapModel.builder()
@@ -560,13 +569,14 @@ class DmaapConsumerJsonParserTest {
             .build();
 
         JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+        MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
 
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
         JsonElement jsonElement = jsonParser.parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((messageAsJsonArray)))
+        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(response))
             .blockFirst();
 
         //then
@@ -625,12 +635,12 @@ class DmaapConsumerJsonParserTest {
             .build();
 
         JsonArray messageAsJsonArray = (JsonArray) jsonParser.parse(message);
+        MessageRouterSubscribeResponse response = ImmutableMessageRouterSubscribeResponse.builder().items(messageAsJsonArray).build();
 
         //when
         DmaapConsumerJsonParser dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
 
         //then
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(messageAsJsonArray)))
-            .expectSubscription().expectNext(expectedObject).expectNext(expectedObject).verifyComplete();
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(response))).expectSubscription().expectNext(expectedObject).expectNext(expectedObject).verifyComplete();
     }
 }
index 18e1a27..04388fb 100644 (file)
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-import javax.net.ssl.SSLException;
-
 import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
@@ -34,6 +28,10 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
 
 
 /**
@@ -49,7 +47,7 @@ public class AaiPublisherTaskSpy {
      */
     @Bean
     @Primary
-    public AaiProducerTask registerSimpleAaiPublisherTask() throws SSLException {
+    public AaiProducerTask registerSimpleAaiPublisherTask() {
         CbsConfiguration cbsConfiguration = spy(CbsConfiguration.class);
         ConsumerDmaapModel consumerDmaapModel = spy(ConsumerDmaapModel.class);
         doReturn(mock(AaiClientConfiguration.class)).when(cbsConfiguration).getAaiClientConfiguration();
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
deleted file mode 100644 (file)
index 9afa767..0000000
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.tasks;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.onap.dcaegen2.services.prh.TestAppConfiguration.createDefaultDmaapConsumerConfiguration;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import java.util.Optional;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
- */
-class DmaapConsumerTaskImplTest {
-
-    private static ConsumerDmaapModel consumerDmaapModel;
-    private static DmaapConsumerTaskImpl dmaapConsumerTask;
-    private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
-    private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
-    private static String message;
-    private static String messageContentEmpty;
-    private static JsonArray jsonArray;
-    private static JsonArray jsonArrayWrongContent;
-
-    private static CbsConfiguration cbsConfiguration;
-
-    @BeforeAll
-    static void setUp() {
-        dmaapConsumerConfiguration = createDefaultDmaapConsumerConfiguration();
-
-        JsonObject jsonObject = new JsonParser().parse("{\n"
-            + "        \"attachmentPoint\": \"bla-bla-30-3\",\n"
-            + "        \"cvlan\": \"678\",\n"
-            + "        \"svlan\": \"1005\"\n"
-            + "      }").getAsJsonObject();
-
-        consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
-            .ipv4("10.16.123.234")
-            .ipv6("0:0:0:0:0:FFFF:0A10:7BEA")
-            .correlationId("NOKQTFCOC540002E")
-            .serialNumber("QTFCOC540002E")
-            .equipVendor("nokia")
-            .equipModel("3310")
-            .equipType("type")
-            .nfRole("gNB")
-            .swVersion("v4.5.0.1")
-            .additionalFields(jsonObject)
-            .build();
-        cbsConfiguration = mock(CbsConfiguration.class);
-
-        message = "[{\"event\": {"
-            + "\"commonEventHeader\": { "
-            + " \"sourceName\":\"NOKQTFCOC540002E\","
-            + " \"nfNamingCode\":\"gNB\" "
-            + "},"
-            + "\"pnfRegistrationFields\": {"
-            + " \"vendorName\": \"nokia\","
-            + " \"serialNumber\": \"QTFCOC540002E\","
-            + " \"pnfRegistrationFieldsVersion\": \"2.0\","
-            + " \"modelNumber\": \"3310\","
-            + " \"unitType\": \"type\",\n"
-            + " \"unitFamily\": \"BBU\","
-            + " \"oamV4IpAddress\": \"10.16.123.234\","
-            + " \"softwareVersion\": \"v4.5.0.1\","
-            + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\","
-            + " \"additionalFields\": {\"attachmentPoint\": \"bla-bla-30-3\",\"cvlan\": \"678\",\"svlan\": \"1005\"}"
-            + "}}}]";
-
-        messageContentEmpty = "[]";
-        JsonParser jsonParser = new JsonParser();
-        jsonArray = (JsonArray) jsonParser.parse(message);
-        jsonArrayWrongContent = (JsonArray) jsonParser.parse(messageContentEmpty);
-
-    }
-
-    @Test
-    void whenPassedObjectDoesNotFit_DoesNotThrowPrhTaskException() throws Exception {
-        //given
-        prepareMocksForDmaapConsumer(Optional.of(jsonArrayWrongContent));
-
-        //when
-        Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
-
-        //then
-        verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
-        assertNull(response.blockFirst());
-    }
-
-    @Test
-    void whenPassedObjectFits_ReturnsCorrectResponse() throws Exception {
-        //given
-        prepareMocksForDmaapConsumer(Optional.of(jsonArray));
-
-        //when
-        Flux<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
-
-        //then
-        verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
-        assertEquals(consumerDmaapModel, response.blockFirst());
-    }
-
-
-
-    private void prepareMocksForDmaapConsumer(Optional<JsonArray> message) throws Exception {
-        dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
-        when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
-            .thenReturn(Mono.just(message.get()));
-        when(cbsConfiguration.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
-        ConsumerReactiveHttpClientFactory httpClientFactory = mock(ConsumerReactiveHttpClientFactory.class);
-        doReturn(dMaaPConsumerReactiveHttpClient).when(httpClientFactory).create(dmaapConsumerConfiguration);
-        dmaapConsumerTask = new DmaapConsumerTaskImpl(cbsConfiguration, new DmaapConsumerJsonParser(), httpClientFactory);
-    }
-}
\ No newline at end of file
index 594575e..4c95c71 100644 (file)
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-import javax.net.ssl.SSLException;
-
 import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
@@ -47,13 +44,10 @@ public class DmaapConsumerTaskSpy {
      */
     @Bean
     @Primary
-    public DmaapConsumerTask registerSimpleDmaapConsumerTask() throws SSLException {
+    public DmaapConsumerTask registerSimpleDmaapConsumerTask() {
         CbsConfiguration cbsConfiguration = spy(CbsConfiguration.class);
-        doReturn(mock(DmaapConsumerConfiguration.class)).when(cbsConfiguration).getDmaapConsumerConfiguration();
+        doReturn(mock(MessageRouterPublishRequest.class)).when(cbsConfiguration).getMessageRouterPublishRequest();
         DmaapConsumerTaskImpl dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(cbsConfiguration));
-        DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient = mock(
-            DMaaPConsumerReactiveHttpClient.class);
-        doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
         return dmaapConsumerTask;
     }
 }
index 77028a3..7a68bc8 100644 (file)
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-import javax.net.ssl.SSLException;
 import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
 
 import java.util.function.Supplier;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
@@ -47,14 +45,10 @@ public class DmaapProducerTaskSpy {
      */
     @Bean
     @Primary
-    public DmaapPublisherTask registerSimpleDmaapPublisherTask() throws SSLException {
-        final CbsConfiguration cbsConfiguration = spy(CbsConfiguration.class);
-        final Supplier<DmaapPublisherConfiguration> configSupplier = () -> cbsConfiguration.getDmaapPublisherConfiguration();
-        doReturn(mock(DmaapPublisherConfiguration.class)).when(cbsConfiguration).getDmaapPublisherConfiguration();
-        final DmaapPublisherTaskImpl dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(configSupplier));
-        final DMaaPPublisherReactiveHttpClient extendedDmaapProducerHttpClient = mock(
-            DMaaPPublisherReactiveHttpClient.class);
-        doReturn(extendedDmaapProducerHttpClient).when(dmaapPublisherTask).resolveClient();
-        return dmaapPublisherTask;
+    public DmaapPublisherTask registerSimpleDmaapPublisherTask() {
+        final CbsConfiguration cbsConfiguration = mock(CbsConfiguration.class);
+        final Supplier<MessageRouterPublishRequest> configSupplier = cbsConfiguration::getMessageRouterPublishRequest;
+        doReturn(mock(MessageRouterPublishRequest.class)).when(cbsConfiguration).getMessageRouterPublishRequest();
+        return spy(new DmaapPublisherTaskImpl(configSupplier, new MessageRouterPublisherResolver()));
     }
 }
index fb4a50e..6347ad3 100644 (file)
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import io.netty.handler.codec.http.HttpResponseStatus;
+import com.google.gson.JsonPrimitive;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.function.Executable;
-import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.model.DmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
-import reactor.core.publisher.Mono;
-import reactor.test.StepVerifier;
-
-import javax.net.ssl.SSLException;
-import java.util.Optional;
+import org.onap.dcaegen2.services.prh.integration.junit5.mockito.MockitoExtension;
+import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import reactor.core.publisher.Flux;
+
 import java.util.function.Supplier;
 
-import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.*;
-import static org.onap.dcaegen2.services.prh.TestAppConfiguration.createDefaultDmaapPublisherConfiguration;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
  */
+@ExtendWith(MockitoExtension.class)
 class DmaapPublisherTaskImplTest {
 
-    private static ConsumerDmaapModel consumerDmaapModel;
     private static DmaapPublisherTaskImpl dmaapPublisherTask;
-    private static DMaaPPublisherReactiveHttpClient dMaaPPublisherReactiveHttpClient;
-    private static CbsConfiguration cbsConfiguration;
-    private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
-    private Optional<RequestDiagnosticContext> requestDiagnosticContextOptionalMock;
-    private DmaapModel dmaapModel;
-    private PublisherReactiveHttpClientFactory publisherReactiveHttpClientFactory;
-    private Supplier<DmaapPublisherConfiguration> configSupplier;
+
+    @Mock
+    private static MessageRouterPublisherResolver messageRouterPublisherClientResolver;
+    @Mock
+    private static MessageRouterPublisher messageRouterPublisher;
+
+    private Supplier<MessageRouterPublishRequest> configSupplier;
+
+
+    @Captor
+    private ArgumentCaptor<Flux<JsonPrimitive>> fluxCaptor;
 
     @BeforeEach
-    public void beforeEach() throws SSLException {
-        dmaapPublisherConfiguration = createDefaultDmaapPublisherConfiguration();
-        consumerDmaapModel = mock(ConsumerDmaapModel.class);
-        cbsConfiguration = mock(CbsConfiguration.class);
-        requestDiagnosticContextOptionalMock = Optional.empty();
-        dmaapModel = mock(DmaapModel.class);
-        dMaaPPublisherReactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class);
-        publisherReactiveHttpClientFactory = mock(PublisherReactiveHttpClientFactory.class);
-        when(cbsConfiguration.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
-        when(publisherReactiveHttpClientFactory.create(dmaapPublisherConfiguration))
-            .thenReturn(dMaaPPublisherReactiveHttpClient);
-        configSupplier = () -> cbsConfiguration.getDmaapPublisherConfiguration();
+    void beforeEach() {
+        when(messageRouterPublisherClientResolver.resolveClient()).thenReturn(messageRouterPublisher);
+        MessageRouterPublishRequest mrRequest = createMRRequest();
+        configSupplier = () -> mrRequest;
     }
 
     @Test
     void execute_whenPassedObjectDoesntFit_ThrowsPrhTaskException() {
         //given
-        dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier);
+        dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, messageRouterPublisherClientResolver);
         //when
         Executable executableFunction = () -> dmaapPublisherTask.execute(null);
         //then
         assertThrows(PrhTaskException.class, executableFunction, "The specified parameter is incorrect");
     }
 
-
     @Test
-    void execute_whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException, SSLException {
+    void execute_whenPassedObjectFits_ReturnsCorrectStatus() throws DmaapNotFoundException {
         //given
-        HttpResponseStatus httpResponseStatus = HttpResponseStatus.OK;
-        HttpResponse httpClientReponse = prepareMocksForTests(httpResponseStatus);
-        dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory);
-
+        dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, messageRouterPublisherClientResolver);
         //when
-        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription()
-            .expectNext(httpClientReponse);
-
+        dmaapPublisherTask.execute(createConsumerDmaapModel());
         //then
-        verify(dMaaPPublisherReactiveHttpClient, times(1))
-            .getDMaaPProducerResponse(consumerDmaapModel, requestDiagnosticContextOptionalMock);
-
-        verifyNoMoreInteractions(dMaaPPublisherReactiveHttpClient);
+        verify(messageRouterPublisher).put(eq(configSupplier.get()), fluxCaptor.capture());
+        assertEquals(new JsonPrimitive("{\"correlationId\":\"NOKQTFCOC540002E\"}"), fluxCaptor.getValue().blockFirst());
     }
 
-    @Test
-    void execute_whenPassedObjectFits_butIncorrectResponseReturns() throws DmaapNotFoundException, SSLException {
-        //given
-        HttpResponseStatus httpResponseStatus = HttpResponseStatus.UNAUTHORIZED;
-        HttpResponse httpClientReponse = prepareMocksForTests(httpResponseStatus);
-        dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory);
 
-        //when
-        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription()
-            .expectNext(httpClientReponse);
-
-        //then
-        verify(dMaaPPublisherReactiveHttpClient, times(1))
-            .getDMaaPProducerResponse(consumerDmaapModel, requestDiagnosticContextOptionalMock);
-        verifyNoMoreInteractions(dMaaPPublisherReactiveHttpClient);
+    private ImmutableConsumerDmaapModel createConsumerDmaapModel() {
+        return ImmutableConsumerDmaapModel.builder()
+                .ipv4("10.16.123.234")
+                .ipv6("0:0:0:0:0:FFFF:0A10:7BEA")
+                .correlationId("NOKQTFCOC540002E")
+                .serialNumber("QTFCOC540002E")
+                .equipVendor("nokia")
+                .equipModel("3310")
+                .equipType("type")
+                .nfRole("gNB")
+                .swVersion("v4.5.0.1")
+                .additionalFields(null)
+                .build();
     }
 
-    @Test()
-    void execute_whenConsumerDmaapModelIsNull() {
-        //given
-        HttpResponseStatus httpResponseStatus = HttpResponseStatus.UNAUTHORIZED;
-        HttpResponse httpClientReponse = prepareMocksForTests(httpResponseStatus);
-        dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory);
-        assertThrows(DmaapNotFoundException.class, () -> {
-            dmaapPublisherTask.execute(null);
-        });
-    }
+    private MessageRouterPublishRequest createMRRequest() {
+        final MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+                .name("the topic")
+                .topicUrl("http://dmaap-mr:2222/events/unauthenticated.PNF_READY")
+                .build();
 
-    @Test
-    public void resolveClient() throws SSLException {
-        //given
-        dmaapPublisherTask = new DmaapPublisherTaskImpl(configSupplier, publisherReactiveHttpClientFactory);
-        //when
-        DMaaPPublisherReactiveHttpClient dMaaPPublisherReactiveHttpClientResolved = dmaapPublisherTask.resolveClient();
-        //then
-        assertSame(dMaaPPublisherReactiveHttpClientResolved, dMaaPPublisherReactiveHttpClient);
+        return ImmutableMessageRouterPublishRequest.builder()
+                .sinkDefinition(sinkDefinition)
+                .contentType("application/json")
+                .build();
     }
-
-    private HttpResponse prepareMocksForTests(HttpResponseStatus httpResponseStatus) {
-        HttpResponse httpClientResponse = mock(HttpResponse.class);
-        when(httpClientResponse.statusCode()).thenReturn(httpResponseStatus.code());
-        when(
-            dMaaPPublisherReactiveHttpClient.getDMaaPProducerResponse(dmaapModel, requestDiagnosticContextOptionalMock))
-            .thenReturn(Mono.just(httpClientResponse));
-        return httpClientResponse;
-    }
-
 }
\ No newline at end of file