Merge "Sonar critical issues"
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRConsumerImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
23
24 import java.io.File;
25 import java.io.FileReader;
26 import java.io.IOException;
27 import java.io.UnsupportedEncodingException;
28 import java.net.MalformedURLException;
29 import java.net.URI;
30 import java.net.URISyntaxException;
31 import java.net.URLEncoder;
32 import java.util.Collection;
33 import java.util.HashMap;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Properties;
37
38 import org.apache.http.HttpException;
39 import org.apache.http.HttpStatus;
40 import org.json.JSONArray;
41 import org.json.JSONException;
42 import org.json.JSONObject;
43 import org.json.JSONTokener;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import com.att.aft.dme2.api.DME2Client;
48 import com.att.aft.dme2.api.DME2Exception;
49 import com.att.nsa.mr.client.HostSelector;
50 import com.att.nsa.mr.client.MRClientFactory;
51 import com.att.nsa.mr.client.MRConsumer;
52 import com.att.nsa.mr.client.response.MRConsumerResponse;
53 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
54
55 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
56
57         private static final String SUCCESS_MESSAGE = "Success";
58
59         private Logger log = LoggerFactory.getLogger(this.getClass().getName());
60
61         public static List<String> stringToList(String str) {
62                 final LinkedList<String> set = new LinkedList<String>();
63                 if (str != null) {
64                         final String[] parts = str.trim().split(",");
65                         for (String part : parts) {
66                                 final String trimmed = part.trim();
67                                 if (trimmed.length() > 0) {
68                                         set.add(trimmed);
69                                 }
70                         }
71                 }
72                 return set;
73         }
74
75         public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
76                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
77                         String apiSecret_password) throws MalformedURLException {
78                 this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
79                                 false);
80         }
81
82         public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
83                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
84                         boolean allowSelfSignedCerts) throws MalformedURLException {
85                 super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
86
87                 fTopic = topic;
88                 fGroup = consumerGroup;
89                 fId = consumerId;
90                 fTimeoutMs = timeoutMs;
91                 fLimit = limit;
92                 fFilter = filter;
93
94                 fHostSelector = new HostSelector(hostPart);
95         }
96
97         @Override
98         public Iterable<String> fetch() throws IOException, Exception {
99                 // fetch with the timeout and limit set in constructor
100                 return fetch(fTimeoutMs, fLimit);
101         }
102
103         @Override
104         public Iterable<String> fetch(int timeoutMs, int limit) throws IOException, Exception {
105                 final LinkedList<String> msgs = new LinkedList<String>();
106
107                 // FIXME: the timeout on the socket needs to be at least as long as the
108                 // long poll
109                 // // sanity check for long poll timeout vs. socket read timeout
110                 // final int maxReasonableTimeoutMs =
111                 // CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
112                 // if ( timeoutMs > maxReasonableTimeoutMs )
113                 // {
114                 // log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t.
115                 // socket read timeout (" +
116                 // CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll
117                 // timeout to " + maxReasonableTimeoutMs + "." );
118                 // timeoutMs = maxReasonableTimeoutMs;
119                 // }
120
121                 // final String urlPath = createUrlPath ( timeoutMs, limit );
122
123                 // getLog().info ( "UEB GET " + urlPath );
124                 try {
125                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
126                                 DMEConfigure(timeoutMs, limit);
127                                 try {
128                                         // getLog().info ( "Receiving msgs from: " +
129                                         // url+subContextPath );
130                                         String reply = sender.sendAndWait(timeoutMs + 10000L);
131                                         final JSONObject o = getResponseDataInJson(reply);
132                                         // msgs.add(reply);
133                                         if (o != null) {
134                                                 final JSONArray a = o.getJSONArray("result");
135                                                 // final int b = o.getInt("status" );
136                                                 // if ( a != null && a.length()>0 )
137                                                 if (a != null) {
138                                                         for (int i = 0; i < a.length(); i++) {
139                                                                 // msgs.add("DMAAP response status:
140                                                                 // "+Integer.toString(b));
141                                                                 if (a.get(i) instanceof String)
142                                                                         msgs.add(a.getString(i));
143                                                                 else
144                                                                         msgs.add(a.getJSONObject(i).toString());
145
146                                                         }
147                                                 }
148                                                 // else if(a != null && a.length()<1){
149                                                 // msgs.add ("[]");
150                                                 // }
151                                         }
152                                 } catch (JSONException e) {
153                                         // unexpected response
154                                         reportProblemWithResponse();
155                                         log.error("exception: ", e);
156                                 } catch (HttpException e) {
157                                         throw new IOException(e);
158                                 }
159                         }
160
161                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
162                                 // final String urlPath = createUrlPath
163                                 // (MRConstants.makeConsumerUrl ( host, fTopic, fGroup,
164                                 // fId,props.getProperty("Protocol")), timeoutMs, limit );
165                                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
166                                                 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
167
168                                 try {
169                                         final JSONObject o = get(urlPath, username, password, protocolFlag);
170
171                                         if (o != null) {
172                                                 final JSONArray a = o.getJSONArray("result");
173                                                 final int b = o.getInt("status");
174                                                 // if ( a != null && a.length()>0 )
175                                                 if (a != null) {
176                                                         for (int i = 0; i < a.length(); i++) {
177                                                                 // msgs.add("DMAAP response status:
178                                                                 // "+Integer.toString(b));
179                                                                 if (a.get(i) instanceof String)
180                                                                         msgs.add(a.getString(i));
181                                                                 else
182                                                                         msgs.add(a.getJSONObject(i).toString());
183
184                                                         }
185                                                 }
186                                                 // else if(a != null && a.length()<1)
187                                                 // {
188                                                 // msgs.add ("[]");
189                                                 // }
190                                         }
191                                 } catch (JSONException e) {
192                                         // unexpected response
193                                         reportProblemWithResponse();
194                                         log.error("exception: ", e);
195                                 } catch (HttpException e) {
196                                         throw new IOException(e);
197                                 }
198                         }
199
200                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
201                                 final String urlPath = createUrlPath(
202                                                 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
203                                                 timeoutMs, limit);
204
205                                 try {
206                                         final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
207                                         if (o != null) {
208                                                 final JSONArray a = o.getJSONArray("result");
209                                                 final int b = o.getInt("status");
210                                                 // if ( a != null && a.length()>0)
211                                                 if (a != null) {
212                                                         for (int i = 0; i < a.length(); i++) {
213                                                                 // msgs.add("DMAAP response status:
214                                                                 // "+Integer.toString(b));
215                                                                 if (a.get(i) instanceof String)
216                                                                         msgs.add(a.getString(i));
217                                                                 else
218                                                                         msgs.add(a.getJSONObject(i).toString());
219
220                                                         }
221                                                 }
222                                                 // else if(a != null && a.length()<1){
223                                                 // msgs.add ("[]");
224                                                 // }
225                                         }
226                                 } catch (JSONException e) {
227                                         // unexpected response
228                                         reportProblemWithResponse();
229                                         log.error("exception: ", e);
230                                 } catch (HttpException e) {
231                                         throw new IOException(e);
232                                 }
233
234                         }
235                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
236                                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
237                                                 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
238
239                                 try {
240                                         final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
241                                         if (o != null) {
242                                                 final JSONArray a = o.getJSONArray("result");
243                                                 final int b = o.getInt("status");
244                                                 // if ( a != null && a.length()>0)
245                                                 if (a != null) {
246                                                         for (int i = 0; i < a.length(); i++) {
247                                                                 // msgs.add("DMAAP response status:
248                                                                 // "+Integer.toString(b));
249                                                                 if (a.get(i) instanceof String)
250                                                                         msgs.add(a.getString(i));
251                                                                 else
252                                                                         msgs.add(a.getJSONObject(i).toString());
253
254                                                         }
255                                                 }
256
257                                         }
258                                 } catch (JSONException e) {
259                                         // unexpected response
260                                         reportProblemWithResponse();
261                                         log.error("exception: ", e);
262                                 } catch (HttpException e) {
263                                         throw new IOException(e);
264                                 }
265
266                         }
267
268                 } catch (JSONException e) {
269                         // unexpected response
270                         reportProblemWithResponse();
271                         log.error("exception: ", e);
272                 } catch (HttpException e) {
273                         throw new IOException(e);
274                 } catch (Exception e) {
275                         throw e;
276                 }
277
278                 return msgs;
279         }
280
281         private JSONObject getResponseDataInJson(String response) {
282                 try {
283
284                         // log.info("DMAAP response status: " + response.getStatus());
285
286                         // final String responseData = response.readEntity(String.class);
287                         JSONTokener jsonTokener = new JSONTokener(response);
288                         JSONObject jsonObject = null;
289                         final char firstChar = jsonTokener.next();
290                         jsonTokener.back();
291                         if ('[' == firstChar) {
292                                 JSONArray jsonArray = new JSONArray(jsonTokener);
293                                 jsonObject = new JSONObject();
294                                 jsonObject.put("result", jsonArray);
295                         } else {
296                                 jsonObject = new JSONObject(jsonTokener);
297                         }
298
299                         return jsonObject;
300                 } catch (JSONException excp) {
301                          log.error("DMAAP - Error reading response data.", excp);
302                         return null;
303                 }
304
305         }
306
307         private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
308                 JSONTokener jsonTokener = new JSONTokener(response);
309                 JSONObject jsonObject = null;
310                 final char firstChar = jsonTokener.next();
311                 jsonTokener.back();
312                 if (null != response && response.length() == 0) {
313                         return null;
314                 }
315
316                 if ('[' == firstChar) {
317                         JSONArray jsonArray = new JSONArray(jsonTokener);
318                         jsonObject = new JSONObject();
319                         jsonObject.put("result", jsonArray);
320                 } else if ('{' == firstChar) {
321                         return null;
322                 } else if ('<' == firstChar) {
323                         return null;
324                 } else {
325                         jsonObject = new JSONObject(jsonTokener);
326                 }
327
328                 return jsonObject;
329
330         }
331
332         private final String fTopic;
333         private final String fGroup;
334         private final String fId;
335         private final int fTimeoutMs;
336         private final int fLimit;
337         private String fFilter;
338         private String username;
339         private String password;
340         private String host;
341         HostSelector fHostSelector = null;
342         private String latitude;
343         private String longitude;
344         private String version;
345         private String serviceName;
346         private String env;
347         private String partner;
348         private String routeOffer;
349         private String subContextPath;
350         private String protocol;
351         private String methodType;
352         private String url;
353         private String dmeuser;
354         private String dmepassword;
355         private String contenttype;
356         private DME2Client sender;
357         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
358         public String consumerFilePath;
359         private String authKey;
360         private String authDate;
361         private Properties props;
362         private HashMap<String, String> DMETimeOuts;
363         private String handlers;
364         public static final String routerFilePath = null;
365
366         public static String getRouterFilePath() {
367                 return routerFilePath;
368         }
369
370         public static void setRouterFilePath(String routerFilePath) {
371                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
372         }
373
374         public String getConsumerFilePath() {
375                 return consumerFilePath;
376         }
377
378         public void setConsumerFilePath(String consumerFilePath) {
379                 this.consumerFilePath = consumerFilePath;
380         }
381
382         public String getProtocolFlag() {
383                 return protocolFlag;
384         }
385
386         public void setProtocolFlag(String protocolFlag) {
387                 this.protocolFlag = protocolFlag;
388         }
389
390         private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
391                 latitude = props.getProperty("Latitude");
392                 longitude = props.getProperty("Longitude");
393                 version = props.getProperty("Version");
394                 serviceName = props.getProperty("ServiceName");
395                 env = props.getProperty("Environment");
396                 partner = props.getProperty("Partner");
397                 routeOffer = props.getProperty("routeOffer");
398
399                 subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
400                 // subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
401                 // if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath,
402                 // timeoutMs);
403
404                 protocol = props.getProperty("Protocol");
405                 methodType = props.getProperty("MethodType");
406                 dmeuser = props.getProperty("username");
407                 dmepassword = props.getProperty("password");
408                 contenttype = props.getProperty("contenttype");
409                 handlers = props.getProperty("sessionstickinessrequired");
410                 // url =protocol+"://DME2SEARCH/"+
411                 // "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
412                 // url = protocol +
413                 // "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
414
415                 /**
416                  * Changes to DME2Client url to use Partner for auto failover between
417                  * data centers When Partner value is not provided use the routeOffer
418                  * value for auto failover within a cluster
419                  */
420
421                 String preferredRouteKey = readRoute("preferredRouteKey");
422
423                 if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
424                         url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
425                                         + "&routeoffer=" + preferredRouteKey;
426                 } else if (partner != null && !partner.isEmpty()) {
427                         url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
428                 } else if (routeOffer != null && !routeOffer.isEmpty()) {
429                         url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
430                                         + routeOffer;
431                 }
432
433                 // log.info("url :"+url);
434
435                 if (timeoutMs != -1)
436                         url = url + "&timeout=" + timeoutMs;
437                 if (limit != -1)
438                         url = url + "&limit=" + limit;
439
440                 // Add filter to DME2 Url
441                 if (fFilter != null && fFilter.length() > 0)
442                         url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
443
444                 DMETimeOuts = new HashMap<String, String>();
445                 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
446                 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
447                 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
448                 DMETimeOuts.put("Content-Type", contenttype);
449                 System.setProperty("AFT_LATITUDE", latitude);
450                 System.setProperty("AFT_LONGITUDE", longitude);
451                 System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
452                 // System.setProperty("DME2.DEBUG", "true");
453
454                 // SSL changes
455                 // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
456                 // "SSLv3,TLSv1,TLSv1.1");
457                 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
458                 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
459                 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
460                 // SSL changes
461
462                 sender = new DME2Client(new URI(url), timeoutMs + 10000L);
463                 sender.setAllowAllHttpReturnCodes(true);
464                 sender.setMethod(methodType);
465                 sender.setSubContext(subContextPath);
466                 if (dmeuser != null && dmepassword != null) {
467                         sender.setCredentials(dmeuser, dmepassword);
468                 }
469                 sender.setHeaders(DMETimeOuts);
470                 sender.setPayload("");
471
472                 if (handlers.equalsIgnoreCase("yes")) {
473                         sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
474                                         props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
475                         sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
476                         sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
477                 } else {
478                         sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
479                 }
480         }
481
482         public Properties getProps() {
483                 return props;
484         }
485
486         public void setProps(Properties props) {
487                 this.props = props;
488         }
489
490         protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
491                 final StringBuffer contexturl = new StringBuffer(url);
492                 // final StringBuffer url = new StringBuffer (
493                 // CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
494                 final StringBuffer adds = new StringBuffer();
495                 if (timeoutMs > -1)
496                         adds.append("timeout=").append(timeoutMs);
497                 if (limit > -1) {
498                         if (adds.length() > 0) {
499                                 adds.append("&");
500                         }
501                         adds.append("limit=").append(limit);
502                 }
503                 if (fFilter != null && fFilter.length() > 0) {
504                         try {
505                                 if (adds.length() > 0) {
506                                         adds.append("&");
507                                 }
508                                 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
509                         } catch (UnsupportedEncodingException e) {
510                                 throw new RuntimeException(e.getMessage() + "....say whaaaat?!");
511                         }
512                 }
513                 if (adds.length() > 0) {
514                         contexturl.append("?").append(adds.toString());
515                 }
516
517                 // sender.setSubContext(url.toString());
518                 return contexturl.toString();
519         }
520
521         public String getUsername() {
522                 return username;
523         }
524
525         public void setUsername(String username) {
526                 this.username = username;
527         }
528
529         public String getPassword() {
530                 return password;
531         }
532
533         public void setPassword(String password) {
534                 this.password = password;
535         }
536
537         public String getHost() {
538                 return host;
539         }
540
541         public void setHost(String host) {
542                 this.host = host;
543         }
544
545         public String getAuthKey() {
546                 return authKey;
547         }
548
549         public void setAuthKey(String authKey) {
550                 this.authKey = authKey;
551         }
552
553         public String getAuthDate() {
554                 return authDate;
555         }
556
557         public void setAuthDate(String authDate) {
558                 this.authDate = authDate;
559         }
560
561         public String getfFilter() {
562                 return fFilter;
563         }
564
565         public void setfFilter(String fFilter) {
566                 this.fFilter = fFilter;
567         }
568
569         private String readRoute(String routeKey) {
570
571                 try {
572
573                         MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
574
575                 } catch (Exception ex) {
576                         log.error("Reply Router Error " + ex.toString());
577                 }
578                 String routeOffer = MRClientFactory.prop.getProperty(routeKey);
579                 return routeOffer;
580         }
581
582         @Override
583         public MRConsumerResponse fetchWithReturnConsumerResponse() {
584
585                 // fetch with the timeout and limit set in constructor
586                 return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
587         }
588
589         @Override
590         public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
591                 final LinkedList<String> msgs = new LinkedList<String>();
592                 MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
593                 try {
594                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
595                                 DMEConfigure(timeoutMs, limit);
596
597                                 String reply = sender.sendAndWait(timeoutMs + 10000L);
598
599                                 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
600
601                                 if (o != null) {
602                                         final JSONArray a = o.getJSONArray("result");
603
604                                         if (a != null) {
605                                                 for (int i = 0; i < a.length(); i++) {
606                                                         if (a.get(i) instanceof String)
607                                                                 msgs.add(a.getString(i));
608                                                         else
609                                                                 msgs.add(a.getJSONObject(i).toString());
610
611                                                 }
612                                         }
613
614                                 }
615                                 createMRConsumerResponse(reply, mrConsumerResponse);
616                         }
617
618                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
619                                 /*
620                                  * final String urlPath = createUrlPath(
621                                  * MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
622                                  * props.getProperty("Protocol")), timeoutMs, limit);
623                                  */
624
625                                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
626                                                 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
627                                 String response = getResponse(urlPath, username, password, protocolFlag);
628
629                                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
630
631                                 if (o != null) {
632                                         final JSONArray a = o.getJSONArray("result");
633
634                                         if (a != null) {
635                                                 for (int i = 0; i < a.length(); i++) {
636                                                         if (a.get(i) instanceof String)
637                                                                 msgs.add(a.getString(i));
638                                                         else
639                                                                 msgs.add(a.getJSONObject(i).toString());
640
641                                                 }
642                                         }
643
644                                 }
645                                 createMRConsumerResponse(response, mrConsumerResponse);
646                         }
647
648                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
649                                 final String urlPath = createUrlPath(
650                                                 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
651                                                 timeoutMs, limit);
652
653                                 String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
654                                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
655                                 if (o != null) {
656                                         final JSONArray a = o.getJSONArray("result");
657
658                                         if (a != null) {
659                                                 for (int i = 0; i < a.length(); i++) {
660                                                         if (a.get(i) instanceof String)
661                                                                 msgs.add(a.getString(i));
662                                                         else
663                                                                 msgs.add(a.getJSONObject(i).toString());
664
665                                                 }
666                                         }
667
668                                 }
669                                 createMRConsumerResponse(response, mrConsumerResponse);
670                         }
671                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
672                                 // final String urlPath = createUrlPath(
673                                 // MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
674                                 // props.getProperty("Protocol")), timeoutMs,
675                                 // limit);
676                                 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
677                                                 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
678
679                                 String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
680                                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
681                                 if (o != null) {
682                                         final JSONArray a = o.getJSONArray("result");
683
684                                         if (a != null) {
685                                                 for (int i = 0; i < a.length(); i++) {
686                                                         if (a.get(i) instanceof String)
687                                                                 msgs.add(a.getString(i));
688                                                         else
689                                                                 msgs.add(a.getJSONObject(i).toString());
690
691                                                 }
692                                         }
693
694                                 }
695                                 createMRConsumerResponse(response, mrConsumerResponse);
696                         }
697
698                 } catch (JSONException e) {
699                         mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
700                         mrConsumerResponse.setResponseMessage(e.getMessage());
701                         log.error("json exception: ", e);
702                 } catch (HttpException e) {
703                         mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
704                         mrConsumerResponse.setResponseMessage(e.getMessage());
705                         log.error("http exception: ", e);
706                 } catch (DME2Exception e) {
707                         mrConsumerResponse.setResponseCode(e.getErrorCode());
708                         mrConsumerResponse.setResponseMessage(e.getErrorMessage());
709                         log.error("DME2 exception: ", e);
710                 } catch (Exception e) {
711                         mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
712                         mrConsumerResponse.setResponseMessage(e.getMessage());
713                         log.error("exception: ", e);
714                 }
715                 mrConsumerResponse.setActualMessages(msgs);
716                 return mrConsumerResponse;
717         }
718
719         private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
720
721                 if (reply.startsWith("{")) {
722                         JSONObject jObject = new JSONObject(reply);
723                         String message = jObject.getString("message");
724                         int status = jObject.getInt("status");
725
726                         mrConsumerResponse.setResponseCode(Integer.toString(status));
727
728                         if (null != message) {
729                                 mrConsumerResponse.setResponseMessage(message);
730                         }
731                 } else if (reply.startsWith("<")) {
732                         mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
733                         mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
734                 } else {
735                         mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
736                         mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
737                 }
738
739         }
740
741 }