0ef062010c94e68c375c62fb7d9f0faeda3574c4
[dcaegen2/services/sdk.git] /
1 /*
2  * ============LICENSE_START====================================
3  * DCAEGEN2-SERVICES-SDK
4  * =========================================================
5  * Copyright (C) 2019 Nokia. All rights reserved.
6  * =========================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *       http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=====================================
19  */
20
21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
22
23 import com.google.gson.Gson;
24 import com.google.gson.JsonArray;
25 import com.google.gson.JsonElement;
26 import io.netty.buffer.ByteBuf;
27 import io.vavr.collection.HashMap;
28 import java.nio.charset.StandardCharsets;
29 import java.time.Duration;
30 import org.jetbrains.annotations.NotNull;
31 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpHeaders;
32 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
33 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
37 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
38 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
46 import org.reactivestreams.Publisher;
47 import reactor.core.publisher.Flux;
48 import reactor.core.publisher.Mono;
49
50 /**
51  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
52  * @since March 2019
53  */
54 // TODO: This is a PoC. It's untested.
55 public class MessageRouterClientImpl implements MessageRouterPublisher, MessageRouterSubscriber {
56
57     private static final Duration WINDOW_MAX_TIME = Duration.ofSeconds(1);
58     private static final int WINDOW_MAX_SIZE = 512;
59     private final RxHttpClient httpClient;
60     private final Gson gson;
61
62     public MessageRouterClientImpl(RxHttpClient httpClient, Gson gson) {
63         this.httpClient = httpClient;
64         this.gson = gson;
65     }
66
67     @Override
68     public Flux<MessageRouterPublishResponse> put(
69             MessageRouterPublishRequest request,
70             Flux<? extends JsonElement> items) {
71         return items.windowTimeout(WINDOW_MAX_SIZE, WINDOW_MAX_TIME).flatMap(subItems ->
72                 subItems.collect(JsonArray::new, JsonArray::add)
73                         .filter(arr -> arr.size() > 0)
74                         .map(RequestBody::fromJson)
75                         .flatMap(body -> httpClient.call(buildPostHttpRequest(request, body)))
76                         .map(this::buildPutResponse));
77     }
78
79     @Override
80     public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
81         return httpClient.call(buildGetHttpRequest(request)).map(this::buildGetResponse);
82     }
83
84     private @NotNull MessageRouterPublishResponse buildPutResponse(HttpResponse httpResponse) {
85         final ImmutableMessageRouterPublishResponse.Builder builder =
86                 ImmutableMessageRouterPublishResponse.builder();
87         return httpResponse.successful()
88                 ? builder.build()
89                 : builder.failReason(extractFailReason(httpResponse)).build();
90     }
91
92     private @NotNull MessageRouterSubscribeResponse buildGetResponse(HttpResponse httpResponse) {
93         final ImmutableMessageRouterSubscribeResponse.Builder builder =
94                 ImmutableMessageRouterSubscribeResponse.builder();
95         return httpResponse.successful()
96                 ? builder.items(httpResponse.bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class)).build()
97                 : builder.failReason(extractFailReason(httpResponse)).build();
98     }
99
100     private String extractFailReason(HttpResponse httpResponse) {
101         return String.format("%d %s%n%s", httpResponse.statusCode(), httpResponse.statusReason(),
102                 httpResponse.bodyAsString());
103     }
104
105     private @NotNull HttpRequest buildPostHttpRequest(MessageRouterPublishRequest request, RequestBody body) {
106         return ImmutableHttpRequest.builder()
107                 .method(HttpMethod.POST)
108                 .url(request.sinkDefinition().topicUrl())
109                 .diagnosticContext(request.diagnosticContext())
110                 .customHeaders(HashMap.of(HttpHeaders.CONTENT_TYPE, request.contentType()))
111                 .body(body)
112                 .build();
113     }
114
115     private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
116         return ImmutableHttpRequest.builder()
117                 .method(HttpMethod.GET)
118                 .url(buildSubscribeUrl(request))
119                 .diagnosticContext(request.diagnosticContext())
120                 .build();
121     }
122
123     private String buildSubscribeUrl(MessageRouterSubscribeRequest request) {
124         return String.format("%s/%s/%s", request.sourceDefinition().topicUrl(), request.consumerGroup(), request.consumerId());
125     }
126 }