2 * ============LICENSE_START======================================================================
3 * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4 * ===============================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6 * in compliance with the License. You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software distributed under the License
11 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 * or implied. See the License for the specific language governing permissions and limitations under
14 * ============LICENSE_END========================================================================
17 package org.onap.dcaegen2.collectors.datafile.service.producer;
19 import com.google.gson.JsonElement;
20 import com.google.gson.JsonParser;
24 import java.util.List;
26 import org.apache.http.HttpHeaders;
27 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
28 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
29 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.springframework.core.io.FileSystemResource;
33 import org.springframework.http.HttpStatus;
34 import org.springframework.web.reactive.function.BodyInserters;
35 import org.springframework.web.reactive.function.client.ClientResponse;
36 import org.springframework.web.reactive.function.client.WebClient;
37 import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
38 import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
39 import org.springframework.web.util.DefaultUriBuilderFactory;
41 import reactor.core.publisher.Mono;
44 * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
45 * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
47 public class DmaapProducerReactiveHttpClient {
49 private static final String X_ATT_DR_META = "X-ATT-DR-META";
50 private static final String LOCATION = "location";
51 private static final String DEFAULT_FEED_ID = "1";
53 private final Logger logger = LoggerFactory.getLogger(this.getClass());
55 private WebClient webClient;
56 private final String dmaapHostName;
57 private final Integer dmaapPortNumber;
58 private final String dmaapTopicName;
59 private final String dmaapProtocol;
60 private final String dmaapContentType;
63 * Constructor DmaapProducerReactiveHttpClient.
65 * @param dmaapPublisherConfiguration - DMaaP producer configuration object
67 public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
69 this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
70 this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
71 this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
72 this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
73 this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
77 * Function for calling DMaaP HTTP producer - post request to DMaaP.
79 * @param consumerDmaapModelMono - object which will be sent to DMaaP
80 * @return status code of operation
82 public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) {
83 consumerDmaapModelMono.subscribe(models -> postFilesAndData(models));
84 return Mono.just(HttpStatus.OK.toString());
87 public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
88 this.webClient = webClient;
92 private void postFilesAndData(List<ConsumerDmaapModel> models) {
93 for (ConsumerDmaapModel consumerDmaapModel : models) {
94 postFileAndData(consumerDmaapModel);
98 private void postFileAndData(ConsumerDmaapModel model) {
99 RequestBodyUriSpec post = webClient.post();
101 boolean headPrepared = prepareHead(model, post);
104 prepareBody(model, post);
106 ResponseSpec responseSpec = post.retrieve();
107 responseSpec.onStatus(HttpStatus::is4xxClientError,
108 clientResponse -> handlePostErrors(model, clientResponse));
109 responseSpec.onStatus(HttpStatus::is5xxServerError,
110 clientResponse -> handlePostErrors(model, clientResponse));
111 String bodyToMono = responseSpec.bodyToMono(String.class).block();
112 logger.debug("File info sent to DR with response: " + bodyToMono);
116 private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
117 boolean result = true;
118 post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
120 JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
121 String location = metaData.getAsJsonObject().remove(LOCATION).getAsString();
122 post.header(X_ATT_DR_META, metaData.toString());
124 post.uri(getUri(location));
129 private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
130 String fileLocation = model.getLocation();
131 File fileResource = new File(fileLocation);
132 FileSystemResource httpResource = new FileSystemResource(fileResource);
133 post.body(BodyInserters.fromResource(httpResource));
136 private URI getUri(String location) {
137 String fileName = location.substring(location.indexOf("/"), location.length());
138 String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
139 URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName)
140 .port(dmaapPortNumber).path(path).build();
144 private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
145 String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString();
146 logger.error(errorMessage);
148 return Mono.error(new Exception(errorMessage));