2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2018-2019 TechMahindra
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.universalvesadapter.service;
23 import java.util.ArrayList;
24 import java.util.LinkedList;
25 import java.util.List;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
30 import org.onap.universalvesadapter.adapter.UniversalEventAdapter;
31 import org.onap.universalvesadapter.dmaap.Creator;
32 import org.onap.universalvesadapter.dmaap.MRPublisher.DMaaPMRPublisher;
33 import org.onap.universalvesadapter.dmaap.MRSubcriber.DMaaPMRSubscriber;
34 import org.onap.universalvesadapter.exception.DMaapException;
35 import org.onap.universalvesadapter.exception.MapperConfigException;
36 import org.onap.universalvesadapter.exception.VesException;
37 import org.onap.universalvesadapter.utils.CollectorConfigPropertyRetrieval;
38 import org.onap.universalvesadapter.utils.DmaapConfig;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Value;
43 import org.springframework.stereotype.Component;
46 * Service that starts the universal ves adapter module to listen for events
57 public class VesService {
59 private static final Logger metricsLogger = LoggerFactory.getLogger("metricsLogger");
60 private static final Logger debugLogger = LoggerFactory.getLogger("debugLogger");
61 private static final Logger errorLogger = LoggerFactory.getLogger("errorLogger");
63 private boolean isRunning = true;
64 @Value("${defaultConfigFilelocation}")
65 private String defaultConfigFilelocation;
67 private Creator creator;
69 private UniversalEventAdapter eventAdapter;
71 private DmaapConfig dmaapConfig;
73 private CollectorConfigPropertyRetrieval collectorConfigPropertyRetrival;
74 private static List<String> list = new LinkedList<String>();
78 * method triggers universal VES adapter module.
80 public void start() throws MapperConfigException {
81 debugLogger.info("Creating Subcriber and Publisher with creator.............");
82 String topicName = null;
83 String publisherTopic = null;
84 // Hashmap of subscriber and publisher details in correspondence to the respective
85 // collectors in kv file
86 Map<String, String> dmaapTopics = collectorConfigPropertyRetrival
87 .getDmaapTopics("stream_subscriber", "stream_publisher", defaultConfigFilelocation);
89 ExecutorService executorService = Executors.newFixedThreadPool(dmaapTopics.size());
90 for (Map.Entry<String, String> entry : dmaapTopics.entrySet()) {
91 String threadName = entry.getKey();
92 // subcriber and corresponding publisher topics in a Map
93 Map<String, String> subpubTopics = collectorConfigPropertyRetrival
94 .getTopics(entry.getKey(), entry.getValue(), defaultConfigFilelocation);
95 for (Map.Entry<String, String> entry2 : subpubTopics.entrySet()) {
96 topicName = entry2.getKey();
97 publisherTopic = entry2.getValue();
101 // Publisher and subcriber as per each collector
102 DMaaPMRSubscriber subcriber = creator.getDMaaPMRSubscriber(topicName);
104 DMaaPMRPublisher publisher = creator.getDMaaPMRPublisher(publisherTopic);
106 "Created scriber topic:" + topicName + "publisher topic:" + publisherTopic);
108 executorService.submit(new Runnable() {
113 Thread.currentThread().setName(threadName);
114 metricsLogger.info("fetch and publish from and to Dmaap started:"
115 + Thread.currentThread().getName());
116 int pollingInternalInt = dmaapConfig.getPollingInterval();
118 "The Polling Interval in Milli Second is :{}" + pollingInternalInt);
119 debugLogger.info("starting subscriber & publisher thread:{}",
120 Thread.currentThread().getName());
122 synchronized (this) {
123 for (String incomingJsonString : subcriber.fetchMessages()
124 .getFetchedMessages()) {
125 list.add(incomingJsonString);
129 if (list.isEmpty()) {
131 Thread.sleep(pollingInternalInt);
132 } catch (InterruptedException e) {
136 debugLogger.debug("number of messages to be converted :{}",
139 if (!list.isEmpty()) {
140 String val = ((LinkedList<String>) list).removeFirst();
141 List<String> messages = new ArrayList<>();
142 String vesEvent = processReceivedJson(val);
144 && (!(vesEvent.isEmpty() || vesEvent.equals("")))) {
145 messages.add(vesEvent);
146 publisher.publish(messages);
149 .info("Message successfully published to DMaaP Topic-\n"
164 * method stops universal ves adapter module
172 * method for processing the incoming json to ves
174 * @param incomingJsonString
177 private String processReceivedJson(String incomingJsonString) {
178 String outgoingJsonString = null;
179 if (!"".equals(incomingJsonString)) {
183 outgoingJsonString = eventAdapter.transform(incomingJsonString);
185 } catch (VesException exception) {
186 errorLogger.error("Received exception : {},{}" + exception.getMessage(), exception);
187 debugLogger.warn("APPLICATION WILL BE SHUTDOWN UNTIL ABOVE ISSUE IS RESOLVED.");
188 } catch (DMaapException e) {
189 errorLogger.error("Received exception : {}", e.getMessage());
192 return outgoingJsonString;