2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * Copyright (C) 2017 Amdocs
 
   7  * ================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *      http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
 
  23 package org.openecomp.appc.adapter.messaging.dmaap.impl;
 
  25 import com.att.eelf.configuration.EELFLogger;
 
  26 import com.att.eelf.configuration.EELFManager;
 
  27 import org.openecomp.sdnc.sli.SvcLogicContext;
 
  30 import java.util.concurrent.ConcurrentHashMap;
 
  32 import org.openecomp.appc.adapter.message.EventSender;
 
  33 import org.openecomp.appc.adapter.message.MessageDestination;
 
  34 import org.openecomp.appc.adapter.message.Producer;
 
  35 import org.openecomp.appc.adapter.message.event.EventHeader;
 
  36 import org.openecomp.appc.adapter.message.event.EventMessage;
 
  37 import org.openecomp.appc.adapter.message.event.EventStatus;
 
  38 import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl;
 
  39 import org.openecomp.appc.configuration.Configuration;
 
  40 import org.openecomp.appc.configuration.ConfigurationFactory;
 
  41 import org.openecomp.appc.exceptions.APPCException;
 
  43 public class EventSenderDmaapImpl implements EventSender
 
  45     private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class);
 
  46     public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write";
 
  47     public static final String DMAAP_USERNAME = "dmaap.appc.username";
 
  48     public static final String DMAAP_PASSWORD = "dmaap.appc.password";
 
  49     public static final String EVENT_POOL_MEMBERS = "dmaap.event.pool.members";
 
  51     private static Configuration configuration = ConfigurationFactory.getConfiguration();
 
  53     private Map<String,Producer> producerMap = new ConcurrentHashMap<>();
 
  55     public Map<String, Producer> getProducerMap() {
 
  59     public void setProducerMap(Map<String, Producer> producerMap) {
 
  60         this.producerMap = producerMap;
 
  63     public EventSenderDmaapImpl(){
 
  67     public void initialize(){
 
  68         Properties properties = configuration.getProperties();
 
  72         final List<String> pool = new ArrayList<>();
 
  74         for(MessageDestination destination: MessageDestination.values()){
 
  75             writeTopic = properties.getProperty(destination + "." +  EVENT_TOPIC_WRITE);
 
  76             username = properties.getProperty(destination + "." + DMAAP_USERNAME);
 
  77             password = properties.getProperty(destination + "." + DMAAP_PASSWORD);
 
  78             String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS);
 
  80             if (hostNames != null && !hostNames.isEmpty()) {
 
  81                 LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS));
 
  82                 Collections.addAll(pool, hostNames.split(","));
 
  85             LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
 
  86             LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE));
 
  87             LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
 
  88             Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password);
 
  90             for (String url : pool) {
 
  91                 if (url.contains("3905") || url.contains("https")) {
 
  92                     LOG.debug("Producer should use HTTPS");
 
  93                     producer.useHttps(true);
 
  97             producerMap.put(destination.toString(),producer);
 
 103     public boolean sendEvent(MessageDestination destination, EventMessage msg) {
 
 104         String jsonStr = msg.toJson();
 
 105         String id = msg.getEventHeader().getEventId();
 
 106         LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr));
 
 107         Producer producer = producerMap.get(destination.toString());
 
 108         return producer.post(id, jsonStr);
 
 112     public boolean sendEvent(MessageDestination destination, EventMessage msg, String eventTopicName) {
 
 113         String jsonStr = msg.toJson();
 
 114         String id = msg.getEventHeader().getEventId();
 
 115         LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr));
 
 116         Producer producer = createProducer(destination, eventTopicName);
 
 117         return producer.post(id, jsonStr);
 
 120     private Producer createProducer(MessageDestination destination, String eventTopicName) {
 
 121         Properties properties = configuration.getProperties();
 
 122         final List<String> pool = new ArrayList<>();
 
 123         String username = properties.getProperty(destination + "." + DMAAP_USERNAME);
 
 124         String password = properties.getProperty(destination + "." + DMAAP_PASSWORD);
 
 125         String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS);
 
 127         if (hostNames != null && !hostNames.isEmpty()) {
 
 128             LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS));
 
 129             Collections.addAll(pool, hostNames.split(","));
 
 132         LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
 
 133         LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE));
 
 134         LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
 
 135         Producer producer = new DmaapProducerImpl(pool, eventTopicName,username, password);
 
 137         for (String url : pool) {
 
 138             if (url.contains("3905") || url.contains("https")) {
 
 139                 LOG.debug("Producer should use HTTPS");
 
 140                 producer.useHttps(true);
 
 148     public boolean sendEvent(MessageDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException {
 
 150         if (params == null) {
 
 151             String message = "Parameters map is empty (null)";
 
 153             throw new APPCException(message);
 
 155         String eventTime = new Date(System.currentTimeMillis()).toString();
 
 156         String apiVer = params.get("apiVer");
 
 157         String eventId = params.get("eventId");
 
 158         String reason = params.get("reason");
 
 159         String entityId=params.get("entityId");
 
 161             reason=reason+"("+entityId+")";
 
 163         Integer code = Integer.getInteger(params.get("code"), 500);
 
 165         if (eventTime == null || apiVer == null || eventId == null || reason == null) {
 
 166             String message = String.format("Missing input parameters: %s", params);
 
 168             throw new APPCException(message);
 
 170         EventMessage eventMessage = new EventMessage(
 
 171                         new EventHeader(eventTime, apiVer, eventId),
 
 172                         new EventStatus(code, reason));
 
 174         return sendEvent(destination,eventMessage);