2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.openecomp.appc.listener.impl;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.HashSet;
27 import java.util.List;
30 import org.openecomp.appc.adapter.dmaap.Consumer;
31 import org.openecomp.appc.adapter.dmaap.DmaapConsumer;
32 import org.openecomp.appc.adapter.dmaap.DmaapProducer;
33 import org.openecomp.appc.adapter.dmaap.Producer;
34 import org.openecomp.appc.adapter.dmaap.DmaapConsumer;
35 import org.openecomp.appc.adapter.dmaap.DmaapProducer;
36 import org.openecomp.appc.listener.EventHandler;
37 import org.openecomp.appc.listener.ListenerProperties;
38 import org.openecomp.appc.listener.ListenerProperties.MessageService;
39 import org.openecomp.appc.listener.util.Mapper;
40 import org.openecomp.appc.logging.LoggingConstants;
42 import com.att.eelf.configuration.EELFLogger;
43 import com.att.eelf.configuration.EELFManager;
47 * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure that only well formed
48 * messages are sent and received on DMaaP.
51 public class EventHandlerImpl implements EventHandler {
53 private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventHandlerImpl.class);
56 * The amount of time in seconds to keep a connection to a topic open while waiting for data
58 private int READ_TIMEOUT = 60;
61 * The pool of hosts to query against
63 private Collection<String> pool;
66 * The topic to read messages from
68 private String readTopic;
71 * The topic to write messages to
73 private Set<String> writeTopics;
76 * The client (group) name to use for reading messages
78 private String clientName;
81 * The id of the client (group) that is reading messages
83 private String clientId;
86 * The api public key to use for authentication
88 private String apiKey;
91 * The api secret key to use for authentication
93 private String apiSecret;
96 * A json object containing filter arguments.
98 private String filter_json;
100 private MessageService messageService;
102 private Consumer reader = null;
103 private Producer producer = null;
105 public EventHandlerImpl(ListenerProperties props) {
106 pool = new HashSet<String>();
107 writeTopics = new HashSet<String>();
110 readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ);
111 clientName = props.getProperty(ListenerProperties.KEYS.CLIENT_NAME, "APP-C");
112 clientId = props.getProperty(ListenerProperties.KEYS.CLIENT_ID, "0");
113 apiKey = props.getProperty(ListenerProperties.KEYS.AUTH_USER_KEY);
114 apiSecret = props.getProperty(ListenerProperties.KEYS.AUTH_SECRET_KEY);
116 filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER);
118 READ_TIMEOUT = Integer
119 .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
121 String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS);
122 if (hostnames != null && !hostnames.isEmpty()) {
123 for (String name : hostnames.split(",")) {
128 String writeTopicStr = props.getProperty(ListenerProperties.KEYS.TOPIC_WRITE);
129 if (writeTopicStr != null) {
130 for (String topic : writeTopicStr.split(",")) {
131 writeTopics.add(topic);
135 messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE));
137 LOG.info(String.format(
138 "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s",
139 messageService, hostnames, readTopic, filter_json, writeTopics, apiKey));
144 public List<String> getIncomingEvents() {
145 return getIncomingEvents(1000);
149 public List<String> getIncomingEvents(int limit) {
150 List<String> out = new ArrayList<String>();
151 LOG.info(String.format("Getting up to %d incoming events", limit));
152 // reuse the consumer object instead of creating a new one every time
153 if (reader == null) {
154 LOG.info("Getting Consumer...");
155 reader = getConsumer();
157 for (String item : reader.fetch(READ_TIMEOUT * 1000, limit)) {
160 LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
165 public <T> List<T> getIncomingEvents(Class<T> cls) {
166 return getIncomingEvents(cls, 1000);
170 public <T> List<T> getIncomingEvents(Class<T> cls, int limit) {
171 List<String> incomingStrings = getIncomingEvents(limit);
172 return Mapper.mapList(incomingStrings, cls);
176 public void postStatus(String event) {
177 postStatus(null, event);
181 public void postStatus(String partition, String event) {
182 LOG.debug(String.format("Posting Message [%s]", event));
183 if (producer == null) {
184 LOG.info("Getting Producer...");
185 producer = getProducer();
187 producer.post(partition, event);
191 * Returns a consumer object for direct access to our Cambria consumer interface
193 * @return An instance of the consumer interface
195 protected Consumer getConsumer() {
196 LOG.debug(String.format("Getting Consumer: %s %s/%s/%s", pool, readTopic, clientName, clientId));
197 if (filter_json == null && writeTopics.contains(readTopic)) {
199 "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
202 out = new DmaapConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
203 for (String url : pool) {
204 if (url.contains("3905") || url.contains("https")) {
213 * Returns a consumer object for direct access to our Cambria producer interface
215 * @return An instance of the producer interface
217 protected Producer getProducer() {
218 LOG.debug(String.format("Getting Producer: %s %s", pool, readTopic));
221 out = new DmaapProducer(pool,writeTopics);
223 if (apiKey != null && apiSecret != null) {
224 out.updateCredentials(apiKey, apiSecret);
227 for (String url : pool) {
228 if (url.contains("3905") || url.contains("https")) {
237 public void closeClients() {
238 LOG.debug("Closing Consumer and Producer DMaaP clients");
239 switch (messageService) {
241 if (reader != null) {
242 ((DmaapConsumer) reader).close();
244 if (producer != null) {
245 ((DmaapProducer) producer).close();
249 // close DMaaP clients
250 if (reader != null) {
251 ((DmaapConsumer) reader).close();
253 if (producer != null) {
254 ((DmaapProducer) producer).close();
260 public String getClientId() {
265 public void setClientId(String clientId) {
266 this.clientId = clientId;
270 public String getClientName() {
275 public void setClientName(String clientName) {
276 this.clientName = clientName;
277 MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, clientName);
281 public void addToPool(String hostname) {
286 public Collection<String> getPool() {
291 public void removeFromPool(String hostname) {
292 pool.remove(hostname);
296 public String getReadTopic() {
301 public void setReadTopic(String readTopic) {
302 this.readTopic = readTopic;
306 public Set<String> getWriteTopics() {
311 public void setWriteTopics(Set<String> writeTopics) {
312 this.writeTopics = writeTopics;
316 public void clearCredentials() {
322 public void setCredentials(String key, String secret) {