)
.flatMap(this::queryAaiForConfiguration)
.flatMap(this::publishToAaiConfiguration)
- .doOnError(exception ->
- LOGGER.warn("AAIProducerTask exception has been registered: ", exception))
- .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
.flatMap(this::processAdditionalFields)
- .doOnError(exception ->
- LOGGER.warn("BBSActionsTask exception has been registered: ", exception))
.flatMap(this::publishToDmaapConfiguration)
- .doOnError(exception ->
- LOGGER.warn("DMaaPProducerTask exception has been registered: ", exception))
.onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
.doOnTerminate(mainCountDownLatch::countDown)
.subscribe(this::onSuccess, this::onError, this::onComplete);
.execute(state.dmaapModel)
.map(x -> state);
} catch (PrhTaskException | SSLException e) {
+ LOGGER.warn("AAIProducerTask exception has been registered: ", e);
return Mono.error(e);
}
}
private Mono<State> processAdditionalFields(final State state) {
if (state.activationStatus) {
LOGGER.debug("Re-registration - Logical links won't be updated.");
-
return Mono.just(state);
}
-
return bbsActionsTask.execute(state.dmaapModel).map(x -> state);
}
- private Flux<MessageRouterPublishResponse>
- publishToDmaapConfiguration(final State state) {
+ private Flux<MessageRouterPublishResponse> publishToDmaapConfiguration(final State state) {
try {
if (state.activationStatus) {
LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
return dmaapUpdateProducerTask.execute(state.dmaapModel);
}
-
return dmaapReadyProducerTask.execute(state.dmaapModel);
} catch (PrhTaskException e) {
+ LOGGER.warn("DMaaPProducerTask exception has been registered: ", e);
return Flux.error(e);
}
}