1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 Wipro Limited.
6 * ==============================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 ******************************************************************************/
21 package com.wipro.www.sonhms.dmaap;
23 import com.att.nsa.apiClient.http.HttpException;
24 import com.att.nsa.cambria.client.CambriaClient;
25 import com.att.nsa.cambria.client.CambriaClientBuilders;
26 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
28 import com.att.nsa.cambria.client.CambriaConsumer;
29 import com.att.nsa.cambria.client.CambriaTopicManager;
30 import com.wipro.www.sonhms.Configuration;
31 import com.wipro.www.sonhms.NewNotification;
32 import com.wipro.www.sonhms.Topic;
33 import com.wipro.www.sonhms.dao.DmaapNotificationsRepository;
34 import com.wipro.www.sonhms.entity.DmaapNotifications;
37 import java.io.IOException;
38 import java.net.MalformedURLException;
39 import java.security.GeneralSecurityException;
40 import java.util.HashSet;
41 import java.util.List;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.concurrent.TimeUnit;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import org.springframework.beans.factory.annotation.Autowired;
50 import org.springframework.stereotype.Component;
53 public class DmaapClient {
56 private DmaapNotificationsRepository dmaapNotificationsRepository;
57 private Configuration configuration;
58 private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
59 private static final String CONSUMER = "CONSUMER";
60 private static final String PRODUCER = "PRODUCER";
61 private static final String DESCRIPTION = "api keys for OOF PCI use case";
62 private static final int PARTITION_COUNT = 1;
63 private static final int REPLICATION_COUNT = 1;
64 private NewNotification newNotification;
65 private CambriaTopicManager topicManager;
67 public class NotificationCallback {
68 DmaapClient dmaapClient;
70 public NotificationCallback(DmaapClient dmaapClient) {
71 this.dmaapClient = dmaapClient;
74 public void activateCallBack(String msg) {
75 handleNotification(msg);
78 private void handleNotification(String msg) {
79 DmaapNotifications dmaapNotification = new DmaapNotifications();
80 dmaapNotification.setNotification(msg);
81 if (log.isDebugEnabled()) {
82 log.debug(dmaapNotification.toString());
84 dmaapNotificationsRepository.save(dmaapNotification);
85 newNotification.setNewNotif(true);
92 public void initClient(NewNotification newNotification) {
93 log.debug("initializing client");
94 configuration = Configuration.getInstance();
95 if (log.isDebugEnabled()) {
96 log.debug(configuration.toString());
98 this.newNotification = newNotification;
100 createAndConfigureTopics();
105 * create and configures topics.
107 private void createAndConfigureTopics() {
110 topicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(configuration.getServers())
111 .authenticatedBy(configuration.getManagerApiKey(), configuration.getManagerSecretKey()));
112 } catch (GeneralSecurityException | IOException e) {
113 log.debug("exception during creating topic", e);
115 List<Topic> topics = configuration.getTopics();
117 for (Topic topic : topics) {
118 Set<String> topicsInDmaap = getAllTopicsFromDmaap();
120 createTopic(topic, topicsInDmaap);
121 subscribeToTopic(topic.getName(), topic.getProducer(), PRODUCER);
122 subscribeToTopic(topic.getName(), topic.getConsumer(), CONSUMER);
126 topicManager.close();
133 private void createTopic(Topic topic, Set<String> topicsInDmaap) {
134 if (topicsInDmaap.contains(topic.getName())) {
135 log.debug("topic exists in dmaap");
138 topicManager.createTopic(topic.getName(), DESCRIPTION, PARTITION_COUNT, REPLICATION_COUNT);
139 } catch (HttpException | IOException e) {
140 log.debug("error while creating topic: {}", e);
146 * get all topics from dmaap.
148 private Set<String> getAllTopicsFromDmaap() {
149 Set<String> topics = new HashSet<>();
151 topics = topicManager.getTopics();
152 } catch (IOException e) {
153 log.debug("IOException while fetching topics");
160 * start dmaap client.
162 private synchronized void startClient() {
164 ScheduledExecutorService executorPool;
165 CambriaConsumer cambriaConsumer = null;
168 cambriaConsumer = new ConsumerBuilder()
169 .authenticatedBy(configuration.getPcimsApiKey(), configuration.getPcimsSecretKey())
170 .knownAs(configuration.getCg(), configuration.getCid()).onTopic(configuration.getSdnrTopic())
171 .usingHosts(configuration.getServers()).withSocketTimeout(configuration.getPollingTimeout() * 1000)
174 // create notification consumers for SNDR and policy
175 NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer,
176 new NotificationCallback(this));
178 // start notification consumer threads
179 executorPool = Executors.newScheduledThreadPool(10);
180 executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(),
182 } catch (MalformedURLException | GeneralSecurityException e) {
183 log.debug("exception during starting client", e);
189 * subscribe to topic.
191 private void subscribeToTopic(String topicName, String subscriberApiKey, String subscriberType) {
192 if (subscriberType.equals(PRODUCER)) {
194 topicManager.allowProducer(topicName, subscriberApiKey);
195 } catch (HttpException | IOException e) {
196 log.debug("error while subscribing to a topic: {}", e);
198 } else if (subscriberType.equals(CONSUMER)) {
200 topicManager.allowConsumer(topicName, subscriberApiKey);
201 } catch (HttpException | IOException e) {
202 log.debug("error while subscribing to a topic: {}", e);
208 @SuppressWarnings("unchecked")
209 private static <T extends CambriaClient> T buildCambriaClient(
210 CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client)
211 throws MalformedURLException, GeneralSecurityException {
212 return (T) client.build();