2  * ============LICENSE_START====================================
 
   3  * DCAEGEN2-SERVICES-SDK
 
   4  * =========================================================
 
   5  * Copyright (C) 2019-2021 Nokia. All rights reserved.
 
   6  * =========================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *       http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=====================================
 
  21 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl;
 
  23 import com.google.gson.Gson;
 
  24 import com.google.gson.JsonArray;
 
  25 import com.google.gson.JsonElement;
 
  26 import com.google.gson.JsonParser;
 
  27 import io.netty.handler.timeout.ReadTimeoutException;
 
  28 import io.vavr.collection.HashMap;
 
  29 import io.vavr.collection.List;
 
  30 import io.vavr.collection.Map;
 
  31 import io.vavr.control.Option;
 
  32 import org.jetbrains.annotations.NotNull;
 
  33 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
 
  34 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest;
 
  35 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
 
  36 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
 
  37 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
 
  38 import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.RetryableException;
 
  39 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
 
  40 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReason;
 
  41 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasonPresenter;
 
  42 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.error.ClientErrorReasons;
 
  43 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
 
  44 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
 
  45 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
 
  46 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.DmaapTimeoutConfig;
 
  47 import org.slf4j.Logger;
 
  48 import org.slf4j.LoggerFactory;
 
  49 import reactor.core.publisher.Mono;
 
  51 import java.net.ConnectException;
 
  52 import java.nio.charset.StandardCharsets;
 
  53 import java.time.Duration;
 
  55 import static org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.impl.Commons.extractFailReason;
 
  58  * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a>
 
  61 public class MessageRouterSubscriberImpl implements MessageRouterSubscriber {
 
  62     private final RxHttpClient httpClient;
 
  63     private final Gson gson;
 
  64     private final ClientErrorReasonPresenter clientErrorReasonPresenter;
 
  65     private static final Logger LOGGER = LoggerFactory.getLogger(MessageRouterSubscriberImpl.class);
 
  67     public MessageRouterSubscriberImpl(RxHttpClient httpClient, Gson gson,
 
  68                                        ClientErrorReasonPresenter clientErrorReasonPresenter) {
 
  69         this.httpClient = httpClient;
 
  71         this.clientErrorReasonPresenter = clientErrorReasonPresenter;
 
  75     public Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request) {
 
  76         LOGGER.debug("Requesting new items from DMaaP MR: {}", request);
 
  77         return httpClient.call(buildGetHttpRequest(request))
 
  78                 .map(this::buildGetResponse)
 
  79                 .doOnError(ReadTimeoutException.class,
 
  80                         e -> LOGGER.error("Timeout exception occurred when subscribe items from DMaaP MR", e))
 
  81                 .onErrorResume(ReadTimeoutException.class, e -> buildErrorResponse(ClientErrorReasons.TIMEOUT))
 
  82                 .doOnError(ConnectException.class, e -> LOGGER.error("DMaaP MR is unavailable, {}", e.getMessage()))
 
  83                 .onErrorResume(ConnectException.class, e -> buildErrorResponse(ClientErrorReasons.SERVICE_UNAVAILABLE))
 
  84                 .onErrorResume(RetryableException.class, e -> Mono.just(buildGetResponse(e.getResponse())));
 
  87     private @NotNull HttpRequest buildGetHttpRequest(MessageRouterSubscribeRequest request) {
 
  88         return ImmutableHttpRequest.builder()
 
  89                 .method(HttpMethod.GET)
 
  90                 .url(buildSubscribeUrl(request))
 
  91                 .diagnosticContext(request.diagnosticContext().withNewInvocationId())
 
  92                 .customHeaders(headers(request))
 
  93                 .timeout(timeout(request).getOrNull())
 
  97     private @NotNull MessageRouterSubscribeResponse buildGetResponse(HttpResponse httpResponse) {
 
  98         final ImmutableMessageRouterSubscribeResponse.Builder builder =
 
  99                 ImmutableMessageRouterSubscribeResponse.builder();
 
 100         return httpResponse.successful()
 
 101                 ? builder.items(getAsJsonElements(httpResponse)).build()
 
 102                 : builder.failReason(extractFailReason(httpResponse)).build();
 
 105     private List<JsonElement> getAsJsonElements(HttpResponse httpResponse) {
 
 106         JsonArray bodyAsJsonArray = httpResponse
 
 107                 .bodyAsJson(StandardCharsets.UTF_8, gson, JsonArray.class);
 
 109         return List.ofAll(bodyAsJsonArray).map(arrayElement -> JsonParser.parseString(arrayElement.getAsString()));
 
 112     private String buildSubscribeUrl(MessageRouterSubscribeRequest request) {
 
 113         return String.format("%s/%s/%s", request.sourceDefinition().topicUrl(), request.consumerGroup(),
 
 114                 request.consumerId());
 
 117     private Mono<MessageRouterSubscribeResponse> buildErrorResponse(ClientErrorReason clientErrorReason) {
 
 118         String failReason = clientErrorReasonPresenter.present(clientErrorReason);
 
 119         return Mono.just(ImmutableMessageRouterSubscribeResponse.builder()
 
 120                 .failReason(failReason)
 
 124     private Option<Duration> timeout(MessageRouterSubscribeRequest request) {
 
 125         return Option.of(request.timeoutConfig())
 
 126                 .map(DmaapTimeoutConfig::getTimeout);
 
 129     private Map<String, String> headers(MessageRouterSubscribeRequest request) {
 
 130         return Option.of(request.sourceDefinition().aafCredentials())
 
 131                 .map(Commons::basicAuthHeader)
 
 133                 .getOrElse(HashMap.empty());