1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 Wipro Limited.
6 * ==============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
20 *******************************************************************************/
22 package org.onap.dcaegen2.services.sonhms.dmaap;
24 import com.att.nsa.cambria.client.CambriaConsumer;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
31 import javax.annotation.PostConstruct;
33 import org.onap.dcaegen2.services.sonhms.Configuration;
34 import org.onap.dcaegen2.services.sonhms.utils.DmaapUtils;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.springframework.stereotype.Component;
40 public class DmaapClient {
42 private Configuration configuration;
43 private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
45 private DmaapUtils dmaapUtils;
51 public void initClient() {
52 log.debug("initializing client");
53 dmaapUtils = new DmaapUtils();
54 configuration = Configuration.getInstance();
55 if (log.isDebugEnabled()) {
56 log.debug(configuration.toString());
65 @SuppressWarnings("unchecked")
66 private synchronized void startClient() {
68 Map<String, Object> streamSubscribes = Configuration.getInstance().getStreamsSubscribes();
69 String sdnrTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
70 .get("nbr_list_change_topic")).get("dmaap_info")).get("topic_url");
71 String[] sdnrTopicSplit = sdnrTopicUrl.split("\\/");
72 String sdnrTopic = sdnrTopicSplit[sdnrTopicSplit.length - 1];
73 log.debug("sdnr topic : {}", sdnrTopic);
74 String fmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
75 .get("fault_management_topic")).get("dmaap_info")).get("topic_url");
76 String[] fmTopicSplit = fmTopicUrl.split("\\/");
77 String fmTopic = fmTopicSplit[sdnrTopicSplit.length - 1];
78 log.debug("fm topic : {}", fmTopic);
79 String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
80 .get("performance_management_topic")).get("dmaap_info")).get("topic_url");
81 String[] pmTopicSplit = pmTopicUrl.split("\\/");
82 String pmTopic = pmTopicSplit[sdnrTopicSplit.length - 1];
83 log.debug("pm topic : {}", pmTopic);
84 CambriaConsumer sdnrNotifCambriaConsumer = null;
85 CambriaConsumer fmNotifCambriaConsumer = null;
86 CambriaConsumer pmNotifCambriaConsumer = null;
88 sdnrNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, sdnrTopic);
89 fmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, fmTopic);
90 pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
92 // create notification consumers for SNDR and policy
93 NotificationConsumer sdnrNotificationConsumer = new NotificationConsumer(sdnrNotifCambriaConsumer,
94 new SdnrNotificationCallback());
95 // start sdnr notification consumer threads
96 ScheduledExecutorService executorPool;
97 executorPool = Executors.newScheduledThreadPool(10);
98 executorPool.scheduleAtFixedRate(sdnrNotificationConsumer, 0, configuration.getPollingInterval(),
101 // create notification consumers for FM
102 NotificationConsumer fmNotificationConsumer = new NotificationConsumer(fmNotifCambriaConsumer,
103 new FmNotificationCallback());
104 // start fm notification consumer threads
105 executorPool = Executors.newScheduledThreadPool(10);
106 executorPool.scheduleAtFixedRate(fmNotificationConsumer, 0, configuration.getPollingInterval(),
109 // create notification consumers for PM
110 NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer,
111 new PmNotificationCallback());
112 // start pm notification consumer threads
113 executorPool = Executors.newScheduledThreadPool(10);
114 executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(),