Support for HTTPS certificates-based communication with A&AI and DMaaP 44/84844/1
authorStavros Kanarakis <stavros.kanarakis@nokia.com>
Wed, 10 Apr 2019 10:04:11 +0000 (13:04 +0300)
committerStavros Kanarakis <stavros.kanarakis@nokia.com>
Wed, 10 Apr 2019 10:04:11 +0000 (13:04 +0300)
Also, upgraded DCAE-SDK to the latest 1.1.4 version

Change-Id: Ica59ab3107d9c0bcbf4dbaacf5063d4ceb8ed4b9
Issue-ID: DCAEGEN2-1354
Signed-off-by: Stavros Kanarakis <stavros.kanarakis@nokia.com>
25 files changed:
components/bbs-event-processor/dpo/blueprints/bbs-event-processor-input.yaml
components/bbs-event-processor/dpo/blueprints/k8s-bbs-event-processor.yaml-template
components/bbs-event-processor/dpo/spec/bbs-event-processor-spec.json
components/bbs-event-processor/pom.xml
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ApplicationConfiguration.java
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/config/ConsulConfigurationGateway.java
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/model/GeneratedAppConfigObject.java
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipeline.java
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipeline.java
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImpl.java
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTask.java
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImpl.java
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImpl.java
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java
components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java
components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ApplicationConfigurationTest.java
components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/config/ConsulConfigurationGatewayTest.java
components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/CpeAuthenticationPipelineTest.java
components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/pipelines/ReRegistrationPipelineTest.java
components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapCpeAuthenticationConsumerTaskImplTest.java
components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapPublisherTaskImplTest.java
components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/tasks/DmaapReRegistrationConsumerTaskImplTest.java
components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParserTest.java
components/bbs-event-processor/src/test/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParserTest.java

index a7adbbb..f503eb8 100644 (file)
@@ -17,9 +17,9 @@
 
 
 tag_version: nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.components.bbs-event-processor:1.0.0-SNAPSHOT
-pnf_reregistration_url: http:message-router:3904/events/unauthenticated.PNF_UPDATE
-cpe_authentication_url: http:message-router:3904/events/unauthenticated.CPE_AUTHENTICATION
-close_loop_url: http:message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT
+pnf_reregistration_url: https:message-router:3905/events/unauthenticated.PNF_UPDATE
+cpe_authentication_url: https:message-router:3905/events/unauthenticated.CPE_AUTHENTICATION
+close_loop_url: https:message-router:3905/events/unauthenticated.DCAE_CL_OUTPUT
 application_rereg_policy_scope: policyScopeReReg
 application_rereg_cl_control_name: clControlNameReReg
 application_cpeAuth_policy_scope: policyScopeCpeAuth
index 2d2c7a5..eaf6275 100644 (file)
@@ -26,13 +26,17 @@ imports:
 inputs:
   aai_enrichment_host:
     type: string
-    default: "aai"
+    default: "aai.onap"
   aai_enrichment_port:
     type: integer
     default: 8443
   aai_enrichment_protocol:
     type: string
     default: "https"
+  aai_secure_enable_cert:
+    type: boolean
+    description: enable certificates-based connection with AAI
+    default: true
   tag_version:
     type: string
   replicas:
@@ -90,10 +94,20 @@ inputs:
   application_logging_level:
     type: string
     default: "INFO"
+  dmaap_username:
+    type: string
+    default: admin
+  dmaap_password:
+    type: string
+    default: admin
   dmaap_consumer_id:
     type: string
   dmaap_consumer_group:
     type: string
+  dmaap_secure_enable_cert:
+    type: boolean
+    description: enable certificates-based connection with DMaaP
+    default: true
 node_templates:
   bbs-event-processor:
     type: dcae.nodes.ContainerizedPlatformComponent
@@ -102,18 +116,24 @@ node_templates:
         streams_subscribes:
           pnf_reregistration:
             type: message_router
+            aaf_username: { get_input: dmaap_username }
+            aaf_password: { get_input: dmaap_password }
             dmaap_info:
               topic_url: { get_input: pnf_reregistration_url }
           cpe_authentication:
             type: message_router
+            aaf_username: { get_input: dmaap_username }
+            aaf_password: { get_input: dmaap_password }
             dmaap_info:
               topic_url: { get_input: cpe_authentication_url }
         streams_publishes:
           close_loop:
             type: message_router
+            aaf_username: { get_input: dmaap_username }
+            aaf_password: { get_input: dmaap_password }
             dmaap_info:
               topic_url: { get_input: close_loop_url }
-        dmaap.protocol: "http"
+        dmaap.protocol: "https"
         dmaap.contentType: "application/json"
         dmaap.consumer.consumerId: { get_input: dmaap_consumer_id }
         dmaap.consumer.consumerGroup: { get_input: dmaap_consumer_group }
@@ -142,6 +162,12 @@ node_templates:
         application.cpeAuth.configKey: "cpe_authentication"
         application.closeLoop.configKey: "close_loop"
         application.loggingLevel: { get_input: application_logging_level }
+        application.ssl.keyStorePath: "/opt/app/bbs-event-processor/etc/cert/cert.jks"
+        application.ssl.keyStorePasswordPath: "/opt/app/bbs-event-processor/etc/cert/jks.pass"
+        application.ssl.trustStorePath: "/opt/app/bbs-event-processor/etc/cert/trust.jks"
+        application.ssl.trustStorePasswordPath: "/opt/app/bbs-event-processor/etc/cert/trust.pass"
+        application.ssl.enableAaiCertAuth: { get_input: aai_secure_enable_cert }
+        application.ssl.enableDmaapCertAuth: { get_input: dmaap_secure_enable_cert }
       host_port:
           { get_input: host_port }
       container_port:
@@ -160,5 +186,5 @@ node_templates:
       log_info:
         log_directory: "/opt/app/bbs-event-processor/logs"
       tls_info:
-        cert_directory: '/opt/app/bbs-event-processor/etc/cert/'
-        use_tls: false
\ No newline at end of file
+        cert_directory: '/opt/app/bbs-event-processor/etc/cert'
+        use_tls: true
\ No newline at end of file
index 49b0e8f..08fe458 100644 (file)
   },
   "artifacts": [
     {
-    "uri": "nexus3.onap.org:10003/onap/org.onap.dcaegen2.services.components.bbs-event-processor:1.0.0-SNAPSHOT",
+    "uri": "nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.components.bbs-event-processor:1.0.0-SNAPSHOT",
     "type": "docker image"
     }
   ]
index 7ebe18e..a4a0ffd 100644 (file)
@@ -33,9 +33,7 @@
         <slf4j.version>1.7.25</slf4j.version>
         <junit-platform.version>1.1.0</junit-platform.version>
         <jacoco.version>0.8.2</jacoco.version>
-        <sdk.version>1.1.2-SNAPSHOT</sdk.version>
-        <common.sdk.version>1.1.3</common.sdk.version>
-        <cbs.version>1.1.3</cbs.version>
+        <dcae.sdk.version>1.1.4</dcae.sdk.version>
         <wiremock.version>2.21.0</wiremock.version>
         <springfox-swagger.version>2.8.0</springfox-swagger.version>
         <maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
             <dependency>
                 <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
                 <artifactId>cbs-client</artifactId>
-                <version>${cbs.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
-                <artifactId>aai-client</artifactId>
-                <version>${sdk.version}</version>
+                <version>${dcae.sdk.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
                 <artifactId>dmaap-client</artifactId>
-                <version>${sdk.version}</version>
+                <version>${dcae.sdk.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
                 <artifactId>common-dependency</artifactId>
-                <version>${common.sdk.version}</version>
+                <version>${dcae.sdk.version}</version>
             </dependency>
             <dependency>
                 <groupId>io.springfox</groupId>
             <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
             <artifactId>cbs-client</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
-            <artifactId>aai-client</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
             <artifactId>dmaap-client</artifactId>
             <groupId>io.projectreactor</groupId>
             <artifactId>reactor-core</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.tomcat.embed</groupId>
-            <artifactId>tomcat-embed-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.tomcat.embed</groupId>
-            <artifactId>tomcat-embed-el</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.tomcat.embed</groupId>
-            <artifactId>tomcat-embed-websocket</artifactId>
-        </dependency>
         <dependency>
             <groupId>org.immutables</groupId>
             <artifactId>value</artifactId>
index 981d963..5022a69 100644 (file)
@@ -174,6 +174,13 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
         synchronized (this) {
             cbsPollingInterval = newConfiguration.cbsPollingIntervalSec();
 
+            securityProperties.setEnableAaiCertAuth(newConfiguration.enableAaiCertAuth());
+            securityProperties.setEnableDmaapCertAuth(newConfiguration.enableDmaapCertAuth());
+            securityProperties.setKeyStorePath(newConfiguration.keyStorePath());
+            securityProperties.setKeyStorePasswordPath(newConfiguration.keyStorePasswordPath());
+            securityProperties.setTrustStorePath(newConfiguration.trustStorePath());
+            securityProperties.setTrustStorePasswordPath(newConfiguration.trustStorePasswordPath());
+
             GeneratedAppConfigObject.StreamsObject reRegObject =
                     getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(),
                             "PNF Re-Registration");
@@ -181,6 +188,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
             dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
             dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
             dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+            dmaapReRegistrationConsumerProperties.setDmaapUserName(reRegObject.aafUsername());
+            dmaapReRegistrationConsumerProperties.setDmaapUserPassword(reRegObject.aafPassword());
             dmaapReRegistrationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
             dmaapReRegistrationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
             dmaapReRegistrationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
@@ -196,6 +205,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
             dmaapCpeAuthenticationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
             dmaapCpeAuthenticationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
             dmaapCpeAuthenticationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+            dmaapCpeAuthenticationConsumerProperties.setDmaapUserName(cpeAuthObject.aafUsername());
+            dmaapCpeAuthenticationConsumerProperties.setDmaapUserPassword(cpeAuthObject.aafPassword());
             dmaapCpeAuthenticationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
             dmaapCpeAuthenticationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
             dmaapCpeAuthenticationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
@@ -211,6 +222,8 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
             dmaapProducerProperties.setDmaapHostName(topicUrlInfo.getHost());
             dmaapProducerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
             dmaapProducerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+            dmaapProducerProperties.setDmaapUserName(closeLoopObject.aafUsername());
+            dmaapProducerProperties.setDmaapUserPassword(closeLoopObject.aafPassword());
             dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
             dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
             constructDmaapProducerConfiguration();
@@ -361,7 +374,7 @@ public class ApplicationConfiguration implements ConfigurationChangeObservable {
             throw new ConfigurationParsingException("Wrong topic name structure");
         }
         topicUrlInfo.setPort(Integer.valueOf(tokensAfterHost[0]));
-        topicUrlInfo.setTopicName("/events/" + tokensAfterHost[1]);
+        topicUrlInfo.setTopicName("events/" + tokensAfterHost[1]);
 
         return topicUrlInfo;
     }
index 1d27fc0..607b3b3 100644 (file)
@@ -39,6 +39,8 @@ import org.onap.bbs.event.processor.model.ImmutableDmaapInfo;
 import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
 import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
@@ -123,12 +125,12 @@ public class ConsulConfigurationGateway {
 
         // Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
         EnvProperties env = EnvProperties.fromEnvironment();
-
+        CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
         // Create the client and use it to get the configuration
         cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
                 .doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
                 .retry(e -> true)
-                .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
+                .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
                 .subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
     }
 
@@ -178,6 +180,13 @@ public class ConsulConfigurationGateway {
 
         final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
 
+        final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
+        final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
+        final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
+        final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
+        final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
+        final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
+
         final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
         final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
 
@@ -211,6 +220,12 @@ public class ConsulConfigurationGateway {
                 .cpeAuthConfigKey(cpeAuthConfigKey)
                 .closeLoopConfigKey(closeLoopConfigKey)
                 .loggingLevel(loggingLevel)
+                .keyStorePath(keyStorePath)
+                .keyStorePasswordPath(keyStorePasswordPath)
+                .trustStorePath(trustStorePath)
+                .trustStorePasswordPath(trustStorePasswordPath)
+                .enableAaiCertAuth(aaiEnableCertAuth)
+                .enableDmaapCertAuth(dmaapEnableCertAuth)
                 .streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
                 .streamPublishesMap(parseStreamsObjects(streamsPublishes))
                 .build();
index 4fdb81b..41a8a34 100644 (file)
@@ -123,6 +123,25 @@ public interface GeneratedAppConfigObject {
     @SerializedName(value = "application.loggingLevel", alternate = "application.loggingLevel")
     String loggingLevel();
 
+    @SerializedName(value = "application.ssl.trustStorePath", alternate = "application.ssl.trustStorePath")
+    String trustStorePath();
+
+    @SerializedName(value = "application.ssl.trustStorePasswordPath",
+            alternate = "application.ssl.trustStorePasswordPath")
+    String trustStorePasswordPath();
+
+    @SerializedName(value = "application.ssl.keyStorePath", alternate = "application.ssl.keyStorePath")
+    String keyStorePath();
+
+    @SerializedName(value = "application.ssl.keyStorePasswordPath", alternate = "application.ssl.keyStorePasswordPath")
+    String keyStorePasswordPath();
+
+    @SerializedName(value = "application.ssl.enableAaiCertAuth", alternate = "application.ssl.enableAaiCertAuth")
+    boolean enableAaiCertAuth();
+
+    @SerializedName(value = "application.ssl.enableDmaapCertAuth", alternate = "application.ssl.enableDmaapCertAuth")
+    boolean enableDmaapCertAuth();
+
     @SerializedName(value = "streams_subscribes", alternate = "streams_subscribes")
     Map<String, StreamsObject> streamSubscribesMap();
 
index 711ab18..a30903b 100644 (file)
@@ -49,11 +49,11 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
 import org.onap.bbs.event.processor.tasks.AaiClientTask;
 import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
 
@@ -99,7 +99,7 @@ public class CpeAuthenticationPipeline {
         LOGGER.trace("Reactive CPE Authentication pipeline subscribed - Execution started");
     }
 
-    Flux<ResponseEntity<String>> executePipeline() {
+    Flux<HttpResponse> executePipeline() {
         return
             // Consume CPE Authentication from DMaaP
             consumeCpeAuthenticationFromDmaap()
@@ -111,11 +111,11 @@ public class CpeAuthenticationPipeline {
             .flatMap(this::triggerPolicy);
     }
 
-    private void onSuccess(ResponseEntity<String> responseCode) {
-        MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+    private void onSuccess(HttpResponse responseCode) {
+        MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
         LOGGER.info("CPE Authentication event successfully handled. "
                         + "Publishing to DMaaP for Policy returned a status code of ({} {})",
-                responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase());
+                responseCode.statusCode(), responseCode.statusReason());
         MDC.remove(RESPONSE_CODE);
     }
 
@@ -171,8 +171,10 @@ public class CpeAuthenticationPipeline {
                 .doOnError(TimeoutException.class,
                         e -> LOGGER.warn("Timed out waiting for A&AI response")
                 )
-                .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}",
-                        e.getMessage())
+                .doOnError(e -> {
+                            LOGGER.error("Error while retrieving PNF: {}", e.getMessage());
+                            LOGGER.debug("Error\n", e);
+                        }
                 )
                 .onErrorResume(
                     e -> e instanceof Exception,
@@ -214,8 +216,10 @@ public class CpeAuthenticationPipeline {
                 .doOnError(TimeoutException.class,
                         e -> LOGGER.warn("Timed out waiting for A&AI response")
                 )
-                .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}",
-                        e.getMessage())
+                .doOnError(e -> {
+                            LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage());
+                            LOGGER.debug("Error\n", e);
+                        }
                 )
                 .onErrorResume(
                     e -> e instanceof Exception,
@@ -226,7 +230,7 @@ public class CpeAuthenticationPipeline {
                 });
     }
 
-    private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) {
+    private Mono<HttpResponse> triggerPolicy(PipelineState state) {
 
         if (state == null || state.getHsiCfsServiceInstance() == null) {
             return Mono.empty();
index 9a42ed2..33a9aea 100644 (file)
@@ -48,11 +48,11 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
 import org.onap.bbs.event.processor.tasks.AaiClientTask;
 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
 import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
 import org.springframework.util.StringUtils;
 
@@ -98,7 +98,7 @@ public class ReRegistrationPipeline {
         LOGGER.trace("Reactive PNF Re-registration pipeline subscribed - Execution started");
     }
 
-    Flux<ResponseEntity<String>> executePipeline() {
+    Flux<HttpResponse> executePipeline() {
         return
             // Consume Re-Registration from DMaaP
             consumeReRegistrationsFromDmaap()
@@ -110,11 +110,11 @@ public class ReRegistrationPipeline {
             .flatMap(this::triggerPolicy);
     }
 
-    private void onSuccess(ResponseEntity<String> responseCode) {
-        MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+    private void onSuccess(HttpResponse responseCode) {
+        MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
         LOGGER.info("PNF Re-Registration event successfully handled. "
                         + "Publishing to DMaaP for Policy returned a status code of ({} {})",
-                responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase());
+                responseCode.statusCode(), responseCode.statusReason());
         MDC.remove(RESPONSE_CODE);
     }
 
@@ -170,8 +170,10 @@ public class ReRegistrationPipeline {
                 .doOnError(TimeoutException.class,
                         e -> LOGGER.warn("Timed out waiting for A&AI response")
                 )
-                .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}",
-                        e.getMessage())
+                .doOnError(e -> {
+                            LOGGER.error("Error while retrieving PNF: {}", e.getMessage());
+                            LOGGER.debug("Error\n", e);
+                        }
                 )
                 .onErrorResume(
                     e -> e instanceof Exception,
@@ -219,8 +221,10 @@ public class ReRegistrationPipeline {
                 .doOnError(TimeoutException.class,
                         e -> LOGGER.warn("Timed out waiting for A&AI response")
                 )
-                .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}",
-                        e.getMessage())
+                .doOnError(e -> {
+                            LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage());
+                            LOGGER.debug("Error\n", e);
+                        }
                 )
                 .onErrorResume(
                     e -> e instanceof Exception,
@@ -259,7 +263,7 @@ public class ReRegistrationPipeline {
         return isNotRelocation;
     }
 
-    private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) {
+    private Mono<HttpResponse> triggerPolicy(PipelineState state) {
 
         if (state == null || state.getHsiCfsServiceInstance() == null) {
             return Mono.empty();
index e6bef52..da51028 100644 (file)
 
 package org.onap.bbs.event.processor.tasks;
 
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.net.ssl.SSLException;
@@ -85,6 +89,8 @@ public class DmaapCpeAuthenticationConsumerTaskImpl
     public synchronized void updateConfiguration() {
         try {
             LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
+            LOGGER.info("Creating secure context with:\n {}",
+                    this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
             httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
         } catch (SSLException e) {
             LOGGER.error("SSL error while updating HTTP Client after a config update");
@@ -96,7 +102,7 @@ public class DmaapCpeAuthenticationConsumerTaskImpl
     public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) {
         LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName);
         DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
-        Mono<String> response = httpClient.getDMaaPConsumerResponse();
+        Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
         return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response)
                 .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
                 .doOnError(e -> {
index bddd2ec..749c4e5 100644 (file)
 package org.onap.bbs.event.processor.tasks;
 
 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
-import org.springframework.http.ResponseEntity;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 
 import reactor.core.publisher.Mono;
 
 public interface DmaapPublisherTask {
 
-    Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
+    Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
 }
index 7b22721..283e5ef 100644 (file)
 
 package org.onap.bbs.event.processor.tasks;
 
+import java.util.Optional;
+
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
+import javax.net.ssl.SSLException;
 
 import org.onap.bbs.event.processor.config.ApplicationConfiguration;
 import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
 import org.onap.bbs.event.processor.exceptions.DmaapException;
 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
 import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Mono;
@@ -59,7 +62,12 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration
         this.configuration = configuration;
         this.httpClientFactory = httpClientFactory;
 
-        httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+        try {
+            httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+        } catch (SSLException e) {
+            LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage());
+            LOGGER.debug("SSL exception\n", e);
+        }
     }
 
     @PostConstruct
@@ -75,17 +83,24 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask, Configuration
     @Override
     public synchronized void updateConfiguration() {
         LOGGER.info("DMaaP Publisher update due to new application configuration");
-        httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+        try {
+            LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration());
+            httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+        } catch (SSLException e) {
+            LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage());
+            LOGGER.debug("SSL exception\n", e);
+        }
     }
 
     @Override
-    public Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
+    public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
         if (controlLoopPublisherDmaapModel == null) {
             throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message");
         }
-        LOGGER.info("Executing task for publishing control loop message \n{}", controlLoopPublisherDmaapModel);
+        LOGGER.info("Executing task for publishing control loop message");
+        LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel);
         DMaaPPublisherReactiveHttpClient httpClient = getHttpClient();
-        return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
+        return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
     }
 
     private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() {
index 92f5a86..e40037b 100644 (file)
 
 package org.onap.bbs.event.processor.tasks;
 
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.net.ssl.SSLException;
@@ -85,6 +89,8 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
     public synchronized void updateConfiguration() {
         try {
             LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
+            LOGGER.info("Creating secure context with:\n {}",
+                    this.configuration.getDmaapReRegistrationConsumerConfiguration());
             httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
         } catch (SSLException e) {
             LOGGER.error("SSL error while updating HTTP Client after a config update");
@@ -96,7 +102,7 @@ public class DmaapReRegistrationConsumerTaskImpl implements DmaapReRegistrationC
     public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) {
         LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName);
         DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
-        Mono<String> response = httpClient.getDMaaPConsumerResponse();
+        Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
         return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response)
                 .switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
                 .doOnError(e -> {
index 19b81a8..84fc9f7 100644 (file)
@@ -181,6 +181,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver {
 
     private SslContext createSslContext() throws SSLException {
         if (aaiClientConfiguration.enableAaiCertAuth()) {
+            LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration);
             return sslFactory.createSecureContext(
                     aaiClientConfiguration.keyStorePath(),
                     aaiClientConfiguration.keyStorePasswordPath(),
index 2bb5d98..3cff4e6 100644 (file)
@@ -37,7 +37,6 @@ import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
 
 import java.util.Optional;
 import java.util.stream.StreamSupport;
@@ -79,20 +78,9 @@ public class CpeAuthenticationDmaapConsumerJsonParser {
      * @param dmaapResponse Response from DMaaP
      * @return CPE Authentication Consumer DMaaP reactive model
      */
-    public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
+    public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) {
         return dmaapResponse
-                .flatMapMany(this::parseToMono)
-                .flatMap(this::createTargetFlux);
-    }
-
-    private Mono<JsonElement> parseToMono(String message) {
-        if (StringUtils.isEmpty(message)) {
-            LOGGER.warn("DMaaP response is empty");
-            return Mono.empty();
-        }
-        return Mono.fromCallable(() -> new JsonParser().parse(message))
-                .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
-                .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
+                .flatMapMany(this::createTargetFlux);
     }
 
     private Flux<CpeAuthenticationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
index 947d7a7..9fe0c27 100644 (file)
@@ -32,7 +32,6 @@ import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
 
 import java.util.Optional;
 import java.util.stream.StreamSupport;
@@ -72,20 +71,9 @@ public class ReRegistrationDmaapConsumerJsonParser {
      * @param dmaapResponse Response from DMaaP
      * @return Re-Registration Consumer DMaaP reactive model
      */
-    public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
+    public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) {
         return dmaapResponse
-                .flatMapMany(this::parseToMono)
-                .flatMap(this::createTargetFlux);
-    }
-
-    private Mono<JsonElement> parseToMono(String message) {
-        if (StringUtils.isEmpty(message)) {
-            LOGGER.warn("DMaaP response is empty");
-            return Mono.empty();
-        }
-        return Mono.fromCallable(() -> new JsonParser().parse(message))
-                .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
-                .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
+                .flatMapMany(this::createTargetFlux);
     }
 
     private Flux<ReRegistrationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
index 220466b..69fbb3f 100644 (file)
@@ -91,8 +91,8 @@ import org.springframework.test.context.TestPropertySource;
         "configs.security.trustStorePasswordPath=test trust store password path",
         "configs.security.keyStorePath=test key store path",
         "configs.security.keyStorePasswordPath=test key store password path",
-        "configs.security.enableDmaapCertAuth=true",
-        "configs.security.enableAaiCertAuth=true",
+        "configs.security.enableDmaapCertAuth=false",
+        "configs.security.enableAaiCertAuth=false",
         "configs.application.pipelinesPollingIntervalSec=30",
         "configs.application.pipelinesTimeoutSec=15",
         "configs.application.policyVersion=1.0.0",
@@ -208,6 +208,17 @@ class ApplicationConfigurationTest {
             () -> assertEquals("reRegControlName", configuration.getReRegistrationCloseLoopControlName()),
             () -> assertEquals("cpeAuthControlName", configuration.getCpeAuthenticationCloseLoopControlName())
         );
+
+        assertAll("Security Application Properties",
+            () -> assertFalse(aaiClientConfiguration.enableAaiCertAuth()),
+            () -> assertFalse(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()),
+            () -> assertEquals("test key store path", aaiClientConfiguration.keyStorePath()),
+            () -> assertEquals("test key store password path",
+                        aaiClientConfiguration.keyStorePasswordPath()),
+            () -> assertEquals("test trust store path", aaiClientConfiguration.trustStorePath()),
+            () -> assertEquals("test trust store password path",
+                        aaiClientConfiguration.trustStorePasswordPath())
+        );
     }
 
     @Test
@@ -287,6 +298,12 @@ class ApplicationConfigurationTest {
                 .cpeAuthConfigKey("config_key_2")
                 .closeLoopConfigKey("config_key_3")
                 .loggingLevel("TRACE")
+                .keyStorePath("test key store path - update")
+                .keyStorePasswordPath("test key store password path - update")
+                .trustStorePath("test trust store path - update")
+                .trustStorePasswordPath("test trust store password path - update")
+                .enableAaiCertAuth(true)
+                .enableDmaapCertAuth(true)
                 .streamSubscribesMap(subscribes)
                 .streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3))
                 .build();
@@ -315,11 +332,11 @@ class ApplicationConfigurationTest {
         assertAll("DMaaP Consumer Re-Registration Configuration Properties",
             () -> assertEquals("we-are-message-router1.us", dmaapConsumerReRegistrationConfig.dmaapHostName()),
             () -> assertEquals(Integer.valueOf(3901), dmaapConsumerReRegistrationConfig.dmaapPortNumber()),
-            () -> assertEquals("/events/unauthenticated.PNF_UPDATE",
+            () -> assertEquals("events/unauthenticated.PNF_UPDATE",
                     dmaapConsumerReRegistrationConfig.dmaapTopicName()),
             () -> assertEquals("https", dmaapConsumerReRegistrationConfig.dmaapProtocol()),
-            () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserName()),
-            () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserPassword()),
+            () -> assertEquals("some-user", dmaapConsumerReRegistrationConfig.dmaapUserName()),
+            () -> assertEquals("some-password", dmaapConsumerReRegistrationConfig.dmaapUserPassword()),
             () -> assertEquals("application/json", dmaapConsumerReRegistrationConfig.dmaapContentType()),
             () -> assertEquals("c13", dmaapConsumerReRegistrationConfig.consumerId()),
             () -> assertEquals("OpenDcae-c13", dmaapConsumerReRegistrationConfig.consumerGroup()),
@@ -332,11 +349,11 @@ class ApplicationConfigurationTest {
         assertAll("DMaaP Consumer CPE Authentication Configuration Properties",
             () -> assertEquals("we-are-message-router2.us", dmaapConsumerCpeAuthenticationConfig.dmaapHostName()),
             () -> assertEquals(Integer.valueOf(3902), dmaapConsumerCpeAuthenticationConfig.dmaapPortNumber()),
-            () -> assertEquals("/events/unauthenticated.CPE_AUTHENTICATION",
+            () -> assertEquals("events/unauthenticated.CPE_AUTHENTICATION",
                     dmaapConsumerCpeAuthenticationConfig.dmaapTopicName()),
             () -> assertEquals("https", dmaapConsumerCpeAuthenticationConfig.dmaapProtocol()),
-            () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()),
-            () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()),
+            () -> assertEquals("some-user", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()),
+            () -> assertEquals("some-password", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()),
             () -> assertEquals("application/json", dmaapConsumerCpeAuthenticationConfig.dmaapContentType()),
             () -> assertEquals("c13", dmaapConsumerCpeAuthenticationConfig.consumerId()),
             () -> assertEquals("OpenDcae-c13", dmaapConsumerCpeAuthenticationConfig.consumerGroup()),
@@ -348,11 +365,11 @@ class ApplicationConfigurationTest {
         assertAll("DMaaP Publisher Configuration Properties",
             () -> assertEquals("we-are-message-router3.us", dmaapPublisherConfiguration.dmaapHostName()),
             () -> assertEquals(Integer.valueOf(3903), dmaapPublisherConfiguration.dmaapPortNumber()),
-            () -> assertEquals("/events/unauthenticated.DCAE_CL_OUTPUT",
+            () -> assertEquals("events/unauthenticated.DCAE_CL_OUTPUT",
                     dmaapPublisherConfiguration.dmaapTopicName()),
             () -> assertEquals("https", dmaapPublisherConfiguration.dmaapProtocol()),
-            () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserName()),
-            () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserPassword()),
+            () -> assertEquals("some-user", dmaapPublisherConfiguration.dmaapUserName()),
+            () -> assertEquals("some-password", dmaapPublisherConfiguration.dmaapUserPassword()),
             () -> assertEquals("application/json", dmaapPublisherConfiguration.dmaapContentType())
         );
 
@@ -371,5 +388,16 @@ class ApplicationConfigurationTest {
             () -> assertEquals("controlName-update", configuration.getReRegistrationCloseLoopControlName()),
             () -> assertEquals("controlName-update", configuration.getCpeAuthenticationCloseLoopControlName())
         );
+
+        assertAll("Security Application Properties",
+            () -> assertTrue(aaiClientConfiguration.enableAaiCertAuth()),
+            () -> assertTrue(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()),
+            () -> assertEquals("test key store path - update", aaiClientConfiguration.keyStorePath()),
+            () -> assertEquals("test key store password path - update",
+                        aaiClientConfiguration.keyStorePasswordPath()),
+            () -> assertEquals("test trust store path - update", aaiClientConfiguration.trustStorePath()),
+            () -> assertEquals("test trust store password path - update",
+                        aaiClientConfiguration.trustStorePasswordPath())
+        );
     }
 }
\ No newline at end of file
index 9f5ce6d..1acf864 100644 (file)
@@ -84,6 +84,12 @@ class ConsulConfigurationGatewayTest {
                 + "\"application.cpeAuth.configKey\": \"config_key_1\","
                 + "\"application.closeLoop.configKey\": \"config_key_3\","
                 + "\"application.loggingLevel\": \"TRACE\","
+                + "\"application.ssl.keyStorePath\": \"/opt/app/bbs-event-processor/etc/cert/key.p12\","
+                + "\"application.ssl.keyStorePasswordPath\": \"/opt/app/bbs-event-processor/etc/cert/key.pass\","
+                + "\"application.ssl.trustStorePath\": \"/opt/app/bbs-event-processor/etc/cert/trust.jks\","
+                + "\"application.ssl.trustStorePasswordPath\": \"/opt/app/bbs-event-processor/etc/cert/trust.pass\","
+                + "\"application.ssl.enableAaiCertAuth\": true,"
+                + "\"application.ssl.enableDmaapCertAuth\": true,"
                 + "\"streams_subscribes\": {"
                 + "\"config_key_1\": {"
                 + "\"type\": \"message_router\","
@@ -203,6 +209,12 @@ class ConsulConfigurationGatewayTest {
                 .cpeAuthConfigKey("config_key_1")
                 .closeLoopConfigKey("config_key_3")
                 .loggingLevel("TRACE")
+                .keyStorePath("/opt/app/bbs-event-processor/etc/cert/key.p12")
+                .keyStorePasswordPath("/opt/app/bbs-event-processor/etc/cert/key.pass")
+                .trustStorePath("/opt/app/bbs-event-processor/etc/cert/trust.jks")
+                .trustStorePasswordPath("/opt/app/bbs-event-processor/etc/cert/trust.pass")
+                .enableAaiCertAuth(true)
+                .enableDmaapCertAuth(true)
                 .streamSubscribesMap(subscribes)
                 .streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3))
                 .build();
index 76d9659..c4bef9d 100644 (file)
@@ -64,15 +64,13 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
 import org.onap.bbs.event.processor.tasks.AaiClientTask;
 import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
-// We can safely suppress unchecked assignment warnings for the ResponseEntity mock
-@SuppressWarnings("unchecked")
 @DisplayName("CPE Authentication Pipeline Unit-Tests")
 class CpeAuthenticationPipelineTest {
 
@@ -82,12 +80,12 @@ class CpeAuthenticationPipelineTest {
     private DmaapPublisherTask publisherTask;
     private AaiClientTask aaiClientTask;
 
-    private ResponseEntity<String> responseEntity;
+    private HttpResponse httpResponse;
 
     @BeforeEach
     void setup() {
 
-        responseEntity = Mockito.mock(ResponseEntity.class);
+        httpResponse = Mockito.mock(HttpResponse.class);
 
         configuration = Mockito.mock(ApplicationConfiguration.class);
         consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
@@ -268,13 +266,13 @@ class CpeAuthenticationPipelineTest {
                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
-        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+        when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
         // Execute the pipeline
         StepVerifier.create(pipeline.executePipeline())
                 .expectSubscription()
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
                 .verifyComplete();
 
         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -341,14 +339,14 @@ class CpeAuthenticationPipelineTest {
                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
-        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+        when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
         // Execute the pipeline
         StepVerifier.create(pipeline.executePipeline())
                 .expectSubscription()
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
                 .verifyComplete();
 
         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -452,13 +450,13 @@ class CpeAuthenticationPipelineTest {
                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
-        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+        when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
         // Execute the pipeline
         StepVerifier.create(pipeline.executePipeline())
                 .expectSubscription()
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
                 .verifyComplete();
 
         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -512,13 +510,13 @@ class CpeAuthenticationPipelineTest {
                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
-        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+        when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
         // Execute the pipeline
         StepVerifier.create(pipeline.executePipeline())
                 .expectSubscription()
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
                 .verifyComplete();
 
         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
@@ -574,13 +572,13 @@ class CpeAuthenticationPipelineTest {
                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
-        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+        when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
         // Execute the pipeline
         StepVerifier.create(pipeline.executePipeline())
                 .expectSubscription()
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
                 .verifyComplete();
 
         verify(aaiClientTask, times(2))
index a1b6b14..9453db3 100644 (file)
@@ -64,15 +64,13 @@ import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject;
 import org.onap.bbs.event.processor.tasks.AaiClientTask;
 import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
 import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
-// We can safely suppress unchecked assignment warnings for the ResponseEntity mock
-@SuppressWarnings("unchecked")
 @DisplayName("PNF Re-registration Pipeline Unit-Tests")
 class ReRegistrationPipelineTest {
 
@@ -82,12 +80,12 @@ class ReRegistrationPipelineTest {
     private DmaapPublisherTask publisherTask;
     private AaiClientTask aaiClientTask;
 
-    private ResponseEntity<String> responseEntity;
+    private HttpResponse httpResponse;
 
     @BeforeEach
     void setup() {
 
-        responseEntity = Mockito.mock(ResponseEntity.class);
+        httpResponse = Mockito.mock(HttpResponse.class);
 
         configuration = Mockito.mock(ApplicationConfiguration.class);
         consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class);
@@ -262,8 +260,8 @@ class ReRegistrationPipelineTest {
                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
-        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+        when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
         // Execute the pipeline
         StepVerifier.create(pipeline.executePipeline())
@@ -312,13 +310,13 @@ class ReRegistrationPipelineTest {
                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
-        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+        when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
         // Execute the pipeline
         StepVerifier.create(pipeline.executePipeline())
                 .expectSubscription()
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
                 .verifyComplete();
 
         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -384,14 +382,14 @@ class ReRegistrationPipelineTest {
                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
-        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+        when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
         // Execute the pipeline
         StepVerifier.create(pipeline.executePipeline())
                 .expectSubscription()
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
                 .verifyComplete();
 
         verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -491,13 +489,13 @@ class ReRegistrationPipelineTest {
                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
                 .thenReturn(Mono.just(hsiCfsServiceInstance2));
 
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
-        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+        when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
         // Execute the pipeline
         StepVerifier.create(pipeline.executePipeline())
                 .expectSubscription()
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
                 .verifyComplete();
 
         verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
@@ -551,13 +549,13 @@ class ReRegistrationPipelineTest {
                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
-        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+        when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
         // Execute the pipeline
         StepVerifier.create(pipeline.executePipeline())
                 .expectSubscription()
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
                 .verifyComplete();
 
         verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
@@ -613,13 +611,13 @@ class ReRegistrationPipelineTest {
                 .executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
                 .thenReturn(Mono.just(hsiCfsServiceInstance));
 
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
-        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+        when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+        when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
 
         // Execute the pipeline
         StepVerifier.create(pipeline.executePipeline())
                 .expectSubscription()
-                .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+                .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
                 .verifyComplete();
 
         verify(aaiClientTask, times(2))
index 538ff1d..40bcb65 100644 (file)
@@ -26,6 +26,11 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
 import javax.net.ssl.SSLException;
 
 import org.junit.Assert;
@@ -62,6 +67,7 @@ class DmaapCpeAuthenticationConsumerTaskImplTest {
     private static CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel;
     private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
     private static String eventsArray;
+    private static Gson gson = new Gson();
 
     @BeforeAll
     static void setUp() throws SSLException {
@@ -108,22 +114,25 @@ class DmaapCpeAuthenticationConsumerTaskImplTest {
 
     @Test
     void passingEmptyMessage_NothingHappens() throws Exception {
-        when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(""));
+        JsonElement empty = gson.toJsonTree("");
+        when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty));
 
         StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
                 .expectSubscription()
                 .expectError(EmptyDmaapResponseException.class);
-        verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+        verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
     }
 
     @Test
     void passingNormalMessage_ResponseSucceeds() throws Exception {
-        when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(eventsArray));
+        JsonElement normalEventsArray = gson.toJsonTree(eventsArray);
+        when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
+                .thenReturn(Mono.just(normalEventsArray));
 
         StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
                 .expectSubscription()
                 .consumeNextWith(e -> Assert.assertEquals(e, cpeAuthenticationConsumerDmaapModel));
-        verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+        verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
     }
 
     private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() {
index 199a43e..436206d 100644 (file)
@@ -30,6 +30,9 @@ import static org.mockito.Mockito.when;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
+
+import javax.net.ssl.SSLException;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -39,12 +42,12 @@ import org.onap.bbs.event.processor.config.ApplicationConfiguration;
 import org.onap.bbs.event.processor.exceptions.DmaapException;
 import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
 import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
 import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
 
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -112,46 +115,46 @@ class DmaapPublisherTaskImplTest {
     }
 
     @Test
-    void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException {
-        ResponseEntity<String> responseEntity = setupMocks(HttpStatus.OK.value());
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.OK);
+    void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException, SSLException {
+        HttpResponse response = setupMocks(HttpStatus.OK.value());
+
         StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription()
-                .expectNext(responseEntity).verifyComplete();
+                .expectNext(response).verifyComplete();
 
         verify(reactiveHttpClient, times(1))
-                .getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
+                .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
         verifyNoMoreInteractions(reactiveHttpClient);
     }
 
     @Test
-    void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException {
-        ResponseEntity<String> responseEntity = setupMocks(HttpStatus.UNAUTHORIZED.value());
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.UNAUTHORIZED);
+    void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException, SSLException {
+        HttpResponse response = setupMocks(HttpStatus.UNAUTHORIZED.value());
+
         StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription()
-                .expectNext(responseEntity).verifyComplete();
+                .expectNext(response).verifyComplete();
 
         verify(reactiveHttpClient, times(1))
-                .getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
+                .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
         verifyNoMoreInteractions(reactiveHttpClient);
     }
 
     // We can safely suppress unchecked assignment warning here since it is a mock class
     @SuppressWarnings("unchecked")
-    private ResponseEntity<String> setupMocks(Integer httpResponseCode) {
+    private HttpResponse setupMocks(Integer httpResponseCode) throws SSLException {
 
-        ResponseEntity<String> responseEntity = mock(ResponseEntity.class);
-        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(httpResponseCode));
+        HttpResponse response = mock(HttpResponse.class);
+        when(response.statusCode()).thenReturn(httpResponseCode);
 
         reactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class);
-        when(reactiveHttpClient.getDMaaPProducerResponse(any()))
-                .thenReturn(Mono.just(responseEntity));
+        when(reactiveHttpClient.getDMaaPProducerResponse(any(), any(Optional.class)))
+                .thenReturn(Mono.just(response));
 
         PublisherReactiveHttpClientFactory httpClientFactory = mock(PublisherReactiveHttpClientFactory.class);
         doReturn(reactiveHttpClient).when(httpClientFactory).create(dmaapPublisherConfiguration);
 
         task = new DmaapPublisherTaskImpl(configuration, httpClientFactory);
 
-        return responseEntity;
+        return response;
     }
 
     private static DmaapPublisherConfiguration testVersionOfDmaapPublisherConfiguration() {
index c9a461d..72e2898 100644 (file)
@@ -26,6 +26,11 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
 import javax.net.ssl.SSLException;
 
 import org.junit.Assert;
@@ -59,7 +64,8 @@ class DmaapReRegistrationConsumerTaskImplTest {
     private static DmaapReRegistrationConsumerTaskImpl dmaapConsumerTask;
     private static ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel;
     private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
-    private static String message;
+    private static String eventsArray;
+    private static Gson gson = new Gson();
 
     @BeforeAll
     static void setUp() throws SSLException {
@@ -91,12 +97,10 @@ class DmaapReRegistrationConsumerTaskImplTest {
                 .sVlan(svlan)
                 .build();
 
-        message = String.format("[" + RE_REGISTRATION_EVENT_TEMPLATE + "]",
-                sourceName,
-                attachmentPoint,
-                remoteId,
-                cvlan,
-                svlan);
+        String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, sourceName, attachmentPoint, remoteId,
+                cvlan, svlan);
+
+        eventsArray = "[" + event + "]";
     }
 
     @AfterEach
@@ -105,23 +109,26 @@ class DmaapReRegistrationConsumerTaskImplTest {
     }
 
     @Test
-    void passingEmptyMessage_NothingHappens() throws Exception {
-        when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(""));
+    void passingEmptyMessage_NothingHappens() {
+        JsonElement empty = gson.toJsonTree("");
+        when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty));
 
         StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
                 .expectSubscription()
                 .expectError(EmptyDmaapResponseException.class);
-        verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+        verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
     }
 
     @Test
-    void passingNormalMessage_ResponseSucceeds() throws Exception {
-        when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(message));
+    void passingNormalMessage_ResponseSucceeds() {
+        JsonElement normalEventsArray = gson.toJsonTree(eventsArray);
+        when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
+                .thenReturn(Mono.just(normalEventsArray));
 
         StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
                 .expectSubscription()
                 .consumeNextWith(e -> Assert.assertEquals(e, reRegistrationConsumerDmaapModel));
-        verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+        verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
     }
 
     private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() {
index 4ca61f5..c7ad793 100644 (file)
@@ -24,7 +24,9 @@ import static org.mockito.Mockito.spy;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
+import com.google.gson.stream.JsonReader;
 
+import java.io.StringReader;
 import java.util.Optional;
 
 import org.junit.jupiter.api.BeforeAll;
@@ -100,14 +102,17 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
     }
 
     @Test
-    void passingNonJson_EmptyFluxIsReturned() {
+    void passingNonJson_getIllegalStateException() {
 
         CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
                 new CpeAuthenticationDmaapConsumerJsonParser();
+        JsonReader jsonReader = new JsonReader(new StringReader("not JSON"));
+        jsonReader.setLenient(true);
+        JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive();
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("not JSON")))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson)))
                 .expectSubscription()
-                .verifyComplete();
+                .verifyError(IllegalStateException.class);
     }
 
     @Test
@@ -116,7 +121,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
         CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
                 new CpeAuthenticationDmaapConsumerJsonParser();
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("[]")))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]"))))
                 .expectSubscription()
                 .verifyComplete();
     }
@@ -151,7 +156,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
                 .swVersion(swVersion)
                 .build();
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .expectNext(expectedEventObject);
     }
@@ -182,7 +187,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
         Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject()))
                 .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2);
 
-        String eventsArray = "[" + firstEvent + secondEvent + "]";
+        String eventsArray = "[" + firstEvent + "," + secondEvent + "]";
 
         CpeAuthenticationConsumerDmaapModel expectedFirstEventObject =
                 ImmutableCpeAuthenticationConsumerDmaapModel.builder()
@@ -203,7 +208,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
                 .swVersion(swVersion)
                 .build();
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .expectNext(expectedFirstEventObject)
                 .expectNext(expectedSecondEventObject);
@@ -229,7 +234,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
 
         String eventsArray = "[" + event + "]";
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .verifyComplete();
     }
@@ -254,7 +259,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
 
         String eventsArray = "[" + event + "]";
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .verifyComplete();
     }
@@ -279,7 +284,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
 
         String eventsArray = "[" + event + "]";
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .verifyComplete();
     }
@@ -304,7 +309,7 @@ class CpeAuthenticationDmaapConsumerJsonParserTest {
 
         String eventsArray = "[" + event + "]";
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .verifyComplete();
     }
index ca448dd..cd238e2 100644 (file)
@@ -24,7 +24,9 @@ import static org.mockito.Mockito.spy;
 
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
+import com.google.gson.stream.JsonReader;
 
+import java.io.StringReader;
 import java.util.Optional;
 
 import org.junit.jupiter.api.BeforeAll;
@@ -89,21 +91,22 @@ class ReRegistrationDmaapConsumerJsonParserTest {
     }
 
     @Test
-    void passingNonJson_EmptyFluxIsReturned() {
+    void passingNonJson_getIllegalStateException() {
 
         ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
-
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("not JSON")))
+        JsonReader jsonReader = new JsonReader(new StringReader("not JSON"));
+        jsonReader.setLenient(true);
+        JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive();
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson)))
                 .expectSubscription()
-                .verifyComplete();
+                .verifyError(IllegalStateException.class);
     }
 
     @Test
     void passingNoEvents_EmptyFluxIsReturned() {
 
         ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
-
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("[]")))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]"))))
                 .expectSubscription()
                 .verifyComplete();
     }
@@ -135,7 +138,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
                 .sVlan(svlan)
                 .build();
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .expectNext(expectedEventObject);
     }
@@ -165,7 +168,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
         Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject()))
                 .when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2);
 
-        String eventsArray = "[" + firstEvent + secondEvent + "]";
+        String eventsArray = "[" + firstEvent + "," + secondEvent + "]";
 
         ReRegistrationConsumerDmaapModel expectedFirstEventObject = ImmutableReRegistrationConsumerDmaapModel.builder()
                 .correlationId(correlationId1)
@@ -182,7 +185,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
                 .sVlan(svlan)
                 .build();
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .expectNext(expectedFirstEventObject)
                 .expectNext(expectedSecondEventObject);
@@ -209,7 +212,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
 
         String eventsArray = "[" + event + "]";
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .verifyComplete();
     }
@@ -235,7 +238,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
 
         String eventsArray = "[" + event + "]";
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .verifyComplete();
     }
@@ -261,7 +264,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
 
         String eventsArray = "[" + event + "]";
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .verifyComplete();
     }
@@ -289,7 +292,7 @@ class ReRegistrationDmaapConsumerJsonParserTest {
 
         String eventsArray = "[" + event + "]";
 
-        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+        StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
                 .expectSubscription()
                 .verifyComplete();
     }