Changes to the DMaap Client
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRConsumerImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
23
24 import java.io.File;
25 import java.io.FileReader;
26 import java.io.IOException;
27 import java.io.UnsupportedEncodingException;
28 import java.net.MalformedURLException;
29 import java.net.URI;
30 import java.net.URISyntaxException;
31 import java.net.URLEncoder;
32 import java.util.Collection;
33 import java.util.HashMap;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Properties;
37
38 import org.apache.http.HttpException;
39 import org.apache.http.HttpStatus;
40 import org.json.JSONArray;
41 import org.json.JSONException;
42 import org.json.JSONObject;
43 import org.json.JSONTokener;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import com.att.aft.dme2.api.DME2Client;
48 import com.att.aft.dme2.api.DME2Exception;
49 import com.att.nsa.mr.client.HostSelector;
50 import com.att.nsa.mr.client.MRClientFactory;
51 import com.att.nsa.mr.client.MRConsumer;
52 import com.att.nsa.mr.client.response.MRConsumerResponse;
53 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
54
55 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
56
57         private static final String SUCCESS_MESSAGE = "Success";
58
59         private Logger log = LoggerFactory.getLogger(this.getClass().getName());
60
61         public static List<String> stringToList(String str) {
62                 final LinkedList<String> set = new LinkedList<String>();
63                 if (str != null) {
64                         final String[] parts = str.trim().split(",");
65                         for (String part : parts) {
66                                 final String trimmed = part.trim();
67                                 if (trimmed.length() > 0) {
68                                         set.add(trimmed);
69                                 }
70                         }
71                 }
72                 return set;
73         }
74
75         public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
76                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
77                         String apiSecret_password) throws MalformedURLException {
78                 this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
79                                 false);
80         }
81
82         public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
83                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
84                         boolean allowSelfSignedCerts) throws MalformedURLException {
85                 super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
86
87                 fTopic = topic;
88                 fGroup = consumerGroup;
89                 fId = consumerId;
90                 fTimeoutMs = timeoutMs;
91                 fLimit = limit;
92                 fFilter = filter;
93
94                 fHostSelector = new HostSelector(hostPart);
95         }
96
97         @Override
98         public Iterable<String> fetch() throws IOException, Exception {
99                 // fetch with the timeout and limit set in constructor
100                 return fetch(fTimeoutMs, fLimit);
101         }
102
103         @Override
104         public Iterable<String> fetch(int timeoutMs, int limit) throws IOException, Exception {
105                 final LinkedList<String> msgs = new LinkedList<String>();
106
107                 // FIXME: the timeout on the socket needs to be at least as long as the
108                 // long poll
109                 // // sanity check for long poll timeout vs. socket read timeout
110                 // final int maxReasonableTimeoutMs =
111                 // CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
112                 // if ( timeoutMs > maxReasonableTimeoutMs )
113                 // {
114                 // log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t.
115                 // socket read timeout (" +
116                 // CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll
117                 // timeout to " + maxReasonableTimeoutMs + "." );
118                 // timeoutMs = maxReasonableTimeoutMs;
119                 // }
120
121                 // final String urlPath = createUrlPath ( timeoutMs, limit );
122
123                 // getLog().info ( "UEB GET " + urlPath );
124                 try {
125                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
126                                 DMEConfigure(timeoutMs, limit);
127                                 try {
128                                         // getLog().info ( "Receiving msgs from: " +
129                                         // url+subContextPath );
130                                         String reply = sender.sendAndWait(timeoutMs + 10000L);
131                                         final JSONObject o = getResponseDataInJson(reply);
132                                         // msgs.add(reply);
133                                         if (o != null) {
134                                                 final JSONArray a = o.getJSONArray("result");
135                                                 // final int b = o.getInt("status" );
136                                                 // if ( a != null && a.length()>0 )
137                                                 if (a != null) {
138                                                         for (int i = 0; i < a.length(); i++) {
139                                                                 // msgs.add("DMAAP response status:
140                                                                 // "+Integer.toString(b));
141                                                                 if (a.get(i) instanceof String)
142                                                                         msgs.add(a.getString(i));
143                                                                 else
144                                                                         msgs.add(a.getJSONObject(i).toString());
145
146                                                         }
147                                                 }
148                                                 // else if(a != null && a.length()<1){
149                                                 // msgs.add ("[]");
150                                                 // }
151                                         }
152                                 } catch (JSONException e) {
153                                         // unexpected response
154                                         reportProblemWithResponse();
155                                         log.error("exception: ", e);
156                                 } catch (HttpException e) {
157                                         throw new IOException(e);
158                                 }
159                         }
160
161                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
162                                 // final String urlPath = createUrlPath
163                                 // (MRConstants.makeConsumerUrl ( host, fTopic, fGroup,
164                                 // fId,props.getProperty("Protocol")), timeoutMs, limit );
165                                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
166                                                 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
167
168                                 try {
169                                         final JSONObject o = get(urlPath, username, password, protocolFlag);
170
171                                         if (o != null) {
172                                                 final JSONArray a = o.getJSONArray("result");
173                                                 final int b = o.getInt("status");
174                                                 // if ( a != null && a.length()>0 )
175                                                 if (a != null) {
176                                                         for (int i = 0; i < a.length(); i++) {
177                                                                 // msgs.add("DMAAP response status:
178                                                                 // "+Integer.toString(b));
179                                                                 if (a.get(i) instanceof String)
180                                                                         msgs.add(a.getString(i));
181                                                                 else
182                                                                         msgs.add(a.getJSONObject(i).toString());
183
184                                                         }
185                                                 }
186                                                 // else if(a != null && a.length()<1)
187                                                 // {
188                                                 // msgs.add ("[]");
189                                                 // }
190                                         }
191                                 } catch (JSONException e) {
192                                         // unexpected response
193                                         reportProblemWithResponse();
194                                         log.error("exception: ", e);
195                                 } catch (HttpException e) {
196                                         throw new IOException(e);
197                                 }
198                         }
199
200                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
201                                 final String urlPath = createUrlPath(
202                                                 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
203                                                 timeoutMs, limit);
204
205                                 try {
206                                         final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
207                                         if (o != null) {
208                                                 final JSONArray a = o.getJSONArray("result");
209                                                 final int b = o.getInt("status");
210                                                 // if ( a != null && a.length()>0)
211                                                 if (a != null) {
212                                                         for (int i = 0; i < a.length(); i++) {
213                                                                 // msgs.add("DMAAP response status:
214                                                                 // "+Integer.toString(b));
215                                                                 if (a.get(i) instanceof String)
216                                                                         msgs.add(a.getString(i));
217                                                                 else
218                                                                         msgs.add(a.getJSONObject(i).toString());
219
220                                                         }
221                                                 }
222                                                 // else if(a != null && a.length()<1){
223                                                 // msgs.add ("[]");
224                                                 // }
225                                         }
226                                 } catch (JSONException e) {
227                                         // unexpected response
228                                         reportProblemWithResponse();
229                                         log.error("exception: ", e);
230                                 } catch (HttpException e) {
231                                         throw new IOException(e);
232                                 }
233
234                         }
235                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
236                                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
237                                                 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
238
239                                 try {
240                                         final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
241                                         if (o != null) {
242                                                 final JSONArray a = o.getJSONArray("result");
243                                                 final int b = o.getInt("status");
244                                                 // if ( a != null && a.length()>0)
245                                                 if (a != null) {
246                                                         for (int i = 0; i < a.length(); i++) {
247                                                                 // msgs.add("DMAAP response status:
248                                                                 // "+Integer.toString(b));
249                                                                 if (a.get(i) instanceof String)
250                                                                         msgs.add(a.getString(i));
251                                                                 else
252                                                                         msgs.add(a.getJSONObject(i).toString());
253
254                                                         }
255                                                 }
256
257                                         }
258                                 } catch (JSONException e) {
259                                         // unexpected response
260                                         reportProblemWithResponse();
261                                 } catch (HttpException e) {
262                                         throw new IOException(e);
263                                 }
264
265                         }
266
267                 } catch (JSONException e) {
268                         // unexpected response
269                         reportProblemWithResponse();
270                         log.error("exception: ", e);
271                 } catch (HttpException e) {
272                         throw new IOException(e);
273                 } catch (Exception e) {
274                         throw e;
275                 }
276
277                 return msgs;
278         }
279
280         private JSONObject getResponseDataInJson(String response) {
281                 try {
282
283                         // log.info("DMAAP response status: " + response.getStatus());
284
285                         // final String responseData = response.readEntity(String.class);
286                         JSONTokener jsonTokener = new JSONTokener(response);
287                         JSONObject jsonObject = null;
288                         final char firstChar = jsonTokener.next();
289                         jsonTokener.back();
290                         if ('[' == firstChar) {
291                                 JSONArray jsonArray = new JSONArray(jsonTokener);
292                                 jsonObject = new JSONObject();
293                                 jsonObject.put("result", jsonArray);
294                         } else {
295                                 jsonObject = new JSONObject(jsonTokener);
296                         }
297
298                         return jsonObject;
299                 } catch (JSONException excp) {
300                         // log.error("DMAAP - Error reading response data.", excp);
301                         return null;
302                 }
303
304         }
305
306         private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
307                 JSONTokener jsonTokener = new JSONTokener(response);
308                 JSONObject jsonObject = null;
309                 final char firstChar = jsonTokener.next();
310                 jsonTokener.back();
311                 if (null != response && response.length() == 0) {
312                         return null;
313                 }
314
315                 if ('[' == firstChar) {
316                         JSONArray jsonArray = new JSONArray(jsonTokener);
317                         jsonObject = new JSONObject();
318                         jsonObject.put("result", jsonArray);
319                 } else if ('{' == firstChar) {
320                         return null;
321                 } else if ('<' == firstChar) {
322                         return null;
323                 } else {
324                         jsonObject = new JSONObject(jsonTokener);
325                 }
326
327                 return jsonObject;
328
329         }
330
331         private final String fTopic;
332         private final String fGroup;
333         private final String fId;
334         private final int fTimeoutMs;
335         private final int fLimit;
336         private String fFilter;
337         private String username;
338         private String password;
339         private String host;
340         HostSelector fHostSelector = null;
341         private String latitude;
342         private String longitude;
343         private String version;
344         private String serviceName;
345         private String env;
346         private String partner;
347         private String routeOffer;
348         private String subContextPath;
349         private String protocol;
350         private String methodType;
351         private String url;
352         private String dmeuser;
353         private String dmepassword;
354         private String contenttype;
355         private DME2Client sender;
356         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
357         public String consumerFilePath;
358         private String authKey;
359         private String authDate;
360         private Properties props;
361         private HashMap<String, String> DMETimeOuts;
362         private String handlers;
363         public static final String routerFilePath = null;
364
365         public static String getRouterFilePath() {
366                 return routerFilePath;
367         }
368
369         public static void setRouterFilePath(String routerFilePath) {
370                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
371         }
372
373         public String getConsumerFilePath() {
374                 return consumerFilePath;
375         }
376
377         public void setConsumerFilePath(String consumerFilePath) {
378                 this.consumerFilePath = consumerFilePath;
379         }
380
381         public String getProtocolFlag() {
382                 return protocolFlag;
383         }
384
385         public void setProtocolFlag(String protocolFlag) {
386                 this.protocolFlag = protocolFlag;
387         }
388
389         private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
390                 latitude = props.getProperty("Latitude");
391                 longitude = props.getProperty("Longitude");
392                 version = props.getProperty("Version");
393                 serviceName = props.getProperty("ServiceName");
394                 env = props.getProperty("Environment");
395                 partner = props.getProperty("Partner");
396                 routeOffer = props.getProperty("routeOffer");
397
398                 subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
399                 // subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
400                 // if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath,
401                 // timeoutMs);
402
403                 protocol = props.getProperty("Protocol");
404                 methodType = props.getProperty("MethodType");
405                 dmeuser = props.getProperty("username");
406                 dmepassword = props.getProperty("password");
407                 contenttype = props.getProperty("contenttype");
408                 handlers = props.getProperty("sessionstickinessrequired");
409                 // url =protocol+"://DME2SEARCH/"+
410                 // "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
411                 // url = protocol +
412                 // "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
413
414                 /**
415                  * Changes to DME2Client url to use Partner for auto failover between
416                  * data centers When Partner value is not provided use the routeOffer
417                  * value for auto failover within a cluster
418                  */
419
420                 String preferredRouteKey = readRoute("preferredRouteKey");
421
422                 if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
423                         url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
424                                         + "&routeoffer=" + preferredRouteKey;
425                 } else if (partner != null && !partner.isEmpty()) {
426                         url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
427                 } else if (routeOffer != null && !routeOffer.isEmpty()) {
428                         url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
429                                         + routeOffer;
430                 }
431
432                 // log.info("url :"+url);
433
434                 if (timeoutMs != -1)
435                         url = url + "&timeout=" + timeoutMs;
436                 if (limit != -1)
437                         url = url + "&limit=" + limit;
438
439                 // Add filter to DME2 Url
440                 if (fFilter != null && fFilter.length() > 0)
441                         url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
442
443                 DMETimeOuts = new HashMap<String, String>();
444                 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
445                 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
446                 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
447                 DMETimeOuts.put("Content-Type", contenttype);
448                 System.setProperty("AFT_LATITUDE", latitude);
449                 System.setProperty("AFT_LONGITUDE", longitude);
450                 System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
451                 // System.setProperty("DME2.DEBUG", "true");
452
453                 // SSL changes
454                 // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
455                 // "SSLv3,TLSv1,TLSv1.1");
456                 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
457                 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
458                 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
459                 // SSL changes
460
461                 sender = new DME2Client(new URI(url), timeoutMs + 10000L);
462                 sender.setAllowAllHttpReturnCodes(true);
463                 sender.setMethod(methodType);
464                 sender.setSubContext(subContextPath);
465                 if (dmeuser != null && dmepassword != null) {
466                         sender.setCredentials(dmeuser, dmepassword);
467                 }
468                 sender.setHeaders(DMETimeOuts);
469                 sender.setPayload("");
470
471                 if (handlers.equalsIgnoreCase("yes")) {
472                         sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
473                                         props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
474                         sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
475                         sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
476                 } else {
477                         sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
478                 }
479         }
480
481         public Properties getProps() {
482                 return props;
483         }
484
485         public void setProps(Properties props) {
486                 this.props = props;
487         }
488
489         protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
490                 final StringBuffer contexturl = new StringBuffer(url);
491                 // final StringBuffer url = new StringBuffer (
492                 // CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
493                 final StringBuffer adds = new StringBuffer();
494                 if (timeoutMs > -1)
495                         adds.append("timeout=").append(timeoutMs);
496                 if (limit > -1) {
497                         if (adds.length() > 0) {
498                                 adds.append("&");
499                         }
500                         adds.append("limit=").append(limit);
501                 }
502                 if (fFilter != null && fFilter.length() > 0) {
503                         try {
504                                 if (adds.length() > 0) {
505                                         adds.append("&");
506                                 }
507                                 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
508                         } catch (UnsupportedEncodingException e) {
509                                 throw new RuntimeException(e.getMessage() + "....say whaaaat?!");
510                         }
511                 }
512                 if (adds.length() > 0) {
513                         contexturl.append("?").append(adds.toString());
514                 }
515
516                 // sender.setSubContext(url.toString());
517                 return contexturl.toString();
518         }
519
520         public String getUsername() {
521                 return username;
522         }
523
524         public void setUsername(String username) {
525                 this.username = username;
526         }
527
528         public String getPassword() {
529                 return password;
530         }
531
532         public void setPassword(String password) {
533                 this.password = password;
534         }
535
536         public String getHost() {
537                 return host;
538         }
539
540         public void setHost(String host) {
541                 this.host = host;
542         }
543
544         public String getAuthKey() {
545                 return authKey;
546         }
547
548         public void setAuthKey(String authKey) {
549                 this.authKey = authKey;
550         }
551
552         public String getAuthDate() {
553                 return authDate;
554         }
555
556         public void setAuthDate(String authDate) {
557                 this.authDate = authDate;
558         }
559
560         public String getfFilter() {
561                 return fFilter;
562         }
563
564         public void setfFilter(String fFilter) {
565                 this.fFilter = fFilter;
566         }
567
568         private String readRoute(String routeKey) {
569
570                 try {
571
572                         MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
573
574                 } catch (Exception ex) {
575                         log.error("Reply Router Error " + ex.toString());
576                 }
577                 String routeOffer = MRClientFactory.prop.getProperty(routeKey);
578                 return routeOffer;
579         }
580
581         @Override
582         public MRConsumerResponse fetchWithReturnConsumerResponse() {
583
584                 // fetch with the timeout and limit set in constructor
585                 return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
586         }
587
588         @Override
589         public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
590                 final LinkedList<String> msgs = new LinkedList<String>();
591                 MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
592                 try {
593                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
594                                 DMEConfigure(timeoutMs, limit);
595
596                                 String reply = sender.sendAndWait(timeoutMs + 10000L);
597
598                                 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
599
600                                 if (o != null) {
601                                         final JSONArray a = o.getJSONArray("result");
602
603                                         if (a != null) {
604                                                 for (int i = 0; i < a.length(); i++) {
605                                                         if (a.get(i) instanceof String)
606                                                                 msgs.add(a.getString(i));
607                                                         else
608                                                                 msgs.add(a.getJSONObject(i).toString());
609
610                                                 }
611                                         }
612
613                                 }
614                                 createMRConsumerResponse(reply, mrConsumerResponse);
615                         }
616
617                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
618                                 /*
619                                  * final String urlPath = createUrlPath(
620                                  * MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
621                                  * props.getProperty("Protocol")), timeoutMs, limit);
622                                  */
623
624                                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
625                                                 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
626                                 String response = getResponse(urlPath, username, password, protocolFlag);
627
628                                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
629
630                                 if (o != null) {
631                                         final JSONArray a = o.getJSONArray("result");
632
633                                         if (a != null) {
634                                                 for (int i = 0; i < a.length(); i++) {
635                                                         if (a.get(i) instanceof String)
636                                                                 msgs.add(a.getString(i));
637                                                         else
638                                                                 msgs.add(a.getJSONObject(i).toString());
639
640                                                 }
641                                         }
642
643                                 }
644                                 createMRConsumerResponse(response, mrConsumerResponse);
645                         }
646
647                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
648                                 final String urlPath = createUrlPath(
649                                                 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
650                                                 timeoutMs, limit);
651
652                                 String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
653                                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
654                                 if (o != null) {
655                                         final JSONArray a = o.getJSONArray("result");
656
657                                         if (a != null) {
658                                                 for (int i = 0; i < a.length(); i++) {
659                                                         if (a.get(i) instanceof String)
660                                                                 msgs.add(a.getString(i));
661                                                         else
662                                                                 msgs.add(a.getJSONObject(i).toString());
663
664                                                 }
665                                         }
666
667                                 }
668                                 createMRConsumerResponse(response, mrConsumerResponse);
669                         }
670                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
671                                 // final String urlPath = createUrlPath(
672                                 // MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
673                                 // props.getProperty("Protocol")), timeoutMs,
674                                 // limit);
675                                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
676                                                 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
677
678                                 String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
679                                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
680                                 if (o != null) {
681                                         final JSONArray a = o.getJSONArray("result");
682
683                                         if (a != null) {
684                                                 for (int i = 0; i < a.length(); i++) {
685                                                         if (a.get(i) instanceof String)
686                                                                 msgs.add(a.getString(i));
687                                                         else
688                                                                 msgs.add(a.getJSONObject(i).toString());
689
690                                                 }
691                                         }
692
693                                 }
694                                 createMRConsumerResponse(response, mrConsumerResponse);
695                         }
696
697                 } catch (JSONException e) {
698                         mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
699                         mrConsumerResponse.setResponseMessage(e.getMessage());
700                         log.error("json exception: ", e);
701                 } catch (HttpException e) {
702                         mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
703                         mrConsumerResponse.setResponseMessage(e.getMessage());
704                         log.error("http exception: ", e);
705                 } catch (DME2Exception e) {
706                         mrConsumerResponse.setResponseCode(e.getErrorCode());
707                         mrConsumerResponse.setResponseMessage(e.getErrorMessage());
708                         log.error("DME2 exception: ", e);
709                 } catch (Exception e) {
710                         mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
711                         mrConsumerResponse.setResponseMessage(e.getMessage());
712                         log.error("exception: ", e);
713                 }
714                 mrConsumerResponse.setActualMessages(msgs);
715                 return mrConsumerResponse;
716         }
717
718         private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
719
720                 if (reply.startsWith("{")) {
721                         JSONObject jObject = new JSONObject(reply);
722                         String message = jObject.getString("message");
723                         int status = jObject.getInt("status");
724
725                         mrConsumerResponse.setResponseCode(Integer.toString(status));
726
727                         if (null != message) {
728                                 mrConsumerResponse.setResponseMessage(message);
729                         }
730                 } else if (reply.startsWith("<")) {
731                         mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
732                         mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
733                 } else {
734                         mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
735                         mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
736                 }
737
738         }
739
740 }