ee9f30ea712fe4e424f2be761760744335424d7e
[appc.git] / appc-event-listener / appc-event-listener-bundle / src / main / java / org / openecomp / appc / listener / impl / EventHandlerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * openECOMP : APP-C
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights
6  *                                              reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.openecomp.appc.listener.impl;
23 import com.att.eelf.configuration.EELFLogger;
24 import com.att.eelf.configuration.EELFManager;
25
26 import org.openecomp.appc.adapter.factory.DmaapMessageAdapterFactoryImpl;
27 import org.openecomp.appc.adapter.factory.MessageService;
28 import org.openecomp.appc.adapter.message.Consumer;
29 import org.openecomp.appc.adapter.message.MessageAdapterFactory;
30 import org.openecomp.appc.adapter.message.Producer;
31 import org.openecomp.appc.listener.EventHandler;
32 import org.openecomp.appc.listener.ListenerProperties;
33 import org.openecomp.appc.listener.util.Mapper;
34 import org.openecomp.appc.logging.LoggingConstants;
35 import org.osgi.framework.BundleContext;
36 import org.osgi.framework.FrameworkUtil;
37 import org.osgi.framework.ServiceReference;
38 import org.slf4j.MDC;
39
40 import java.util.*;
41
42 /**
43  * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure that only well formed
44  * messages are sent and received on DMaaP.
45  * 
46  */
47 public class EventHandlerImpl implements EventHandler {
48
49     private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventHandlerImpl.class);
50
51     /*
52      * The amount of time in seconds to keep a connection to a topic open while waiting for data
53      */
54     private int READ_TIMEOUT = 60;
55
56     /*
57      * The pool of hosts to query against
58      */
59     private Collection<String> pool;
60
61     /*
62      * The topic to read messages from
63      */
64     private String readTopic;
65
66     /*
67      * The topic to write messages to
68      */
69     private Set<String> writeTopics;
70
71     /*
72      * The client (group) name to use for reading messages
73      */
74     private String clientName;
75
76     /*
77      * The id of the client (group) that is reading messages
78      */
79     private String clientId;
80
81     /*
82      * The api public key to use for authentication
83      */
84     private String apiKey;
85
86     /*
87      * The api secret key to use for authentication
88      */
89     private String apiSecret;
90
91     /*
92      * A json object containing filter arguments.
93      */
94     private String filter_json;
95
96     private MessageService messageService;
97
98     private Consumer reader = null;
99     private Producer producer = null;
100     
101     public EventHandlerImpl(ListenerProperties props) {
102         pool = new HashSet<String>();
103         writeTopics = new HashSet<String>();
104
105         if (props != null) {
106             readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ);
107             clientName = props.getProperty(ListenerProperties.KEYS.CLIENT_NAME, "APP-C");
108             clientId = props.getProperty(ListenerProperties.KEYS.CLIENT_ID, "0");
109             apiKey = props.getProperty(ListenerProperties.KEYS.AUTH_USER_KEY);
110             apiSecret = props.getProperty(ListenerProperties.KEYS.AUTH_SECRET_KEY);
111
112             filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER);
113
114             READ_TIMEOUT = Integer
115                 .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
116
117             String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS);
118             if (hostnames != null && !hostnames.isEmpty()) {
119                 for (String name : hostnames.split(",")) {
120                     pool.add(name);
121                 }
122             }
123
124             String writeTopicStr = props.getProperty(ListenerProperties.KEYS.TOPIC_WRITE);
125             if (writeTopicStr != null) {
126                 for (String topic : writeTopicStr.split(",")) {
127                     writeTopics.add(topic);
128                 }
129             }
130
131             messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE));
132
133             LOG.info(String.format(
134                 "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s",
135                 messageService, hostnames, readTopic, filter_json, writeTopics, apiKey));
136         }
137     }
138
139     @Override
140     public List<String> getIncomingEvents() {
141         return getIncomingEvents(1000);
142     }
143
144     @Override
145     public List<String> getIncomingEvents(int limit) {
146         List<String> out = new ArrayList<String>();
147         LOG.info(String.format("Getting up to %d incoming events", limit));
148         // reuse the consumer object instead of creating a new one every time
149         if (reader == null) {
150                 LOG.info("Getting Consumer...");
151                 reader = getConsumer();
152         }
153         
154         List<String> items = null;
155         try{
156             items = reader.fetch(READ_TIMEOUT * 1000, limit);
157         }catch(Error r){
158             LOG.error("EvenHandlerImpl.getIncomingEvents",r);
159         }
160         
161         
162         for (String item : items) {
163             out.add(item);
164         }
165         LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
166         return out;
167     }
168
169     @Override
170     public <T> List<T> getIncomingEvents(Class<T> cls) {
171         return getIncomingEvents(cls, 1000);
172     }
173
174     @Override
175     public <T> List<T> getIncomingEvents(Class<T> cls, int limit) {
176         List<String> incomingStrings = getIncomingEvents(limit);
177         return Mapper.mapList(incomingStrings, cls);
178     }
179
180     @Override
181     public void postStatus(String event) {
182         postStatus(null, event);
183     }
184
185     @Override
186     public void postStatus(String partition, String event) {
187         LOG.debug(String.format("Posting Message [%s]", event));
188         if (producer == null) {
189                 LOG.info("Getting Producer...");
190                 producer = getProducer();
191         }
192         producer.post(partition, event);
193     }
194
195     /**
196      * Returns a consumer object for direct access to our Cambria consumer interface
197      * 
198      * @return An instance of the consumer interface
199      */
200     protected Consumer getConsumer() {
201         LOG.debug(String.format("Getting Consumer: %s  %s/%s/%s", pool, readTopic, clientName, clientId));
202         if (filter_json == null && writeTopics.contains(readTopic)) {
203             LOG.error(
204                 "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
205         }
206         
207         Consumer out=null;
208         BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
209         if (ctx != null) {
210                 ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
211                 if (svcRef != null) {
212                         try{
213                             out = ((MessageAdapterFactory) ctx.getService(svcRef)).createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
214                         }catch(Error e){
215                             //TODO:create eelf message
216                             LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer",e);
217                         }
218                         for (String url : pool) {
219                             if (url.contains("3905") || url.contains("https")) {
220                                 out.useHttps(true);
221                                 break;
222                             }
223                         }
224                 }
225         }
226         return out;
227     }
228
229     /**
230      * Returns a consumer object for direct access to our Cambria producer interface
231      * 
232      * @return An instance of the producer interface
233      */
234     protected Producer getProducer() {
235         LOG.debug(String.format("Getting Producer: %s  %s", pool, readTopic));
236
237         Producer out = null;
238         BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
239         if (ctx != null) {
240                 ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
241                 if (svcRef != null) {
242                         out = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopics,apiKey, apiSecret);
243                         for (String url : pool) {
244                             if (url.contains("3905") || url.contains("https")) {
245                                 out.useHttps(true);
246                                 break;
247                             }
248                         }
249                 }
250         }
251         return out;
252     }
253
254     @Override
255     public void closeClients() {
256         LOG.debug("Closing Consumer and Producer DMaaP clients");
257         if (reader != null) {
258                 reader.close();
259         }
260         if (producer != null) {
261                 producer.close();
262         }
263     }
264     
265     @Override
266     public String getClientId() {
267         return clientId;
268     }
269
270     @Override
271     public void setClientId(String clientId) {
272         this.clientId = clientId;
273     }
274
275     @Override
276     public String getClientName() {
277         return clientName;
278     }
279
280     @Override
281     public void setClientName(String clientName) {
282         this.clientName = clientName;
283         MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, clientName);
284     }
285
286     @Override
287     public void addToPool(String hostname) {
288         pool.add(hostname);
289     }
290
291     @Override
292     public Collection<String> getPool() {
293         return pool;
294     }
295
296     @Override
297     public void removeFromPool(String hostname) {
298         pool.remove(hostname);
299     }
300
301     @Override
302     public String getReadTopic() {
303         return readTopic;
304     }
305
306     @Override
307     public void setReadTopic(String readTopic) {
308         this.readTopic = readTopic;
309     }
310
311     @Override
312     public Set<String> getWriteTopics() {
313         return writeTopics;
314     }
315
316     @Override
317     public void setWriteTopics(Set<String> writeTopics) {
318         this.writeTopics = writeTopics;
319     }
320
321     @Override
322     public void clearCredentials() {
323         apiKey = null;
324         apiSecret = null;
325     }
326
327     @Override
328     public void setCredentials(String key, String secret) {
329         apiKey = key;
330         apiSecret = secret;
331     }
332 }