package org.onap.dcaegen2.services.prh.tasks;
-import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
-abstract class DmaapConsumerTask {
+interface DmaapConsumerTask {
- abstract Flux<ConsumerDmaapModel> consume(Mono<String> message);
+ void initConfigs();
- abstract DMaaPConsumerReactiveHttpClient resolveClient();
+ Flux<ConsumerDmaapModel> execute(String object);
- abstract void initConfigs();
+ Flux<ConsumerDmaapModel> consume(Mono<String> message);
- protected abstract DmaapConsumerConfiguration resolveConfiguration();
-
- protected abstract Flux<ConsumerDmaapModel> execute(String object);
-
- WebClient buildWebClient() {
- return new DMaaPReactiveWebClient().build();
- }
+ DMaaPConsumerReactiveHttpClient resolveClient();
}
package org.onap.dcaegen2.services.prh.tasks;
-import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import org.slf4j.Logger;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
@Component
-public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
+public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
- private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+ private final DmaapConsumerJsonParser dmaapConsumerJsonParser;
+ private final DMaaPReactiveWebClient dmaapReactiveWebClient;
@Autowired
public DmaapConsumerTaskImpl(Config config) {
- this.config = config;
- this.dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ this(config, new DmaapConsumerJsonParser(), new DMaaPReactiveWebClient());
}
- DmaapConsumerTaskImpl(AppConfig prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
+ DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser,
+ DMaaPReactiveWebClient dmaapReactiveWebClient) {
this.config = prhAppConfig;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
+ this.dmaapReactiveWebClient = dmaapReactiveWebClient;
}
@Override
- Flux<ConsumerDmaapModel> consume(Mono<String> message) {
- return dmaapConsumerJsonParser.getJsonObject(message);
+ public void initConfigs() {
+ config.initFileStreamReader();
}
@Override
}
@Override
- void initConfigs() {
- config.initFileStreamReader();
- }
-
- @Override
- protected DmaapConsumerConfiguration resolveConfiguration() {
- return config.getDmaapConsumerConfiguration();
+ public Flux<ConsumerDmaapModel> consume(Mono<String> message) {
+ return dmaapConsumerJsonParser.getJsonObject(message);
}
@Override
- DMaaPConsumerReactiveHttpClient resolveClient() {
- return new DMaaPConsumerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient());
+ public DMaaPConsumerReactiveHttpClient resolveClient() {
+ return new DMaaPConsumerReactiveHttpClient(
+ config.getDmaapConsumerConfiguration()).createDMaaPWebClient(dmaapReactiveWebClient.build());
}
}
package org.onap.dcaegen2.services.prh.tasks;
-import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.producer.DMaaPPublisherReactiveHttpClient;
import org.springframework.http.ResponseEntity;
-import org.springframework.web.client.RestTemplate;
import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
-abstract class DmaapPublisherTask {
+interface DmaapPublisherTask {
- abstract Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
+ Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
- abstract DMaaPProducerReactiveHttpClient resolveClient();
+ Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
- protected abstract DmaapPublisherConfiguration resolveConfiguration();
-
- protected abstract Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel)
- throws PrhTaskException;
-
- abstract RestTemplate buildWebClient();
+ DMaaPPublisherReactiveHttpClient resolveClient();
}
package org.onap.dcaegen2.services.prh.tasks;
-import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.producer.DMaaPPublisherReactiveHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
@Component
-public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
+public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
private final Config config;
- private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
+ private DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient;
@Autowired
public DmaapPublisherTaskImpl(Config config) {
this.config = config;
}
- @Override
- Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) {
- return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
- }
-
@Override
public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
- dmaapProducerReactiveHttpClient = resolveClient();
+ dmaapPublisherReactiveHttpClient = resolveClient();
LOGGER.info("Method called with arg {}", consumerDmaapModel);
return publish(consumerDmaapModel);
}
@Override
- RestTemplate buildWebClient() {
- return new RestTemplate();
- }
-
- @Override
- protected DmaapPublisherConfiguration resolveConfiguration() {
- return config.getDmaapPublisherConfiguration();
+ public Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) {
+ return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
}
@Override
- DMaaPProducerReactiveHttpClient resolveClient() {
- return new DMaaPProducerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient());
+ public DMaaPPublisherReactiveHttpClient resolveClient() {
+ return new DMaaPPublisherReactiveHttpClient(config.getDmaapPublisherConfiguration())
+ .createDMaaPWebClient(new RestTemplate());
}
}
\ No newline at end of file
import static org.mockito.Mockito.when;
import static org.onap.dcaegen2.services.prh.TestAppConfiguration.createDefaultDmaapConsumerConfiguration;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonParser;
import java.util.Optional;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
import reactor.core.publisher.Flux;
private static AppConfig appConfig;
private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
private static String message;
- private static String parsed;
@BeforeAll
static void setUp() {
+ " \"softwareVersion\": \"v4.5.0.1\","
+ " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
+ "}}}]";
-
- parsed = "{\"event\": {"
- + "\"commonEventHeader\": { \"sourceName\":\"NOKQTFCOC540002E\"},"
- + "\"pnfRegistrationFields\": {"
- + " \"unitType\": \"AirScale\","
- + " \"serialNumber\": \"QTFCOC540002E\","
- + " \"pnfRegistrationFieldsVersion\": \"2.0\","
- + " \"manufactureDate\": \"1535014037024\","
- + " \"modelNumber\": \"7BEA\",\n"
- + " \"lastServiceDate\": \"1535014037024\","
- + " \"unitFamily\": \"BBU\","
- + " \"vendorName\": \"Nokia\","
- + " \"oamV4IpAddress\": \"10.16.123.234\","
- + " \"softwareVersion\": \"v4.5.0.1\","
- + " \"oamV6IpAddress\": \"0:0:0:0:0:FFFF:0A10:7BEA\""
- + "}}}";
}
@Test
assertEquals(consumerDmaapModel, response.blockFirst());
}
+ @Test
+ void whenInitConfigs_initStreamReader() {
+ //when
+ dmaapConsumerTask.initConfigs();
+
+ //then
+ verify(appConfig).initFileStreamReader();
+ }
+
private void prepareMocksForDmaapConsumer(Optional<String> message) {
- DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
- JsonElement jsonElement = new JsonParser().parse(parsed);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
- .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(message.orElse("")));
when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
- dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerJsonParser));
- when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
+ DMaaPReactiveWebClient dmaapReactiveWebClient = mock(DMaaPReactiveWebClient.class);
+ dmaapConsumerTask =
+ spy(new DmaapConsumerTaskImpl(appConfig, new DmaapConsumerJsonParser(), dmaapReactiveWebClient));
doReturn(dMaaPConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
}
}
\ No newline at end of file
DmaapConsumerTaskImpl dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig));
DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient = mock(
DMaaPConsumerReactiveHttpClient.class);
- doReturn(mock(DmaapConsumerConfiguration.class)).when(dmaapConsumerTask).resolveConfiguration();
doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
return dmaapConsumerTask;
}
import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
-import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.producer.DMaaPPublisherReactiveHttpClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
AppConfig appConfig = spy(AppConfig.class);
doReturn(mock(DmaapPublisherConfiguration.class)).when(appConfig).getDmaapPublisherConfiguration();
DmaapPublisherTaskImpl dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
- DMaaPProducerReactiveHttpClient extendedDmaapProducerHttpClient = mock(
- DMaaPProducerReactiveHttpClient.class);
- doReturn(mock(DmaapPublisherConfiguration.class)).when(dmaapPublisherTask).resolveConfiguration();
+ DMaaPPublisherReactiveHttpClient extendedDmaapProducerHttpClient = mock(
+ DMaaPPublisherReactiveHttpClient.class);
doReturn(extendedDmaapProducerHttpClient).when(dmaapPublisherTask).resolveClient();
return dmaapPublisherTask;
}
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.producer.DMaaPPublisherReactiveHttpClient;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;
private static ConsumerDmaapModel consumerDmaapModel;
private static DmaapPublisherTaskImpl dmaapPublisherTask;
- private static DMaaPProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
+ private static DMaaPPublisherReactiveHttpClient dMaaPPublisherReactiveHttpClient;
private static AppConfig appConfig;
private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
.expectNext(responseEntity).verifyComplete();
//then
- verify(dMaaPProducerReactiveHttpClient, times(1))
+ verify(dMaaPPublisherReactiveHttpClient, times(1))
.getDMaaPProducerResponse(consumerDmaapModel);
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ verifyNoMoreInteractions(dMaaPPublisherReactiveHttpClient);
}
.expectNext(responseEntity).verifyComplete();
//then
- verify(dMaaPProducerReactiveHttpClient, times(1))
+ verify(dMaaPPublisherReactiveHttpClient, times(1))
.getDMaaPProducerResponse(consumerDmaapModel);
- verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
+ verifyNoMoreInteractions(dMaaPPublisherReactiveHttpClient);
}
private ResponseEntity<String> prepareMocksForTests(Integer httpResponseCode) {
ResponseEntity<String> responseEntity = mock(ResponseEntity.class);
- //when
when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(httpResponseCode));
- dMaaPProducerReactiveHttpClient = mock(DMaaPProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any()))
+ dMaaPPublisherReactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class);
+ when(dMaaPPublisherReactiveHttpClient.getDMaaPProducerResponse(any()))
.thenReturn(Mono.just(responseEntity));
dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
- when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
- doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
+ doReturn(dMaaPPublisherReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
return responseEntity;
}
}
\ No newline at end of file
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
*/
-public class DMaaPProducerReactiveHttpClient {
+public class DMaaPPublisherReactiveHttpClient {
private final String dmaapHostName;
private final Integer dmaapPortNumber;
private RestTemplate restTemplate;
/**
- * Constructor DMaaPProducerReactiveHttpClient.
+ * Constructor DMaaPPublisherReactiveHttpClient.
*
* @param dmaapPublisherConfiguration - DMaaP producer configuration object
*/
- public DMaaPProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
+ public DMaaPPublisherReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
}
- public DMaaPProducerReactiveHttpClient createDMaaPWebClient(RestTemplate restTemplate) {
+ public DMaaPPublisherReactiveHttpClient createDMaaPWebClient(RestTemplate restTemplate) {
this.restTemplate = restTemplate;
return this;
}
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
*/
-class DMaaPProducerReactiveHttpClientTest {
+class DMaaPPublisherReactiveHttpClientTest {
- private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
+ private DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient;
private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(
DmaapPublisherConfiguration.class);
when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("PRH");
when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn("application/json");
when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.PNF_READY");
- dmaapProducerReactiveHttpClient = new DMaaPProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
+ dmaapPublisherReactiveHttpClient = new DMaaPPublisherReactiveHttpClient(dmaapPublisherConfigurationMock);
}
when(mockedResponseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(responseSuccess));
doReturn(mockedResponseEntity).when(restTemplate)
.exchange(any(URI.class), any(HttpMethod.class), any(HttpEntity.class), (Class<Object>) any());
- dmaapProducerReactiveHttpClient.createDMaaPWebClient(restTemplate);
+ dmaapPublisherReactiveHttpClient.createDMaaPWebClient(restTemplate);
//then
- StepVerifier.create(dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel))
+ StepVerifier.create(dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel))
.expectSubscription().expectNext(mockedResponseEntity).verifyComplete();
}
@Test
void getAppropriateUri_whenPassingCorrectedPathForPnf() {
- Assertions.assertEquals(dmaapProducerReactiveHttpClient.getUri(),
+ Assertions.assertEquals(dmaapPublisherReactiveHttpClient.getUri(),
URI.create("https://54.45.33.2:1234/unauthenticated.PNF_READY"));
}
}
\ No newline at end of file