2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.service.producer;
19 import com.google.gson.JsonElement;
20 import com.google.gson.JsonParser;
24 import java.net.URISyntaxException;
25 import java.util.List;
27 import org.apache.http.HttpHeaders;
28 import org.apache.http.client.utils.URIBuilder;
29 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
30 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
31 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.springframework.core.io.FileSystemResource;
35 import org.springframework.http.HttpStatus;
36 import org.springframework.web.reactive.function.BodyInserters;
37 import org.springframework.web.reactive.function.client.ClientResponse;
38 import org.springframework.web.reactive.function.client.WebClient;
39 import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
40 import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
42 import reactor.core.publisher.Mono;
45 * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
46 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
48 public class DmaapProducerReactiveHttpClient {
50 private static final String X_ATT_DR_META = "X-ATT-DR-META";
51 private static final String LOCATION = "location";
53 private final Logger logger = LoggerFactory.getLogger(this.getClass());
55 private WebClient webClient;
56 private final String dmaapHostName;
57 private final Integer dmaapPortNumber;
58 private final String dmaapProtocol;
59 private final String dmaapTopicName;
60 private final String dmaapContentType;
63 * Constructor DmaapProducerReactiveHttpClient.
65 * @param dmaapPublisherConfiguration - DMaaP producer configuration object
67 public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
69 this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
70 this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
71 this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
72 this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
73 this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
77 * Function for calling DMaaP HTTP producer - post request to DMaaP.
79 * @param consumerDmaapModelMono - object which will be sent to DMaaP
80 * @return status code of operation
82 public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) {
83 consumerDmaapModelMono.subscribe(models -> postFilesAndData(models));
84 return Mono.just(HttpStatus.OK.toString());
87 public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
88 this.webClient = webClient;
92 private void postFilesAndData(List<ConsumerDmaapModel> models) {
93 for (ConsumerDmaapModel consumerDmaapModel : models) {
94 postFileAndData(consumerDmaapModel);
98 private void postFileAndData(ConsumerDmaapModel model) {
99 RequestBodyUriSpec post = webClient.post();
101 boolean headPrepared = prepareHead(model, post);
104 prepareBody(model, post);
106 ResponseSpec responseSpec = post.retrieve();
107 responseSpec.onStatus(HttpStatus::is4xxClientError,
108 clientResponse -> handlePostErrors(model, clientResponse));
109 responseSpec.onStatus(HttpStatus::is5xxServerError,
110 clientResponse -> handlePostErrors(model, clientResponse));
111 String bodyToMono = responseSpec.bodyToMono(String.class).block();
112 logger.debug("File info sent to DR with response: " + bodyToMono);
116 private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
117 boolean result = true;
119 post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
121 JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
122 metaData.getAsJsonObject().remove(LOCATION);
123 post.header(X_ATT_DR_META, metaData.toString());
126 } catch (Exception e) {
127 logger.error("Unable to post file to Data Router. " + model, e);
134 private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
135 String fileLocation = model.getLocation();
136 File fileResource = new File(fileLocation);
137 FileSystemResource httpResource = new FileSystemResource(fileResource);
138 post.body(BodyInserters.fromResource(httpResource));
141 private URI getUri() throws URISyntaxException {
142 return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
143 .setPath(dmaapTopicName).build();
146 private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
147 String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString();
148 logger.error(errorMessage);
150 return Mono.error(new Exception(errorMessage));