Dmaap micro service jar
[appc.git] / services / appc-dmaap-service / appc-event-listener-bundle / src / main / java / org / onap / appc / listener / impl / EventHandlerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * ================================================================================
9  * Modifications Copyright (C) 2019 Ericsson
10  * =============================================================================
11  * Licensed under the Apache License, Version 2.0 (the "License");
12  * you may not use this file except in compliance with the License.
13  * You may obtain a copy of the License at
14  *
15  *      http://www.apache.org/licenses/LICENSE-2.0
16  *
17  * Unless required by applicable law or agreed to in writing, software
18  * distributed under the License is distributed on an "AS IS" BASIS,
19  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  * See the License for the specific language governing permissions and
21  * limitations under the License.
22  *
23  * ============LICENSE_END=========================================================
24  */
25
26 package org.onap.appc.listener.impl;
27
28 import com.att.eelf.configuration.EELFLogger;
29 import com.att.eelf.configuration.EELFManager;
30
31 import org.onap.appc.adapter.factory.MessageService;
32 import org.onap.appc.adapter.message.Consumer;
33 import org.onap.appc.adapter.message.Producer;
34 import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapConsumerImpl;
35 import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl;
36 import org.onap.appc.listener.EventHandler;
37 import org.onap.appc.listener.ListenerProperties;
38 import org.onap.appc.listener.util.Mapper;
39 import org.onap.appc.logging.LoggingConstants;
40 import org.slf4j.MDC;
41
42 import java.util.ArrayList;
43 import java.util.Collection;
44 import java.util.HashSet;
45 import java.util.List;
46
47 /**
48  * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure
49  * that only well formed messages are sent and received on DMaaP.
50  */
51 public class EventHandlerImpl implements EventHandler {
52
53     private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventHandlerImpl.class);
54
55     /*
56      * The amount of time in seconds to keep a connection to a topic open while waiting for data
57      */
58     private int READ_TIMEOUT = 60;
59
60     /*
61      * The pool of hosts to query against
62      */
63     private Collection<String> pool;
64
65     /*
66      * The topic to read messages from
67      */
68     private String readTopic;
69
70     /*
71      * The topic to write messages to
72      */
73     private String writeTopic;
74
75     /*
76      * The client (group) name to use for reading messages
77      */
78     private String clientName;
79
80     /*
81      * The id of the client (group) that is reading messages
82      */
83     private String clientId;
84
85     /*
86      * The api public key to use for authentication
87      */
88     private String apiKey;
89
90     /*
91      * The api secret key to use for authentication
92      */
93     private String apiSecret;
94
95     /*
96      * A json object containing filter arguments.
97      */
98     private String filter_json;
99
100
101     /*
102      * Blacklist time for a server with response problem in seconds
103      */
104     private String responseProblemBlacklistTime;
105
106     /*
107      *  Blacklist time for a server with server problem in seconds
108      */
109     private String serverProblemBlacklistTime;
110
111     /*
112      * Blacklist time for a server with DNS problem in seconds
113      */
114     private String dnsIssueBlacklistTime;
115
116     /*
117      * Blacklist time for a server with IO Exception problem in seconds
118      */
119     private String ioExceptionBlacklistTime;
120
121     private MessageService messageService;
122
123     private Consumer reader = null;
124     private Producer producer = null;
125
126     public EventHandlerImpl(ListenerProperties props) {
127         pool = new HashSet<>();
128
129         if (props != null) {
130             readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ);
131             clientName = props.getProperty(ListenerProperties.KEYS.CLIENT_NAME, "APP-C");
132             clientId = props.getProperty(ListenerProperties.KEYS.CLIENT_ID, "0");
133             apiKey = props.getProperty(ListenerProperties.KEYS.AUTH_USER_KEY);
134             apiSecret = props.getProperty(ListenerProperties.KEYS.AUTH_SECRET_KEY);
135             responseProblemBlacklistTime = props.getProperty(ListenerProperties.KEYS.PROBLEM_WITH_RESPONSE_BLACKLIST_TIME);
136             serverProblemBlacklistTime = props.getProperty(ListenerProperties.KEYS.PROBLEM_SERVERSIDE_ERROR_BLACKLIST_TIME);
137             dnsIssueBlacklistTime = props.getProperty(ListenerProperties.KEYS.PROBLEM_DNS_BLACKLIST_TIME);
138             ioExceptionBlacklistTime = props.getProperty(ListenerProperties.KEYS.PROBLEM_IO_EXCEPTION_BLACKLIST_TIME);
139
140             filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER);
141
142             READ_TIMEOUT = Integer
143                     .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
144
145             String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS);
146             if (hostnames != null && !hostnames.isEmpty()) {
147                 for (String name : hostnames.split(",")) {
148                     pool.add(name);
149                 }
150             }
151
152             String writeTopicStr = props.getProperty(ListenerProperties.KEYS.TOPIC_WRITE);
153             if (writeTopicStr != null) {
154                 for (String topic : writeTopicStr.split(",")) {
155                     writeTopic = topic;
156                 }
157             }
158
159             messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE));
160
161             LOG.info(String.format(
162                     "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Writing to [%s]. Authenticated using %s",
163                     messageService, hostnames, readTopic, filter_json, writeTopic, apiKey));
164         }
165     }
166
167     @Override
168     public List<String> getIncomingEvents() {
169         return getIncomingEvents(1000);
170     }
171
172     @Override
173     public List<String> getIncomingEvents(int limit) {
174         List<String> out = new ArrayList<>();
175         LOG.info(String.format("Getting up to %d incoming events", limit));
176         // reuse the consumer object instead of creating a new one every time
177         if (reader == null) {
178             LOG.info("Getting Consumer...");
179             reader = getConsumer();
180         }
181         if (reader != null) {
182             List<String> items = reader.fetch(READ_TIMEOUT * 1000, limit);
183             for (String item : items) {
184                 out.add(item);
185             }
186         }
187         LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
188         return out;
189     }
190
191     @Override
192     public <T> List<T> getIncomingEvents(Class<T> cls) {
193         return getIncomingEvents(cls, 1000);
194     }
195
196     @Override
197     public <T> List<T> getIncomingEvents(Class<T> cls, int limit) {
198         List<String> incomingStrings = getIncomingEvents(limit);
199         return Mapper.mapList(incomingStrings, cls);
200     }
201
202     @Override
203     public void postStatus(String event) {
204         postStatus(null, event);
205     }
206
207     @Override
208     public void postStatus(String partition, String event) {
209         LOG.debug(String.format("Posting Message [%s]", event));
210         if (producer == null) {
211             LOG.info("Getting Producer...");
212             producer = getProducer();
213         }
214         producer.post(partition, event);
215     }
216
217     /**
218      * Returns a consumer object for direct access to our Cambria consumer interface
219      *
220      * @return An instance of the consumer interface
221      */
222     protected Consumer getConsumer() {
223         LOG.debug(String.format("Getting Consumer: %s  %s/%s/%s", pool, readTopic, clientName, clientId));
224         if (filter_json == null && writeTopic.equals(readTopic)) {
225             LOG.error(
226                     "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
227         }
228
229         Consumer out = null;
230         out = new HttpDmaapConsumerImpl(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
231
232                     if (out != null && responseProblemBlacklistTime != null && responseProblemBlacklistTime.length() > 0)
233                     {
234                         out.setResponseProblemBlacklistTime(responseProblemBlacklistTime);
235                     }
236
237                     if (out != null && serverProblemBlacklistTime != null && serverProblemBlacklistTime.length() > 0)
238                     {
239                         out.setServerProblemBlacklistTime(serverProblemBlacklistTime);
240                     }
241
242                     if (out != null && dnsIssueBlacklistTime != null && dnsIssueBlacklistTime.length() > 0)
243                     {
244                         out.setDnsIssueBlacklistTime(dnsIssueBlacklistTime);
245                     }
246
247                     if (out != null && ioExceptionBlacklistTime != null && ioExceptionBlacklistTime.length() > 0)
248                     {
249                         out.setIOExceptionBlacklistTime(ioExceptionBlacklistTime);
250                     }
251                 if (out != null) {
252                     for (String url : pool) {
253                         if (url.contains("3905") || url.contains("https")) {
254                             out.useHttps(true);
255                             break;
256                         }
257                     }
258                 }
259         return out;
260     }
261
262     /**
263      * Returns a consumer object for direct access to our Cambria producer interface
264      *
265      * @return An instance of the producer interface
266      */
267     protected Producer getProducer() {
268         LOG.debug(String.format("Getting Producer: %s  %s", pool, readTopic));
269
270         Producer out = null;
271         out = new HttpDmaapProducerImpl(pool, writeTopic, apiKey, apiSecret);
272                 if (out != null && responseProblemBlacklistTime != null && responseProblemBlacklistTime.length() > 0)
273                 {
274                     out.setResponseProblemBlacklistTime(responseProblemBlacklistTime);
275                 }
276
277                 if (out != null && serverProblemBlacklistTime != null && serverProblemBlacklistTime.length() > 0)
278                 {
279                     out.setServerProblemBlacklistTime(serverProblemBlacklistTime);
280                 }
281
282                 if (out != null && dnsIssueBlacklistTime != null && dnsIssueBlacklistTime.length() > 0)
283                 {
284                     out.setDnsIssueBlacklistTime(dnsIssueBlacklistTime);
285                 }
286
287                 if (out != null && ioExceptionBlacklistTime != null && ioExceptionBlacklistTime.length() > 0)
288                 {
289                     out.setIOExceptionBlacklistTime(ioExceptionBlacklistTime);
290                 }
291                 if (out != null) {
292                     for (String url : pool) {
293                         if (url.contains("3905") || url.contains("https")) {
294                             out.useHttps(true);
295                             break;
296                         }
297                     }
298                 }
299         return out;
300     }
301
302     @Override
303     public void closeClients() {
304         LOG.debug("Closing Consumer and Producer DMaaP clients");
305         if (reader != null) {
306             reader.close();
307         }
308         if (producer != null) {
309             producer.close();
310         }
311     }
312
313     @Override
314     public String getClientId() {
315         return clientId;
316     }
317
318     @Override
319     public void setClientId(String clientId) {
320         this.clientId = clientId;
321     }
322
323     @Override
324     public String getClientName() {
325         return clientName;
326     }
327
328     @Override
329     public void setClientName(String clientName) {
330         this.clientName = clientName;
331         MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, clientName);
332     }
333
334     @Override
335     public void addToPool(String hostname) {
336         pool.add(hostname);
337     }
338
339     @Override
340     public Collection<String> getPool() {
341         return pool;
342     }
343
344     @Override
345     public void removeFromPool(String hostname) {
346         pool.remove(hostname);
347     }
348
349     @Override
350     public String getReadTopic() {
351         return readTopic;
352     }
353
354     @Override
355     public void setReadTopic(String readTopic) {
356         this.readTopic = readTopic;
357     }
358
359     @Override
360     public String getWriteTopic() {
361         return writeTopic;
362     }
363
364     @Override
365     public void setWriteTopic(String writeTopic) {
366         this.writeTopic = writeTopic;
367     }
368
369     @Override
370     public void setResponseProblemBlacklistTime(String duration){
371         this.responseProblemBlacklistTime = duration;
372     }
373
374     @Override
375     public void setServerProblemBlacklistTime(String duration){
376         this.serverProblemBlacklistTime = duration;
377     }
378
379     @Override
380     public void setDnsIssueBlacklistTime(String duration){
381         this.dnsIssueBlacklistTime = duration;
382     }
383
384     @Override
385     public void setIOExceptionBlacklistTime(String duration){
386         this.ioExceptionBlacklistTime = duration;
387     }
388
389     @Override
390     public void clearCredentials() {
391         apiKey = null;
392         apiSecret = null;
393     }
394
395     @Override
396     public void setCredentials(String key, String secret) {
397         apiKey = key;
398         apiSecret = secret;
399     }
400 }