Release DMaaP client API 39/87239/4
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Wed, 8 May 2019 12:52:24 +0000 (14:52 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 9 May 2019 11:54:07 +0000 (13:54 +0200)
* remove @ExperimentalApi annotation
* deprecate old API
* extract http-client module + refactor
* change DmaapClientFactory so it's more configurable

Change-Id: I710d20558eece8cc3d7c0740e765d34737134b3a
Issue-ID: DCAEGEN2-1492
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
52 files changed:
rest-services/aai-client/pom.xml
rest-services/cbs-client/pom.xml
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/providers/ReactiveCloudConfigurationProvider.java
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/model/logging/RequestDiagnosticContext.java
rest-services/dmaap-client/pom.xml
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapConsumerConfiguration.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapCustomConfig.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/config/DmaapPublisherConfiguration.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterPublisherImpl.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/impl/MessageRouterSubscriberImpl.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapResponse.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishResponse.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeRequest.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java [new file with mode: 0644]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/MessageRouterPublisherConfig.java [new file with mode: 0644]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/MessageRouterSubscriberConfig.java [new file with mode: 0644]
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/DMaaPAbstractReactiveHttpClient.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/DMaaPClientServiceUtils.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/ConsumerReactiveHttpClientFactory.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactory.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/DMaaPPublisherReactiveHttpClient.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/DmaaPRestTemplateFactory.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/producer/PublisherReactiveHttpClientFactory.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/utlis/SecurityKeysUtil.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java [new file with mode: 0644]
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java [new file with mode: 0644]
rest-services/dmaap-client/src/test/resources/logback-test.xml
rest-services/dmaap-client/src/test/resources/sample-mr-subscribe-response.json [new file with mode: 0644]
rest-services/http-client/pom.xml [new file with mode: 0644]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClient.java [moved from rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClient.java with 98% similarity]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpHeaders.java [moved from rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpHeaders.java with 100% similarity]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpMethod.java [moved from rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpMethod.java with 100% similarity]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java [moved from rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java with 100% similarity]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java [moved from rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java with 100% similarity]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java [moved from rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java with 100% similarity]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java [moved from rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java with 96% similarity]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java [moved from rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java with 91% similarity]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java [new file with mode: 0644]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/HttpException.java [moved from rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/HttpException.java with 100% similarity]
rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java [moved from rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java with 83% similarity]
rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClientIT.java [moved from rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClientIT.java with 95% similarity]
rest-services/http-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java [moved from rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java with 98% similarity]
rest-services/pom.xml

index b67c647..7c63f4b 100644 (file)
@@ -20,7 +20,7 @@
   <dependencies>
     <dependency>
       <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
-      <artifactId>common-dependency</artifactId>
+      <artifactId>http-client</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
index ca2cf88..7d044dc 100644 (file)
@@ -20,7 +20,7 @@
     <dependencies>
         <dependency>
             <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
-            <artifactId>common-dependency</artifactId>
+            <artifactId>http-client</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
index c11ed53..053c60c 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
 
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsLookup;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
@@ -53,7 +54,7 @@ public class CbsClientFactory {
      */
     public static @NotNull Mono<CbsClient> createCbsClient(EnvProperties env) {
         return Mono.defer(() -> {
-            final RxHttpClient httpClient = RxHttpClient.create();
+            final RxHttpClient httpClient = RxHttpClientFactory.create();
             final CbsLookup lookup = new CbsLookup(httpClient);
             return lookup.lookup(env)
                     .map(addr -> new CbsClientImpl(httpClient, env.appName(), addr));
index 0a31966..a32cb3b 100644 (file)
@@ -49,9 +49,6 @@ public interface CbsRequest {
      * Return a view on this CbsRequest with updated InvocationID.
      */
     default CbsRequest withNewInvocationId() {
-        final RequestDiagnosticContext newDiagnosticCtx = ImmutableRequestDiagnosticContext
-                .copyOf(diagnosticContext())
-                .withInvocationId(UUID.randomUUID());
-        return ImmutableCbsRequest.copyOf(this).withDiagnosticContext(newDiagnosticCtx);
+        return ImmutableCbsRequest.copyOf(this).withDiagnosticContext(diagnosticContext().withNewInvocationId());
     }
 }
index 27d3658..dbc9480 100644 (file)
@@ -27,6 +27,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.uri.URI;
 import org.slf4j.Logger;
@@ -44,7 +45,7 @@ public final class ReactiveCloudConfigurationProvider implements CloudConfigurat
     private final RxHttpClient rxHttpClient;
 
     public ReactiveCloudConfigurationProvider() {
-        this(RxHttpClient.create());
+        this(RxHttpClientFactory.create());
     }
 
     ReactiveCloudConfigurationProvider(RxHttpClient rxHttpClient) {
index 9726906..0c4a4b1 100644 (file)
@@ -24,6 +24,7 @@ import io.vavr.collection.HashMap;
 import io.vavr.collection.Map;
 import java.util.UUID;
 import org.immutables.value.Value;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.MDC;
 
@@ -91,6 +92,11 @@ public interface RequestDiagnosticContext {
         }
     }
 
+    default @NotNull RequestDiagnosticContext withNewInvocationId() {
+        return ImmutableRequestDiagnosticContext.copyOf(this)
+                .withInvocationId(UUID.randomUUID());
+    }
+
     static ImmutableRequestDiagnosticContext create() {
         return ImmutableRequestDiagnosticContext.builder()
                 .requestId(UUID.randomUUID())
index 277cfda..799a8a4 100644 (file)
@@ -20,7 +20,7 @@
   <dependencies>
     <dependency>
       <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
-      <artifactId>common-dependency</artifactId>
+      <artifactId>http-client</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.junit.jupiter</groupId>
index 0ac2d0b..3c27da1 100644 (file)
  * limitations under the License.
  * ============LICENSE_END=====================================
  */
-
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
 
-import com.google.gson.Gson;
-import io.netty.handler.ssl.SslContext;
-import io.vavr.Lazy;
-import java.time.Duration;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClientFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterPublisherImpl;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.MessageRouterSubscriberImpl;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 
 /**
- * <b>WARNING</b>: This is a proof-of-concept. It is untested. API may change or be removed.  Use at your own risk.
- * You've been warned.
  *
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public final class DmaapClientFactory {
 
-    private static final Duration DEFAULT_MAX_BATCH_DURATION = Duration.ofSeconds(1);
-    private static final int DEFAULT_MAX_BATCH_SIZE = 512;
-
     private DmaapClientFactory() {
     }
 
-    public static @NotNull MessageRouterPublisher createMessageRouterPublisher() {
-        return new MessageRouterPublisherImpl(
-                RxHttpClient.create(),
-                DEFAULT_MAX_BATCH_SIZE,
-                DEFAULT_MAX_BATCH_DURATION);
-    }
+    public static @NotNull MessageRouterPublisher createMessageRouterPublisher(
+            @NotNull MessageRouterPublisherConfig clientConfiguration) {
 
-    public static @NotNull MessageRouterPublisher createMessageRouterPublisher(@NotNull SslContext sslContext) {
         return new MessageRouterPublisherImpl(
-                RxHttpClient.create(sslContext),
-                DEFAULT_MAX_BATCH_SIZE,
-                DEFAULT_MAX_BATCH_DURATION);
+                createHttpClient(clientConfiguration),
+                clientConfiguration.maxBatchSize(),
+                clientConfiguration.maxBatchDuration());
     }
 
-    public static @NotNull MessageRouterSubscriber createMessageRouterSubscriber() {
-        return new MessageRouterSubscriberImpl(RxHttpClient.create(), new Gson());
+    public static @NotNull MessageRouterSubscriber createMessageRouterSubscriber(
+            @NotNull MessageRouterSubscriberConfig clientConfiguration) {
+        return new MessageRouterSubscriberImpl(
+                createHttpClient(clientConfiguration),
+                clientConfiguration.gsonInstance());
     }
 
-    public static @NotNull MessageRouterSubscriber createMessageRouterSubscriber(@NotNull SslContext sslContext) {
-        return new MessageRouterSubscriberImpl(
-                RxHttpClient.create(sslContext),
-                new Gson());
+    private static @NotNull RxHttpClient createHttpClient(DmaapClientConfiguration config) {
+        return config.securityKeys() == null
+                ? RxHttpClientFactory.create()
+                : RxHttpClientFactory.create(config.securityKeys());
     }
 }
index c205f47..e37bdcd 100644 (file)
@@ -30,7 +30,6 @@ import reactor.core.publisher.Flux;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public interface MessageRouterPublisher {
     Flux<MessageRouterPublishResponse> put(MessageRouterPublishRequest request, Flux<? extends JsonElement> items);
 }
index 75816ea..91e026a 100644 (file)
@@ -25,10 +25,12 @@ import org.immutables.value.Value;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 3/23/18
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
  */
 @Value.Immutable(prehash = true)
 @Value.Style(builder = "new")
 @Gson.TypeAdapters
+@Deprecated
 public abstract class DmaapConsumerConfiguration implements DmaapCustomConfig {
 
     private static final long serialVersionUID = 1L;
index 6cfb358..29c3a5e 100644 (file)
@@ -25,7 +25,9 @@ import org.immutables.value.Value;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 3/28/18
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
  */
+@Deprecated
 public interface DmaapCustomConfig extends Serializable {
 
     @Deprecated
index 3866f9b..df81370 100644 (file)
@@ -25,10 +25,12 @@ import org.immutables.value.Value;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 3/23/18
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
  */
 @Value.Immutable(prehash = true)
 @Value.Style(builder = "new")
 @Gson.TypeAdapters
+@Deprecated
 public abstract class DmaapPublisherConfiguration implements DmaapCustomConfig {
 
     private static final long serialVersionUID = 1L;
index f09c539..aa88b9e 100644 (file)
@@ -40,6 +40,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.Immutable
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -47,11 +49,11 @@ import reactor.core.publisher.Mono;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since March 2019
  */
-// TODO: This is a PoC. It's untested.
 public class MessageRouterPublisherImpl implements MessageRouterPublisher {
     private final RxHttpClient httpClient;
     private final int maxBatchSize;
     private final Duration maxBatchDuration;
+    private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterPublisherImpl.class);
 
     public MessageRouterPublisherImpl(RxHttpClient httpClient, int maxBatchSize, Duration maxBatchDuration) {
         this.httpClient = httpClient;
@@ -70,6 +72,8 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
     private Publisher<? extends MessageRouterPublishResponse> pushBatchToMr(
             MessageRouterPublishRequest request,
             List<JsonElement> batch) {
+        LOGGER.debug("Sending a batch of {} items to DMaaP MR", batch.size());
+        LOGGER.trace("The items to be sent: {}", batch);
         return httpClient.call(buildHttpRequest(request, asJsonBody(batch)))
                 .map(httpResponse -> buildResponse(httpResponse, batch));
     }
@@ -84,7 +88,7 @@ public class MessageRouterPublisherImpl implements MessageRouterPublisher {
         return ImmutableHttpRequest.builder()
                 .method(HttpMethod.POST)
                 .url(request.sinkDefinition().topicUrl())
-                .diagnosticContext(request.diagnosticContext())
+                .diagnosticContext(request.diagnosticContext().withNewInvocationId())
                 .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType()))
                 .body(body)
                 .build();
index e91a77f..2f49ddf 100644 (file)
@@ -35,16 +35,18 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRout
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since March 2019
  */
-// TODO: This is a PoC. It's untested.
 public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
     private final RxHttpClient httpClient;
     private final Gson gson;
+    private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class);
 
     public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson) {
         this.httpClient = httpClient;
@@ -53,6 +55,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
 
     @Override
     public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
+        LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
         return httpClient.call(buildGetHttpRequest(request)).map(this::buildGetResponse);
     }
 
@@ -70,7 +73,7 @@ public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
         return ImmutableHttpRequest.builder()
                 .method(HttpMethod.GET)
                 .url(buildSubscribeUrl(request))
-                .diagnosticContext(request.diagnosticContext())
+                .diagnosticContext(request.diagnosticContext().withNewInvocationId())
                 .build();
     }
 
index 2bed4c9..6b3e2c4 100644 (file)
@@ -28,7 +28,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnos
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public interface DmaapRequest {
     @Value.Default
     default RequestDiagnosticContext diagnosticContext() {
index 8b4d41e..c3e3726 100644 (file)
@@ -28,7 +28,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 public interface DmaapResponse {
 
     @Nullable String failReason();
index f32fd0e..2328487 100644 (file)
@@ -29,7 +29,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi;
  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
  * @since 1.1.4
  */
-@ExperimentalApi
 @Value.Immutable
 public interface MessageRouterSubscribeRequest extends DmaapRequest {
 
@@ -39,6 +38,7 @@ public interface MessageRouterSubscribeRequest extends DmaapRequest {
 
     @Nullable Duration timeout();
 
+    @Value.Default
     default String consumerId() {
         return Constants.CLASS_LOADER_SCOPED_UNIQUE_ID;
     }
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/DmaapClientConfiguration.java
new file mode 100644 (file)
index 0000000..ac677f0
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config;
+
+import org.immutables.value.Value;
+import org.jetbrains.annotations.Nullable;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.2.0
+ */
+public interface DmaapClientConfiguration {
+    @Value.Default
+    default @Nullable SecurityKeys securityKeys() {
+        return null;
+    }
+
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/MessageRouterPublisherConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/MessageRouterPublisherConfig.java
new file mode 100644 (file)
index 0000000..dc75377
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config;
+
+import java.time.Duration;
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.2.0
+ */
+@Value.Immutable
+public interface MessageRouterPublisherConfig extends DmaapClientConfiguration {
+
+    @Value.Default
+    default Duration maxBatchDuration() {
+        return Duration.ofSeconds(1);
+    }
+
+    @Value.Default
+    default int maxBatchSize() {
+        return 512;
+    }
+
+    static MessageRouterPublisherConfig createDefault() {
+        return ImmutableMessageRouterPublisherConfig.builder().build();
+    }
+}
diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/MessageRouterSubscriberConfig.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/config/MessageRouterSubscriberConfig.java
new file mode 100644 (file)
index 0000000..84d9969
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config;
+
+import com.google.gson.Gson;
+import org.immutables.value.Value;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since 1.2.0
+ */
+@Value.Immutable
+public interface MessageRouterSubscriberConfig extends DmaapClientConfiguration {
+    @Value.Default
+    default Gson gsonInstance() {
+        return new Gson();
+    }
+
+    static MessageRouterSubscriberConfig createDefault() {
+        return ImmutableMessageRouterSubscriberConfig.builder().build();
+    }
+}
index 55fa7fb..aee961e 100644 (file)
@@ -24,6 +24,11 @@ import java.util.UUID;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 
+/**
+ *
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
+ */
+@Deprecated
 public abstract class DMaaPAbstractReactiveHttpClient {
 
     protected final static String SLASH = "/";
index 3876b52..3b4f55a 100644 (file)
@@ -25,7 +25,9 @@ import java.util.Map;
 
 /**
  * @author <a href="mailto:marcin.wmigdal@nokia.com">Marcin Migdal</a> on 3/8/2019
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
  */
+@Deprecated
 public final class DMaaPClientServiceUtils {
 
     public final static String CONTENT_TYPE = "Content-Type";
index e92ad3f..5e1a064 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer;
 
-import javax.net.ssl.SSLException;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 6/26/18
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
  */
+@Deprecated
 public class ConsumerReactiveHttpClientFactory {
 
     private final DMaaPReactiveWebClientFactory reactiveWebClientFactory;
index 81a62eb..83678d2 100644 (file)
@@ -37,7 +37,9 @@ import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 6/26/18
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
  */
+@Deprecated
 public class DMaaPConsumerReactiveHttpClient extends DMaaPAbstractReactiveHttpClient {
 
     private final DmaapConsumerConfiguration consumerConfiguration;
index 3d3c54a..65f0b60 100644 (file)
@@ -29,7 +29,9 @@ import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 7/4/18
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
  */
+@Deprecated
 public class DMaaPReactiveWebClientFactory {
 
     private final SslFactory sslFactory;
index 0d453e4..7173624 100644 (file)
@@ -39,7 +39,9 @@ import java.util.Optional;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">PrzemysÅ‚aw WÄ…sala</a> on 7/4/18
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
  */
+@Deprecated
 public class DMaaPPublisherReactiveHttpClient extends DMaaPAbstractReactiveHttpClient {
 
     private final DmaapPublisherConfiguration dmaapPublisherConfiguration;
index 2d71760..3007980 100644 (file)
@@ -27,6 +27,11 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.utlis.SecurityK
 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
 import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
 
+/**
+ *
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
+ */
+@Deprecated
 public class DmaaPRestTemplateFactory {
 
     private SslFactory sslFactory;
index 953a331..9e6cce2 100644 (file)
@@ -23,6 +23,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.produc
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder;
 
+/**
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
+ */
+@Deprecated
 public class PublisherReactiveHttpClientFactory {
 
     private final DmaaPRestTemplateFactory restTemplateFactory;
index 7ee06e9..c688ab0 100644 (file)
@@ -30,14 +30,17 @@ import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
 import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
 
+/**
+ * @deprecated Use new API {@link org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory}
+ */
+@Deprecated
 public final class SecurityKeysUtil {
 
     private SecurityKeysUtil(){
 
     }
 
-    @NotNull
-    public static SecurityKeys fromDmappCustomConfig(DmaapCustomConfig configuration){
+    public static @NotNull SecurityKeys fromDmappCustomConfig(DmaapCustomConfig configuration){
         return ImmutableSecurityKeys.builder()
                 .keyStore(ImmutableSecurityKeysStore.of(resource(configuration.keyStorePath()).get()))
                 .keyStorePassword(Passwords.fromResource(configuration.keyStorePasswordPath()))
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherIT.java
new file mode 100644 (file)
index 0000000..8ed3eb3
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonPrimitive;
+import io.vavr.collection.List;
+import java.time.Duration;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since May 2019
+ */
+class MessageRouterPublisherIT {
+    private MessageRouterPublisher sut = DmaapClientFactory.createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+    private static DummyHttpServer server;
+    private static MessageRouterSink sinkDefinition;
+
+    @BeforeAll
+    static void setUp() {
+        server = DummyHttpServer.start(routes ->
+                routes.post("/events/TOPIC", (req, resp) -> sendString(resp, Mono.just("TODO")))
+        );
+        sinkDefinition = ImmutableMessageRouterSink.builder()
+                .name("the topic")
+                .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
+                .build();
+    }
+
+    @Test
+    void testStub() {
+        final MessageRouterPublishRequest mrRequest = ImmutableMessageRouterPublishRequest.builder()
+                .sinkDefinition(sinkDefinition)
+                .build();
+
+        final Flux<MessageRouterPublishResponse> result = sut
+                .put(mrRequest, Flux.just("ala", "ma", "kota").map(JsonPrimitive::new));
+
+        final List<JsonElement> expectedItems = List.of("ala", "ma", "kota").map(JsonPrimitive::new);
+        StepVerifier.create(result)
+                .expectNext(ImmutableMessageRouterPublishResponse.builder().items(expectedItems).build())
+                .expectComplete()
+                .verify(Duration.ofSeconds(10));
+    }
+}
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
new file mode 100644 (file)
index 0000000..ab51bfe
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
+
+import com.google.gson.JsonElement;
+import java.time.Duration;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since May 2019
+ */
+class MessageRouterSubscriberIT {
+    private MessageRouterSubscriber sut = DmaapClientFactory.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+    private static DummyHttpServer server;
+    private static MessageRouterSource sourceDefinition;
+
+    @BeforeAll
+    static void setUp() {
+        server = DummyHttpServer.start(routes ->
+                routes.get("/events/TOPIC/group1/consumer8", (req, resp) -> sendResource(resp, "/sample-mr-subscribe-response.json"))
+        );
+        sourceDefinition = ImmutableMessageRouterSource.builder()
+                .name("the topic")
+                .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
+                .build();
+    }
+
+    @Test
+    void testStub() {
+        final MessageRouterSubscribeRequest mrRequest = ImmutableMessageRouterSubscribeRequest.builder()
+                .sourceDefinition(sourceDefinition)
+                .consumerGroup("group1")
+                .consumerId("consumer8")
+                .build();
+
+        final Flux<String> result = sut
+                .getElements(mrRequest)
+                .map(JsonElement::getAsString);
+
+        StepVerifier.create(result)
+                .expectNext("I", "like", "pizza")
+                .expectComplete()
+                .verify(Duration.ofSeconds(10));
+    }
+}
index c1f0066..07a1aae 100644 (file)
@@ -1,21 +1,54 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  ~ ===============================LICENSE_START======================================
-  ~    Copyright Â© 2017 AT&T Intellectual Property. All rights reserved.
+  ~ ============LICENSE_START=======================================================
+  ~ dcaegen2-collectors-veshv
   ~ ================================================================================
-  ~  Licensed under the Apache License, Version 2.0 (the "License");
-  ~  you may not use this file except in compliance with the License.
-  ~   You may obtain a copy of the License at
+  ~ Copyright (C) 2019 NOKIA
+  ~ ================================================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
   ~
-  ~          http://www.apache.org/licenses/LICENSE-2.0
+  ~      http://www.apache.org/licenses/LICENSE-2.0
   ~
-  ~  Unless required by applicable law or agreed to in writing, software
-  ~  distributed under the License is distributed on an "AS IS" BASIS,
-  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~  See the License for the specific language governing permissions and
-  ~  limitations under the License.
-  ~  ============================LICENSE_END===========================================
-  -->
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=========================================================
+-->
 <configuration>
-  <root level="OFF"/>
+  <property name="p_log" value="%logger"/>
+  <property name="p_lor" value="%50.50logger"/>
+  <property name="p_tim" value="%date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}"/>
+  <property name="p_lvl" value="%level"/>
+  <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+  <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/>
+  <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+  <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+  <property name="p_thr" value="%thread"/>
+
+  <property name="READABLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_lor}\t
+| ${p_lvl}\t
+| %msg\t
+| ${p_mak}\t
+| %rootException\t
+| ${p_mdc}\t
+| ${p_thr}%n"/>
+
+  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>${READABLE_LOG_PATTERN}</pattern>
+    </encoder>
+  </appender>
+
+  <logger name="org.onap.dcaegen2.services.sdk" level="TRACE"/>
+
+  <root level="INFO">
+    <appender-ref ref="CONSOLE"/>
+  </root>
 </configuration>
diff --git a/rest-services/dmaap-client/src/test/resources/sample-mr-subscribe-response.json b/rest-services/dmaap-client/src/test/resources/sample-mr-subscribe-response.json
new file mode 100644 (file)
index 0000000..f3ba41a
--- /dev/null
@@ -0,0 +1,5 @@
+[
+    "I",
+    "like",
+    "pizza"
+]
diff --git a/rest-services/http-client/pom.xml b/rest-services/http-client/pom.xml
new file mode 100644 (file)
index 0000000..1d32105
--- /dev/null
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ ============LICENSE_START====================================
+  ~ DCAEGEN2-SERVICES-SDK
+  ~ =========================================================
+  ~ Copyright (C) 2019 Nokia. All rights reserved.
+  ~ =========================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=====================================
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.onap.dcaegen2.services.sdk</groupId>
+        <artifactId>dcaegen2-services-sdk-rest-services</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+    <artifactId>http-client</artifactId>
+
+    <name>dcaegen2-services-sdk-rest-services-http-client</name>
+    <description>HTTP adapter</description>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.onap.dcaegen2.services.sdk.security</groupId>
+            <artifactId>ssl</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+            <artifactId>common-dependency</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor.netty</groupId>
+            <artifactId>reactor-netty</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.vavr</groupId>
+            <artifactId>vavr</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains</groupId>
+            <artifactId>annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
@@ -48,11 +48,11 @@ public class CloudHttpClient {
     }
 
     public CloudHttpClient() {
-        this(RxHttpClient.create());
+        this(RxHttpClientFactory.create());
     }
 
     public CloudHttpClient(SslContext sslContext) {
-        this(RxHttpClient.create(sslContext));
+        this(RxHttpClientFactory.create(sslContext));
     }
 
     public <T> Mono<T> get(String url, Class<T> bodyClass) {
@@ -23,14 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
 import com.google.gson.JsonElement;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import org.immutables.value.Value;
 import org.jetbrains.annotations.Nullable;
 import org.reactivestreams.Publisher;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.netty.ByteBufFlux;
 
  */
 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
 
-import io.netty.handler.ssl.SslContext;
 import io.vavr.collection.Stream;
 import java.util.stream.Collectors;
-import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,16 +38,6 @@ public class RxHttpClient {
     private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class);
     private final HttpClient httpClient;
 
-    public static RxHttpClient create() {
-        return new RxHttpClient(HttpClient.create());
-    }
-
-    // TODO: hide netty from public api (io.netty.handler.ssl.SslContext)
-    public static RxHttpClient create(@NotNull
-            SslContext sslContext) {
-        return new RxHttpClient(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
-    }
-
     RxHttpClient(HttpClient httpClient) {
         this.httpClient = httpClient;
     }
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientFactory.java
new file mode 100644 (file)
index 0000000..cfa98f2
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START====================================
+ * DCAEGEN2-SERVICES-SDK
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
+
+import io.netty.handler.ssl.SslContext;
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory;
+import reactor.netty.http.client.HttpClient;
+
+/**
+ * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
+ * @since May 2019
+ */
+public final class RxHttpClientFactory {
+
+    private static final SslFactory SSL_FACTORY = new SslFactory();
+
+    private RxHttpClientFactory() {
+    }
+
+    public static RxHttpClient create() {
+        return new RxHttpClient(HttpClient.create());
+    }
+
+
+    public static RxHttpClient create(SecurityKeys securityKeys) {
+        final SslContext context = SSL_FACTORY.createSecureClientContext(securityKeys);
+        return create(context);
+    }
+
+    public static RxHttpClient createInsecure() {
+        final SslContext context = SSL_FACTORY.createInsecureClientContext();
+        return create(context);
+    }
+
+    // TODO: make it private after removing CloudHttpClient
+    static RxHttpClient create(@NotNull SslContext sslContext) {
+        return new RxHttpClient(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
+    }
+}
@@ -24,8 +24,11 @@ import io.vavr.CheckedFunction0;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
 import reactor.netty.DisposableServer;
 import reactor.netty.http.server.HttpServer;
@@ -38,6 +41,7 @@ import reactor.netty.http.server.HttpServerRoutes;
  */
 public class DummyHttpServer {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(DummyHttpServer.class);
     private final DisposableServer server;
 
     private DummyHttpServer(DisposableServer server) {
@@ -45,11 +49,18 @@ public class DummyHttpServer {
     }
 
     public static DummyHttpServer start(Consumer<HttpServerRoutes> routes) {
-        return new DummyHttpServer(HttpServer.create()
+        LOGGER.info("Starting dummy server");
+        final DisposableServer server = HttpServer.create()
                 .host("127.0.0.1")
                 .route(routes)
                 .bind()
-                .block());
+                .block();
+        LOGGER.info("Server started");
+        return new DummyHttpServer(server);
+    }
+
+    public static Publisher<Void> sendInOrder(AtomicInteger state, Publisher<Void>... responses) {
+        return responses[state.getAndIncrement()];
     }
 
     public static Publisher<Void> sendResource(HttpServerResponse httpServerResponse, String resourcePath) {
@@ -1,21 +1,21 @@
 /*
- * ============LICENSE_START=======================================================
+ * ============LICENSE_START====================================
  * DCAEGEN2-SERVICES-SDK
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
+ * =========================================================
+ * Copyright (C) 2019 Nokia. All rights reserved.
+ * =========================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * ============LICENSE_END=========================================================
+ * ============LICENSE_END=====================================
  */
 
 package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
@@ -39,7 +39,6 @@ import reactor.netty.http.client.HttpClient;
 import reactor.netty.http.server.HttpServer;
 import reactor.netty.resources.ConnectionProvider;
 import reactor.test.StepVerifier;
-import reactor.netty.http.client.HttpClientResponse;
 
 class CloudHttpClientIT {
 
index 57a08fc..9721f08 100644 (file)
@@ -20,9 +20,9 @@
   <modules>
     <module>model</module>
     <module>common-dependency</module>
+    <module>http-client</module>
     <module>aai-client</module>
     <module>cbs-client</module>
     <module>dmaap-client</module>
-
   </modules>
 </project>