2 * ============LICENSE_START====================================
3 * DCAEGEN2-SERVICES-SDK
4 * =========================================================
5 * Copyright (C) 2019 Nokia. All rights reserved.
6 * =========================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=====================================
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import io.netty.buffer.ByteBuf;
27 import io.vavr.collection.HashMap;
28 import java.nio.charset.StandardCharsets;
29 import java.time.Duration;
30 import org.jetbrains.annotations.NotNull;
31 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
32 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
33 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
37 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
46 import org.reactivestreams.Publisher;
47 import reactor.core.publisher.Flux;
48 import reactor.core.publisher.Mono;
51 * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
54 // TODO: This is a PoC. It's untested.
55 public class MessageRouterClientImpl implements MessageRouterPublisher, MessageRouterSubscriber {
57 private static final Duration WINDOW_MAX_TIME = Duration.ofSeconds(1);
58 private static final int WINDOW_MAX_SIZE = 512;
59 private final RxHttpClient httpClient;
60 private final Gson gson;
62 public MessageRouterClientImpl(RxHttpClient httpClient, Gson gson) {
63 this.httpClient = httpClient;
68 public Flux<MessageRouterPublishResponse> put(
69 MessageRouterPublishRequest request,
70 Flux<? extends JsonElement> items) {
71 return items.windowTimeout(WINDOW_MAX_SIZE, WINDOW_MAX_TIME).flatMap(subItems ->
72 subItems.collect(JsonArray::new, JsonArray::add)
73 .filter(arr -> arr.size() > 0)
74 .map(RequestBody::fromJson)
75 .flatMap(body -> httpClient.call(buildPostHttpRequest(request, body)))
76 .map(this::buildPutResponse));
80 public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
81 return httpClient.call(buildGetHttpRequest(request)).map(this::buildGetResponse);
84 private @NotNull MessageRouterPublishResponse buildPutResponse(HttpResponse httpResponse) {
85 final ImmutableMessageRouterPublishResponse.Builder builder =
86 ImmutableMessageRouterPublishResponse.builder();
87 return httpResponse.successful()
89 : builder.failReason(extractFailReason(httpResponse)).build();
92 private @NotNull MessageRouterSubscribeResponse buildGetResponse(HttpResponse httpResponse) {
93 final ImmutableMessageRouterSubscribeResponse.Builder builder =
94 ImmutableMessageRouterSubscribeResponse.builder();
95 return httpResponse.successful()
96 ? builder.items(httpResponse.bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class)).build()
97 : builder.failReason(extractFailReason(httpResponse)).build();
100 private String extractFailReason(HttpResponse httpResponse) {
101 return String.format("%d %s%n%s", httpResponse.statusCode(), httpResponse.statusReason(),
102 httpResponse.bodyAsString());
105 private @NotNull HttpRequest buildPostHttpRequest(MessageRouterPublishRequest request, RequestBody body) {
106 return ImmutableHttpRequest.builder()
107 .method(HttpMethod.POST)
108 .url(request.sinkDefinition().topicUrl())
109 .diagnosticContext(request.diagnosticContext())
110 .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType()))
115 private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
116 return ImmutableHttpRequest.builder()
117 .method(HttpMethod.GET)
118 .url(buildSubscribeUrl(request))
119 .diagnosticContext(request.diagnosticContext())
123 private String buildSubscribeUrl(MessageRouterSubscribeRequest request) {
124 return String.format("%s/%s/%s", request.sourceDefinition().topicUrl(), request.consumerGroup(), request.consumerId());