878ffaa1114a9c42f024682e3c53331d0c25e498
[appc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22
23 package org.openecomp.appc.adapter.messaging.dmaap.http;
24
25 import java.net.URI;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Set;
31
32 import com.att.eelf.configuration.EELFLogger;
33 import com.att.eelf.configuration.EELFManager;
34
35 import org.apache.http.client.methods.CloseableHttpResponse;
36 import org.apache.http.client.methods.HttpPost;
37 import org.apache.http.entity.StringEntity;
38 import org.openecomp.appc.adapter.message.Producer;
39
40 public class HttpDmaapProducerImpl extends CommonHttpClient implements Producer {
41
42     private static final EELFLogger LOG = EELFManager.getInstance().getLogger(HttpDmaapProducerImpl.class);
43
44     private static final String CONTENT_TYPE = "application/cambria";
45     private static final String URL_TEMPLATE = "%s/events/%s";
46
47     private List<String> hosts;
48     private Set<String> topics;
49
50     private boolean useHttps = false;
51
52     public HttpDmaapProducerImpl(Collection<String> urls, String topicName) {
53         hosts = new ArrayList<String>();
54         topics = new HashSet<String>();
55         topics.add(topicName);
56
57         for (String host : urls) {
58             hosts.add(formatHostString(host));
59         }
60     }
61
62     public HttpDmaapProducerImpl(Collection<String> urls, Set<String> topicNames) {
63         hosts = new ArrayList<String>();
64         topics = topicNames;
65
66         for (String host : urls) {
67             hosts.add(formatHostString(host));
68         }
69     }
70
71     @Override
72     public void updateCredentials(String user, String pass) {
73         LOG.debug(String.format("Setting auth to %s for %s", user, this.toString()));
74         this.setBasicAuth(user, pass);
75     }
76
77     @Override
78     public boolean post(String partition, String data) {
79         int sent = 0;
80         try {
81             HttpPost request = postReq(null);
82             request.setHeader("Content-Type", CONTENT_TYPE);
83             request.setEntity(new StringEntity(bodyLine(partition, data)));
84
85             for (String topic : topics) {
86                 String uriStr = String.format(URL_TEMPLATE, hosts.get(0), topic);
87                 try {
88                     request.setURI(new URI(uriStr));
89                     CloseableHttpResponse response = getClient().execute(request);
90                     if (response.getStatusLine().getStatusCode() == 200) {
91                         sent++;
92                     }
93                     response.close();
94                 } catch (Exception sendEx) {
95                     LOG.error(String.format("Failed to send message to %s. Reason: %s", uriStr, sendEx.getMessage()),
96                         sendEx);
97                     if (hosts.size() > 1) {
98                         String failedUrl = hosts.remove(0);
99                         hosts.add(failedUrl);
100                         LOG.debug(String.format("Moving host %s to the end of the pool. New primary host is %s",
101                             failedUrl, hosts.get(0)));
102                     }
103                 }
104             }
105         } catch (Exception buildEx) {
106             LOG.error(
107                 String.format("Failed to build request with string [%s]. Message not sent to any topic. Reason: %s",
108                     data, buildEx.getMessage()),
109                 buildEx);
110         }
111         return sent == topics.size();
112     }
113
114     @Override
115     public void useHttps(boolean yes) {
116         useHttps = yes;
117     }
118
119     /**
120      * Format the body for the application/cambria content type with no partitioning.
121      *
122      * @param msg
123      *            The message body to format
124      * @return A string in the application/cambria content type
125      */
126     private String bodyLine(String partition, String msg) {
127         String p = (partition == null) ? "" : partition;
128         String m = (msg == null) ? "" : msg;
129         return String.format("%d.%d.%s%s", p.length(), m.length(), p, m);
130     }
131
132         @Override
133         public void close() {
134                 // Nothing to do                
135         }
136 }