2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.service.consumer;
20 import java.util.function.Consumer;
22 import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
23 import org.springframework.http.HttpHeaders;
24 import org.springframework.http.HttpStatus;
25 import org.springframework.web.reactive.function.client.WebClient;
26 import org.springframework.web.util.DefaultUriBuilderFactory;
28 import reactor.core.publisher.Mono;
31 * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18
32 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
34 public class DmaapConsumerReactiveHttpClient {
36 private WebClient webClient;
37 private final String dmaapHostName;
38 private final String dmaapProtocol;
39 private final Integer dmaapPortNumber;
40 private final String dmaapTopicName;
41 private final String consumerGroup;
42 private final String consumerId;
43 private final String contentType;
46 * Constructor of DmaapConsumerReactiveHttpClient.
48 * @param consumerConfiguration - DMaaP consumer configuration object
50 public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
51 this.dmaapHostName = consumerConfiguration.dmaapHostName();
52 this.dmaapProtocol = consumerConfiguration.dmaapProtocol();
53 this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber();
54 this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
55 this.consumerGroup = consumerConfiguration.consumerGroup();
56 this.consumerId = consumerConfiguration.consumerId();
57 this.contentType = consumerConfiguration.dmaapContentType();
61 * Function for calling DMaaP HTTP consumer - consuming messages from Kafka/DMaaP from topic.
63 * @return reactive response from DMaaP in string format
65 public Mono<String> getDmaapConsumerResponse() {
66 return webClient.get().uri(getUri()).headers(getHeaders()).retrieve()
67 .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception("HTTP 400")))
68 .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500")))
69 .bodyToMono(String.class);
72 private Consumer<HttpHeaders> getHeaders() {
73 return httpHeaders -> httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
76 private String createRequestPath() {
77 return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
80 public DmaapConsumerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
81 this.webClient = webClient;
86 return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
87 .path(createRequestPath()).build();