code coverage for M3
[dcaegen2/services/son-handler.git] / src / main / java / org / onap / dcaegen2 / services / sonhms / dmaap / DmaapClient.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  son-handler
4  *  ================================================================================
5  *   Copyright (C) 2019 Wipro Limited.
6  *   ==============================================================================
7  *     Licensed under the Apache License, Version 2.0 (the "License");
8  *     you may not use this file except in compliance with the License.
9  *     You may obtain a copy of the License at
10  *  
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *  
13  *     Unless required by applicable law or agreed to in writing, software
14  *     distributed under the License is distributed on an "AS IS" BASIS,
15  *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *     See the License for the specific language governing permissions and
17  *     limitations under the License.
18  *     ============LICENSE_END=========================================================
19  *  
20  *******************************************************************************/
21
22 package org.onap.dcaegen2.services.sonhms.dmaap;
23
24 import com.att.nsa.apiClient.http.HttpException;
25 import com.att.nsa.cambria.client.CambriaClient;
26 import com.att.nsa.cambria.client.CambriaClientBuilders;
27 import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
28 import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
29 import com.att.nsa.cambria.client.CambriaConsumer;
30 import com.att.nsa.cambria.client.CambriaTopicManager;
31 import org.onap.dcaegen2.services.sonhms.Configuration;
32 import org.onap.dcaegen2.services.sonhms.NewNotification;
33 import org.onap.dcaegen2.services.sonhms.Topic;
34 import org.onap.dcaegen2.services.sonhms.dao.DmaapNotificationsRepository;
35 import org.onap.dcaegen2.services.sonhms.entity.DmaapNotifications;
36
37 import java.io.IOException;
38 import java.net.MalformedURLException;
39 import java.security.GeneralSecurityException;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Set;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.concurrent.TimeUnit;
46
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import org.springframework.beans.factory.annotation.Autowired;
50 import org.springframework.stereotype.Component;
51
52 @Component
53 public class DmaapClient {
54
55     @Autowired
56     private DmaapNotificationsRepository dmaapNotificationsRepository;
57     private Configuration configuration;
58     private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
59     private static final String CONSUMER = "CONSUMER";
60     private static final String PRODUCER = "PRODUCER";
61     private static final String DESCRIPTION = "api keys for OOF PCI use case";
62     private static final int PARTITION_COUNT = 1;
63     private static final int REPLICATION_COUNT = 1;
64     private NewNotification newNotification;
65     private CambriaTopicManager topicManager;
66
67     public class NotificationCallback {
68         DmaapClient dmaapClient;
69
70         public NotificationCallback(DmaapClient dmaapClient) {
71             this.dmaapClient = dmaapClient;
72         }
73
74         public void activateCallBack(String msg) {
75             handleNotification(msg);
76         }
77
78         private void handleNotification(String msg) {
79             DmaapNotifications dmaapNotification = new DmaapNotifications();
80             dmaapNotification.setNotification(msg);
81             if (log.isDebugEnabled()) {
82                 log.debug(dmaapNotification.toString());
83             }
84             dmaapNotificationsRepository.save(dmaapNotification);
85             newNotification.setNewNotif(true);
86         }
87     }
88
89     /**
90      * init dmaap client.
91      */
92     public void initClient(NewNotification newNotification) {
93         log.debug("initializing client");
94         configuration = Configuration.getInstance();
95         if (log.isDebugEnabled()) {
96             log.debug(configuration.toString());
97         }
98         this.newNotification = newNotification;
99
100         createAndConfigureTopics();
101         startClient();
102     }
103
104     /**
105      * create and configures topics.
106      */
107     private void createAndConfigureTopics() {
108
109         try {
110             topicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(configuration.getServers())
111                     .authenticatedBy(configuration.getManagerApiKey(), configuration.getManagerSecretKey()));
112         } catch (GeneralSecurityException | IOException e) {
113             log.debug("exception during creating topic", e);
114         }
115         List<Topic> topics = configuration.getTopics();
116
117         for (Topic topic : topics) {
118             Set<String> topicsInDmaap = getAllTopicsFromDmaap();
119
120             createTopic(topic, topicsInDmaap);
121             subscribeToTopic(topic.getName(), topic.getProducer(), PRODUCER);
122             subscribeToTopic(topic.getName(), topic.getConsumer(), CONSUMER);
123
124         }
125
126         topicManager.close();
127
128     }
129
130     /**
131      * create topic.
132      */
133     private void createTopic(Topic topic, Set<String> topicsInDmaap) {
134         if (topicsInDmaap.contains(topic.getName())) {
135             log.debug("topic exists in dmaap");
136         } else {
137             try {
138                 topicManager.createTopic(topic.getName(), DESCRIPTION, PARTITION_COUNT, REPLICATION_COUNT);
139             } catch (HttpException | IOException e) {
140                 log.debug("error while creating topic: {}", e);
141             }
142         }
143     }
144
145     /**
146      * get all topics from dmaap.
147      */
148     private Set<String> getAllTopicsFromDmaap() {
149         Set<String> topics = new HashSet<>();
150         try {
151             topics = topicManager.getTopics();
152         } catch (IOException e) {
153             log.debug("IOException while fetching topics");
154         }
155         return topics;
156
157     }
158
159     /**
160      * start dmaap client.
161      */
162     private synchronized void startClient() {
163
164         ScheduledExecutorService executorPool;
165         CambriaConsumer cambriaConsumer = null;
166
167         try {
168             cambriaConsumer = new ConsumerBuilder()
169                     .authenticatedBy(configuration.getPcimsApiKey(), configuration.getPcimsSecretKey())
170                     .knownAs(configuration.getCg(), configuration.getCid()).onTopic(configuration.getSdnrTopic())
171                     .usingHosts(configuration.getServers()).withSocketTimeout(configuration.getPollingTimeout() * 1000)
172                     .build();
173
174             // create notification consumers for SNDR and policy
175             NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer,
176                     new NotificationCallback(this));
177
178             // start notification consumer threads
179             executorPool = Executors.newScheduledThreadPool(10);
180             executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(),
181                     TimeUnit.SECONDS);
182         } catch (MalformedURLException | GeneralSecurityException e) {
183             log.debug("exception during starting client", e);
184         }
185
186     }
187
188     /**
189      * subscribe to topic.
190      */
191     private void subscribeToTopic(String topicName, String subscriberApiKey, String subscriberType) {
192         if (subscriberType.equals(PRODUCER)) {
193             try {
194                 topicManager.allowProducer(topicName, subscriberApiKey);
195             } catch (HttpException | IOException e) {
196                 log.debug("error while subscribing to a topic: {}", e);
197             }
198         } else if (subscriberType.equals(CONSUMER)) {
199             try {
200                 topicManager.allowConsumer(topicName, subscriberApiKey);
201             } catch (HttpException | IOException e) {
202                 log.debug("error while subscribing to a topic: {}", e);
203             }
204         }
205
206     }
207
208     @SuppressWarnings("unchecked")
209     private static <T extends CambriaClient> T buildCambriaClient(
210             CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client)
211             throws MalformedURLException, GeneralSecurityException {
212         return (T) client.build();
213     }
214
215 }