2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.service.producer;
19 import com.google.gson.JsonElement;
20 import com.google.gson.JsonParser;
25 import org.apache.http.HttpHeaders;
26 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
27 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
28 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.core.io.FileSystemResource;
32 import org.springframework.http.HttpStatus;
33 import org.springframework.web.reactive.function.BodyInserters;
34 import org.springframework.web.reactive.function.client.ClientResponse;
35 import org.springframework.web.reactive.function.client.WebClient;
36 import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
37 import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
38 import org.springframework.web.util.DefaultUriBuilderFactory;
40 import reactor.core.publisher.Flux;
41 import reactor.core.publisher.Mono;
44 * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
45 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
47 public class DmaapProducerReactiveHttpClient {
49 private static final String X_ATT_DR_META = "X-ATT-DR-META";
50 private static final String NAME_JSON_TAG = "name";
51 private static final String LOCATION_JSON_TAG = "location";
52 private static final String DEFAULT_FEED_ID = "1";
54 private final Logger logger = LoggerFactory.getLogger(this.getClass());
56 private WebClient webClient;
57 private final String dmaapHostName;
58 private final Integer dmaapPortNumber;
59 private final String dmaapTopicName;
60 private final String dmaapProtocol;
61 private final String dmaapContentType;
64 * Constructor DmaapProducerReactiveHttpClient.
66 * @param dmaapPublisherConfiguration - DMaaP producer configuration object
68 public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
70 this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
71 this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
72 this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
73 this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
74 this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
78 * Function for calling DMaaP HTTP producer - post request to DMaaP.
80 * @param consumerDmaapModel - object which will be sent to DMaaP
81 * @return status code of operation
83 public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
84 logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
86 RequestBodyUriSpec post = webClient.post();
88 prepareHead(consumerDmaapModel, post);
90 prepareBody(consumerDmaapModel, post);
92 ResponseSpec responseSpec = post.retrieve();
93 responseSpec.onStatus(HttpStatus::is4xxClientError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
94 responseSpec.onStatus(HttpStatus::is5xxServerError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
95 Flux<String> response = responseSpec.bodyToFlux(String.class);
97 logger.trace("Exiting getDmaapProducerResponse with {}", response);
101 public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
102 this.webClient = webClient;
106 private void prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
107 post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
109 JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
110 String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
111 metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
112 post.header(X_ATT_DR_META, metaData.toString());
114 post.uri(getUri(name));
117 private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
118 String fileLocation = model.getLocation();
119 File fileResource = new File(fileLocation);
120 FileSystemResource httpResource = new FileSystemResource(fileResource);
121 post.body(BodyInserters.fromResource(httpResource));
124 private URI getUri(String fileName) {
125 String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
126 return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
130 private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
131 String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString();
132 logger.error(errorMessage);
134 return Mono.error(new Exception(errorMessage));