c4bf1611a4e2df084d91b57e2a32dd74cfb9206c
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.service.consumer;
18
19 import java.net.URI;
20 import java.util.function.Consumer;
21
22 import org.onap.dcaegen2.collectors.datafile.config.DmaapConsumerConfiguration;
23 import org.springframework.http.HttpHeaders;
24 import org.springframework.http.HttpStatus;
25 import org.springframework.web.reactive.function.client.WebClient;
26 import org.springframework.web.util.DefaultUriBuilderFactory;
27
28 import reactor.core.publisher.Mono;
29
30 /**
31  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18
32  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
33  */
34 public class DmaapConsumerReactiveHttpClient {
35
36     private WebClient webClient;
37     private final String dmaapHostName;
38     private final String dmaapProtocol;
39     private final Integer dmaapPortNumber;
40     private final String dmaapTopicName;
41     private final String consumerGroup;
42     private final String consumerId;
43     private final String contentType;
44
45     /**
46      * Constructor of DmaapConsumerReactiveHttpClient.
47      *
48      * @param consumerConfiguration - DMaaP consumer configuration object
49      */
50     public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
51         this.dmaapHostName = consumerConfiguration.dmaapHostName();
52         this.dmaapProtocol = consumerConfiguration.dmaapProtocol();
53         this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber();
54         this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
55         this.consumerGroup = consumerConfiguration.consumerGroup();
56         this.consumerId = consumerConfiguration.consumerId();
57         this.contentType = consumerConfiguration.dmaapContentType();
58     }
59
60     /**
61      * Function for calling DMaaP HTTP consumer - consuming messages from Kafka/DMaaP from topic.
62      *
63      * @return reactive response from DMaaP in string format
64      */
65     public Mono<String> getDmaapConsumerResponse() {
66         return webClient.get().uri(getUri()).headers(getHeaders()).retrieve()
67                 .onStatus(HttpStatus::is4xxClientError, clientResponse -> Mono.error(new Exception("HTTP 400")))
68                 .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500")))
69                 .bodyToMono(String.class);
70     }
71
72     private Consumer<HttpHeaders> getHeaders() {
73         return httpHeaders -> httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
74     }
75
76     private String createRequestPath() {
77         return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
78     }
79
80     public DmaapConsumerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
81         this.webClient = webClient;
82         return this;
83     }
84
85     URI getUri() {
86         return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
87                 .path(createRequestPath()).build();
88     }
89 }