2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights
 
   7  * ================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *      http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  22 package org.openecomp.appc.adapter.dmaap;
 
  25 import java.util.ArrayList;
 
  26 import java.util.Collection;
 
  27 import java.util.HashSet;
 
  28 import java.util.List;
 
  31 import com.att.eelf.configuration.EELFLogger;
 
  32 import com.att.eelf.configuration.EELFManager;
 
  34 import org.apache.http.client.methods.CloseableHttpResponse;
 
  35 import org.apache.http.client.methods.HttpPost;
 
  36 import org.apache.http.entity.StringEntity;
 
  38 public class DmaapProducer extends CommonHttpClient implements Producer {
 
  40     private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapProducer.class);
 
  42     private static final String CONTENT_TYPE = "application/cambria";
 
  43     private static final String URL_TEMPLATE = "%s/events/%s";
 
  45     private List<String> hosts;
 
  46     private Set<String> topics;
 
  48     private boolean useHttps = false;
 
  50     public DmaapProducer(Collection<String> urls, String topicName) {
 
  51         hosts = new ArrayList<String>();
 
  52         topics = new HashSet<String>();
 
  53         topics.add(topicName);
 
  55         for (String host : urls) {
 
  56             hosts.add(formatHostString(host));
 
  60     public DmaapProducer(Collection<String> urls, Set<String> topicNames) {
 
  61         hosts = new ArrayList<String>();
 
  64         for (String host : urls) {
 
  65             hosts.add(formatHostString(host));
 
  70     public void updateCredentials(String user, String pass) {
 
  71         LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
 
  72         this.setBasicAuth(user, pass);
 
  76     public boolean post(String partition, String data) {
 
  79             HttpPost request = postReq(null);
 
  80             request.setHeader("Content-Type", CONTENT_TYPE);
 
  81             request.setEntity(new StringEntity(bodyLine(partition, data)));
 
  83             for (String topic : topics) {
 
  84                 String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic);
 
  86                     request.setURI(new URI(uriStr));
 
  87                     CloseableHttpResponse response = getClient().execute(request);
 
  88                     if (response.getStatusLine().getStatusCode() == 200) {
 
  92                 } catch (Exception sendEx) {
 
  93                     LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()),
 
  95                     if (hosts.size() > 1) {
 
  96                         String failedUrl = hosts.remove(0);
 
  98                         LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s",
 
  99                             failedUrl, hosts.get(0)));
 
 103         } catch (Exception buildEx) {
 
 105                 String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s",
 
 106                     data, buildEx.getMessage()),
 
 109         return sent == topics.size();
 
 113     public void useHttps(boolean yes) {
 
 118      * Format the body for the application/cambria content type with no partitioning.
 
 121      *            The message body to format
 
 122      * @return A string in the application/cambria content type
 
 124     private String bodyLine(String partition, String msg) {
 
 125         String p = (partition == null) ? "" : partition;
 
 126         String m = (msg == null) ? "" : msg;
 
 127         return String.format("%d.%d.%s%s", p.length(), m.length(), p, m);