2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.openecomp.appc.listener.impl;
23 import com.att.eelf.configuration.EELFLogger;
24 import com.att.eelf.configuration.EELFManager;
26 import org.openecomp.appc.adapter.factory.DmaapMessageAdapterFactoryImpl;
27 import org.openecomp.appc.adapter.factory.MessageService;
28 import org.openecomp.appc.adapter.message.Consumer;
29 import org.openecomp.appc.adapter.message.MessageAdapterFactory;
30 import org.openecomp.appc.adapter.message.Producer;
31 import org.openecomp.appc.listener.EventHandler;
32 import org.openecomp.appc.listener.ListenerProperties;
33 import org.openecomp.appc.listener.util.Mapper;
34 import org.openecomp.appc.logging.LoggingConstants;
35 import org.osgi.framework.BundleContext;
36 import org.osgi.framework.FrameworkUtil;
37 import org.osgi.framework.ServiceReference;
43 * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure that only well formed
44 * messages are sent and received on DMaaP.
47 public class EventHandlerImpl implements EventHandler {
49 private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventHandlerImpl.class);
52 * The amount of time in seconds to keep a connection to a topic open while waiting for data
54 private int READ_TIMEOUT = 60;
57 * The pool of hosts to query against
59 private Collection<String> pool;
62 * The topic to read messages from
64 private String readTopic;
67 * The topic to write messages to
69 private Set<String> writeTopics;
72 * The client (group) name to use for reading messages
74 private String clientName;
77 * The id of the client (group) that is reading messages
79 private String clientId;
82 * The api public key to use for authentication
84 private String apiKey;
87 * The api secret key to use for authentication
89 private String apiSecret;
92 * A json object containing filter arguments.
94 private String filter_json;
96 private MessageService messageService;
98 private Consumer reader = null;
99 private Producer producer = null;
101 public EventHandlerImpl(ListenerProperties props) {
102 pool = new HashSet<String>();
103 writeTopics = new HashSet<String>();
106 readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ);
107 clientName = props.getProperty(ListenerProperties.KEYS.CLIENT_NAME, "APP-C");
108 clientId = props.getProperty(ListenerProperties.KEYS.CLIENT_ID, "0");
109 apiKey = props.getProperty(ListenerProperties.KEYS.AUTH_USER_KEY);
110 apiSecret = props.getProperty(ListenerProperties.KEYS.AUTH_SECRET_KEY);
112 filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER);
114 READ_TIMEOUT = Integer
115 .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
117 String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS);
118 if (hostnames != null && !hostnames.isEmpty()) {
119 for (String name : hostnames.split(",")) {
124 String writeTopicStr = props.getProperty(ListenerProperties.KEYS.TOPIC_WRITE);
125 if (writeTopicStr != null) {
126 for (String topic : writeTopicStr.split(",")) {
127 writeTopics.add(topic);
131 messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE));
133 LOG.info(String.format(
134 "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s",
135 messageService, hostnames, readTopic, filter_json, writeTopics, apiKey));
140 public List<String> getIncomingEvents() {
141 return getIncomingEvents(1000);
145 public List<String> getIncomingEvents(int limit) {
146 List<String> out = new ArrayList<String>();
147 LOG.info(String.format("Getting up to %d incoming events", limit));
148 // reuse the consumer object instead of creating a new one every time
149 if (reader == null) {
150 LOG.info("Getting Consumer...");
151 reader = getConsumer();
154 List<String> items = null;
156 items = reader.fetch(READ_TIMEOUT * 1000, limit);
158 LOG.error("EvenHandlerImpl.getIncomingEvents",r);
162 for (String item : items) {
165 LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
170 public <T> List<T> getIncomingEvents(Class<T> cls) {
171 return getIncomingEvents(cls, 1000);
175 public <T> List<T> getIncomingEvents(Class<T> cls, int limit) {
176 List<String> incomingStrings = getIncomingEvents(limit);
177 return Mapper.mapList(incomingStrings, cls);
181 public void postStatus(String event) {
182 postStatus(null, event);
186 public void postStatus(String partition, String event) {
187 LOG.debug(String.format("Posting Message [%s]", event));
188 if (producer == null) {
189 LOG.info("Getting Producer...");
190 producer = getProducer();
192 producer.post(partition, event);
196 * Returns a consumer object for direct access to our Cambria consumer interface
198 * @return An instance of the consumer interface
200 protected Consumer getConsumer() {
201 LOG.debug(String.format("Getting Consumer: %s %s/%s/%s", pool, readTopic, clientName, clientId));
202 if (filter_json == null && writeTopics.contains(readTopic)) {
204 "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
208 BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
210 ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
211 if (svcRef != null) {
213 out = ((MessageAdapterFactory) ctx.getService(svcRef)).createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
215 //TODO:create eelf message
216 LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer",e);
218 for (String url : pool) {
219 if (url.contains("3905") || url.contains("https")) {
230 * Returns a consumer object for direct access to our Cambria producer interface
232 * @return An instance of the producer interface
234 protected Producer getProducer() {
235 LOG.debug(String.format("Getting Producer: %s %s", pool, readTopic));
238 BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
240 ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
241 if (svcRef != null) {
242 out = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopics,apiKey, apiSecret);
243 for (String url : pool) {
244 if (url.contains("3905") || url.contains("https")) {
255 public void closeClients() {
256 LOG.debug("Closing Consumer and Producer DMaaP clients");
257 if (reader != null) {
260 if (producer != null) {
266 public String getClientId() {
271 public void setClientId(String clientId) {
272 this.clientId = clientId;
276 public String getClientName() {
281 public void setClientName(String clientName) {
282 this.clientName = clientName;
283 MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, clientName);
287 public void addToPool(String hostname) {
292 public Collection<String> getPool() {
297 public void removeFromPool(String hostname) {
298 pool.remove(hostname);
302 public String getReadTopic() {
307 public void setReadTopic(String readTopic) {
308 this.readTopic = readTopic;
312 public Set<String> getWriteTopics() {
317 public void setWriteTopics(Set<String> writeTopics) {
318 this.writeTopics = writeTopics;
322 public void clearCredentials() {
328 public void setCredentials(String key, String secret) {