8010bdc1cfc12dde6ecaadb6b598bed08f524757
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.service.producer;
18
19 import com.google.gson.JsonElement;
20 import com.google.gson.JsonParser;
21
22 import java.io.File;
23 import java.net.URI;
24 import java.net.URISyntaxException;
25 import java.util.List;
26
27 import org.apache.http.HttpHeaders;
28 import org.apache.http.client.utils.URIBuilder;
29 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
30 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
31 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.springframework.core.io.FileSystemResource;
35 import org.springframework.http.HttpStatus;
36 import org.springframework.web.reactive.function.BodyInserters;
37 import org.springframework.web.reactive.function.client.ClientResponse;
38 import org.springframework.web.reactive.function.client.WebClient;
39 import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
40 import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
41
42 import reactor.core.publisher.Mono;
43
44 /**
45  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
46  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
47  */
48 public class DmaapProducerReactiveHttpClient {
49
50     private static final String X_ATT_DR_META = "X-ATT-DR-META";
51     private static final String LOCATION = "location";
52
53     private final Logger logger = LoggerFactory.getLogger(this.getClass());
54
55     private WebClient webClient;
56     private final String dmaapHostName;
57     private final Integer dmaapPortNumber;
58     private final String dmaapProtocol;
59     private final String dmaapTopicName;
60     private final String dmaapContentType;
61
62     /**
63      * Constructor DmaapProducerReactiveHttpClient.
64      *
65      * @param dmaapPublisherConfiguration - DMaaP producer configuration object
66      */
67     public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
68
69         this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
70         this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
71         this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
72         this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
73         this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
74     }
75
76     /**
77      * Function for calling DMaaP HTTP producer - post request to DMaaP.
78      *
79      * @param consumerDmaapModelMono - object which will be sent to DMaaP
80      * @return status code of operation
81      */
82     public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) {
83         consumerDmaapModelMono.subscribe(models -> postFilesAndData(models));
84         return Mono.just(HttpStatus.OK.toString());
85     }
86
87     public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
88         this.webClient = webClient;
89         return this;
90     }
91
92     private void postFilesAndData(List<ConsumerDmaapModel> models) {
93         for (ConsumerDmaapModel consumerDmaapModel : models) {
94             postFileAndData(consumerDmaapModel);
95         }
96     }
97
98     private void postFileAndData(ConsumerDmaapModel model) {
99         RequestBodyUriSpec post = webClient.post();
100
101         boolean headPrepared = prepareHead(model, post);
102
103         if (headPrepared) {
104             prepareBody(model, post);
105
106             ResponseSpec responseSpec = post.retrieve();
107             responseSpec.onStatus(HttpStatus::is4xxClientError,
108                     clientResponse -> handlePostErrors(model, clientResponse));
109             responseSpec.onStatus(HttpStatus::is5xxServerError,
110                     clientResponse -> handlePostErrors(model, clientResponse));
111             String bodyToMono = responseSpec.bodyToMono(String.class).block();
112         }
113     }
114
115     private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
116         boolean result = true;
117         try {
118             post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
119
120             JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
121             metaData.getAsJsonObject().remove(LOCATION);
122             post.header(X_ATT_DR_META, metaData.toString());
123
124             post.uri(getUri());
125         } catch (Exception e) {
126             logger.error("Unable to post file to Data Router. " + model, e);
127             result = false;
128         }
129
130         return result;
131     }
132
133     private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
134         String fileLocation = model.getLocation();
135         File fileResource = new File(fileLocation);
136         FileSystemResource httpResource = new FileSystemResource(fileResource);
137         post.body(BodyInserters.fromResource(httpResource));
138     }
139
140     private URI getUri() throws URISyntaxException {
141         return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
142                 .setPath(dmaapTopicName).build();
143     }
144
145     private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
146         String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString();
147         logger.error(errorMessage);
148
149         return Mono.error(new Exception(errorMessage));
150     }
151 }