Add Initial Code Import
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / metrics / publisher / impl / DMaaPCambriaConsumerImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.metrics.publisher.impl;
23
24 import java.io.IOException;
25 import java.io.UnsupportedEncodingException;
26 import java.net.MalformedURLException;
27 import java.net.URLEncoder;
28 import java.util.Collection;
29 import java.util.LinkedList;
30 import java.util.List;
31
32 import jline.internal.Log;
33
34 import org.json.JSONArray;
35 import org.json.JSONException;
36 import org.json.JSONObject;
37
38 import com.att.nsa.apiClient.http.HttpException;
39 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
40 import com.att.nsa.cambria.metrics.publisher.CambriaPublisherUtility;
41
42 /**
43  * 
44  * @author author
45  *
46  */
47 public class DMaaPCambriaConsumerImpl extends CambriaBaseClient
48                 implements com.att.nsa.cambria.metrics.publisher.CambriaConsumer {
49         private final String fTopic;
50         private final String fGroup;
51         private final String fId;
52         private final int fTimeoutMs;
53         private final int fLimit;
54         private final String fFilter;
55
56         /**
57          * 
58          * @param hostPart
59          * @param topic
60          * @param consumerGroup
61          * @param consumerId
62          * @param timeoutMs
63          * @param limit
64          * @param filter
65          * @param apiKey
66          * @param apiSecret
67          * @throws MalformedURLException 
68          */
69         public DMaaPCambriaConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
70                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) throws MalformedURLException {
71                 super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
72
73                 fTopic = topic;
74                 fGroup = consumerGroup;
75                 fId = consumerId;
76                 fTimeoutMs = timeoutMs;
77                 fLimit = limit;
78                 fFilter = filter;
79
80                 setApiCredentials(apiKey, apiSecret);
81         }
82
83         /**
84          * method converts String to list
85          * 
86          * @param str
87          * @return
88          */
89         public static List<String> stringToList(String str) {
90                 final LinkedList<String> set = new LinkedList<String>();
91                 if (str != null) {
92                         final String[] parts = str.trim().split(",");
93                         for (String part : parts) {
94                                 final String trimmed = part.trim();
95                                 if (trimmed.length() > 0) {
96                                         set.add(trimmed);
97                                 }
98                         }
99                 }
100                 return set;
101         }
102
103         @Override
104         public Iterable<String> fetch() throws IOException {
105                 // fetch with the timeout and limit set in constructor
106                 return fetch(fTimeoutMs, fLimit);
107         }
108
109         @Override
110         public Iterable<String> fetch(int timeoutMs, int limit) throws IOException {
111                 final LinkedList<String> msgs = new LinkedList<String>();
112
113                 final String urlPath = createUrlPath(timeoutMs, limit);
114
115                 getLog().info("UEB GET " + urlPath);
116                 try {
117                         final JSONObject o = get(urlPath);
118
119                         if (o != null) {
120                                 final JSONArray a = o.getJSONArray("result");
121                                 if (a != null) {
122                                         for (int i = 0; i < a.length(); i++) {
123                                                 msgs.add(a.getString(i));
124                                         }
125                                 }
126                         }
127                 } catch (HttpObjectNotFoundException e) {
128                         // this can happen if the topic is not yet created. ignore.
129                         Log.error("Failed due to topic is not yet created" + e);
130                 } catch (JSONException e) {
131                         // unexpected response
132                         reportProblemWithResponse();
133                         Log.error("Failed due to jsonException", e);
134                 } catch (HttpException e) {
135                         throw new IOException(e);
136                 }
137
138                 return msgs;
139         }
140
141         protected String createUrlPath(int timeoutMs, int limit) {
142                 final StringBuilder url = new StringBuilder(CambriaPublisherUtility.makeConsumerUrl(fTopic, fGroup, fId));
143                 final StringBuilder adds = new StringBuilder();
144                 if (timeoutMs > -1) {
145                         adds.append("timeout=").append(timeoutMs);
146                 }
147
148                 if (limit > -1) {
149                         if (adds.length() > 0) {
150                                 adds.append("&");
151                         }
152                         adds.append("limit=").append(limit);
153                 }
154                 if (fFilter != null && fFilter.length() > 0) {
155                         try {
156                                 if (adds.length() > 0) {
157                                         adds.append("&");
158                                 }
159                                 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
160                         } catch (UnsupportedEncodingException e) {
161                                 Log.error("Failed due to UnsupportedEncodingException" + e);
162                         }
163                 }
164                 if (adds.length() > 0) {
165                         url.append("?").append(adds.toString());
166                 }
167                 return url.toString();
168         }
169
170 }