Change nexus values to properties
[appc.git] / appc-event-listener / appc-event-listener-bundle / src / main / java / org / openecomp / appc / listener / impl / EventHandlerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * openECOMP : APP-C
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights
6  *                                              reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.openecomp.appc.listener.impl;
23
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Set;
29
30 import org.openecomp.appc.adapter.dmaap.Consumer;
31 import org.openecomp.appc.adapter.dmaap.DmaapConsumer;
32 import org.openecomp.appc.adapter.dmaap.DmaapProducer;
33 import org.openecomp.appc.adapter.dmaap.Producer;
34 import org.openecomp.appc.adapter.dmaap.DmaapConsumer;
35 import org.openecomp.appc.adapter.dmaap.DmaapProducer;
36 import org.openecomp.appc.listener.EventHandler;
37 import org.openecomp.appc.listener.ListenerProperties;
38 import org.openecomp.appc.listener.ListenerProperties.MessageService;
39 import org.openecomp.appc.listener.util.Mapper;
40 import org.openecomp.appc.logging.LoggingConstants;
41
42 import com.att.eelf.configuration.EELFLogger;
43 import com.att.eelf.configuration.EELFManager;
44 import org.slf4j.MDC;
45
46 /**
47  * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure that only well formed
48  * messages are sent and received on DMaaP.
49  * 
50  */
51 public class EventHandlerImpl implements EventHandler {
52
53     private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventHandlerImpl.class);
54
55     /*
56      * The amount of time in seconds to keep a connection to a topic open while waiting for data
57      */
58     private int READ_TIMEOUT = 60;
59
60     /*
61      * The pool of hosts to query against
62      */
63     private Collection<String> pool;
64
65     /*
66      * The topic to read messages from
67      */
68     private String readTopic;
69
70     /*
71      * The topic to write messages to
72      */
73     private Set<String> writeTopics;
74
75     /*
76      * The client (group) name to use for reading messages
77      */
78     private String clientName;
79
80     /*
81      * The id of the client (group) that is reading messages
82      */
83     private String clientId;
84
85     /*
86      * The api public key to use for authentication
87      */
88     private String apiKey;
89
90     /*
91      * The api secret key to use for authentication
92      */
93     private String apiSecret;
94
95     /*
96      * A json object containing filter arguments.
97      */
98     private String filter_json;
99
100     private MessageService messageService;
101
102     private Consumer reader = null;
103     private Producer producer = null;
104     
105     public EventHandlerImpl(ListenerProperties props) {
106         pool = new HashSet<String>();
107         writeTopics = new HashSet<String>();
108
109         if (props != null) {
110             readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ);
111             clientName = props.getProperty(ListenerProperties.KEYS.CLIENT_NAME, "APP-C");
112             clientId = props.getProperty(ListenerProperties.KEYS.CLIENT_ID, "0");
113             apiKey = props.getProperty(ListenerProperties.KEYS.AUTH_USER_KEY);
114             apiSecret = props.getProperty(ListenerProperties.KEYS.AUTH_SECRET_KEY);
115
116             filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER);
117
118             READ_TIMEOUT = Integer
119                 .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
120
121             String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS);
122             if (hostnames != null && !hostnames.isEmpty()) {
123                 for (String name : hostnames.split(",")) {
124                     pool.add(name);
125                 }
126             }
127
128             String writeTopicStr = props.getProperty(ListenerProperties.KEYS.TOPIC_WRITE);
129             if (writeTopicStr != null) {
130                 for (String topic : writeTopicStr.split(",")) {
131                     writeTopics.add(topic);
132                 }
133             }
134
135             messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE));
136
137             LOG.info(String.format(
138                 "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s",
139                 messageService, hostnames, readTopic, filter_json, writeTopics, apiKey));
140         }
141     }
142
143     @Override
144     public List<String> getIncomingEvents() {
145         return getIncomingEvents(1000);
146     }
147
148     @Override
149     public List<String> getIncomingEvents(int limit) {
150         List<String> out = new ArrayList<String>();
151         LOG.info(String.format("Getting up to %d incoming events", limit));
152         // reuse the consumer object instead of creating a new one every time
153         if (reader == null) {
154                 LOG.info("Getting Consumer...");
155                 reader = getConsumer();
156         }
157         for (String item : reader.fetch(READ_TIMEOUT * 1000, limit)) {
158             out.add(item);
159         }
160         LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
161         return out;
162     }
163
164     @Override
165     public <T> List<T> getIncomingEvents(Class<T> cls) {
166         return getIncomingEvents(cls, 1000);
167     }
168
169     @Override
170     public <T> List<T> getIncomingEvents(Class<T> cls, int limit) {
171         List<String> incomingStrings = getIncomingEvents(limit);
172         return Mapper.mapList(incomingStrings, cls);
173     }
174
175     @Override
176     public void postStatus(String event) {
177         postStatus(null, event);
178     }
179
180     @Override
181     public void postStatus(String partition, String event) {
182         LOG.debug(String.format("Posting Message [%s]", event));
183         if (producer == null) {
184                 LOG.info("Getting Producer...");
185                 producer = getProducer();
186         }
187         producer.post(partition, event);
188     }
189
190     /**
191      * Returns a consumer object for direct access to our Cambria consumer interface
192      * 
193      * @return An instance of the consumer interface
194      */
195     protected Consumer getConsumer() {
196         LOG.debug(String.format("Getting Consumer: %s  %s/%s/%s", pool, readTopic, clientName, clientId));
197         if (filter_json == null && writeTopics.contains(readTopic)) {
198             LOG.error(
199                 "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
200         }
201         Consumer out;
202         out = new DmaapConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
203         for (String url : pool) {
204             if (url.contains("3905") || url.contains("https")) {
205                 out.useHttps(true);
206                 break;
207             }
208         }
209         return out;
210     }
211
212     /**
213      * Returns a consumer object for direct access to our Cambria producer interface
214      * 
215      * @return An instance of the producer interface
216      */
217     protected Producer getProducer() {
218         LOG.debug(String.format("Getting Producer: %s  %s", pool, readTopic));
219
220         Producer out;
221         out = new DmaapProducer(pool,writeTopics);
222
223         if (apiKey != null && apiSecret != null) {
224             out.updateCredentials(apiKey, apiSecret);
225         }
226
227         for (String url : pool) {
228             if (url.contains("3905") || url.contains("https")) {
229                 out.useHttps(true);
230                 break;
231             }
232         }
233         return out;
234     }
235
236     @Override
237     public void closeClients() {
238         LOG.debug("Closing Consumer and Producer DMaaP clients");
239         switch (messageService) {
240             case DMaaP:
241                 if (reader != null) {
242                         ((DmaapConsumer) reader).close();
243                 }
244                 if (producer != null) {
245                         ((DmaapProducer) producer).close();
246                 }
247                 break;
248             default:
249                 // close DMaaP clients
250                 if (reader != null) {
251                         ((DmaapConsumer) reader).close();
252                 }
253                 if (producer != null) {
254                         ((DmaapProducer) producer).close();
255                 }
256         }
257     }
258     
259     @Override
260     public String getClientId() {
261         return clientId;
262     }
263
264     @Override
265     public void setClientId(String clientId) {
266         this.clientId = clientId;
267     }
268
269     @Override
270     public String getClientName() {
271         return clientName;
272     }
273
274     @Override
275     public void setClientName(String clientName) {
276         this.clientName = clientName;
277         MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, clientName);
278     }
279
280     @Override
281     public void addToPool(String hostname) {
282         pool.add(hostname);
283     }
284
285     @Override
286     public Collection<String> getPool() {
287         return pool;
288     }
289
290     @Override
291     public void removeFromPool(String hostname) {
292         pool.remove(hostname);
293     }
294
295     @Override
296     public String getReadTopic() {
297         return readTopic;
298     }
299
300     @Override
301     public void setReadTopic(String readTopic) {
302         this.readTopic = readTopic;
303     }
304
305     @Override
306     public Set<String> getWriteTopics() {
307         return writeTopics;
308     }
309
310     @Override
311     public void setWriteTopics(Set<String> writeTopics) {
312         this.writeTopics = writeTopics;
313     }
314
315     @Override
316     public void clearCredentials() {
317         apiKey = null;
318         apiSecret = null;
319     }
320
321     @Override
322     public void setCredentials(String key, String secret) {
323         apiKey = key;
324         apiSecret = secret;
325     }
326 }