bump the version
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / metrics / publisher / impl / DMaaPCambriaConsumerImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.metrics.publisher.impl;
23
24 import java.io.IOException;
25 import java.io.UnsupportedEncodingException;
26 import java.net.MalformedURLException;
27 import java.net.URLEncoder;
28 import java.util.Collection;
29 import java.util.LinkedList;
30 import java.util.List;
31
32 import org.json.JSONArray;
33 import org.json.JSONException;
34 import org.json.JSONObject;
35
36 import com.att.dmf.mr.metrics.publisher.CambriaPublisherUtility;
37 import com.att.nsa.apiClient.http.HttpException;
38 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
39
40 import jline.internal.Log;
41
42 /**
43  * 
44  * @author anowarul.islam
45  *
46  */
47 public class DMaaPCambriaConsumerImpl extends CambriaBaseClient
48                 implements com.att.dmf.mr.metrics.publisher.CambriaConsumer {
49         private final String fTopic;
50         private final String fGroup;
51         private final String fId;
52         private final int fTimeoutMs;
53         private final int fLimit;
54         private final String fFilter;
55
56         /**
57          * 
58          * @param hostPart
59          * @param topic
60          * @param consumerGroup
61          * @param consumerId
62          * @param timeoutMs
63          * @param limit
64          * @param filter
65          * @param apiKey
66          * @param apiSecret
67          */
68         public DMaaPCambriaConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
69                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) throws MalformedURLException {
70                 super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
71
72                 fTopic = topic;
73                 fGroup = consumerGroup;
74                 fId = consumerId;
75                 fTimeoutMs = timeoutMs;
76                 fLimit = limit;
77                 fFilter = filter;
78
79                 setApiCredentials(apiKey, apiSecret);
80         }
81
82         /**
83          * method converts String to list
84          * 
85          * @param str
86          * @return
87          */
88         public static List<String> stringToList(String str) {
89                 final LinkedList<String> set = new LinkedList<String>();
90                 if (str != null) {
91                         final String[] parts = str.trim().split(",");
92                         for (String part : parts) {
93                                 final String trimmed = part.trim();
94                                 if (trimmed.length() > 0) {
95                                         set.add(trimmed);
96                                 }
97                         }
98                 }
99                 return set;
100         }
101
102         @Override
103         public Iterable<String> fetch() throws IOException {
104                 // fetch with the timeout and limit set in constructor
105                 return fetch(fTimeoutMs, fLimit);
106         }
107
108         @Override
109         public Iterable<String> fetch(int timeoutMs, int limit) throws IOException {
110                 final LinkedList<String> msgs = new LinkedList<String>();
111
112                 final String urlPath = createUrlPath(timeoutMs, limit);
113
114                 getLog().info("UEB GET " + urlPath);
115                 try {
116                         final JSONObject o = get(urlPath);
117
118                         if (o != null) {
119                                 final JSONArray a = o.getJSONArray("result");
120                                 if (a != null) {
121                                         for (int i = 0; i < a.length(); i++) {
122                                                 msgs.add(a.getString(i));
123                                         }
124                                 }
125                         }
126                 } catch (HttpObjectNotFoundException e) {
127                         // this can happen if the topic is not yet created. ignore.
128                         Log.error("Failed due to topic is not yet created" + e);
129                 } catch (JSONException e) {
130                         // unexpected response
131                         reportProblemWithResponse();
132                         Log.error("Failed due to jsonException", e);
133                 } catch (HttpException e) {
134                         throw new IOException(e);
135                 }
136
137                 return msgs;
138         }
139
140         public String createUrlPath(int timeoutMs, int limit) {
141                 final StringBuilder url = new StringBuilder(CambriaPublisherUtility.makeConsumerUrl(fTopic, fGroup, fId));
142                 final StringBuilder adds = new StringBuilder();
143                 if (timeoutMs > -1) {
144                         adds.append("timeout=").append(timeoutMs);
145                 }
146
147                 if (limit > -1) {
148                         if (adds.length() > 0) {
149                                 adds.append("&");
150                         }
151                         adds.append("limit=").append(limit);
152                 }
153                 if (fFilter != null && fFilter.length() > 0) {
154                         try {
155                                 if (adds.length() > 0) {
156                                         adds.append("&");
157                                 }
158                                 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
159                         } catch (UnsupportedEncodingException e) {
160                                 Log.error("Failed due to UnsupportedEncodingException" + e);
161                         }
162                 }
163                 if (adds.length() > 0) {
164                         url.append("?").append(adds.toString());
165                 }
166                 return url.toString();
167         }
168
169 }