Son-handler microservice seed code
[dcaegen2/services/son-handler.git] / src / main / java / com / wipro / www / sonhms / dmaap / DmaapClient.java
1 /*******************************************************************************
2  * ============LICENSE_START=======================================================
3  * pcims
4  *  ================================================================================
5  *  Copyright (C) 2018 Wipro Limited.
6  *  ==============================================================================
7  *   Licensed under the Apache License, Version 2.0 (the "License");
8  *   you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *        http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *   Unless required by applicable law or agreed to in writing, software
14  *   distributed under the License is distributed on an "AS IS" BASIS,
15  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *   See the License for the specific language governing permissions and
17  *   limitations under the License.
18  *   ============LICENSE_END=========================================================
19  ******************************************************************************/
20
21 package com.wipro.www.sonhms.dmaap;
22
23 import com.att.nsa.apiClient.http.HttpException;
24 import com.att.nsa.cambria.client.CambriaClient;
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import com.att.nsa.cambria.client.CambriaTopicManager;
30 import com.wipro.www.sonhms.Configuration;
31 import com.wipro.www.sonhms.NewNotification;
32 import com.wipro.www.sonhms.Topic;
33 import com.wipro.www.sonhms.dao.DmaapNotificationsRepository;
34 import com.wipro.www.sonhms.entity.DmaapNotifications;
35
36
37 import java.io.IOException;
38 import java.net.MalformedURLException;
39 import java.security.GeneralSecurityException;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Set;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.concurrent.TimeUnit;
46
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import org.springframework.beans.factory.annotation.Autowired;
50 import org.springframework.stereotype.Component;
51
52 @Component
53 public class DmaapClient {
54
55     @Autowired
56     private DmaapNotificationsRepository dmaapNotificationsRepository;
57     private Configuration configuration;
58     private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
59     private static final String CONSUMER = "CONSUMER";
60     private static final String PRODUCER = "PRODUCER";
61     private static final String DESCRIPTION = "api keys for OOF PCI use case";
62     private static final int PARTITION_COUNT = 1;
63     private static final int REPLICATION_COUNT = 1;
64     private NewNotification newNotification;
65     private CambriaTopicManager topicManager;
66
67     public class NotificationCallback {
68         DmaapClient dmaapClient;
69
70         public NotificationCallback(DmaapClient dmaapClient) {
71             this.dmaapClient = dmaapClient;
72         }
73
74         public void activateCallBack(String msg) {
75             handleNotification(msg);
76         }
77
78         private void handleNotification(String msg) {
79             DmaapNotifications dmaapNotification = new DmaapNotifications();
80             dmaapNotification.setNotification(msg);
81             if (log.isDebugEnabled()) {
82                 log.debug(dmaapNotification.toString());
83             }
84             dmaapNotificationsRepository.save(dmaapNotification);
85             newNotification.setNewNotif(true);
86         }
87     }
88
89     /**
90      * init dmaap client.
91      */
92     public void initClient(NewNotification newNotification) {
93         log.debug("initializing client");
94         configuration = Configuration.getInstance();
95         if (log.isDebugEnabled()) {
96             log.debug(configuration.toString());
97         }
98         this.newNotification = newNotification;
99
100         createAndConfigureTopics();
101         startClient();
102     }
103
104     /**
105      * create and configures topics.
106      */
107     private void createAndConfigureTopics() {
108
109         try {
110             topicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(configuration.getServers())
111                     .authenticatedBy(configuration.getManagerApiKey(), configuration.getManagerSecretKey()));
112         } catch (GeneralSecurityException | IOException e) {
113             log.debug("exception during creating topic", e);
114         }
115         List<Topic> topics = configuration.getTopics();
116
117         for (Topic topic : topics) {
118             Set<String> topicsInDmaap = getAllTopicsFromDmaap();
119
120             createTopic(topic, topicsInDmaap);
121             subscribeToTopic(topic.getName(), topic.getProducer(), PRODUCER);
122             subscribeToTopic(topic.getName(), topic.getConsumer(), CONSUMER);
123
124         }
125
126         topicManager.close();
127
128     }
129
130     /**
131      * create topic.
132      */
133     private void createTopic(Topic topic, Set<String> topicsInDmaap) {
134         if (topicsInDmaap.contains(topic.getName())) {
135             log.debug("topic exists in dmaap");
136         } else {
137             try {
138                 topicManager.createTopic(topic.getName(), DESCRIPTION, PARTITION_COUNT, REPLICATION_COUNT);
139             } catch (HttpException | IOException e) {
140                 log.debug("error while creating topic: {}", e);
141             }
142         }
143     }
144
145     /**
146      * get all topics from dmaap.
147      */
148     private Set<String> getAllTopicsFromDmaap() {
149         Set<String> topics = new HashSet<>();
150         try {
151             topics = topicManager.getTopics();
152         } catch (IOException e) {
153             log.debug("IOException while fetching topics");
154         }
155         return topics;
156
157     }
158
159     /**
160      * start dmaap client.
161      */
162     private synchronized void startClient() {
163
164         ScheduledExecutorService executorPool;
165         CambriaConsumer cambriaConsumer = null;
166
167         try {
168             cambriaConsumer = new ConsumerBuilder()
169                     .authenticatedBy(configuration.getPcimsApiKey(), configuration.getPcimsSecretKey())
170                     .knownAs(configuration.getCg(), configuration.getCid()).onTopic(configuration.getSdnrTopic())
171                     .usingHosts(configuration.getServers()).withSocketTimeout(configuration.getPollingTimeout() * 1000)
172                     .build();
173
174             // create notification consumers for SNDR and policy
175             NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer,
176                     new NotificationCallback(this));
177
178             // start notification consumer threads
179             executorPool = Executors.newScheduledThreadPool(10);
180             executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(),
181                     TimeUnit.SECONDS);
182         } catch (MalformedURLException | GeneralSecurityException e) {
183             log.debug("exception during starting client", e);
184         }
185
186     }
187
188     /**
189      * subscribe to topic.
190      */
191     private void subscribeToTopic(String topicName, String subscriberApiKey, String subscriberType) {
192         if (subscriberType.equals(PRODUCER)) {
193             try {
194                 topicManager.allowProducer(topicName, subscriberApiKey);
195             } catch (HttpException | IOException e) {
196                 log.debug("error while subscribing to a topic: {}", e);
197             }
198         } else if (subscriberType.equals(CONSUMER)) {
199             try {
200                 topicManager.allowConsumer(topicName, subscriberApiKey);
201             } catch (HttpException | IOException e) {
202                 log.debug("error while subscribing to a topic: {}", e);
203             }
204         }
205
206     }
207
208     @SuppressWarnings("unchecked")
209     private static <T extends CambriaClient> T buildCambriaClient(
210             CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client)
211             throws MalformedURLException, GeneralSecurityException {
212         return (T) client.build();
213     }
214
215 }