Code synch up for appc-event-listener bundle
[appc.git] / appc-event-listener / appc-event-listener-bundle / src / main / java / org / onap / appc / listener / impl / EventHandlerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.onap.appc.listener.impl;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29
30 import org.onap.appc.adapter.factory.DmaapMessageAdapterFactoryImpl;
31 import org.onap.appc.adapter.factory.MessageService;
32 import org.onap.appc.adapter.message.Consumer;
33 import org.onap.appc.adapter.message.MessageAdapterFactory;
34 import org.onap.appc.adapter.message.Producer;
35 import org.onap.appc.listener.EventHandler;
36 import org.onap.appc.listener.ListenerProperties;
37 import org.onap.appc.listener.util.Mapper;
38 import org.onap.appc.logging.LoggingConstants;
39 import org.osgi.framework.BundleContext;
40 import org.osgi.framework.FrameworkUtil;
41 import org.osgi.framework.ServiceReference;
42 import org.slf4j.MDC;
43
44 import java.util.ArrayList;
45 import java.util.Collection;
46 import java.util.HashSet;
47 import java.util.List;
48 import java.util.Set;
49
50 /**
51  * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure
52  * that only well formed messages are sent and received on DMaaP.
53  */
54 public class EventHandlerImpl implements EventHandler {
55
56     private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventHandlerImpl.class);
57
58     /*
59      * The amount of time in seconds to keep a connection to a topic open while waiting for data
60      */
61     private int READ_TIMEOUT = 60;
62
63     /*
64      * The pool of hosts to query against
65      */
66     private Collection<String> pool;
67
68     /*
69      * The topic to read messages from
70      */
71     private String readTopic;
72
73     /*
74      * The topic to write messages to
75      */
76     private Set<String> writeTopics;
77
78     /*
79      * The client (group) name to use for reading messages
80      */
81     private String clientName;
82
83     /*
84      * The id of the client (group) that is reading messages
85      */
86     private String clientId;
87
88     /*
89      * The api public key to use for authentication
90      */
91     private String apiKey;
92
93     /*
94      * The api secret key to use for authentication
95      */
96     private String apiSecret;
97
98     /*
99      * A json object containing filter arguments.
100      */
101     private String filter_json;
102
103     private MessageService messageService;
104
105     private Consumer reader = null;
106     private Producer producer = null;
107
108     public EventHandlerImpl(ListenerProperties props) {
109         pool = new HashSet<>();
110         writeTopics = new HashSet<>();
111
112         if (props != null) {
113             readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ);
114             clientName = props.getProperty(ListenerProperties.KEYS.CLIENT_NAME, "APP-C");
115             clientId = props.getProperty(ListenerProperties.KEYS.CLIENT_ID, "0");
116             apiKey = props.getProperty(ListenerProperties.KEYS.AUTH_USER_KEY);
117             apiSecret = props.getProperty(ListenerProperties.KEYS.AUTH_SECRET_KEY);
118
119             filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER);
120
121             READ_TIMEOUT = Integer
122                     .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
123
124             String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS);
125             if (hostnames != null && !hostnames.isEmpty()) {
126                 for (String name : hostnames.split(",")) {
127                     pool.add(name);
128                 }
129             }
130
131             String writeTopicStr = props.getProperty(ListenerProperties.KEYS.TOPIC_WRITE);
132             if (writeTopicStr != null) {
133                 for (String topic : writeTopicStr.split(",")) {
134                     writeTopics.add(topic);
135                 }
136             }
137
138             messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE));
139
140             LOG.info(String.format(
141                     "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s",
142                     messageService, hostnames, readTopic, filter_json, writeTopics, apiKey));
143         }
144     }
145
146     @Override
147     public List<String> getIncomingEvents() {
148         return getIncomingEvents(1000);
149     }
150
151     @Override
152     public List<String> getIncomingEvents(int limit) {
153         List<String> out = new ArrayList<>();
154         LOG.info(String.format("Getting up to %d incoming events", limit));
155         // reuse the consumer object instead of creating a new one every time
156         if (reader == null) {
157             LOG.info("Getting Consumer...");
158             reader = getConsumer();
159         }
160         if (reader != null) {
161             List<String> items = reader.fetch(READ_TIMEOUT * 1000, limit);
162             for (String item : items) {
163                 out.add(item);
164             }
165         }
166         LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
167         return out;
168     }
169
170     @Override
171     public <T> List<T> getIncomingEvents(Class<T> cls) {
172         return getIncomingEvents(cls, 1000);
173     }
174
175     @Override
176     public <T> List<T> getIncomingEvents(Class<T> cls, int limit) {
177         List<String> incomingStrings = getIncomingEvents(limit);
178         return Mapper.mapList(incomingStrings, cls);
179     }
180
181     @Override
182     public void postStatus(String event) {
183         postStatus(null, event);
184     }
185
186     @Override
187     public void postStatus(String partition, String event) {
188         LOG.debug(String.format("Posting Message [%s]", event));
189         if (producer == null) {
190             LOG.info("Getting Producer...");
191             producer = getProducer();
192         }
193         producer.post(partition, event);
194     }
195
196     /**
197      * Returns a consumer object for direct access to our Cambria consumer interface
198      *
199      * @return An instance of the consumer interface
200      */
201     protected Consumer getConsumer() {
202         LOG.debug(String.format("Getting Consumer: %s  %s/%s/%s", pool, readTopic, clientName, clientId));
203         if (filter_json == null && writeTopics.contains(readTopic)) {
204             LOG.error(
205                     "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
206         }
207
208         Consumer out = null;
209         BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
210         if (ctx != null) {
211             ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
212             if (svcRef != null) {
213                 try {
214                     out = ((MessageAdapterFactory) ctx.getService(svcRef))
215                             .createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
216                 } catch (Exception e) {
217                     //TODO:create eelf message
218                     LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer", e);
219                 }
220                 if (out != null) {
221                     for (String url : pool) {
222                         if (url.contains("3905") || url.contains("https")) {
223                             out.useHttps(true);
224                             break;
225                         }
226                     }
227                 }
228             }
229         }
230         return out;
231     }
232
233     /**
234      * Returns a consumer object for direct access to our Cambria producer interface
235      *
236      * @return An instance of the producer interface
237      */
238     protected Producer getProducer() {
239         LOG.debug(String.format("Getting Producer: %s  %s", pool, readTopic));
240
241         Producer out = null;
242         BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
243         if (ctx != null) {
244             ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
245             if (svcRef != null) {
246                 out = ((MessageAdapterFactory) ctx.getService(svcRef))
247                         .createProducer(pool, writeTopics, apiKey, apiSecret);
248                 if (out != null) {
249                     for (String url : pool) {
250                         if (url.contains("3905") || url.contains("https")) {
251                             out.useHttps(true);
252                             break;
253                         }
254                     }
255                 }
256             }
257         }
258         return out;
259     }
260
261     @Override
262     public void closeClients() {
263         LOG.debug("Closing Consumer and Producer DMaaP clients");
264         if (reader != null) {
265             reader.close();
266         }
267         if (producer != null) {
268             producer.close();
269         }
270     }
271
272     @Override
273     public String getClientId() {
274         return clientId;
275     }
276
277     @Override
278     public void setClientId(String clientId) {
279         this.clientId = clientId;
280     }
281
282     @Override
283     public String getClientName() {
284         return clientName;
285     }
286
287     @Override
288     public void setClientName(String clientName) {
289         this.clientName = clientName;
290         MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, clientName);
291     }
292
293     @Override
294     public void addToPool(String hostname) {
295         pool.add(hostname);
296     }
297
298     @Override
299     public Collection<String> getPool() {
300         return pool;
301     }
302
303     @Override
304     public void removeFromPool(String hostname) {
305         pool.remove(hostname);
306     }
307
308     @Override
309     public String getReadTopic() {
310         return readTopic;
311     }
312
313     @Override
314     public void setReadTopic(String readTopic) {
315         this.readTopic = readTopic;
316     }
317
318     @Override
319     public Set<String> getWriteTopics() {
320         return writeTopics;
321     }
322
323     @Override
324     public void setWriteTopics(Set<String> writeTopics) {
325         this.writeTopics = writeTopics;
326     }
327
328     @Override
329     public void clearCredentials() {
330         apiKey = null;
331         apiSecret = null;
332     }
333
334     @Override
335     public void setCredentials(String key, String secret) {
336         apiKey = key;
337         apiSecret = secret;
338     }
339 }