DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / metrics / publisher / impl / DMaaPCambriaConsumerImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.metrics.publisher.impl;
23
24 import com.att.nsa.apiClient.http.HttpException;
25 import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
26 import jline.internal.Log;
27 import org.json.JSONArray;
28 import org.json.JSONException;
29 import org.json.JSONObject;
30 import org.onap.dmaap.dmf.mr.metrics.publisher.CambriaPublisherUtility;
31
32 import java.io.IOException;
33 import java.io.UnsupportedEncodingException;
34 import java.net.MalformedURLException;
35 import java.net.URLEncoder;
36 import java.util.Collection;
37 import java.util.LinkedList;
38 import java.util.List;
39
40 /**
41  * 
42  * @author anowarul.islam
43  *
44  */
45 public class DMaaPCambriaConsumerImpl extends CambriaBaseClient
46                 implements org.onap.dmaap.dmf.mr.metrics.publisher.CambriaConsumer {
47         private final String fTopic;
48         private final String fGroup;
49         private final String fId;
50         private final int fTimeoutMs;
51         private final int fLimit;
52         private final String fFilter;
53
54         /**
55          * 
56          * @param hostPart
57          * @param topic
58          * @param consumerGroup
59          * @param consumerId
60          * @param timeoutMs
61          * @param limit
62          * @param filter
63          * @param apiKey
64          * @param apiSecret
65          */
66         public DMaaPCambriaConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
67                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) throws MalformedURLException {
68                 super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
69
70                 fTopic = topic;
71                 fGroup = consumerGroup;
72                 fId = consumerId;
73                 fTimeoutMs = timeoutMs;
74                 fLimit = limit;
75                 fFilter = filter;
76
77                 setApiCredentials(apiKey, apiSecret);
78         }
79
80         /**
81          * method converts String to list
82          * 
83          * @param str
84          * @return
85          */
86         public static List<String> stringToList(String str) {
87                 final LinkedList<String> set = new LinkedList<String>();
88                 if (str != null) {
89                         final String[] parts = str.trim().split(",");
90                         for (String part : parts) {
91                                 final String trimmed = part.trim();
92                                 if (trimmed.length() > 0) {
93                                         set.add(trimmed);
94                                 }
95                         }
96                 }
97                 return set;
98         }
99
100         @Override
101         public Iterable<String> fetch() throws IOException {
102                 // fetch with the timeout and limit set in constructor
103                 return fetch(fTimeoutMs, fLimit);
104         }
105
106         @Override
107         public Iterable<String> fetch(int timeoutMs, int limit) throws IOException {
108                 final LinkedList<String> msgs = new LinkedList<String>();
109
110                 final String urlPath = createUrlPath(timeoutMs, limit);
111
112                 getLog().info("UEB GET " + urlPath);
113                 try {
114                         final JSONObject o = get(urlPath);
115
116                         if (o != null) {
117                                 final JSONArray a = o.getJSONArray("result");
118                                 if (a != null) {
119                                         for (int i = 0; i < a.length(); i++) {
120                                                 msgs.add(a.getString(i));
121                                         }
122                                 }
123                         }
124                 } catch (HttpObjectNotFoundException e) {
125                         // this can happen if the topic is not yet created. ignore.
126                         Log.error("Failed due to topic is not yet created" + e);
127                 } catch (JSONException e) {
128                         // unexpected response
129                         reportProblemWithResponse();
130                         Log.error("Failed due to jsonException", e);
131                 } catch (HttpException e) {
132                         throw new IOException(e);
133                 }
134
135                 return msgs;
136         }
137
138         public String createUrlPath(int timeoutMs, int limit) {
139                 final StringBuilder url = new StringBuilder(CambriaPublisherUtility.makeConsumerUrl(fTopic, fGroup, fId));
140                 final StringBuilder adds = new StringBuilder();
141                 if (timeoutMs > -1) {
142                         adds.append("timeout=").append(timeoutMs);
143                 }
144
145                 if (limit > -1) {
146                         if (adds.length() > 0) {
147                                 adds.append("&");
148                         }
149                         adds.append("limit=").append(limit);
150                 }
151                 if (fFilter != null && fFilter.length() > 0) {
152                         try {
153                                 if (adds.length() > 0) {
154                                         adds.append("&");
155                                 }
156                                 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
157                         } catch (UnsupportedEncodingException e) {
158                                 Log.error("Failed due to UnsupportedEncodingException" + e);
159                         }
160                 }
161                 if (adds.length() > 0) {
162                         url.append("?").append(adds.toString());
163                 }
164                 return url.toString();
165         }
166
167 }