/**
* Creating AaiReactiveWebClient.
+ *
* @param configuration - configuration object
* @return AaiReactiveWebClient
*/
*/
public WebClient build() {
return WebClient.builder()
- .defaultHeaders(httpHeaders -> httpHeaders.setAll(aaiHeaders))
- .filter(basicAuthentication(aaiUserName, aaiUserPassword))
- .filter(logRequest())
- .filter(logResponse())
- .build();
+ .defaultHeaders(httpHeaders -> httpHeaders.setAll(aaiHeaders))
+ .filter(basicAuthentication(aaiUserName, aaiUserPassword))
+ .filter(logRequest())
+ .filter(logResponse())
+ .build();
}
private ExchangeFilterFunction logRequest() {
return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
clientRequest.headers()
- .forEach((name, values) -> values.forEach(value -> logger.info("{}={}",name, value)));
+ .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
return Mono.just(clientRequest);
});
}
import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
import org.onap.dcaegen2.services.prh.exceptions.AaiRequestException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
private final String aaiProtocol;
private final Integer aaiHostPortNumber;
private final String aaiBasePath;
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* @return status code of operation
*/
public Mono<Integer> getAaiProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) {
- return consumerDmaapModelMono.flatMap(this::patchAaiRequest);
+ return consumerDmaapModelMono
+ .doOnNext(consumerDmaapModel -> logger.info("Sending PNF model to AAI {}", consumerDmaapModel))
+ .flatMap(this::patchAaiRequest);
}
public AaiProducerReactiveHttpClient createAaiWebClient(WebClient webClient) {
.retrieve()
.onStatus(
HttpStatus::is4xxClientError,
- clientResponse -> Mono.error(new AaiRequestException("HTTP 400"))
+ clientResponse -> Mono
+ .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode()))
)
.onStatus(HttpStatus::is5xxServerError,
- clientResponse -> Mono.error(new AaiRequestException("HTTP 500")))
+ clientResponse -> Mono
+ .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode())))
.bodyToMono(Integer.class);
} catch (URISyntaxException e) {
return Mono.error(e);
"dmaapConsumerConfiguration": {
"dmaapHostName": "localhost",
"dmaapPortNumber": 2222,
- "dmaapTopicName": "/events/unauthenticated.SEC_OTHER_OUTPUT",
+ "dmaapTopicName": "/events/unauthenticated.VES_PNFREG_OUTPUT",
"dmaapProtocol": "http",
"dmaapUserName": "admin",
"dmaapUserPassword": "admin",
@EnableScheduling
public class SchedulerConfig extends CloudConfiguration {
- private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 2000;
- private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 1;
+ private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 5;
+ private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>();
private final ConcurrentTaskScheduler taskScheduler;
.scheduleAtFixedRate(super::runTask, Instant.now(),
Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
scheduledPrhTaskFutureList.add(taskScheduler
- .scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask, SCHEDULING_DELAY_FOR_PRH_TASKS));
+ .scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask,
+ Duration.ofSeconds(SCHEDULING_DELAY_FOR_PRH_TASKS)));
return true;
} else {
return false;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
private static final String VENDOR_NAME = "vendorName";
private static final String SERIAL_NUMBER = "serialNumber";
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
/**
* Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}.
*
*/
public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
return monoMessage
+ .doOnNext(message -> logger.info("Consumed message from DmaaP: {}", message))
.flatMap(this::getJsonParserMessage)
.flatMap(this::createJsonConsumerModel);
}
@Override
Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
- logger.info("Sending PNF model to AAI {}", consumerDmaapModel);
+
return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel)
.flatMap(response -> {
if (HttpUtils.isSuccessfulResponseCode(response)) {
@Override
Mono<ConsumerDmaapModel> consume(Mono<String> message) {
- logger.info("Consumed model from DMaaP: {}", message);
return dmaapConsumerJsonParser.getJsonObject(message);
}
@Override
Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
- logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
- consumerDmaapModel);
- return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
+ return consumerDmaapModel.flatMap(dmaapModel -> {
+ logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
+ dmaapModel);
+ return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(dmaapModel);
+ });
}
@Override
//then
verify(dMaaPProducerReactiveHttpClient, times(1))
- .getDMaaPProducerResponse(any(Mono.class));
+ .getDMaaPProducerResponse(any());
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
.expectNext(String.valueOf(HttpStatus.UNAUTHORIZED.value())).verifyComplete();
//then
- verify(dMaaPProducerReactiveHttpClient, times(1)).getDMaaPProducerResponse(any(Mono.class));
+ verify(dMaaPProducerReactiveHttpClient, times(1)).getDMaaPProducerResponse(any());
verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
}
private void prepareMocksForTests(Integer httpResponseCode) {
dMaaPProducerReactiveHttpClient = mock(DMaaPProducerReactiveHttpClient.class);
- when(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any(Mono.class)))
+ when(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any()))
.thenReturn(Mono.just(httpResponseCode.toString()));
dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
public WebClient build() {
return WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType)
- .filter(basicAuthentication(dmaaPUserName, dmaaPUserPassword))
.filter(logRequest())
.filter(logResponse())
.build();
.uri(getUri())
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("HTTP 400"))
+ Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode()))
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("HTTP 500")))
+ Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode())))
.bodyToMono(String.class);
} catch (URISyntaxException e) {
logger.warn("Exception while evaluating URI ");
* @param consumerDmaapModelMono - object which will be sent to DMaaP
* @return status code of operation
*/
- public Mono<String> getDMaaPProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) {
+ public Mono<String> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
try {
return webClient
.post()
.body(BodyInserters.fromObject(consumerDmaapModelMono))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, clientResponse ->
- Mono.error(new Exception("HTTP 400"))
+ Mono.error(new Exception("DmaapProducer HTTP" + clientResponse.statusCode()))
)
.onStatus(HttpStatus::is5xxServerError, clientResponse ->
- Mono.error(new Exception("HTTP 500")))
+ Mono.error(new Exception("DmaapProducer HTTP " + clientResponse.statusCode())))
.bodyToMono(String.class);
} catch (URISyntaxException e) {
logger.warn("Exception while evaluating URI");
mockWebClientDependantObject();
doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
- Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(Mono.just(consumerDmaapModel));
+ Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
//then
Assertions.assertEquals(response.block(), expectedResult.block());