b0eae94930fd0d769544dce869c31620c90cc453
[dcaegen2/services/prh.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * PNF-REGISTRATION-HANDLER
4  * ================================================================================
5  * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcaegen2.services.prh.tasks.commit;
22
23 import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
24
25 import java.util.Map;
26 import java.util.concurrent.CountDownLatch;
27 import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
28 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
29 import org.onap.dcaegen2.services.prh.tasks.AaiProducerTask;
30 import org.onap.dcaegen2.services.prh.tasks.AaiQueryTask;
31 import org.onap.dcaegen2.services.prh.tasks.BbsActionsTask;
32 import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
33 import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
34 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
35 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
36 import org.slf4j.Marker;
37 import org.slf4j.MarkerFactory;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import org.slf4j.MDC;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Qualifier;
43 import org.springframework.boot.configurationprocessor.json.JSONException;
44 import org.springframework.context.annotation.Profile;
45 import org.springframework.http.HttpStatus;
46 import org.springframework.stereotype.Component;
47 import reactor.core.publisher.Flux;
48 import reactor.core.publisher.Mono;
49
50 /**
51  * @author <a href="mailto:sangeeta.bellara@t-systems.com">Sangeeta Bellara</a> on 3/13/23
52  */
53 @Profile("autoCommitDisabled")
54 @Component
55 public class ScheduledTasksWithCommit {
56
57     private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasksWithCommit.class);
58     private static Boolean pnfFound = true;
59     private KafkaConsumerTask kafkaConsumerTask;
60     private DmaapPublisherTask dmaapReadyProducerTask;
61     private DmaapPublisherTask dmaapUpdateProducerTask;
62     private AaiQueryTask aaiQueryTask;
63     private AaiProducerTask aaiProducerTask;
64     private BbsActionsTask bbsActionsTask;
65     private Map<String, String> mdcContextMap;
66
67     /**
68      * Constructor for tasks registration in PRHWorkflow.
69      *
70      * @param kafkaConsumerTask        - fist task
71      * @param dmaapReadyPublisherTask  - third task
72      * @param dmaapUpdatePublisherTask - fourth task
73      * @param aaiPublisherTask         - second task
74      */
75     @Autowired
76     public ScheduledTasksWithCommit(
77         final KafkaConsumerTask kafkaConsumerTask,
78         @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask,
79         @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask,
80         final AaiQueryTask aaiQueryTask,
81         final AaiProducerTask aaiPublisherTask,
82         final BbsActionsTask bbsActionsTask,
83         final Map<String, String> mdcContextMap) {
84         this.dmaapReadyProducerTask = dmaapReadyPublisherTask;
85         this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask;
86         this.kafkaConsumerTask=kafkaConsumerTask;
87         this.aaiQueryTask = aaiQueryTask;
88         this.aaiProducerTask = aaiPublisherTask;
89         this.bbsActionsTask = bbsActionsTask;
90         this.mdcContextMap = mdcContextMap;
91     }
92
93     static class State {
94         public ConsumerDmaapModel dmaapModel;
95         public  Boolean activationStatus;
96
97         public State(ConsumerDmaapModel dmaapModel, final Boolean activationStatus) {
98             this.dmaapModel = dmaapModel;
99             this.activationStatus = activationStatus;
100         }
101     }
102
103     public void scheduleKafkaPrhEventTask() {
104         MdcVariables.setMdcContextMap(mdcContextMap);
105         try {
106             LOGGER.info("Execution of tasks was registered with commit");
107             CountDownLatch mainCountDownLatch = new CountDownLatch(1);
108             consumeFromKafkaMessage()
109                     .flatMap(model->queryAaiForPnf(model)
110                 .doOnError(e -> { LOGGER.info("PNF Not Found in AAI --> {}" + e);
111                    disableCommit();
112                 })
113                 .onErrorResume(e -> Mono.empty())
114                 
115                 )
116                     .flatMap(this::queryAaiForConfiguration)
117                 .flatMap(this::publishToAaiConfiguration)
118                 .flatMap(this::processAdditionalFields)
119                 .flatMap(this::publishToDmaapConfiguration)
120                     .onErrorResume(e -> Mono.empty())
121                 
122                 .doOnTerminate(mainCountDownLatch::countDown)
123                 .subscribe(this::onSuccess, this::onError, this::onCompleteKafka);
124             mainCountDownLatch.await();
125         } catch (InterruptedException | JSONException e ) {
126             LOGGER.warn("Interruption problem on countDownLatch {}", e);
127             Thread.currentThread().interrupt();
128         }
129     }
130
131     private static void disableCommit()
132     {
133         pnfFound=false;
134     }
135
136     private void onCompleteKafka() {
137         LOGGER.info("PRH tasks have been completed");
138         if(pnfFound){
139             kafkaConsumerTask.commitOffset();
140             LOGGER.info("Committed the Offset");
141         }
142         else
143         {
144             LOGGER.info("Offset not Committed");
145             pnfFound=true;
146         }
147     }
148
149
150     private void onSuccess(MessageRouterPublishResponse response) {
151         if (response.successful()) {
152             String statusCodeOk = HttpStatus.OK.name();
153             MDC.put(RESPONSE_CODE, statusCodeOk);
154             LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCodeOk);
155             MDC.remove(RESPONSE_CODE);
156         }
157     }
158
159     private void onError(Throwable throwable) {
160         if (!(throwable instanceof DmaapEmptyResponseException)) {
161             LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow {}", throwable);
162         }
163     }
164
165     private Flux<ConsumerDmaapModel> consumeFromKafkaMessage() throws JSONException {
166         return kafkaConsumerTask.execute();
167     }
168
169     private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) {
170         return   aaiQueryTask
171             .execute(monoDMaaPModel)
172             .map(x -> new State(monoDMaaPModel, x));
173     }
174
175     private Mono<ConsumerDmaapModel> queryAaiForPnf(final ConsumerDmaapModel monoDMaaPModel) {
176
177         LOGGER.info("Find PNF --> "+monoDMaaPModel.getCorrelationId());
178         return aaiQueryTask.findPnfinAAI(monoDMaaPModel);
179     }
180
181
182     private Mono<State> publishToAaiConfiguration(final State state) {
183         try {
184             return aaiProducerTask
185                 .execute(state.dmaapModel)
186                 .map(x -> state);
187         } catch (PrhTaskException e) {
188             LOGGER.warn("AAIProducerTask exception has been registered: {}", e);
189             return Mono.error(e);
190         }
191     }
192
193     private Mono<State> processAdditionalFields(final State state) {
194         if (state.activationStatus) {
195             LOGGER.debug("Re-registration - Logical links won't be updated.");
196             return Mono.just(state);
197         }
198         return bbsActionsTask.execute(state.dmaapModel).map(x -> state);
199     }
200
201     private Flux<MessageRouterPublishResponse> publishToDmaapConfiguration(final State state) {
202         try {
203             if (state.activationStatus) {
204                 LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
205                 return dmaapUpdateProducerTask.execute(state.dmaapModel);
206             }
207             return dmaapReadyProducerTask.execute(state.dmaapModel);
208         } catch (PrhTaskException e) {
209             LOGGER.warn("DMaaPProducerTask exception has been registered: ", e);
210             return Flux.error(e);
211         }
212     }
213 }