506593ed1689125e506d3b1ead49860ee78d8dcc
[appc.git] / appc-event-listener / appc-event-listener-bundle / src / main / java / org / openecomp / appc / listener / impl / EventHandlerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.openecomp.appc.listener.impl;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import org.openecomp.appc.adapter.factory.MessageService;
30 import org.openecomp.appc.adapter.message.Consumer;
31 import org.openecomp.appc.adapter.message.MessageAdapterFactory;
32 import org.openecomp.appc.adapter.message.Producer;
33 import org.openecomp.appc.listener.EventHandler;
34 import org.openecomp.appc.listener.ListenerProperties;
35 import org.openecomp.appc.listener.util.Mapper;
36 import org.openecomp.appc.logging.LoggingConstants;
37 import org.osgi.framework.BundleContext;
38 import org.osgi.framework.FrameworkUtil;
39 import org.osgi.framework.ServiceReference;
40 import org.slf4j.MDC;
41
42 import java.util.ArrayList;
43 import java.util.Collection;
44 import java.util.HashSet;
45 import java.util.List;
46 import java.util.Set;
47
48 /**
49  * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure
50  * that only well formed messages are sent and received on DMaaP.
51  */
52 public class EventHandlerImpl implements EventHandler {
53
54     private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventHandlerImpl.class);
55
56     /*
57      * The amount of time in seconds to keep a connection to a topic open while waiting for data
58      */
59     private int READ_TIMEOUT = 60;
60
61     /*
62      * The pool of hosts to query against
63      */
64     private Collection<String> pool;
65
66     /*
67      * The topic to read messages from
68      */
69     private String readTopic;
70
71     /*
72      * The topic to write messages to
73      */
74     private Set<String> writeTopics;
75
76     /*
77      * The client (group) name to use for reading messages
78      */
79     private String clientName;
80
81     /*
82      * The id of the client (group) that is reading messages
83      */
84     private String clientId;
85
86     /*
87      * The api public key to use for authentication
88      */
89     private String apiKey;
90
91     /*
92      * The api secret key to use for authentication
93      */
94     private String apiSecret;
95
96     /*
97      * A json object containing filter arguments.
98      */
99     private String filter_json;
100
101     private MessageService messageService;
102
103     private Consumer reader = null;
104     private Producer producer = null;
105
106     public EventHandlerImpl(ListenerProperties props) {
107         pool = new HashSet<>();
108         writeTopics = new HashSet<>();
109
110         if (props != null) {
111             readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ);
112             clientName = props.getProperty(ListenerProperties.KEYS.CLIENT_NAME, "APP-C");
113             clientId = props.getProperty(ListenerProperties.KEYS.CLIENT_ID, "0");
114             apiKey = props.getProperty(ListenerProperties.KEYS.AUTH_USER_KEY);
115             apiSecret = props.getProperty(ListenerProperties.KEYS.AUTH_SECRET_KEY);
116
117             filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER);
118
119             READ_TIMEOUT = Integer
120                     .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
121
122             String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS);
123             if (hostnames != null && !hostnames.isEmpty()) {
124                 for (String name : hostnames.split(",")) {
125                     pool.add(name);
126                 }
127             }
128
129             String writeTopicStr = props.getProperty(ListenerProperties.KEYS.TOPIC_WRITE);
130             if (writeTopicStr != null) {
131                 for (String topic : writeTopicStr.split(",")) {
132                     writeTopics.add(topic);
133                 }
134             }
135
136             messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE));
137
138             LOG.info(String.format(
139                     "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s",
140                     messageService, hostnames, readTopic, filter_json, writeTopics, apiKey));
141         }
142     }
143
144     @Override
145     public List<String> getIncomingEvents() {
146         return getIncomingEvents(1000);
147     }
148
149     @Override
150     public List<String> getIncomingEvents(int limit) {
151         List<String> out = new ArrayList<>();
152         LOG.info(String.format("Getting up to %d incoming events", limit));
153         // reuse the consumer object instead of creating a new one every time
154         if (reader == null) {
155             LOG.info("Getting Consumer...");
156             reader = getConsumer();
157         }
158         if (reader != null) {
159             List<String> items = reader.fetch(READ_TIMEOUT * 1000, limit);
160             for (String item : items) {
161                 out.add(item);
162             }
163         }
164         LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
165         return out;
166     }
167
168     @Override
169     public <T> List<T> getIncomingEvents(Class<T> cls) {
170         return getIncomingEvents(cls, 1000);
171     }
172
173     @Override
174     public <T> List<T> getIncomingEvents(Class<T> cls, int limit) {
175         List<String> incomingStrings = getIncomingEvents(limit);
176         return Mapper.mapList(incomingStrings, cls);
177     }
178
179     @Override
180     public void postStatus(String event) {
181         postStatus(null, event);
182     }
183
184     @Override
185     public void postStatus(String partition, String event) {
186         LOG.debug(String.format("Posting Message [%s]", event));
187         if (producer == null) {
188             LOG.info("Getting Producer...");
189             producer = getProducer();
190         }
191         producer.post(partition, event);
192     }
193
194     /**
195      * Returns a consumer object for direct access to our Cambria consumer interface
196      *
197      * @return An instance of the consumer interface
198      */
199     protected Consumer getConsumer() {
200         LOG.debug(String.format("Getting Consumer: %s  %s/%s/%s", pool, readTopic, clientName, clientId));
201         if (filter_json == null && writeTopics.contains(readTopic)) {
202             LOG.error(
203                     "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
204         }
205
206         Consumer out = null;
207         BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
208         if (ctx != null) {
209             ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
210             if (svcRef != null) {
211                 try {
212                     out = ((MessageAdapterFactory) ctx.getService(svcRef))
213                             .createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
214                 } catch (Exception e) {
215                     //TODO:create eelf message
216                     LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer", e);
217                 }
218                 if (out != null) {
219                     for (String url : pool) {
220                         if (url.contains("3905") || url.contains("https")) {
221                             out.useHttps(true);
222                             break;
223                         }
224                     }
225                 }
226             }
227         }
228         return out;
229     }
230
231     /**
232      * Returns a consumer object for direct access to our Cambria producer interface
233      *
234      * @return An instance of the producer interface
235      */
236     protected Producer getProducer() {
237         LOG.debug(String.format("Getting Producer: %s  %s", pool, readTopic));
238
239         Producer out = null;
240         BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
241         if (ctx != null) {
242             ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
243             if (svcRef != null) {
244                 out = ((MessageAdapterFactory) ctx.getService(svcRef))
245                         .createProducer(pool, writeTopics, apiKey, apiSecret);
246                 if (out != null) {
247                     for (String url : pool) {
248                         if (url.contains("3905") || url.contains("https")) {
249                             out.useHttps(true);
250                             break;
251                         }
252                     }
253                 }
254             }
255         }
256         return out;
257     }
258
259     @Override
260     public void closeClients() {
261         LOG.debug("Closing Consumer and Producer DMaaP clients");
262         if (reader != null) {
263             reader.close();
264         }
265         if (producer != null) {
266             producer.close();
267         }
268     }
269
270     @Override
271     public String getClientId() {
272         return clientId;
273     }
274
275     @Override
276     public void setClientId(String clientId) {
277         this.clientId = clientId;
278     }
279
280     @Override
281     public String getClientName() {
282         return clientName;
283     }
284
285     @Override
286     public void setClientName(String clientName) {
287         this.clientName = clientName;
288         MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, clientName);
289     }
290
291     @Override
292     public void addToPool(String hostname) {
293         pool.add(hostname);
294     }
295
296     @Override
297     public Collection<String> getPool() {
298         return pool;
299     }
300
301     @Override
302     public void removeFromPool(String hostname) {
303         pool.remove(hostname);
304     }
305
306     @Override
307     public String getReadTopic() {
308         return readTopic;
309     }
310
311     @Override
312     public void setReadTopic(String readTopic) {
313         this.readTopic = readTopic;
314     }
315
316     @Override
317     public Set<String> getWriteTopics() {
318         return writeTopics;
319     }
320
321     @Override
322     public void setWriteTopics(Set<String> writeTopics) {
323         this.writeTopics = writeTopics;
324     }
325
326     @Override
327     public void clearCredentials() {
328         apiKey = null;
329         apiSecret = null;
330     }
331
332     @Override
333     public void setCredentials(String key, String secret) {
334         apiKey = key;
335         apiSecret = secret;
336     }
337 }