36050ff76d4cb7eebd63ff7ce7a098a2534f2918
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.service.producer;
18
19 import com.google.gson.JsonElement;
20 import com.google.gson.JsonParser;
21
22 import java.io.File;
23 import java.net.URI;
24
25 import org.apache.http.HttpHeaders;
26 import org.onap.dcaegen2.collectors.datafile.config.DmaapPublisherConfiguration;
27 import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
28 import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.springframework.core.io.FileSystemResource;
32 import org.springframework.http.HttpStatus;
33 import org.springframework.web.reactive.function.BodyInserters;
34 import org.springframework.web.reactive.function.client.ClientResponse;
35 import org.springframework.web.reactive.function.client.WebClient;
36 import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
37 import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
38 import org.springframework.web.util.DefaultUriBuilderFactory;
39
40 import reactor.core.publisher.Flux;
41 import reactor.core.publisher.Mono;
42
43 /**
44  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
45  * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
46  */
47 public class DmaapProducerReactiveHttpClient {
48
49     private static final String X_ATT_DR_META = "X-ATT-DR-META";
50     private static final String NAME_JSON_TAG = "name";
51     private static final String LOCATION_JSON_TAG = "location";
52     private static final String DEFAULT_FEED_ID = "1";
53
54     private final Logger logger = LoggerFactory.getLogger(this.getClass());
55
56     private WebClient webClient;
57     private final String dmaapHostName;
58     private final Integer dmaapPortNumber;
59     private final String dmaapTopicName;
60     private final String dmaapProtocol;
61     private final String dmaapContentType;
62
63     /**
64      * Constructor DmaapProducerReactiveHttpClient.
65      *
66      * @param dmaapPublisherConfiguration - DMaaP producer configuration object
67      */
68     public DmaapProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
69
70         this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
71         this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
72         this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
73         this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
74         this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
75     }
76
77     /**
78      * Function for calling DMaaP HTTP producer - post request to DMaaP.
79      *
80      * @param consumerDmaapModel - object which will be sent to DMaaP
81      * @return status code of operation
82      */
83     public Flux<String> getDmaapProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
84         logger.trace("Entering getDmaapProducerResponse with {}", consumerDmaapModel);
85
86         RequestBodyUriSpec post = webClient.post();
87
88         prepareHead(consumerDmaapModel, post);
89
90         prepareBody(consumerDmaapModel, post);
91
92         ResponseSpec responseSpec = post.retrieve();
93         responseSpec.onStatus(HttpStatus::is4xxClientError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
94         responseSpec.onStatus(HttpStatus::is5xxServerError, clientResponse -> handlePostErrors(consumerDmaapModel, clientResponse));
95         Flux<String> response = responseSpec.bodyToFlux(String.class);
96
97         logger.trace("Exiting getDmaapProducerResponse with {}", response);
98         return response;
99     }
100
101     public DmaapProducerReactiveHttpClient createDmaapWebClient(WebClient webClient) {
102         this.webClient = webClient;
103         return this;
104     }
105
106     private void prepareHead(ConsumerDmaapModel model, RequestBodyUriSpec post) {
107         post.header(HttpHeaders.CONTENT_TYPE, dmaapContentType);
108
109         JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
110         String name = metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
111         metaData.getAsJsonObject().remove(LOCATION_JSON_TAG);
112         post.header(X_ATT_DR_META, metaData.toString());
113
114         post.uri(getUri(name));
115     }
116
117     private void prepareBody(ConsumerDmaapModel model, RequestBodyUriSpec post) {
118         String fileLocation = model.getLocation();
119         File fileResource = new File(fileLocation);
120         FileSystemResource httpResource = new FileSystemResource(fileResource);
121         post.body(BodyInserters.fromResource(httpResource));
122     }
123
124     private URI getUri(String fileName) {
125         String path = dmaapTopicName + "/" + DEFAULT_FEED_ID + "/" + fileName;
126         return new DefaultUriBuilderFactory().builder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber)
127                 .path(path).build();
128     }
129
130     private Mono<Exception> handlePostErrors(ConsumerDmaapModel model, ClientResponse clientResponse) {
131         String errorMessage = "Unable to post file to Data Router. " + model + "Reason: " + clientResponse.toString();
132         logger.error(errorMessage);
133
134         return Mono.error(new Exception(errorMessage));
135     }
136 }