2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  22 package org.onap.appc.services.dmaapService;
 
  24 import java.util.HashMap;
 
  25 import java.util.HashSet;
 
  27 import java.util.Properties;
 
  29 import org.onap.appc.adapter.factory.DmaapMessageAdapterFactoryImpl;
 
  30 import org.onap.appc.configuration.Configuration;
 
  31 import org.onap.appc.configuration.ConfigurationFactory;
 
  32 import org.springframework.stereotype.Service;
 
  34 import com.att.eelf.configuration.EELFLogger;
 
  35 import com.att.eelf.configuration.EELFManager;
 
  37 import org.onap.appc.adapter.message.MessageAdapterFactory;
 
  38 import org.onap.appc.adapter.message.Producer;
 
  39 import org.onap.appc.adapter.messaging.dmaap.http.HttpDmaapProducerImpl;
 
  42 public class PublishService {
 
  44     private static final EELFLogger LOG = EELFManager.getInstance().getLogger(PublishService.class);
 
  46     private Map<String,Producer> producers;
 
  47     private MessageAdapterFactory factory;
 
  48     Configuration configuration;
 
  50     public PublishService() {
 
  51         this.factory = new DmaapMessageAdapterFactoryImpl();
 
  52         producers = new HashMap<>();
 
  55     public PublishService(MessageAdapterFactory factory) {
 
  56         this.factory = factory;
 
  57         producers = new HashMap<>();
 
  60     public String publishMessage(String key, String partition, String topic, String message) {
 
  61         Producer producer = getProducer(key, topic);
 
  62         if(producer == null) {
 
  63             return "Could not find producer with property prefix: " + key;
 
  65         boolean success = producer.post(partition, message);
 
  69         return "Failed. See dmaap service jar log.";
 
  72     private Producer getProducer(String key, String topic) {
 
  73         String searchKey = key;
 
  77         Producer producer = producers.get(searchKey);
 
  78         if(producer != null) {
 
  81         producer =  newProducer(key, topic);
 
  82         producers.put(searchKey,producer);
 
  86     private Producer newProducer(String key, String topic) {
 
  87         Configuration configuration;
 
  88         if(this.configuration != null) {
 
  89             configuration = this.configuration;
 
  91             configuration = ConfigurationFactory.getConfiguration();
 
  93         Properties props = configuration.getProperties();
 
  94         HashSet<String> pool = new HashSet<>();
 
  98                 writeTopic = props.getProperty(key + ".topic.write");
 
 102             String apiKey = props.getProperty(key + ".client.key");
 
 103             String apiSecret = props.getProperty(key + ".client.secret");
 
 104             String hostnames = props.getProperty(key + ".poolMembers");
 
 105             if (hostnames != null && !hostnames.isEmpty()) {
 
 106                 for (String name : hostnames.split(",")) {
 
 111                 LOG.error("There are no dmaap server pools. Check the property " + key + ".poolMembers");
 
 114             if(writeTopic == null || writeTopic.isEmpty()) {
 
 115                 LOG.error("There is no write topic defined in the message request or in the properties file");
 
 118             return factory.createProducer(pool, writeTopic, apiKey, apiSecret);