4b8ce08f6caca0a959149901e1d7b74809add2a3
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.service.producer;
18
19 import com.google.gson.JsonElement;
20 import com.google.gson.JsonParser;
21
22 import java.io.File;
23 import java.net.URI;
24 import java.util.List;
25
26 import org.apache.http.HttpHeaders;
27 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
28 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
29 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import org.springframework.core.io.FileSystemResource;
33 import org.springframework.http.HttpStatus;
34 import org.springframework.web.reactive.function.BodyInserters;
35 import org.springframework.web.reactive.function.client.ClientResponse;
36 import org.springframework.web.reactive.function.client.WebClient;
37 import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
38 import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
39 import org.springframework.web.util.DefaultUriBuilderFactory;
40
41 import reactor.core.publisher.Mono;
42
43 /**
44  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
45  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
46  */
47 public class DmaapProducerReactiveHttpClient {
48
49     private static final String X_ATT_DR_META = "X-ATT-DR-META";
50     private static final String LOCATION = "location";
51     private static final String DEFAULT_FEED_ID = "1";
52
53     private final Logger logger = LoggerFactory.getLogger(this.getClass());
54
55     private WebClient webClient;
56     private final String dmaapHostName;
57     private final Integer dmaapPortNumber;
58     private final String dmaapTopicName;
59     private final String dmaapProtocol;
60     private final String dmaapContentType;
61
62     /**
63      * Constructor DmaapProducerReactiveHttpClient.
64      *
65      * @param dmaapPublisherConfiguration - DMaaP producer configuration object
66      */
67     public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
68
69         this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
70         this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
71         this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
72         this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
73         this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
74     }
75
76     /**
77      * Function for calling DMaaP HTTP producer - post request to DMaaP.
78      *
79      * @param consumerDmaapModelMono - object which will be sent to DMaaP
80      * @return status code of operation
81      */
82     public Mono<String> getDmaapProducerResponse(Mono<List<ConsumerDmaapModel>> consumerDmaapModelMono) {
83         consumerDmaapModelMono.subscribe(models -> postFilesAndData(models));
84         return Mono.just(HttpStatus.OK.toString());
85     }
86
87     public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
88         this.webClient = webClient;
89         return this;
90     }
91
92     private void postFilesAndData(List<ConsumerDmaapModel> models) {
93         for (ConsumerDmaapModel consumerDmaapModel : models) {
94             postFileAndData(consumerDmaapModel);
95         }
96     }
97
98     private void postFileAndData(ConsumerDmaapModel model) {
99         RequestBodyUriSpec post = webClient.post();
100
101         boolean headPrepared = prepareHead(model, post);
102
103         if (headPrepared) {
104             prepareBody(model, post);
105
106             ResponseSpec responseSpec = post.retrieve();
107             responseSpec.onStatus(HttpStatus::is4xxClientError,
108                     clientResponse -> handlePostErrors(model, clientResponse));
109             responseSpec.onStatus(HttpStatus::is5xxServerError,
110                     clientResponse -> handlePostErrors(model, clientResponse));
111             String bodyToMono = responseSpec.bodyToMono(String.class).block();
112             logger.debug("File info sent to DR with response: " + bodyToMono);
113         }
114     }
115
116     private boolean prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
117         boolean result = true;
118         post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
119
120         JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
121         String location = metaData.getAsJsonObject().remove(LOCATION).getAsString();
122         post.header(X_ATT_DR_META, metaData.toString());
123
124         post.uri(getUri(location));
125
126         return result;
127     }
128
129     private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
130         String fileLocation = model.getLocation();
131         File fileResource = new File(fileLocation);
132         FileSystemResource httpResource = new FileSystemResource(fileResource);
133         post.body(BodyInserters.fromResource(httpResource));
134     }
135
136     private URI getUri(String location) {
137         String fileName = location.substring(location.indexOf("/"), location.length());
138         String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" +  fileName;
139         URI uri = new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName)
140                 .port(dmaapPortNumber).path(path).build();
141         return uri;
142     }
143
144     private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
145         String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString();
146         logger.error(errorMessage);
147
148         return Mono.error(new Exception(errorMessage));
149     }
150 }