public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
- return monoMessage.flatMap(message ->
- {
- if (!StringUtils.isEmpty(message)) {
- JsonElement jsonElement = new JsonParser().parse(message);
- ConsumerDmaapModel consumerDmaapModel;
- try {
- if (jsonElement.isJsonObject()) {
- consumerDmaapModel = create(jsonElement.getAsJsonObject());
- } else {
- consumerDmaapModel = create(
- StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
- .flatMap(this::getJsonObjectFromAnArray)
- .orElseThrow(DmaapEmptyResponseException::new));
- }
- logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
- return Mono.just(consumerDmaapModel);
- } catch (DmaapNotFoundException | DmaapEmptyResponseException e) {
- return Mono.error(e);
- }
- }
- return Mono.error(new DmaapEmptyResponseException());
- });
+ return monoMessage
+ .flatMap(message -> (StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
+ : convertJsonToConsumerDmaapModel(message)));
+ }
+
+ private Mono<? extends ConsumerDmaapModel> convertJsonToConsumerDmaapModel(String message) {
+ try {
+ JsonElement jsonElement = new JsonParser().parse(message);
+ ConsumerDmaapModel consumerDmaapModel = jsonElement.isJsonObject() ?
+ create(jsonElement.getAsJsonObject()) :
+ getConsumerDmaapModelFromJsonArray(jsonElement);
+ logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
+ return Mono.just(consumerDmaapModel);
+ } catch (DmaapNotFoundException | DmaapEmptyResponseException e) {
+ return Mono.error(e);
+ }
+ }
+
+ private ConsumerDmaapModel getConsumerDmaapModelFromJsonArray(JsonElement jsonElement)
+ throws DmaapNotFoundException, DmaapEmptyResponseException {
+ return create(
+ StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
+ .flatMap(this::getJsonObjectFromAnArray)
+ .orElseThrow(DmaapEmptyResponseException::new));
}
public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
-import org.onap.dcaegen2.services.prh.service.HttpUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
WebClient buildWebClient() {
- DmaapConsumerConfiguration dmaapConsumerConfiguration = resolveConfiguration();
- return new DMaaPReactiveWebClient.WebClientBuilder()
- .dmaapContentType(dmaapConsumerConfiguration.dmaapContentType())
- .dmaapUserName(dmaapConsumerConfiguration.dmaapUserName())
- .dmaapUserPassword(dmaapConsumerConfiguration.dmaapUserPassword()).build();
+ return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
}
}
*/
package org.onap.dcaegen2.services.prh.tasks;
-import java.util.Optional;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
@Override
DMaaPConsumerReactiveHttpClient resolveClient() {
- return Optional.ofNullable(dMaaPConsumerReactiveHttpClient)
- .orElseGet(() -> new DMaaPConsumerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(
- buildWebClient()));
+ return dMaaPConsumerReactiveHttpClient == null
+ ? new DMaaPConsumerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient())
+ : dMaaPConsumerReactiveHttpClient;
}
}
*/
abstract class DmaapPublisherTask {
- abstract Mono<Integer> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+ abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
abstract DMaaPProducerReactiveHttpClient resolveClient();
protected abstract DmaapPublisherConfiguration resolveConfiguration();
- protected abstract Mono<Integer> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+ protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
WebClient buildWebClient() {
- DmaapPublisherConfiguration dmaapPublisherConfiguration = resolveConfiguration();
- return new DMaaPReactiveWebClient.WebClientBuilder()
- .dmaapContentType(dmaapPublisherConfiguration.dmaapContentType())
- .dmaapUserName(dmaapPublisherConfiguration.dmaapUserName())
- .dmaapUserPassword(dmaapPublisherConfiguration.dmaapUserPassword()).build();
+ return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
}
}
}
@Override
- Mono<Integer> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
+ Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
consumerDmaapModel);
- return dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel).map(Integer::parseInt);
+ return dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
}
@Override
- public Mono<Integer> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException {
+ public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
.orElseThrow(() -> new DmaapNotFoundException("Invoked null object to DMaaP task"));
dMaaPProducerReactiveHttpClient = resolveClient();
@Override
DMaaPProducerReactiveHttpClient resolveClient() {
- return Optional.ofNullable(dMaaPProducerReactiveHttpClient)
- .orElseGet(() -> new DMaaPProducerReactiveHttpClient(resolveConfiguration())
- .createDMaaPWebClient(buildWebClient()));
+ return dMaaPProducerReactiveHttpClient == null
+ ? new DMaaPProducerReactiveHttpClient(resolveConfiguration()).createDMaaPWebClient(buildWebClient())
+ : dMaaPProducerReactiveHttpClient;
}
}
\ No newline at end of file
public void scheduleMainPrhEventTask() {
logger.trace("Execution of tasks was registered");
- Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
+ Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
.doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
.map(this::publishToAAIConfiguration)
.flatMap(this::publishToDMaaPConfiguration)
logger.info("PRH tasks have been completed");
}
- private void onSuccess(Integer responseCode) {
+ private void onSuccess(String responseCode) {
logger.info("Prh consumed tasks. HTTP Response code {}", responseCode);
}
try {
return Mono.just(aaiProducerTask.execute(dmaapModel));
} catch (PrhTaskException e) {
- logger.warn("Exception in A&AIProducer task ", e);
return Mono.error(e);
}
});
}
- private Mono<Integer> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) {
+ private Mono<String> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) {
try {
return dmaapProducerTask.execute(monoAAIModel);
} catch (PrhTaskException e) {
- logger.warn("Exception in DMaaPProducer task ", e);
return Mono.error(e);
}
}
Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(message)))
- .expectSubscription().expectError(DmaapNotFoundException.class);
+ .expectSubscription().expectError(DmaapNotFoundException.class).verify();
}
+ ":\"Normal\",\"reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\"<<SerialNumber>>\","
+ "\"sourceName\":\"5GRAN_DU\",\"startEpochMicrosec\":1519837825682,\"version\":3}}}]";
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessage)))
- .expectSubscription().expectError(DmaapNotFoundException.class);
+ .expectSubscription().expectError(DmaapNotFoundException.class).verify();
}
@Test
+ "\"pnfType\":\"AirScale\"}}}]";
StepVerifier
.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutPnfVendorAndSerialNumber)))
- .expectSubscription().expectError(DmaapNotFoundException.class);
+ .expectSubscription().expectError(DmaapNotFoundException.class).verify();
}
@Test
+ "\"AJ02\",\"pnfSerialNumber\":\"QTFCOC540002E\",\"pnfSoftwareVersion\":\"v4.5.0.1\",\"pnfType\":\"AirScale\","
+ "\"pnfVendorName\":\"Nokia\"}}}]";
StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIPInformation)))
- .expectSubscription().expectError(DmaapNotFoundException.class);
+ .expectSubscription().expectError(DmaapNotFoundException.class).verify();
}
}
*/
package org.onap.dcaegen2.services.prh.tasks;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
//then
StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
- .expectError(DmaapEmptyResponseException.class);
+ .expectError(DmaapEmptyResponseException.class).verify();
verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
}
//then
verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
- Assertions.assertEquals(consumerDmaapModel, response.block());
+ assertEquals(consumerDmaapModel, response.block());
}
*/
package org.onap.dcaegen2.services.prh.tasks;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
Executable executableFunction = () -> dmaapPublisherTask.execute(null);
//then
- Assertions
- .assertThrows(PrhTaskException.class, executableFunction, "The specified parameter is incorrect");
+ assertThrows(PrhTaskException.class, executableFunction, "The specified parameter is incorrect");
}
@Test
//when
StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription()
- .expectNext(HttpStatus.OK.value());
+ .expectNext(HttpStatus.OK.toString()).verifyComplete();
//then
verify(dMaaPProducerReactiveHttpClient, times(1))
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
+
@Test
public void whenPassedObjectFits_butIncorrectResponseReturns() throws DmaapNotFoundException {
//given
//when
StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription()
- .expectError(PrhTaskException.class);
+ .expectNext(String.valueOf(HttpStatus.UNAUTHORIZED.value())).verifyComplete();
//then
verify(dMaaPProducerReactiveHttpClient, times(1)).getDMaaPProducerResponse(any(Mono.class));
private void prepareMocksForTests(Integer httpResponseCode) {
dMaaPProducerReactiveHttpClient = mock(DMaaPProducerReactiveHttpClient.class);
when(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any(Mono.class)))
- .thenReturn(Mono.just(httpResponseCode));
+ .thenReturn(Mono.just(httpResponseCode.toString()));
dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
<artifactId>spring-boot-starter-reactor-netty</artifactId>
<version>2.0.4.RELEASE</version>
</dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </dependency>
<dependency>
<groupId>org.onap.dcaegen2.services.prh</groupId>
<artifactId>prh-commons</artifactId>
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+import org.onap.dcaegen2.services.prh.config.DmaapCustomConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- private DMaaPReactiveWebClient() {
+ private String dMaaPContentType;
+ private String dMaaPUserName;
+ private String dMaaPUserPassword;
+
+ public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
+ this.dMaaPUserName = dmaapCustomConfig.dmaapUserName();
+ this.dMaaPUserPassword = dmaapCustomConfig.dmaapUserPassword();
+ this.dMaaPContentType = dmaapCustomConfig.dmaapContentType();
+ return this;
}
- private WebClient create(WebClientBuilder webClientBuilder) {
+ public WebClient build() {
return WebClient.builder()
- .defaultHeader(HttpHeaders.CONTENT_TYPE, webClientBuilder.dMaaPContentType)
- .filter(basicAuthentication(webClientBuilder.dMaaPUserName, webClientBuilder.dMaaPUserPassword))
+ .defaultHeader(HttpHeaders.CONTENT_TYPE, dMaaPContentType)
+ .filter(basicAuthentication(dMaaPUserName, dMaaPUserPassword))
.filter(logRequest())
.filter(logResponse())
.build();
}
- ExchangeFilterFunction logResponse() {
+ private ExchangeFilterFunction logResponse() {
return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
logger.info("Response Status {}", clientResponse.statusCode());
return Mono.just(clientResponse);
});
}
- ExchangeFilterFunction logRequest() {
+ private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
});
}
- public static class WebClientBuilder {
-
- private String dMaaPContentType;
- private String dMaaPUserName;
- private String dMaaPUserPassword;
-
- public WebClientBuilder() {
- }
-
- public WebClientBuilder dmaapContentType(String dmaapContentType) {
- this.dMaaPContentType = dmaapContentType;
- return this;
- }
-
- public WebClientBuilder dmaapUserName(String dmaapUserName) {
- this.dMaaPUserName = dmaapUserName;
- return this;
- }
-
- public WebClientBuilder dmaapUserPassword(String dmaapUserPassword) {
- this.dMaaPUserPassword = dmaapUserPassword;
- return this;
- }
-
- public WebClient build() {
- return new DMaaPReactiveWebClient().create(this);
- }
- }
}
+++ /dev/null
-/*-
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcaegen2.services.prh.service;
-
-import org.apache.http.HttpStatus;
-
-public final class HttpUtils implements HttpStatus {
-
- private HttpUtils() {}
-
- public static boolean isSuccessfulResponseCode(Integer statusCode) {
- return statusCode >= 200 && statusCode < 300;
- }
-}
*/
package org.onap.dcaegen2.services.prh.service;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.springframework.web.reactive.function.client.WebClient;
/**
*/
public class DMaaPReactiveWebClientTest {
+
@Test
public void builder_shouldBuildDMaaPReactiveWebClient() {
//given
+ DmaapConsumerConfiguration dmaapConsumerConfiguration = mock(DmaapConsumerConfiguration.class);
WebClient dMaaPReactiveWebClient;
String dMaaPContentType = "*/*";
String dMaaPUserName = "DMaaP";
String dMaaPUserPassword = "DMaaP";
//when
- dMaaPReactiveWebClient = new DMaaPReactiveWebClient.WebClientBuilder()
- .dmaapContentType(dMaaPContentType)
- .dmaapUserName(dMaaPUserName)
- .dmaapUserPassword(dMaaPUserPassword).build();
+ when(dmaapConsumerConfiguration.dmaapContentType()).thenReturn(dMaaPContentType);
+ when(dmaapConsumerConfiguration.dmaapUserName()).thenReturn(dMaaPUserName);
+ when(dmaapConsumerConfiguration.dmaapUserPassword()).thenReturn(dMaaPUserPassword);
+ dMaaPReactiveWebClient = new DMaaPReactiveWebClient()
+ .fromConfiguration(dmaapConsumerConfiguration)
+ .build();
//then
Assertions.assertNotNull(dMaaPReactiveWebClient);
}).verifyComplete();
}
-
- @Test
- public void getHttpResponse_HttpResponse4xxClientError() {
-
- //when
- mockDependantObjects();
- doAnswer(invocationOnMock -> Mono.error(new Exception("400")))
- .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("400")));
- DMaaPConsumerReactiveHttpClient.createDMaaPWebClient(webClient);
-
- //then
- StepVerifier.create(DMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).expectSubscription()
- .expectError(Exception.class);
-
- }
-
- @Test
- public void getHttpResponse_HttpResponse5xxClientError() {
-
- //when
- mockDependantObjects();
- doAnswer(invocationOnMock -> Mono.error(new Exception("500")))
- .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("500")));
- DMaaPConsumerReactiveHttpClient.createDMaaPWebClient(webClient);
-
- //then
- StepVerifier.create(DMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).expectSubscription()
- .expectError(Exception.class);
- }
-
@Test
public void getHttpResponse_whenURISyntaxExceptionHasBeenThrown() throws URISyntaxException {
//given
//then
StepVerifier.create(DMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).expectSubscription()
- .expectError(Exception.class);
+ .expectError(Exception.class).verify();
}
private void mockDependantObjects() {
package org.onap.dcaegen2.services.prh.service.producer;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest;
import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
Assertions.assertEquals(response.block(), expectedResult.block());
}
- @Test
- public void getHttpResponse_HttpResponse4xxClientError() {
- //when
- mockWebClientDependantObject();
-
- doAnswer(invocationOnMock -> Mono.error(new Exception("400")))
- .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("400")));
- dMaaPProducerReactiveHttpClient.createDMaaPWebClient(webClient);
-
- //then
- StepVerifier.create(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(Mono.just(consumerDmaapModel)))
- .expectSubscription()
- .expectError(Exception.class);
-
- }
-
- @Test
- public void getHttpResponse_HttpResponse5xxClientError() {
-
- //when
- mockWebClientDependantObject();
- doAnswer(invocationOnMock -> Mono.error(new Exception("500")))
- .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("500")));
- dMaaPProducerReactiveHttpClient.createDMaaPWebClient(webClient);
-
- //then
- StepVerifier.create(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(Mono.just(consumerDmaapModel)))
- .expectSubscription()
- .expectError(Exception.class);
- }
-
@Test
public void getHttpResponse_whenURISyntaxExceptionHasBeenThrown() throws URISyntaxException {
//given
//then
StepVerifier.create(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any())).expectSubscription()
- .expectError(Exception.class);
+ .expectError(Exception.class).verify();
}
private void mockWebClientDependantObject() {