eb7fd91cb7c7ce1a355426533d9a2da546fe10cd
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRConsumerImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
23
24 import java.io.File;
25 import java.io.FileReader;
26 import java.io.IOException;
27 import java.io.UnsupportedEncodingException;
28 import java.net.MalformedURLException;
29 import java.net.URI;
30 import java.net.URISyntaxException;
31 import java.net.URLEncoder;
32 import java.util.Collection;
33 import java.util.HashMap;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Properties;
37
38 import org.apache.http.HttpException;
39 import org.apache.http.HttpStatus;
40 import org.json.JSONArray;
41 import org.json.JSONException;
42 import org.json.JSONObject;
43 import org.json.JSONTokener;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import com.att.aft.dme2.api.DME2Client;
48 import com.att.aft.dme2.api.DME2Exception;
49 import com.att.nsa.mr.client.MRClientFactory;
50 import com.att.nsa.mr.client.MRConsumer;
51 import com.att.nsa.mr.client.response.MRConsumerResponse;
52 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
53
54 public class MRConsumerImpl extends MRBaseClient implements MRConsumer
55 {
56         
57         private static final String SUCCESS_MESSAGE = "Success";
58         
59         
60         private Logger log = LoggerFactory.getLogger ( this.getClass().getName () );
61         public static List<String> stringToList ( String str )
62         {
63                 final LinkedList<String> set = new LinkedList<String> ();
64                 if ( str != null )
65                 {
66                         final String[] parts = str.trim ().split ( "," );
67                         for ( String part : parts )
68                         {
69                                 final String trimmed = part.trim();
70                                 if ( trimmed.length () > 0 )
71                                 {
72                                         set.add ( trimmed );
73                                 }
74                         }
75                 }
76                 return set;
77         }
78         
79         public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
80                         final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, String apiSecret_password ) throws MalformedURLException
81                 {
82                         this( hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, false );
83                 }
84         
85         public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
86                 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, boolean allowSelfSignedCerts ) throws MalformedURLException
87         {
88                 super ( hostPart, topic + "::" + consumerGroup + "::" + consumerId );
89
90                 fTopic = topic;
91                 fGroup = consumerGroup;
92                 fId = consumerId;
93                 fTimeoutMs = timeoutMs;
94                 fLimit = limit;
95                 fFilter = filter;
96
97                 //setApiCredentials ( apiKey, apiSecret );
98         }
99
100         @Override
101         public Iterable<String> fetch () throws IOException,Exception
102         {
103                 // fetch with the timeout and limit set in constructor
104                 return fetch ( fTimeoutMs, fLimit );
105         }
106
107         @Override
108         public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException,Exception
109         {
110                 final LinkedList<String> msgs = new LinkedList<String> ();
111
112 // FIXME: the timeout on the socket needs to be at least as long as the long poll
113 //              // sanity check for long poll timeout vs. socket read timeout
114 //              final int maxReasonableTimeoutMs = CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
115 //              if ( timeoutMs > maxReasonableTimeoutMs )
116 //              {
117 //                      log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t. socket read timeout (" +
118 //                              CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll timeout to " + maxReasonableTimeoutMs + "." );
119 //                      timeoutMs = maxReasonableTimeoutMs;
120 //              }
121
122         //      final String urlPath = createUrlPath ( timeoutMs, limit );
123
124                 //getLog().info ( "UEB GET " + urlPath );
125                 try
126                 {
127                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
128                                 DMEConfigure(timeoutMs, limit);
129                         try 
130                         {
131                                 //getLog().info ( "Receiving msgs from: " + url+subContextPath );
132                                 String reply = sender.sendAndWait(timeoutMs+10000L);                            
133                         //      System.out.println("Message received = "+reply);
134                                 final JSONObject o =getResponseDataInJson(reply);
135                                 //msgs.add(reply);
136                                 if ( o != null )
137                                 {
138                                         final JSONArray a = o.getJSONArray ( "result" );
139                                 //      final int b = o.getInt("status" );
140                                         //if ( a != null && a.length()>0 )
141                                         if ( a != null)
142                                         {
143                                                 for ( int i=0; i<a.length (); i++ )
144                                                 {
145                                                         //msgs.add("DMAAP response status: "+Integer.toString(b));
146                                                         if (a.get(i) instanceof String)
147                                                                 msgs.add ( a.getString(i) );
148                                                         else
149                                                         msgs.add ( a.getJSONObject(i).toString() );
150                                                         
151                                                         
152                                                 }
153                                         }
154 //                                      else if(a != null && a.length()<1){
155 //                                              msgs.add ("[]");                
156 //                                              }
157                                 }
158                         }       
159                         catch ( JSONException e )
160                                 {
161                                         // unexpected response
162                                         reportProblemWithResponse ();
163                                     log.error("exception: ", e);
164                                 }
165                                 catch ( HttpException e )
166                                 {
167                                         throw new IOException ( e );
168                                 }       
169                         }
170                         
171                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
172                                 final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId,props.getProperty("Protocol")), timeoutMs, limit );
173                         
174                                 
175                                 try
176                                 {
177                                         final JSONObject o = get ( urlPath, username, password, protocolFlag );
178
179                                         if ( o != null )
180                                         {
181                                                 final JSONArray a = o.getJSONArray ( "result" );
182                                                 final int b = o.getInt("status" );
183                                                 //if ( a != null && a.length()>0 )
184                                                 if ( a != null)
185                                                 {
186                                                         for ( int i=0; i<a.length (); i++ )
187                                                         {
188                                                                 msgs.add("DMAAP response status: "+Integer.toString(b));
189                                                                 if (a.get(i) instanceof String)
190                                                                         msgs.add ( a.getString(i) );
191                                                                 else
192                                                                         msgs.add ( a.getJSONObject(i).toString() );
193                                                                 
194                                                         }
195                                                 }
196 //                                              else if(a != null && a.length()<1)
197 //                                                      {
198 //                                                              msgs.add ("[]");                
199 //                                                      }
200                                         }
201                                 }
202                                 catch ( JSONException e )
203                                 {
204                                         // unexpected response
205                                         reportProblemWithResponse ();
206                                     log.error("exception: ", e);
207                                 }
208                                 catch ( HttpException e )
209                                 {
210                                         throw new IOException ( e );
211                                 }
212                         } 
213                         
214                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
215                                 final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ,props.getProperty("Protocol")), timeoutMs, limit );
216                                 
217
218                         try 
219                         {
220                                 final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag );
221                                 if ( o != null )
222                                 {
223                                         final JSONArray a = o.getJSONArray ( "result" );
224                                         final int b = o.getInt("status" );
225                                         //if ( a != null && a.length()>0)
226                                         if ( a != null)
227                                         {
228                                                 for ( int i=0; i<a.length (); i++ )
229                                                 {
230                                                         msgs.add("DMAAP response status: "+Integer.toString(b));
231                                                         if (a.get(i) instanceof String)
232                                                                 msgs.add ( a.getString(i) );
233                                                         else
234                                                         msgs.add ( a.getJSONObject(i).toString() );
235                                                         
236                                                 }
237                                         }
238 //                                      else if(a != null && a.length()<1){
239 //                                              msgs.add ("[]");                
240 //                                              }
241                                 }
242                         }
243                         catch ( JSONException e )
244                         {
245                                 // unexpected response
246                                 reportProblemWithResponse ();
247                             log.error("exception: ", e);
248                         }
249                         catch ( HttpException e )
250                         {
251                                 throw new IOException ( e );
252                         }
253                                 
254                         }
255                         
256                 } catch ( JSONException e ) {
257                         // unexpected response
258                         reportProblemWithResponse ();
259                     log.error("exception: ", e);
260                 } catch (HttpException e) {
261                         throw new IOException(e);
262                 } catch (Exception e ) {
263                         throw e;
264                 }
265
266
267                 return msgs;
268         }
269
270         private JSONObject getResponseDataInJson(String response) {
271         try {
272                 
273                 
274                 //log.info("DMAAP response status: " + response.getStatus());
275
276                 //      final String responseData = response.readEntity(String.class);
277                 JSONTokener jsonTokener = new JSONTokener(response);
278                 JSONObject jsonObject = null;
279                 final char firstChar = jsonTokener.next();
280         jsonTokener.back();
281                 if ('[' == firstChar) {
282                         JSONArray jsonArray = new JSONArray(jsonTokener);
283                         jsonObject = new JSONObject();
284                         jsonObject.put("result", jsonArray);
285                 } else {
286                         jsonObject = new JSONObject(jsonTokener);
287                 }
288
289                 return jsonObject;
290         } catch (JSONException excp) {
291         //      log.error("DMAAP - Error reading response data.", excp);
292                 return null;
293         }
294         
295         
296         
297 }
298         
299         private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
300                         JSONTokener jsonTokener = new JSONTokener(response);
301                         JSONObject jsonObject = null;
302                         final char firstChar = jsonTokener.next();
303                 jsonTokener.back();
304                 if(null != response && response.length()==0){
305                         return null;
306                 }
307                 
308                         if ('[' == firstChar) {
309                                 JSONArray jsonArray = new JSONArray(jsonTokener);
310                                 jsonObject = new JSONObject();
311                                 jsonObject.put("result", jsonArray);
312                         } else if('{' == firstChar){
313                                 return null;
314                         } else if('<' == firstChar){
315                                 return null;
316                         }else{
317                                 jsonObject = new JSONObject(jsonTokener);
318                         }
319
320                         return jsonObject;
321                 
322         }
323         
324         private final String fTopic;
325         private final String fGroup;
326         private final String fId;
327         private final int fTimeoutMs;
328         private final int fLimit;
329         private String fFilter;
330         private String username;
331         private String password;
332         private String host;
333          private  String latitude;
334                 private  String longitude;
335                 private  String version;
336                 private  String serviceName;
337                 private  String env;
338                 private  String partner;
339                 private String routeOffer;
340                 private  String subContextPath;
341                 private  String protocol;
342                 private  String methodType;
343                 private  String url;
344                 private  String dmeuser;
345                 private  String dmepassword;
346                 private  String contenttype;
347             private DME2Client sender;
348                 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
349                 public String consumerFilePath;
350                 private String authKey;
351                 private String authDate;
352         private Properties props;
353         private HashMap<String, String> DMETimeOuts;
354         private String handlers;
355         public static final String routerFilePath = null;
356         public static String getRouterFilePath() {
357                 return routerFilePath;
358         }
359
360         public static void setRouterFilePath(String routerFilePath) {
361                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
362         }
363                 public String getConsumerFilePath() {
364                         return consumerFilePath;
365                 }
366
367                 public void setConsumerFilePath(String consumerFilePath) {
368                         this.consumerFilePath = consumerFilePath;
369                 }
370
371                 public String getProtocolFlag() {
372                         return protocolFlag;
373                 }
374
375                 public void setProtocolFlag(String protocolFlag) {
376                         this.protocolFlag = protocolFlag;
377                 }
378                 
379                 private void DMEConfigure(int timeoutMs, int limit)throws IOException,DME2Exception, URISyntaxException{ 
380                         latitude = props.getProperty("Latitude");
381                         longitude = props.getProperty("Longitude");
382                         version = props.getProperty("Version");
383                         serviceName = props.getProperty("ServiceName");
384                         env = props.getProperty("Environment");
385                         partner = props.getProperty("Partner");
386                         routeOffer = props.getProperty("routeOffer");
387                         
388                         subContextPath=props.getProperty("SubContextPath")+fTopic+"/"+fGroup+"/"+fId;
389                 //      subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
390                         //if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath, timeoutMs);
391                         
392                         protocol = props.getProperty("Protocol"); 
393                         methodType = props.getProperty("MethodType");
394                         dmeuser = props.getProperty("username");
395                         dmepassword = props.getProperty("password");
396                         contenttype = props.getProperty("contenttype");
397                         handlers = props.getProperty("sessionstickinessrequired");
398                         //url =protocol+"://DME2SEARCH/"+ "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
399                 //      url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
400                 
401                         /**
402                          * Changes to DME2Client url to use Partner for auto failover between data centers
403                          * When Partner value is not provided use the routeOffer value for auto failover within a cluster 
404                          */
405                         
406                         String preferredRouteKey = readRoute("preferredRouteKey");
407                                                 
408                         if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) 
409                         { 
410                                 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner+"&routeoffer="+preferredRouteKey; 
411                         }else  if (partner != null && !partner.isEmpty()) 
412                         { 
413                                 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; 
414                         }
415                         else if (routeOffer!=null && !routeOffer.isEmpty()) 
416                         { 
417                                 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
418                         }
419                         
420                         //log.info("url :"+url);
421                                                 
422                         if(timeoutMs != -1 )url=url+"&timeout="+timeoutMs;
423                         if(limit != -1 )url=url+"&limit="+limit;
424
425                         DMETimeOuts = new HashMap<String, String>();
426                         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
427                         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
428                         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
429                         DMETimeOuts.put("Content-Type", contenttype);
430                         System.setProperty("AFT_LATITUDE", latitude);
431                         System.setProperty("AFT_LONGITUDE", longitude);
432                         System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
433                 //      System.setProperty("DME2.DEBUG", "true");
434                         
435                         //SSL changes
436                         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
437                                         "SSLv3,TLSv1,TLSv1.1");
438                         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
439                         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
440                         //SSL changes
441             
442                         sender = new DME2Client(new URI(url), timeoutMs+10000L);
443                         sender.setAllowAllHttpReturnCodes(true);
444                         sender.setMethod(methodType);
445                         sender.setSubContext(subContextPath);   
446                         if(dmeuser != null && dmepassword != null){
447                         sender.setCredentials(dmeuser, dmepassword);
448                         //System.out.println(dmepassword);
449                         }
450                         sender.setHeaders(DMETimeOuts);
451                         sender.setPayload("");               
452                         
453                         if(handlers.equalsIgnoreCase("yes")){
454                                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
455                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
456                                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
457                                 }else{
458                                         sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
459                                 }
460                 /*      HeaderReplyHandler headerhandler= new HeaderReplyHandler(); 
461                         sender.setReplyHandler(headerhandler);*/
462 //                      } catch (DME2Exception x) {
463 //                              getLog().warn(x.getMessage(), x);
464 //                              System.out.println("XXXXXXXXXXXX"+x);
465 //                      } catch (URISyntaxException x) {
466 //                              System.out.println(x);
467 //                              getLog().warn(x.getMessage(), x);
468 //                      } catch (Exception x) {
469 //                              System.out.println("XXXXXXXXXXXX"+x);
470 //                              getLog().warn(x.getMessage(), x);
471 //                      }
472                 }
473         public Properties getProps() {
474                         return props;
475                 }
476
477                 public void setProps(Properties props) {
478                         this.props = props;
479                 }
480
481         protected String createUrlPath (String url, int timeoutMs , int limit ) throws IOException
482         {
483                 final StringBuffer contexturl= new StringBuffer(url);
484         //      final StringBuffer url = new StringBuffer ( CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
485                 final StringBuffer adds = new StringBuffer ();
486                 if ( timeoutMs > -1 ) adds.append ( "timeout=" ).append ( timeoutMs ); 
487                 if ( limit > -1 )
488                 {
489                         if ( adds.length () > 0 )
490                         {
491                                 adds.append ( "&" );
492                         }
493                         adds.append ( "limit=" ).append ( limit );
494                 }
495                 if ( fFilter != null && fFilter.length () > 0 )
496                 {
497                         try {
498                                 if ( adds.length () > 0 )
499                                 {
500                                         adds.append ( "&" );
501                                 }
502                                 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
503                         } catch (UnsupportedEncodingException e) {
504                                 throw new RuntimeException(e.getMessage() + "....say whaaaat?!");
505                         }
506                 }
507                 if ( adds.length () > 0 )
508                 {
509                         contexturl.append ( "?" ).append ( adds.toString () );
510                 }
511                 
512                 //sender.setSubContext(url.toString());
513                 return contexturl.toString ();
514         }
515
516         public String getUsername() {
517                 return username;
518         }
519
520         public void setUsername(String username) {
521                 this.username = username;
522         }
523
524         public String getPassword() {
525                 return password;
526         }
527
528         public void setPassword(String password) {
529                 this.password = password;
530         }
531
532         public String getHost() {
533                 return host;
534         }
535
536         public void setHost(String host) {
537                 this.host = host;
538         }
539
540         public String getAuthKey() {
541                 return authKey;
542         }
543
544         public void setAuthKey(String authKey) {
545                 this.authKey = authKey;
546         }
547
548         public String getAuthDate() {
549                 return authDate;
550         }
551
552         public void setAuthDate(String authDate) {
553                 this.authDate = authDate;
554         }
555
556         public String getfFilter() {
557                 return fFilter;
558         }
559
560         public void setfFilter(String fFilter) {
561                 this.fFilter = fFilter;
562         }
563         
564         private String readRoute(String routeKey) {
565
566                 try {
567                         
568                         MRClientFactory.prop.load(new FileReader(new File (MRClientFactory.routeFilePath)));
569
570                 } catch (Exception ex) {
571                         log.error("Reply Router Error " + ex.toString() );
572                 }
573                 String routeOffer = MRClientFactory.prop.getProperty(routeKey);         
574                 return routeOffer;
575         }
576         
577         @Override
578         public MRConsumerResponse fetchWithReturnConsumerResponse() {
579
580                 // fetch with the timeout and limit set in constructor
581                 return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
582         }
583
584         @Override
585         public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs,
586                         int limit) {
587                 final LinkedList<String> msgs = new LinkedList<String>();
588                 MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
589                 try {
590                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(
591                                         protocolFlag)) {
592                                 DMEConfigure(timeoutMs, limit);
593
594                                 String reply = sender.sendAndWait(timeoutMs + 10000L);
595
596                                 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
597
598                                 if (o != null) {
599                                         final JSONArray a = o.getJSONArray("result");
600
601                                         if (a != null) {
602                                                 for (int i = 0; i < a.length(); i++) {                                                  
603                                                         if (a.get(i) instanceof String)
604                                                                 msgs.add(a.getString(i));
605                                                         else
606                                                                 msgs.add(a.getJSONObject(i).toString());
607
608                                                 }
609                                         }
610
611                                 }
612                                 createMRConsumerResponse(reply, mrConsumerResponse);
613                         }
614
615                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(
616                                         protocolFlag)) {
617                                 final String urlPath = createUrlPath(
618                                                 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
619                                                                 props.getProperty("Protocol")), timeoutMs,
620                                                 limit);
621
622                                 String response = getResponse(urlPath, username, password,
623                                                 protocolFlag);
624
625                                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
626
627                                 if (o != null) {
628                                         final JSONArray a = o.getJSONArray("result");
629
630                                         if (a != null) {
631                                                 for (int i = 0; i < a.length(); i++) {                                                  
632                                                         if (a.get(i) instanceof String)
633                                                                 msgs.add(a.getString(i));
634                                                         else
635                                                                 msgs.add(a.getJSONObject(i).toString());
636
637                                                 }
638                                         }
639
640                                 }
641                                 createMRConsumerResponse(response, mrConsumerResponse);
642                         }
643
644                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(
645                                         protocolFlag)) {
646                                 final String urlPath = createUrlPath(
647                                                 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
648                                                                 props.getProperty("Protocol")), timeoutMs,
649                                                 limit);
650
651                                 String response  = getAuthResponse(urlPath, authKey, authDate,
652                                                 username, password, protocolFlag);
653                                 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);                       
654                                 if (o != null) {
655                                         final JSONArray a = o.getJSONArray("result");
656
657                                         if (a != null) {
658                                                 for (int i = 0; i < a.length(); i++) {
659                                                         if (a.get(i) instanceof String)
660                                                                 msgs.add(a.getString(i));
661                                                         else
662                                                                 msgs.add(a.getJSONObject(i).toString());
663
664                                                 }
665                                         }
666
667                                 }
668                                 createMRConsumerResponse(response, mrConsumerResponse);
669                         }
670                         
671                         
672                         
673                 } catch (JSONException e) {     
674                         mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
675                         mrConsumerResponse.setResponseMessage(e.getMessage());
676                         log.error("json exception: ", e);
677                 } catch (HttpException e) {                     
678                         mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
679                         mrConsumerResponse.setResponseMessage(e.getMessage());
680                         log.error("http exception: ", e);
681                 }catch(DME2Exception e){                        
682                         mrConsumerResponse.setResponseCode(e.getErrorCode());
683                         mrConsumerResponse.setResponseMessage(e.getErrorMessage());
684                         log.error("DME2 exception: ", e);
685                 }catch (Exception e) {                  
686                         mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
687                         mrConsumerResponse.setResponseMessage(e.getMessage());
688                         log.error("exception: ", e);
689                 }
690                 mrConsumerResponse.setActualMessages(msgs);
691                 return mrConsumerResponse;
692         }
693
694         private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
695                 
696                 if(reply.startsWith("{")){
697                         JSONObject jObject = new JSONObject(reply);
698                         String message = jObject.getString("message");
699                         int status = jObject.getInt("status");
700                 
701                         mrConsumerResponse.setResponseCode(Integer.toString(status));
702                         
703                         if(null != message){
704                                 mrConsumerResponse.setResponseMessage(message); 
705                         }       
706                 }else if (reply.startsWith("<")){
707                         mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
708                         mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));                      
709                 }else{
710                         mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
711                         mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE); 
712                 }
713                 
714         }
715
716  
717 }