664dbf838a2d62009f1713f6dc5b1aa594ce312a
[dcaegen2/services/son-handler.git] / src / main / java / org / onap / dcaegen2 / services / sonhms / dmaap / DmaapClient.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  son-handler
4  *  ================================================================================
5  *   Copyright (C) 2019 Wipro Limited.
6  *   ==============================================================================
7  *     Licensed under the Apache License, Version 2.0 (the "License");
8  *     you may not use this file except in compliance with the License.
9  *     You may obtain a copy of the License at
10  *  
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *  
13  *     Unless required by applicable law or agreed to in writing, software
14  *     distributed under the License is distributed on an "AS IS" BASIS,
15  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *     See the License for the specific language governing permissions and
17  *     limitations under the License.
18  *     ============LICENSE_END=========================================================
19  *  
20  *******************************************************************************/
21
22 package org.onap.dcaegen2.services.sonhms.dmaap;
23
24 import com.att.nsa.cambria.client.CambriaConsumer;
25
26 import java.util.Map;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.TimeUnit;
30
31 import javax.annotation.PostConstruct;
32
33 import org.onap.dcaegen2.services.sonhms.Configuration;
34 import org.onap.dcaegen2.services.sonhms.utils.DmaapUtils;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.springframework.stereotype.Component;
38
39 @Component
40 public class DmaapClient {
41
42     private Configuration configuration;
43     private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
44
45     private DmaapUtils dmaapUtils;
46
47     /**
48      * init dmaap client.
49      */
50     @PostConstruct
51     public void initClient() {
52         log.debug("initializing client");
53         dmaapUtils = new DmaapUtils();
54         configuration = Configuration.getInstance();
55         if (log.isDebugEnabled()) {
56             log.debug(configuration.toString());
57         }
58
59         startClient();
60     }
61
62     /**
63      * start dmaap client.
64      */
65     @SuppressWarnings("unchecked")
66     private synchronized void startClient() {
67
68         Map<String, Object> streamSubscribes = Configuration.getInstance().getStreamsSubscribes();
69         String sdnrTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
70                 .get("nbr_list_change_topic")).get("dmaap_info")).get("topic_url");
71         String[] sdnrTopicSplit = sdnrTopicUrl.split("\\/");
72         String sdnrTopic = sdnrTopicSplit[sdnrTopicSplit.length - 1];
73         log.debug("sdnr topic : {}", sdnrTopic);
74         String fmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
75                 .get("fault_management_topic")).get("dmaap_info")).get("topic_url");
76         String[] fmTopicSplit = fmTopicUrl.split("\\/");
77         String fmTopic = fmTopicSplit[sdnrTopicSplit.length - 1];
78         log.debug("fm topic : {}", fmTopic);
79         String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
80                 .get("performance_management_topic")).get("dmaap_info")).get("topic_url");
81         String[] pmTopicSplit = pmTopicUrl.split("\\/");
82         String pmTopic = pmTopicSplit[sdnrTopicSplit.length - 1];
83         log.debug("pm topic : {}", pmTopic);
84         CambriaConsumer sdnrNotifCambriaConsumer = null;
85         CambriaConsumer fmNotifCambriaConsumer = null;
86         CambriaConsumer pmNotifCambriaConsumer = null;
87
88         sdnrNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, sdnrTopic);
89         fmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, fmTopic);
90         pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
91
92         // create notification consumers for SNDR and policy
93         NotificationConsumer sdnrNotificationConsumer = new NotificationConsumer(sdnrNotifCambriaConsumer,
94                 new SdnrNotificationCallback());
95         // start sdnr notification consumer threads
96         ScheduledExecutorService executorPool;
97         executorPool = Executors.newScheduledThreadPool(10);
98         executorPool.scheduleAtFixedRate(sdnrNotificationConsumer, 0, configuration.getPollingInterval(),
99                 TimeUnit.SECONDS);
100
101         // create notification consumers for FM
102         NotificationConsumer fmNotificationConsumer = new NotificationConsumer(fmNotifCambriaConsumer,
103                 new FmNotificationCallback());
104         // start fm notification consumer threads
105         executorPool = Executors.newScheduledThreadPool(10);
106         executorPool.scheduleAtFixedRate(fmNotificationConsumer, 0, configuration.getPollingInterval(),
107                 TimeUnit.SECONDS);
108
109         // create notification consumers for PM
110         NotificationConsumer pmNotificationConsumer = new NotificationConsumer(pmNotifCambriaConsumer,
111                 new PmNotificationCallback());
112         // start pm notification consumer threads
113         executorPool = Executors.newScheduledThreadPool(10);
114         executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(),
115                 TimeUnit.SECONDS);
116
117     }
118
119 }