import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.prh.config.ImmutableAaiClientConfiguration;
import org.onap.dcaegen2.services.prh.model.EnvProperties;
import org.onap.dcaegen2.services.prh.service.PrhConfigurationProvider;
import org.slf4j.Logger;
@Override
public AaiClientConfiguration getAaiClientConfiguration() {
- return Optional.ofNullable(aaiClientCloudConfiguration).orElse(super.getAaiClientConfiguration());
+ return Optional.ofNullable(ImmutableAaiClientConfiguration.copyOf(aaiClientCloudConfiguration)
+ .withAaiHeaders(aaiClientConfiguration.aaiHeaders()))
+ .orElse(ImmutableAaiClientConfiguration.copyOf(super.getAaiClientConfiguration()));
}
@Override
LOGGER.info("Retrieving Config Binding Service endpoint from Consul");
try {
return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class)
- .flatMap(jsonArray -> this.createConfigBindingserviceurl(jsonArray, envProperties.appName()));
+ .flatMap(jsonArray -> this.createConfigBindingServiceUrl(jsonArray, envProperties.appName()));
} catch (URISyntaxException e) {
LOGGER.warn("Malformed Consul uri", e);
return Mono.error(e);
}
- private Mono<String> createConfigBindingserviceurl(JsonArray jsonArray, String appName) {
+ private Mono<String> createConfigBindingServiceUrl(JsonArray jsonArray, String appName) {
return getConfigBindingObject(jsonArray)
- .flatMap(jsonObject -> buildConfigBindingserviceurl(jsonObject, appName));
+ .flatMap(jsonObject -> buildConfigBindingServiceUrl(jsonObject, appName));
}
- private Mono<String> buildConfigBindingserviceurl(JsonObject jsonObject, String appName) {
+ private Mono<String> buildConfigBindingServiceUrl(JsonObject jsonObject, String appName) {
try {
return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(),
jsonObject.get("ServicePort").getAsInt(), "/service_component", appName));
protected abstract Mono<ConsumerDmaapModel> execute(String object);
WebClient buildWebClient() {
- return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
+ return new DMaaPReactiveWebClient().build();
}
}
package org.onap.dcaegen2.services.prh.tasks;
-
import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.INSTANCE_UUID;
import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE_CODE;
package org.onap.dcaegen2.services.prh.service;
-import org.onap.dcaegen2.services.prh.config.DmaapCustomConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private String dmaaPUserName;
- private String dmaaPUserPassword;
-
- /**
- * Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig.
- *
- * @param dmaapCustomConfig - configuration object
- * @return DMaaPReactiveWebClient
- */
- public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
- this.dmaaPUserName = dmaapCustomConfig.dmaapUserName();
- this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword();
- return this;
- }
-
/**
* Construct Reactive WebClient with appropriate settings.
*
import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
+import java.util.function.Consumer;
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.slf4j.MDC;
return webClient
.get()
.uri(getUri())
- .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
- .header(X_INVOCATION_ID, UUID.randomUUID().toString())
- .header(HttpHeaders.CONTENT_TYPE, contentType)
+ .headers(getHeaders())
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse ->
Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))
}
}
+ private Consumer<HttpHeaders> getHeaders() {
+ return httpHeaders -> {
+ httpHeaders.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID));
+ httpHeaders.set(X_INVOCATION_ID, UUID.randomUUID().toString());
+ httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
+ };
+ }
+
private String createRequestPath() {
return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
}
import org.apache.http.client.utils.URIBuilder;
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
*/
public class DMaaPProducerReactiveHttpClient {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
private final String dmaapHostName;
private final Integer dmaapPortNumber;
private final String dmaapProtocol;
private final String dmaapTopicName;
private final String dmaapContentType;
+
private RestTemplate restTemplate;
/**
HttpEntity<String> request = new HttpEntity<>(createJsonBody(consumerDmaapModelMono), getAllHeaders());
return Mono.just(restTemplate.exchange(getUri(), HttpMethod.POST, request, String.class));
} catch (URISyntaxException e) {
- logger.warn("Exception while evaluating URI");
return Mono.error(e);
}
});
when(dmaapConsumerConfiguration.dmaapUserName()).thenReturn(dmaaPUserName);
when(dmaapConsumerConfiguration.dmaapUserPassword()).thenReturn(dmaaPUserPassword);
WebClient dmaapreactiveWebClient = new DMaaPReactiveWebClient()
- .fromConfiguration(dmaapConsumerConfiguration)
.build();
//then
import java.net.URI;
import java.net.URISyntaxException;
-
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
private void mockDependantObjects() {
when(webClient.get()).thenReturn(requestHeadersSpec);
when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
- when(requestHeadersSpec.header(any(), any())).thenReturn(requestHeadersSpec);
+ when(requestHeadersSpec.headers(any())).thenReturn(requestHeadersSpec);
when(requestHeadersSpec.retrieve()).thenReturn(responseSpec);
doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
}