Add junit coverage to TimedOutException class
[appc.git] / appc-event-listener / appc-event-listener-bundle / src / main / java / org / onap / appc / listener / impl / EventHandlerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.onap.appc.listener.impl;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29
30 import org.onap.appc.adapter.factory.DmaapMessageAdapterFactoryImpl;
31 import org.onap.appc.adapter.factory.MessageService;
32 import org.onap.appc.adapter.message.Consumer;
33 import org.onap.appc.adapter.message.MessageAdapterFactory;
34 import org.onap.appc.adapter.message.Producer;
35 import org.onap.appc.listener.EventHandler;
36 import org.onap.appc.listener.ListenerProperties;
37 import org.onap.appc.listener.util.Mapper;
38 import org.onap.appc.logging.LoggingConstants;
39 import org.osgi.framework.Bundle;
40 import org.osgi.framework.BundleContext;
41 import org.osgi.framework.FrameworkUtil;
42 import org.osgi.framework.ServiceReference;
43 import org.slf4j.MDC;
44
45 import java.util.ArrayList;
46 import java.util.Collection;
47 import java.util.HashSet;
48 import java.util.List;
49 import java.util.Set;
50
51 /**
52  * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure
53  * that only well formed messages are sent and received on DMaaP.
54  */
55 public class EventHandlerImpl implements EventHandler {
56
57     private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventHandlerImpl.class);
58
59     /*
60      * The amount of time in seconds to keep a connection to a topic open while waiting for data
61      */
62     private int READ_TIMEOUT = 60;
63
64     /*
65      * The pool of hosts to query against
66      */
67     private Collection<String> pool;
68
69     /*
70      * The topic to read messages from
71      */
72     private String readTopic;
73
74     /*
75      * The topic to write messages to
76      */
77     private Set<String> writeTopics;
78
79     /*
80      * The client (group) name to use for reading messages
81      */
82     private String clientName;
83
84     /*
85      * The id of the client (group) that is reading messages
86      */
87     private String clientId;
88
89     /*
90      * The api public key to use for authentication
91      */
92     private String apiKey;
93
94     /*
95      * The api secret key to use for authentication
96      */
97     private String apiSecret;
98
99     /*
100      * A json object containing filter arguments.
101      */
102     private String filter_json;
103
104     private MessageService messageService;
105
106     private Consumer reader = null;
107     private Producer producer = null;
108
109     public EventHandlerImpl(ListenerProperties props) {
110         pool = new HashSet<>();
111         writeTopics = new HashSet<>();
112
113         if (props != null) {
114             readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ);
115             clientName = props.getProperty(ListenerProperties.KEYS.CLIENT_NAME, "APP-C");
116             clientId = props.getProperty(ListenerProperties.KEYS.CLIENT_ID, "0");
117             apiKey = props.getProperty(ListenerProperties.KEYS.AUTH_USER_KEY);
118             apiSecret = props.getProperty(ListenerProperties.KEYS.AUTH_SECRET_KEY);
119
120             filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER);
121
122             READ_TIMEOUT = Integer
123                     .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
124
125             String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS);
126             if (hostnames != null && !hostnames.isEmpty()) {
127                 for (String name : hostnames.split(",")) {
128                     pool.add(name);
129                 }
130             }
131
132             String writeTopicStr = props.getProperty(ListenerProperties.KEYS.TOPIC_WRITE);
133             if (writeTopicStr != null) {
134                 for (String topic : writeTopicStr.split(",")) {
135                     writeTopics.add(topic);
136                 }
137             }
138
139             messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE));
140
141             LOG.info(String.format(
142                     "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s",
143                     messageService, hostnames, readTopic, filter_json, writeTopics, apiKey));
144         }
145     }
146
147     @Override
148     public List<String> getIncomingEvents() {
149         return getIncomingEvents(1000);
150     }
151
152     @Override
153     public List<String> getIncomingEvents(int limit) {
154         List<String> out = new ArrayList<>();
155         LOG.info(String.format("Getting up to %d incoming events", limit));
156         // reuse the consumer object instead of creating a new one every time
157         if (reader == null) {
158             LOG.info("Getting Consumer...");
159             reader = getConsumer();
160         }
161         if (reader != null) {
162             List<String> items = reader.fetch(READ_TIMEOUT * 1000, limit);
163             for (String item : items) {
164                 out.add(item);
165             }
166         }
167         LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
168         return out;
169     }
170
171     @Override
172     public <T> List<T> getIncomingEvents(Class<T> cls) {
173         return getIncomingEvents(cls, 1000);
174     }
175
176     @Override
177     public <T> List<T> getIncomingEvents(Class<T> cls, int limit) {
178         List<String> incomingStrings = getIncomingEvents(limit);
179         return Mapper.mapList(incomingStrings, cls);
180     }
181
182     @Override
183     public void postStatus(String event) {
184         postStatus(null, event);
185     }
186
187     @Override
188     public void postStatus(String partition, String event) {
189         LOG.debug(String.format("Posting Message [%s]", event));
190         if (producer == null) {
191             LOG.info("Getting Producer...");
192             producer = getProducer();
193         }
194         producer.post(partition, event);
195     }
196
197     /**
198      * Returns a consumer object for direct access to our Cambria consumer interface
199      *
200      * @return An instance of the consumer interface
201      */
202     protected Consumer getConsumer() {
203         LOG.debug(String.format("Getting Consumer: %s  %s/%s/%s", pool, readTopic, clientName, clientId));
204         if (filter_json == null && writeTopics.contains(readTopic)) {
205             LOG.error(
206                     "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
207         }
208
209         Consumer out = null;
210         BundleContext ctx = null;
211         Bundle bundle = FrameworkUtil.getBundle(EventHandlerImpl.class);
212         if(bundle != null) {
213             ctx = bundle.getBundleContext();
214         }
215
216         if (ctx != null) {
217             ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
218             if (svcRef != null) {
219                 try {
220                     out = ((MessageAdapterFactory) ctx.getService(svcRef))
221                             .createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
222                 } catch (Exception e) {
223                     //TODO:create eelf message
224                     LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer", e);
225                 }
226                 if (out != null) {
227                     for (String url : pool) {
228                         if (url.contains("3905") || url.contains("https")) {
229                             out.useHttps(true);
230                             break;
231                         }
232                     }
233                 }
234             }
235         }
236         return out;
237     }
238
239     /**
240      * Returns a consumer object for direct access to our Cambria producer interface
241      *
242      * @return An instance of the producer interface
243      */
244     protected Producer getProducer() {
245         LOG.debug(String.format("Getting Producer: %s  %s", pool, readTopic));
246
247         Producer out = null;
248         BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
249         if (ctx != null) {
250             ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
251             if (svcRef != null) {
252                 out = ((MessageAdapterFactory) ctx.getService(svcRef))
253                         .createProducer(pool, writeTopics, apiKey, apiSecret);
254                 if (out != null) {
255                     for (String url : pool) {
256                         if (url.contains("3905") || url.contains("https")) {
257                             out.useHttps(true);
258                             break;
259                         }
260                     }
261                 }
262             }
263         }
264         return out;
265     }
266
267     @Override
268     public void closeClients() {
269         LOG.debug("Closing Consumer and Producer DMaaP clients");
270         if (reader != null) {
271             reader.close();
272         }
273         if (producer != null) {
274             producer.close();
275         }
276     }
277
278     @Override
279     public String getClientId() {
280         return clientId;
281     }
282
283     @Override
284     public void setClientId(String clientId) {
285         this.clientId = clientId;
286     }
287
288     @Override
289     public String getClientName() {
290         return clientName;
291     }
292
293     @Override
294     public void setClientName(String clientName) {
295         this.clientName = clientName;
296         MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, clientName);
297     }
298
299     @Override
300     public void addToPool(String hostname) {
301         pool.add(hostname);
302     }
303
304     @Override
305     public Collection<String> getPool() {
306         return pool;
307     }
308
309     @Override
310     public void removeFromPool(String hostname) {
311         pool.remove(hostname);
312     }
313
314     @Override
315     public String getReadTopic() {
316         return readTopic;
317     }
318
319     @Override
320     public void setReadTopic(String readTopic) {
321         this.readTopic = readTopic;
322     }
323
324     @Override
325     public Set<String> getWriteTopics() {
326         return writeTopics;
327     }
328
329     @Override
330     public void setWriteTopics(Set<String> writeTopics) {
331         this.writeTopics = writeTopics;
332     }
333
334     @Override
335     public void clearCredentials() {
336         apiKey = null;
337         apiSecret = null;
338     }
339
340     @Override
341     public void setCredentials(String key, String secret) {
342         apiKey = key;
343         apiSecret = secret;
344     }
345 }