Fix sonar issues in UniversalVesAdapter
[dcaegen2/services/mapper.git] / UniversalVesAdapter / src / main / java / org / onap / universalvesadapter / service / VesService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : DCAE
4  * ================================================================================
5  * Copyright 2018-2019 TechMahindra
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.universalvesadapter.service;
22
23 import java.util.ArrayList;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29
30 import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
31 import org.onap.universalvesadapter.dmaap.Creator;
32 import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
33 import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
34 import org.onap.universalvesadapter.exception.DMaapException;
35 import org.onap.universalvesadapter.exception.MapperConfigException;
36 import org.onap.universalvesadapter.exception.VesException;
37 import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrieval;
38 import org.onap.universalvesadapter.utils.DmaapConfig;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Value;
43 import org.springframework.stereotype.Component;
44
45 /**
46  * Service that starts the universal ves adapter module to listen for events
47  *
48  * @author kmalbari
49  *
50  */
51 /**
52  * @author PM00501616
53  *
54  */
55
56 @Component
57 public class VesService {
58
59     private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger");
60     private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
61     private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
62
63     private boolean isRunning = true;
64     @Value("${defaultConfigFilelocation}")
65     private String defaultConfigFilelocation;
66     @Autowired
67     private Creator creator;
68     @Autowired
69     private UniversalEventAdapter eventAdapter;
70     @Autowired
71     private DmaapConfig dmaapConfig;
72     @Autowired
73     private CollectorConfigPropertyRetrieval collectorConfigPropertyRetrival;
74     private static List<String> list = new LinkedList<String>();
75
76
77     /**
78      * method triggers universal VES adapter module.
79      */
80     public void start() throws MapperConfigException {
81         debugLogger.info("Creating Subcriber and Publisher with creator.............");
82         String topicName = null;
83         String publisherTopic = null;
84         // Hashmap of subscriber and publisher details in correspondence to the respective
85         // collectors in kv file
86         Map<String, String> dmaapTopics = collectorConfigPropertyRetrival
87                 .getDmaapTopics("stream_subscriber", "stream_publisher", defaultConfigFilelocation);
88
89         ExecutorService executorService = Executors.newFixedThreadPool(dmaapTopics.size());
90         for (Map.Entry<String, String> entry : dmaapTopics.entrySet()) {
91             String threadName = entry.getKey();
92             // subcriber and corresponding publisher topics in a Map
93             Map<String, String> subpubTopics = collectorConfigPropertyRetrival
94                     .getTopics(entry.getKey(), entry.getValue(), defaultConfigFilelocation);
95             for (Map.Entry<String, String> entry2 : subpubTopics.entrySet()) {
96                 topicName = entry2.getKey();
97                 publisherTopic = entry2.getValue();
98             }
99
100
101             // Publisher and subcriber as per each collector
102             DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicName);
103
104             DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher(publisherTopic);
105             debugLogger.info(
106                     "Created scriber topic:" + topicName + "publisher topic:" + publisherTopic);
107
108             executorService.submit(new Runnable() {
109
110                 @Override
111                 public void run() {
112
113                     Thread.currentThread().setName(threadName);
114                     metricsLogger.info("fetch and publish from and to Dmaap started:"
115                             + Thread.currentThread().getName());
116                     int pollingInternalInt = dmaapConfig.getPollingInterval();
117                     debugLogger.info(
118                             "The Polling Interval in Milli Second is :{}" + pollingInternalInt);
119                     debugLogger.info("starting subscriber & publisher thread:{}",
120                             Thread.currentThread().getName());
121                     while (true) {
122                         synchronized (this) {
123                             for (String incomingJsonString : subcriber.fetchMessages()
124                                     .getFetchedMessages()) {
125                                 list.add(incomingJsonString);
126
127                             }
128
129                             if (list.isEmpty()) {
130                                 try {
131                                     Thread.sleep(pollingInternalInt);
132                                 } catch (InterruptedException e) {
133                                     e.printStackTrace();
134                                 }
135                             }
136                             debugLogger.debug("number of messages to be converted :{}",
137                                     list.size());
138
139                             if (!list.isEmpty()) {
140                                 String val = ((LinkedList<String>) list).removeFirst();
141                                 List<String> messages = new ArrayList<>();
142                                 String vesEvent = processReceivedJson(val);
143                                 if (vesEvent != null
144                                         && (!(vesEvent.isEmpty() || vesEvent.equals("")))) {
145                                     messages.add(vesEvent);
146                                     publisher.publish(messages);
147
148                                     metricsLogger
149                                     .info("Message successfully published to DMaaP Topic-\n"
150                                             + vesEvent);
151                                 }
152                             }
153                         }
154                     }
155                 }
156             });
157         }
158
159
160
161     }
162
163     /**
164      * method stops universal ves adapter module
165      */
166     public void stop() {
167         isRunning = false;
168     }
169
170
171     /**
172      * method for processing the incoming json to ves
173      *
174      * @param incomingJsonString
175      * @return ves
176      */
177     private String processReceivedJson(String incomingJsonString) {
178         String outgoingJsonString = null;
179         if (!"".equals(incomingJsonString)) {
180
181             try {
182
183                 outgoingJsonString = eventAdapter.transform(incomingJsonString);
184
185             } catch (VesException exception) {
186                 errorLogger.error("Received exception : {},{}" + exception.getMessage(), exception);
187                 debugLogger.warn("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
188             } catch (DMaapException e) {
189                 errorLogger.error("Received exception : {}", e.getMessage());
190             }
191         }
192         return outgoingJsonString;
193     }
194 }