fd3c0c842c10932824b4032c80236aaf4b71af2f
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.service.producer;
18
19 import com.google.gson.JsonElement;
20 import com.google.gson.JsonParser;
21
22 import java.io.File;
23 import java.net.URI;
24 import java.net.URISyntaxException;
25 import java.util.List;
26
27 import org.apache.http.HttpHeaders;
28 import org.apache.http.client.utils.URIBuilder;
29 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
30 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
31 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.springframework.core.io.FileSystemResource;
35 import org.springframework.http.HttpStatus;
36 import org.springframework.web.reactive.function.BodyInserters;
37 import org.springframework.web.reactive.function.client.ClientResponse;
38 import org.springframework.web.reactive.function.client.WebClient;
39 import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
40 import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
41
42 import reactor.core.publisher.Mono;
43
44 /**
45  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
46  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
47  */
48 public class DmaapProducerReactiveHttpClient {
49
50     private static final String X_ATT_DR_META = "X-ATT-DR-META";
51     private static final String LOCATION = "location";
52
53     private final Logger logger = LoggerFactory.getLogger(this.getClass());
54
55     private WebClient webClient;
56     private final String dmaapHostName;
57     private final Integer dmaapPortNumber;
58     private final String dmaapProtocol;
59     private final String dmaapTopicName;
60     private final String dmaapContentType;
61
62     /**
63      * Constructor DmaapProducerReactiveHttpClient.
64      *
65      * @param dmaapPublisherConfiguration - DMaaP producer configuration object
66      */
67     public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
68
69         this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
70         this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
71         this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
72         this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
73         this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
74     }
75
76     /**
77      * Function for calling DMaaP HTTP producer - post request to DMaaP.
78      *
79      * @param consumerDmaapModelMono - object which will be sent to DMaaP
80      * @return status code of operation
81      */
82     public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) {
83         consumerDmaapModelMono.subscribe(models -> postFilesAndData(models));
84         return Mono.just(HttpStatus.OK.toString());
85     }
86
87     public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
88         this.webClient = webClient;
89         return this;
90     }
91
92     private void postFilesAndData(List<ConsumerDmaapModel> models) {
93         for (ConsumerDmaapModel consumerDmaapModel : models) {
94             postFileAndData(consumerDmaapModel);
95         }
96     }
97
98     private void postFileAndData(ConsumerDmaapModel model) {
99         RequestBodyUriSpec post = webClient.post();
100
101         boolean headPrepared = prepareHead(model, post);
102
103         if (headPrepared) {
104             prepareBody(model, post);
105
106             ResponseSpec responseSpec = post.retrieve();
107             responseSpec.onStatus(HttpStatus::is4xxClientError,
108                     clientResponse -> handlePostErrors(model, clientResponse));
109             responseSpec.onStatus(HttpStatus::is5xxServerError,
110                     clientResponse -> handlePostErrors(model, clientResponse));
111             String bodyToMono = responseSpec.bodyToMono(String.class).block();
112             logger.debug("File info sent to DR with response: " + bodyToMono);
113         }
114     }
115
116     private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
117         boolean result = true;
118         try {
119             post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
120
121             JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
122             metaData.getAsJsonObject().remove(LOCATION);
123             post.header(X_ATT_DR_META, metaData.toString());
124
125             post.uri(getUri());
126         } catch (Exception e) {
127             logger.error("Unable to post file to Data Router. " + model, e);
128             result = false;
129         }
130
131         return result;
132     }
133
134     private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
135         String fileLocation = model.getLocation();
136         File fileResource = new File(fileLocation);
137         FileSystemResource httpResource = new FileSystemResource(fileResource);
138         post.body(BodyInserters.fromResource(httpResource));
139     }
140
141     private URI getUri() throws URISyntaxException {
142         return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
143                 .setPath(dmaapTopicName).build();
144     }
145
146     private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
147         String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString();
148         logger.error(errorMessage);
149
150         return Mono.error(new Exception(errorMessage));
151     }
152 }