package org.onap.dcaegen2.services.prh.tasks;
+import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import reactor.core.publisher.Flux;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
void initConfigs();
- Flux<ConsumerDmaapModel> execute(String object);
+ Flux<ConsumerDmaapModel> execute(String object) throws SSLException;
- DMaaPConsumerReactiveHttpClient resolveClient();
+ DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException;
}
package org.onap.dcaegen2.services.prh.tasks;
+import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.prh.service.consumer.ConsumerReactiveHttpClientFactory;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.prh.service.consumer.DMaaPReactiveWebClient;
+import org.onap.dcaegen2.services.prh.service.consumer.DMaaPReactiveWebClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
@Autowired
public DmaapConsumerTaskImpl(Config config) {
this(config, new DmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClient()));
+ new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
}
DmaapConsumerTaskImpl(Config prhAppConfig,
}
@Override
- public Flux<ConsumerDmaapModel> execute(String object) {
+ public Flux<ConsumerDmaapModel> execute(String object) throws SSLException {
DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
LOGGER.debug("Method called with arg {}", object);
return dmaapConsumerJsonParser.getJsonObject(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
}
@Override
- public DMaaPConsumerReactiveHttpClient resolveClient() {
+ public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException {
return httpClientFactory.create(config.getDmaapConsumerConfiguration());
}
}
MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
logger.info(INVOKE, "Init configs");
dmaapConsumerTask.initConfigs();
- return dmaapConsumerTask.execute("");
+ return consumeFromDMaaP();
});
}
+ private Flux<ConsumerDmaapModel> consumeFromDMaaP() {
+ try {
+ return dmaapConsumerTask.execute("");
+ } catch (SSLException e) {
+ return Flux.error(e);
+ }
+ }
+
private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) {
try {
return aaiProducerTask.execute(monoDMaaPModel);
}
@Test
- void whenPassedObjectDoesntFit_DoesNotThrowPrhTaskException() {
+ void whenPassedObjectDoesntFit_DoesNotThrowPrhTaskException() throws Exception {
//given
prepareMocksForDmaapConsumer(Optional.empty());
}
@Test
- void whenPassedObjectFits_ReturnsCorrectResponse() {
+ void whenPassedObjectFits_ReturnsCorrectResponse() throws Exception {
//given
prepareMocksForDmaapConsumer(Optional.of(message));
verify(appConfig).initFileStreamReader();
}
- private void prepareMocksForDmaapConsumer(Optional<String> message) {
+ private void prepareMocksForDmaapConsumer(Optional<String> message) throws Exception {
dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(message.orElse("")));
when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
*/
*/
@Bean
@Primary
- public DmaapConsumerTask registerSimpleDmaapConsumerTask() {
+ public DmaapConsumerTask registerSimpleDmaapConsumerTask() throws SSLException {
AppConfig appConfig = spy(AppConfig.class);
doReturn(mock(DmaapConsumerConfiguration.class)).when(appConfig).getDmaapConsumerConfiguration();
DmaapConsumerTaskImpl dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig));
package org.onap.dcaegen2.services.prh.service.consumer;
+import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
public class ConsumerReactiveHttpClientFactory {
- private final DMaaPReactiveWebClient reactiveWebClient;
+ private final DMaaPReactiveWebClientFactory reactiveWebClient;
- public ConsumerReactiveHttpClientFactory(DMaaPReactiveWebClient reactiveWebClient) {
+ public ConsumerReactiveHttpClientFactory(DMaaPReactiveWebClientFactory reactiveWebClient) {
this.reactiveWebClient = reactiveWebClient;
}
- public DMaaPConsumerReactiveHttpClient create(DmaapConsumerConfiguration consumerConfiguration) {
- return new DMaaPConsumerReactiveHttpClient(consumerConfiguration, reactiveWebClient.build());
+ public DMaaPConsumerReactiveHttpClient create(DmaapConsumerConfiguration consumerConfiguration)
+ throws SSLException {
+ return new DMaaPConsumerReactiveHttpClient(consumerConfiguration,
+ reactiveWebClient.build(consumerConfiguration));
}
}
import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE_CODE;
import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.SERVICE_NAME;
+import io.netty.handler.ssl.SslContext;
+import javax.net.ssl.SSLException;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.prh.ssl.SslFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import org.springframework.http.client.reactive.ClientHttpConnector;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
*/
-public class DMaaPReactiveWebClient {
+public class DMaaPReactiveWebClientFactory {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private final SslFactory sslFactory;
+
+ public DMaaPReactiveWebClientFactory() {
+ this(new SslFactory());
+ }
+
+ DMaaPReactiveWebClientFactory(SslFactory sslFactory) {
+ this.sslFactory = sslFactory;
+ }
+
/**
* Construct Reactive WebClient with appropriate settings.
*
* @return WebClient
*/
- public WebClient build() {
+ public WebClient build(DmaapConsumerConfiguration consumerConfiguration) throws SSLException {
+ SslContext sslContext = createSslContext(consumerConfiguration);
+ ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(
+ HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
return WebClient.builder()
- .filter(logRequest())
- .filter(logResponse())
- .build();
+ .clientConnector(reactorClientHttpConnector)
+ .filter(logRequest())
+ .filter(logResponse())
+ .build();
+ }
+
+ private SslContext createSslContext(DmaapConsumerConfiguration consumerConfiguration) throws SSLException {
+ if (consumerConfiguration.enableDmaapCertAuth()) {
+ return sslFactory.createSecureContext(
+ consumerConfiguration.keyStore(), consumerConfiguration.keyStorePassword(),
+ consumerConfiguration.trustStore(), consumerConfiguration.trustStorePassword()
+ );
+ }
+ return sslFactory.createInsecureContext();
}
private ExchangeFilterFunction logResponse() {
import java.util.UUID;
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
*/
public class DMaaPPublisherReactiveHttpClient {
+ private final Logger logger = LoggerFactory.getLogger(DMaaPPublisherReactiveHttpClient.class);
private final String dmaapHostName;
private final Integer dmaapPortNumber;
private final String dmaapProtocol;
public Mono<ResponseEntity<String>> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
return Mono.defer(() -> {
HttpEntity<String> request = new HttpEntity<>(createJsonBody(consumerDmaapModelMono), getAllHeaders());
+ logger.info("Request: {} {}", getUri(), request);
return Mono.just(restTemplate.exchange(getUri(), HttpMethod.POST, request, String.class));
});
class ConsumerReactiveHttpClientFactoryTest {
private DmaapConsumerConfiguration dmaapConsumerConfiguration = mock(DmaapConsumerConfiguration.class);
- private DMaaPReactiveWebClient reactiveWebClient = mock(DMaaPReactiveWebClient.class);
+ private DMaaPReactiveWebClientFactory reactiveWebClientFactory = mock(DMaaPReactiveWebClientFactory.class);
private ConsumerReactiveHttpClientFactory httpClientFactory =
- new ConsumerReactiveHttpClientFactory(reactiveWebClient);
+ new ConsumerReactiveHttpClientFactory(reactiveWebClientFactory);
@Test
- void create_shouldReturnNotNullFactoryInstance() {
+ void create_shouldReturnNotNullFactoryInstance() throws Exception {
Assertions.assertNotNull(httpClientFactory.create(dmaapConsumerConfiguration));
- verify(reactiveWebClient).build();
+ verify(reactiveWebClientFactory).build(dmaapConsumerConfiguration);
}
}
\ No newline at end of file
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.service.consumer;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.prh.ssl.SslFactory;
+import org.springframework.web.reactive.function.client.WebClient;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/5/18
+ */
+class DMaaPReactiveWebClientFactoryTest {
+
+ private static final String KEY_STORE = "keyStore";
+ private static final String KEY_STORE_PASS = "keyStorePass";
+ private static final String TRUST_STORE = "trustStore";
+ private static final String TRUST_STORE_PASS = "trustStorePass";
+ private SslFactory sslFactory = mock(SslFactory.class);
+ private DMaaPReactiveWebClientFactory webClientFactory = new DMaaPReactiveWebClientFactory(sslFactory);
+
+ @Test
+ void builder_shouldBuildDMaaPReactiveWebClientwithInsecureSslContext() throws Exception {
+ //given
+ DmaapConsumerConfiguration dmaapConsumerConfiguration = givenDmaapConfigurationWithSslDisabled();
+
+ //when
+ WebClient dmaapReactiveWebClient = webClientFactory.build(dmaapConsumerConfiguration);
+
+ //then
+ Assertions.assertNotNull(dmaapReactiveWebClient);
+ verify(sslFactory).createInsecureContext();
+ }
+
+ @Test
+ void builder_shouldBuildDMaaPReactiveWebClientwithSecureSslContext() throws Exception {
+ //given
+ DmaapConsumerConfiguration dmaapConsumerConfiguration = givenDmaapConfigurationWithSslEnabled();
+
+ //when
+ WebClient dmaapReactiveWebClient = webClientFactory.build(dmaapConsumerConfiguration);
+
+ //then
+ Assertions.assertNotNull(dmaapReactiveWebClient);
+ verify(sslFactory).createSecureContext(KEY_STORE, KEY_STORE_PASS, TRUST_STORE, TRUST_STORE_PASS);
+ }
+
+ private DmaapConsumerConfiguration givenDmaapConfigurationWithSslDisabled() {
+ DmaapConsumerConfiguration dmaapConsumerConfiguration = mock(DmaapConsumerConfiguration.class);
+ when(dmaapConsumerConfiguration.enableDmaapCertAuth()).thenReturn(false);
+ return dmaapConsumerConfiguration;
+ }
+
+ private DmaapConsumerConfiguration givenDmaapConfigurationWithSslEnabled() {
+ DmaapConsumerConfiguration dmaapConsumerConfiguration = mock(DmaapConsumerConfiguration.class);
+ when(dmaapConsumerConfiguration.enableDmaapCertAuth()).thenReturn(true);
+ when(dmaapConsumerConfiguration.keyStore()).thenReturn(KEY_STORE);
+ when(dmaapConsumerConfiguration.keyStorePassword()).thenReturn(KEY_STORE_PASS);
+ when(dmaapConsumerConfiguration.trustStore()).thenReturn(TRUST_STORE);
+ when(dmaapConsumerConfiguration.trustStorePassword()).thenReturn(TRUST_STORE_PASS);
+ return dmaapConsumerConfiguration;
+ }
+}
\ No newline at end of file
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.service.consumer;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.springframework.web.reactive.function.client.WebClient;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/5/18
- */
-class DMaaPReactiveWebClientTest {
-
-
- @Test
- void builder_shouldBuildDMaaPReactiveWebClient() {
- //given
- DmaapConsumerConfiguration dmaapConsumerConfiguration = mock(DmaapConsumerConfiguration.class);
- String dmaaPContentType = "*/*";
- String dmaaPUserName = "DMaaP";
- String dmaaPUserPassword = "DMaaP";
-
- //when
- when(dmaapConsumerConfiguration.dmaapContentType()).thenReturn(dmaaPContentType);
- when(dmaapConsumerConfiguration.dmaapUserName()).thenReturn(dmaaPUserName);
- when(dmaapConsumerConfiguration.dmaapUserPassword()).thenReturn(dmaaPUserPassword);
- WebClient dmaapreactiveWebClient = new DMaaPReactiveWebClient()
- .build();
-
- //then
- Assertions.assertNotNull(dmaapreactiveWebClient);
-
- }
-}
\ No newline at end of file