21266fbcba4ae14bc8882cfba91674b7970892ff
[dcaegen2/collectors/datafile.git] /
1 /*
2  * ============LICENSE_START======================================================================
3  * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
4  * ===============================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
6  * in compliance with the License. You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software distributed under the License
11  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12  * or implied. See the License for the specific language governing permissions and limitations under
13  * the License.
14  * ============LICENSE_END========================================================================
15  */
16
17 package org.onap.dcaegen2.collectors.datafile.service;
18
19 import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.RESPONSE_CODE;
20 import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.SERVICE_NAME;
21 import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
22 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapCustomConfig;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import org.slf4j.MDC;
26 import org.springframework.http.HttpHeaders;
27 import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
28 import org.springframework.web.reactive.function.client.WebClient;
29 import org.springframework.web.reactive.function.client.WebClient.Builder;
30 import reactor.core.publisher.Mono;
31
32 /**
33  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
34  */
35 public class DmaapReactiveWebClient {
36
37     private final Logger logger = LoggerFactory.getLogger(this.getClass());
38
39     private String dmaaPContentType;
40     private String dmaaPUserName;
41     private String dmaaPUserPassword;
42
43     /**
44      * Creating DmaapReactiveWebClient passing to them basic DmaapConfig.
45      *
46      * @param dmaapCustomConfig - configuration object
47      * @return DmaapReactiveWebClient
48      */
49     public DmaapReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
50         this.dmaaPContentType = dmaapCustomConfig.dmaapContentType();
51         return this;
52     }
53
54     /**
55      * Construct Reactive WebClient with appropriate settings.
56      *
57      * @return WebClient
58      */
59     public WebClient build() {
60         Builder webClientBuilder = WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType)
61                 .filter(logRequest()).filter(logResponse());
62         if (dmaaPUserName != null && !dmaaPUserName.isEmpty() && dmaaPUserPassword != null
63                 && !dmaaPUserPassword.isEmpty()) {
64             webClientBuilder.filter(basicAuthentication(dmaaPUserName, dmaaPUserPassword));
65
66         }
67         return webClientBuilder.build();
68     }
69
70     private ExchangeFilterFunction logResponse() {
71         return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
72             MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
73             logger.trace("Response Status {}", clientResponse.statusCode());
74             MDC.remove(RESPONSE_CODE);
75             return Mono.just(clientResponse);
76         });
77     }
78
79     private ExchangeFilterFunction logRequest() {
80         return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
81             MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
82             logger.trace("Request: {} {}", clientRequest.method(), clientRequest.url());
83             clientRequest.headers()
84                     .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
85             logger.trace("HTTP request headers: {}", clientRequest.headers());
86             MDC.remove(SERVICE_NAME);
87             return Mono.just(clientRequest);
88         });
89     }
90
91 }