2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights
 
   7  * ================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *      http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  22 package org.openecomp.appc.listener.impl;
 
  24 import java.util.ArrayList;
 
  25 import java.util.Collection;
 
  26 import java.util.HashSet;
 
  27 import java.util.List;
 
  30 import org.openecomp.appc.adapter.dmaap.Consumer;
 
  31 import org.openecomp.appc.adapter.dmaap.DmaapConsumer;
 
  32 import org.openecomp.appc.adapter.dmaap.DmaapProducer;
 
  33 import org.openecomp.appc.adapter.dmaap.Producer;
 
  34 import org.openecomp.appc.adapter.dmaap.DmaapConsumer;
 
  35 import org.openecomp.appc.adapter.dmaap.DmaapProducer;
 
  36 import org.openecomp.appc.listener.EventHandler;
 
  37 import org.openecomp.appc.listener.ListenerProperties;
 
  38 import org.openecomp.appc.listener.ListenerProperties.MessageService;
 
  39 import org.openecomp.appc.listener.util.Mapper;
 
  40 import org.openecomp.appc.logging.LoggingConstants;
 
  42 import com.att.eelf.configuration.EELFLogger;
 
  43 import com.att.eelf.configuration.EELFManager;
 
  47  * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure that only well formed
 
  48  * messages are sent and received on DMaaP.
 
  51 public class EventHandlerImpl implements EventHandler {
 
  53     private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventHandlerImpl.class);
 
  56      * The amount of time in seconds to keep a connection to a topic open while waiting for data
 
  58     private int READ_TIMEOUT = 60;
 
  61      * The pool of hosts to query against
 
  63     private Collection<String> pool;
 
  66      * The topic to read messages from
 
  68     private String readTopic;
 
  71      * The topic to write messages to
 
  73     private Set<String> writeTopics;
 
  76      * The client (group) name to use for reading messages
 
  78     private String clientName;
 
  81      * The id of the client (group) that is reading messages
 
  83     private String clientId;
 
  86      * The api public key to use for authentication
 
  88     private String apiKey;
 
  91      * The api secret key to use for authentication
 
  93     private String apiSecret;
 
  96      * A json object containing filter arguments.
 
  98     private String filter_json;
 
 100     private MessageService messageService;
 
 102     private Consumer reader = null;
 
 103     private Producer producer = null;
 
 105     public EventHandlerImpl(ListenerProperties props) {
 
 106         pool = new HashSet<String>();
 
 107         writeTopics = new HashSet<String>();
 
 110             readTopic = props.getProperty(ListenerProperties.KEYS.TOPIC_READ);
 
 111             clientName = props.getProperty(ListenerProperties.KEYS.CLIENT_NAME, "APP-C");
 
 112             clientId = props.getProperty(ListenerProperties.KEYS.CLIENT_ID, "0");
 
 113             apiKey = props.getProperty(ListenerProperties.KEYS.AUTH_USER_KEY);
 
 114             apiSecret = props.getProperty(ListenerProperties.KEYS.AUTH_SECRET_KEY);
 
 116             filter_json = props.getProperty(ListenerProperties.KEYS.TOPIC_READ_FILTER);
 
 118             READ_TIMEOUT = Integer
 
 119                 .valueOf(props.getProperty(ListenerProperties.KEYS.TOPIC_READ_TIMEOUT, String.valueOf(READ_TIMEOUT)));
 
 121             String hostnames = props.getProperty(ListenerProperties.KEYS.HOSTS);
 
 122             if (hostnames != null && !hostnames.isEmpty()) {
 
 123                 for (String name : hostnames.split(",")) {
 
 128             String writeTopicStr = props.getProperty(ListenerProperties.KEYS.TOPIC_WRITE);
 
 129             if (writeTopicStr != null) {
 
 130                 for (String topic : writeTopicStr.split(",")) {
 
 131                     writeTopics.add(topic);
 
 135             messageService = MessageService.parse(props.getProperty(ListenerProperties.KEYS.MESSAGE_SERVICE));
 
 137             LOG.info(String.format(
 
 138                 "Configured to use %s client on host pool [%s]. Reading from [%s] filtered by %s. Wriring to [%s]. Authenticated using %s",
 
 139                 messageService, hostnames, readTopic, filter_json, writeTopics, apiKey));
 
 144     public List<String> getIncomingEvents() {
 
 145         return getIncomingEvents(1000);
 
 149     public List<String> getIncomingEvents(int limit) {
 
 150         List<String> out = new ArrayList<String>();
 
 151         LOG.info(String.format("Getting up to %d incoming events", limit));
 
 152         // reuse the consumer object instead of creating a new one every time
 
 153         if (reader == null) {
 
 154                 LOG.info("Getting Consumer...");
 
 155                 reader = getConsumer();
 
 157         for (String item : reader.fetch(READ_TIMEOUT * 1000, limit)) {
 
 160         LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
 
 165     public <T> List<T> getIncomingEvents(Class<T> cls) {
 
 166         return getIncomingEvents(cls, 1000);
 
 170     public <T> List<T> getIncomingEvents(Class<T> cls, int limit) {
 
 171         List<String> incomingStrings = getIncomingEvents(limit);
 
 172         return Mapper.mapList(incomingStrings, cls);
 
 176     public void postStatus(String event) {
 
 177         postStatus(null, event);
 
 181     public void postStatus(String partition, String event) {
 
 182         LOG.debug(String.format("Posting Message [%s]", event));
 
 183         if (producer == null) {
 
 184                 LOG.info("Getting Producer...");
 
 185                 producer = getProducer();
 
 187         producer.post(partition, event);
 
 191      * Returns a consumer object for direct access to our Cambria consumer interface
 
 193      * @return An instance of the consumer interface
 
 195     protected Consumer getConsumer() {
 
 196         LOG.debug(String.format("Getting Consumer: %s  %s/%s/%s", pool, readTopic, clientName, clientId));
 
 197         if (filter_json == null && writeTopics.contains(readTopic)) {
 
 199                 "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
 
 202         out = new DmaapConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
 
 203         for (String url : pool) {
 
 204             if (url.contains("3905") || url.contains("https")) {
 
 213      * Returns a consumer object for direct access to our Cambria producer interface
 
 215      * @return An instance of the producer interface
 
 217     protected Producer getProducer() {
 
 218         LOG.debug(String.format("Getting Producer: %s  %s", pool, readTopic));
 
 221         out = new DmaapProducer(pool,writeTopics);
 
 223         if (apiKey != null && apiSecret != null) {
 
 224             out.updateCredentials(apiKey, apiSecret);
 
 227         for (String url : pool) {
 
 228             if (url.contains("3905") || url.contains("https")) {
 
 237     public void closeClients() {
 
 238         LOG.debug("Closing Consumer and Producer DMaaP clients");
 
 239         switch (messageService) {
 
 241                 if (reader != null) {
 
 242                         ((DmaapConsumer) reader).close();
 
 244                 if (producer != null) {
 
 245                         ((DmaapProducer) producer).close();
 
 249                 // close DMaaP clients
 
 250                 if (reader != null) {
 
 251                         ((DmaapConsumer) reader).close();
 
 253                 if (producer != null) {
 
 254                         ((DmaapProducer) producer).close();
 
 260     public String getClientId() {
 
 265     public void setClientId(String clientId) {
 
 266         this.clientId = clientId;
 
 270     public String getClientName() {
 
 275     public void setClientName(String clientName) {
 
 276         this.clientName = clientName;
 
 277         MDC.put(LoggingConstants.MDCKeys.PARTNER_NAME, clientName);
 
 281     public void addToPool(String hostname) {
 
 286     public Collection<String> getPool() {
 
 291     public void removeFromPool(String hostname) {
 
 292         pool.remove(hostname);
 
 296     public String getReadTopic() {
 
 301     public void setReadTopic(String readTopic) {
 
 302         this.readTopic = readTopic;
 
 306     public Set<String> getWriteTopics() {
 
 311     public void setWriteTopics(Set<String> writeTopics) {
 
 312         this.writeTopics = writeTopics;
 
 316     public void clearCredentials() {
 
 322     public void setCredentials(String key, String secret) {