listener bundle change for multiple ansible
[appc.git] / appc-event-listener / appc-event-listener-bundle / src / main / java / org / onap / appc / listener / impl / EventHandlerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.appc.listener.impl;
25
26 import com.att.eelf.configuration.EELFLogger;
27 import com.att.eelf.configuration.EELFManager;
28
29 import org.onap.appc.adapter.factory.DmaapMessageAdapterFactoryImpl;
30 import org.onap.appc.adapter.factory.MessageService;
31 import org.onap.appc.adapter.message.Consumer;
32 import org.onap.appc.adapter.message.MessageAdapterFactory;
33 import org.onap.appc.adapter.message.Producer;
34 import org.onap.appc.listener.EventHandler;
35 import org.onap.appc.listener.ListenerProperties;
36 import org.onap.appc.listener.util.Mapper;
37 import org.onap.appc.logging.LoggingConstants;
38 import org.osgi.framework.Bundle;
39 import org.osgi.framework.BundleContext;
40 import org.osgi.framework.FrameworkUtil;
41 import org.osgi.framework.ServiceReference;
42 import org.slf4j.MDC;
43
44 import java.util.ArrayList;
45 import java.util.Collection;
46 import java.util.HashSet;
47 import java.util.List;
48 import java.util.Set;
49
50 /**
51  * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure
52  * that only well formed messages are sent and received on DMaaP.
53  */
54 public class EventHandlerImpl implements EventHandler {
55
56     private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventHandlerImpl.class);
57
58     /*
59      * The amount of time in seconds to keep a connection to a topic open while waiting for data
60      */
61     private int READ_TIMEOUT = 60;
62
63     /*
64      * The pool of hosts to query against
65      */
66     private Collection<String> pool;
67
68     /*
69      * The topic to read messages from
70      */
71     private String readTopic;
72
73     /*
74      * The topic to write messages to
75      */
76     private Set<String> writeTopics;
77
78     /*
79      * The client (group) name to use for reading messages
80      */
81     private String clientName;
82
83     /*
84      * The id of the client (group) that is reading messages
85      */
86     private String clientId;
87
88     /*
89      * The api public key to use for authentication
90      */
91     private String apiKey;
92
93     /*
94      * The api secret key to use for authentication
95      */
96     private String apiSecret;
97
98     /*
99      * A json object containing filter arguments.
100      */
101     private String filter_json;
102
103
104     /*
105      * Blacklist time for a server with response problem in seconds
106      */
107     private String responseProblemBlacklistTime;
108
109     /*
110      *  Blacklist time for a server with server problem in seconds
111      */
112     private String serverProblemBlacklistTime;
113
114     /*
115      * Blacklist time for a server with DNS problem in seconds
116      */
117     private String dnsIssueBlacklistTime;
118
119     /*
120      * Blacklist time for a server with IO Exception problem in seconds
121      */
122     private String ioExceptionBlacklistTime;
123
124     private MessageService messageService;
125
126     private Consumer reader = null;
127     private Producer producer = null;
128
129     public EventHandlerImpl(ListenerProperties props) {
130         pool = new HashSet<>();
131         writeTopics = new HashSet<>();
132
133         if (props != null) {
134             readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ);
135             clientName = props.getProperty(ListenerProperties.KEYS.CLIENT_NAME, "APP-C");
136             clientId = props.getProperty(ListenerProperties.KEYS.CLIENT_ID, "0");
137             apiKey = props.getProperty(ListenerProperties.KEYS.AUTH_USER_KEY);
138             apiSecret = props.getProperty(ListenerProperties.KEYS.AUTH_SECRET_KEY);
139             responseProblemBlacklistTime = props.getProperty(ListenerProperties.KEYS.PROBLEM_WITH_RESPONSE_BLACKLIST_TIME);
140             serverProblemBlacklistTime = props.getProperty(ListenerProperties.KEYS.PROBLEM_SERVERSIDE_ERROR_BLACKLIST_TIME);
141             dnsIssueBlacklistTime = props.getProperty(ListenerProperties.KEYS.PROBLEM_DNS_BLACKLIST_TIME);
142             ioExceptionBlacklistTime = props.getProperty(ListenerProperties.KEYS.PROBLEM_IO_EXCEPTION_BLACKLIST_TIME);
143
144             filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER);
145
146             READ_TIMEOUT = Integer
147                     .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
148
149             String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS);
150             if (hostnames != null && !hostnames.isEmpty()) {
151                 for (String name : hostnames.split(",")) {
152                     pool.add(name);
153                 }
154             }
155
156             String writeTopicStr = props.getProperty(ListenerProperties.KEYS.TOPIC_WRITE);
157             if (writeTopicStr != null) {
158                 for (String topic : writeTopicStr.split(",")) {
159                     writeTopics.add(topic);
160                 }
161             }
162
163             messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE));
164
165             LOG.info(String.format(
166                     "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Writing to [%s]. Authenticated using %s",
167                     messageService, hostnames, readTopic, filter_json, writeTopics, apiKey));
168         }
169     }
170
171     @Override
172     public List<String> getIncomingEvents() {
173         return getIncomingEvents(1000);
174     }
175
176     @Override
177     public List<String> getIncomingEvents(int limit) {
178         List<String> out = new ArrayList<>();
179         LOG.info(String.format("Getting up to %d incoming events", limit));
180         // reuse the consumer object instead of creating a new one every time
181         if (reader == null) {
182             LOG.info("Getting Consumer...");
183             reader = getConsumer();
184         }
185         if (reader != null) {
186             List<String> items = reader.fetch(READ_TIMEOUT * 1000, limit);
187             for (String item : items) {
188                 out.add(item);
189             }
190         }
191         LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
192         return out;
193     }
194
195     @Override
196     public <T> List<T> getIncomingEvents(Class<T> cls) {
197         return getIncomingEvents(cls, 1000);
198     }
199
200     @Override
201     public <T> List<T> getIncomingEvents(Class<T> cls, int limit) {
202         List<String> incomingStrings = getIncomingEvents(limit);
203         return Mapper.mapList(incomingStrings, cls);
204     }
205
206     @Override
207     public void postStatus(String event) {
208         postStatus(null, event);
209     }
210
211     @Override
212     public void postStatus(String partition, String event) {
213         LOG.debug(String.format("Posting Message [%s]", event));
214         if (producer == null) {
215             LOG.info("Getting Producer...");
216             producer = getProducer();
217         }
218         producer.post(partition, event);
219     }
220
221     /**
222      * Returns a consumer object for direct access to our Cambria consumer interface
223      *
224      * @return An instance of the consumer interface
225      */
226     protected Consumer getConsumer() {
227         LOG.debug(String.format("Getting Consumer: %s  %s/%s/%s", pool, readTopic, clientName, clientId));
228         if (filter_json == null && writeTopics.contains(readTopic)) {
229             LOG.error(
230                     "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
231         }
232
233         Consumer out = null;
234         BundleContext ctx = null;
235         Bundle bundle = FrameworkUtil.getBundle(EventHandlerImpl.class);
236         if(bundle != null) {
237             ctx = bundle.getBundleContext();
238         }
239
240         if (ctx != null) {
241             ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
242             if (svcRef != null) {
243                 try {
244                     out = ((MessageAdapterFactory) ctx.getService(svcRef))
245                             .createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
246
247                     if (out != null && responseProblemBlacklistTime != null && responseProblemBlacklistTime.length() > 0)
248                     {
249                         out.setResponseProblemBlacklistTime(responseProblemBlacklistTime);
250                     }
251
252                     if (out != null && serverProblemBlacklistTime != null && serverProblemBlacklistTime.length() > 0)
253                     {
254                         out.setServerProblemBlacklistTime(serverProblemBlacklistTime);
255                     }
256
257                     if (out != null && dnsIssueBlacklistTime != null && dnsIssueBlacklistTime.length() > 0)
258                     {
259                         out.setDnsIssueBlacklistTime(dnsIssueBlacklistTime);
260                     }
261
262                     if (out != null && ioExceptionBlacklistTime != null && ioExceptionBlacklistTime.length() > 0)
263                     {
264                         out.setIOExceptionBlacklistTime(ioExceptionBlacklistTime);
265                     }
266                 } catch (Exception e) {
267                     //TODO:create eelf message
268                     LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer", e);
269                 }
270                 if (out != null) {
271                     for (String url : pool) {
272                         if (url.contains("3905") || url.contains("https")) {
273                             out.useHttps(true);
274                             break;
275                         }
276                     }
277                 }
278             }
279         }
280         return out;
281     }
282
283     /**
284      * Returns a consumer object for direct access to our Cambria producer interface
285      *
286      * @return An instance of the producer interface
287      */
288     protected Producer getProducer() {
289         LOG.debug(String.format("Getting Producer: %s  %s", pool, readTopic));
290
291         Producer out = null;
292         BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
293         if (ctx != null) {
294             ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
295             if (svcRef != null) {
296                 out = ((MessageAdapterFactory) ctx.getService(svcRef))
297                         .createProducer(pool, writeTopics, apiKey, apiSecret);
298                 if (out != null && responseProblemBlacklistTime != null && responseProblemBlacklistTime.length() > 0)
299                 {
300                     out.setResponseProblemBlacklistTime(responseProblemBlacklistTime);
301                 }
302
303                 if (out != null && serverProblemBlacklistTime != null && serverProblemBlacklistTime.length() > 0)
304                 {
305                     out.setServerProblemBlacklistTime(serverProblemBlacklistTime);
306                 }
307
308                 if (out != null && dnsIssueBlacklistTime != null && dnsIssueBlacklistTime.length() > 0)
309                 {
310                     out.setDnsIssueBlacklistTime(dnsIssueBlacklistTime);
311                 }
312
313                 if (out != null && ioExceptionBlacklistTime != null && ioExceptionBlacklistTime.length() > 0)
314                 {
315                     out.setIOExceptionBlacklistTime(ioExceptionBlacklistTime);
316                 }
317                 if (out != null) {
318                     for (String url : pool) {
319                         if (url.contains("3905") || url.contains("https")) {
320                             out.useHttps(true);
321                             break;
322                         }
323                     }
324                 }
325             }
326         }
327         return out;
328     }
329
330     @Override
331     public void closeClients() {
332         LOG.debug("Closing Consumer and Producer DMaaP clients");
333         if (reader != null) {
334             reader.close();
335         }
336         if (producer != null) {
337             producer.close();
338         }
339     }
340
341     @Override
342     public String getClientId() {
343         return clientId;
344     }
345
346     @Override
347     public void setClientId(String clientId) {
348         this.clientId = clientId;
349     }
350
351     @Override
352     public String getClientName() {
353         return clientName;
354     }
355
356     @Override
357     public void setClientName(String clientName) {
358         this.clientName = clientName;
359         MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, clientName);
360     }
361
362     @Override
363     public void addToPool(String hostname) {
364         pool.add(hostname);
365     }
366
367     @Override
368     public Collection<String> getPool() {
369         return pool;
370     }
371
372     @Override
373     public void removeFromPool(String hostname) {
374         pool.remove(hostname);
375     }
376
377     @Override
378     public String getReadTopic() {
379         return readTopic;
380     }
381
382     @Override
383     public void setReadTopic(String readTopic) {
384         this.readTopic = readTopic;
385     }
386
387     @Override
388     public Set<String> getWriteTopics() {
389         return writeTopics;
390     }
391
392     @Override
393     public void setWriteTopics(Set<String> writeTopics) {
394         this.writeTopics = writeTopics;
395     }
396
397     @Override
398     public void setResponseProblemBlacklistTime(String duration){
399         this.responseProblemBlacklistTime = duration;
400     }
401
402     @Override
403     public void setServerProblemBlacklistTime(String duration){
404         this.serverProblemBlacklistTime = duration;
405     }
406
407     @Override
408     public void setDnsIssueBlacklistTime(String duration){
409         this.dnsIssueBlacklistTime = duration;
410     }
411
412     @Override
413     public void setIOExceptionBlacklistTime(String duration){
414         this.ioExceptionBlacklistTime = duration;
415     }
416
417     @Override
418     public void clearCredentials() {
419         apiKey = null;
420         apiSecret = null;
421     }
422
423     @Override
424     public void setCredentials(String key, String secret) {
425         apiKey = key;
426         apiSecret = secret;
427     }
428 }