2 * ============LICENSE_START=======================================================
3 * PNF-REGISTRATION-HANDLER
4 * ================================================================================
5 * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.dcaegen2.services.prh.tasks.commit;
23 import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
26 import java.util.concurrent.CountDownLatch;
27 import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
28 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
29 import org.onap.dcaegen2.services.prh.tasks.AaiProducerTask;
30 import org.onap.dcaegen2.services.prh.tasks.AaiQueryTask;
31 import org.onap.dcaegen2.services.prh.tasks.BbsActionsTask;
32 import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
33 import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
36 import org.slf4j.Marker;
37 import org.slf4j.MarkerFactory;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Qualifier;
43 import org.springframework.boot.configurationprocessor.json.JSONException;
44 import org.springframework.context.annotation.Profile;
45 import org.springframework.http.HttpStatus;
46 import org.springframework.stereotype.Component;
47 import reactor.core.publisher.Flux;
48 import reactor.core.publisher.Mono;
51 * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a> on 3/13/23
53 @Profile("autoCommitDisabled")
55 public class ScheduledTasksWithCommit {
57 private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasksWithCommit.class);
58 private static Boolean pnfFound = true;
59 private KafkaConsumerTask kafkaConsumerTask;
60 private DmaapPublisherTask dmaapReadyProducerTask;
61 private DmaapPublisherTask dmaapUpdateProducerTask;
62 private AaiQueryTask aaiQueryTask;
63 private AaiProducerTask aaiProducerTask;
64 private BbsActionsTask bbsActionsTask;
65 private Map<String, String> mdcContextMap;
68 * Constructor for tasks registration in PRHWorkflow.
70 * @param kafkaConsumerTask - fist task
71 * @param dmaapReadyPublisherTask - third task
72 * @param dmaapUpdatePublisherTask - fourth task
73 * @param aaiPublisherTask - second task
76 public ScheduledTasksWithCommit(
77 final KafkaConsumerTask kafkaConsumerTask,
78 @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask,
79 @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask,
80 final AaiQueryTask aaiQueryTask,
81 final AaiProducerTask aaiPublisherTask,
82 final BbsActionsTask bbsActionsTask,
83 final Map<String, String> mdcContextMap) {
84 this.dmaapReadyProducerTask = dmaapReadyPublisherTask;
85 this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask;
86 this.kafkaConsumerTask=kafkaConsumerTask;
87 this.aaiQueryTask = aaiQueryTask;
88 this.aaiProducerTask = aaiPublisherTask;
89 this.bbsActionsTask = bbsActionsTask;
90 this.mdcContextMap = mdcContextMap;
94 public ConsumerDmaapModel dmaapModel;
95 public Boolean activationStatus;
97 public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) {
98 this.dmaapModel = dmaapModel;
99 this.activationStatus = activationStatus;
103 public void scheduleKafkaPrhEventTask() {
104 MdcVariables.setMdcContextMap(mdcContextMap);
106 LOGGER.info("Execution of tasks was registered with commit");
107 CountDownLatch mainCountDownLatch = new CountDownLatch(1);
108 consumeFromKafkaMessage()
109 .flatMap(model->queryAaiForPnf(model)
110 .doOnError(e -> { LOGGER.info("PNF Not Found in AAI --> {}" + e);
113 .onErrorResume(e -> Mono.empty())
116 .flatMap(this::queryAaiForConfiguration)
117 .flatMap(this::publishToAaiConfiguration)
118 .flatMap(this::processAdditionalFields)
119 .flatMap(this::publishToDmaapConfiguration)
120 .onErrorResume(e -> Mono.empty())
122 .doOnTerminate(mainCountDownLatch::countDown)
123 .subscribe(this::onSuccess, this::onError, this::onCompleteKafka);
124 mainCountDownLatch.await();
125 } catch (InterruptedException | JSONException e ) {
126 LOGGER.warn("Interruption problem on countDownLatch {}", e);
127 Thread.currentThread().interrupt();
131 private static void disableCommit()
136 private void onCompleteKafka() {
137 LOGGER.info("PRH tasks have been completed");
139 kafkaConsumerTask.commitOffset();
140 LOGGER.info("Committed the Offset");
144 LOGGER.info("Offset not Committed");
150 private void onSuccess(MessageRouterPublishResponse response) {
151 if (response.successful()) {
152 String statusCodeOk = HttpStatus.OK.name();
153 MDC.put(RESPONSE_CODE, statusCodeOk);
154 LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCodeOk);
155 MDC.remove(RESPONSE_CODE);
159 private void onError(Throwable throwable) {
160 if (!(throwable instanceof DmaapEmptyResponseException)) {
161 LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow {}", throwable);
165 private Flux<ConsumerDmaapModel> consumeFromKafkaMessage() throws JSONException {
166 return kafkaConsumerTask.execute();
169 private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) {
171 .execute(monoDMaaPModel)
172 .map(x -> new State(monoDMaaPModel, x));
175 private Mono<ConsumerDmaapModel> queryAaiForPnf(final ConsumerDmaapModel monoDMaaPModel) {
177 LOGGER.info("Find PNF --> "+monoDMaaPModel.getCorrelationId());
178 return aaiQueryTask.findPnfinAAI(monoDMaaPModel);
182 private Mono<State> publishToAaiConfiguration(final State state) {
184 return aaiProducerTask
185 .execute(state.dmaapModel)
187 } catch (PrhTaskException e) {
188 LOGGER.warn("AAIProducerTask exception has been registered: {}", e);
189 return Mono.error(e);
193 private Mono<State> processAdditionalFields(final State state) {
194 if (state.activationStatus) {
195 LOGGER.debug("Re-registration - Logical links won't be updated.");
196 return Mono.just(state);
198 return bbsActionsTask.execute(state.dmaapModel).map(x -> state);
201 private Flux<MessageRouterPublishResponse> publishToDmaapConfiguration(final State state) {
203 if (state.activationStatus) {
204 LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
205 return dmaapUpdateProducerTask.execute(state.dmaapModel);
207 return dmaapReadyProducerTask.execute(state.dmaapModel);
208 } catch (PrhTaskException e) {
209 LOGGER.warn("DMaaPProducerTask exception has been registered: ", e);
210 return Flux.error(e);