1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
25 import java.io.FileReader;
26 import java.io.IOException;
27 import java.io.UnsupportedEncodingException;
28 import java.net.MalformedURLException;
30 import java.net.URISyntaxException;
31 import java.net.URLEncoder;
32 import java.util.Collection;
33 import java.util.HashMap;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Properties;
38 import org.apache.http.HttpException;
39 import org.apache.http.HttpStatus;
40 import org.json.JSONArray;
41 import org.json.JSONException;
42 import org.json.JSONObject;
43 import org.json.JSONTokener;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 import com.att.aft.dme2.api.DME2Client;
48 import com.att.aft.dme2.api.DME2Exception;
49 import com.att.nsa.mr.client.MRClientFactory;
50 import com.att.nsa.mr.client.MRConsumer;
51 import com.att.nsa.mr.client.response.MRConsumerResponse;
52 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
54 public class MRConsumerImpl extends MRBaseClient implements MRConsumer
57 private static final String SUCCESS_MESSAGE = "Success";
60 private Logger log = LoggerFactory.getLogger ( this.getClass().getName () );
61 public static List<String> stringToList ( String str )
63 final LinkedList<String> set = new LinkedList<String> ();
66 final String[] parts = str.trim ().split ( "," );
67 for ( String part : parts )
69 final String trimmed = part.trim();
70 if ( trimmed.length () > 0 )
79 public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
80 final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, String apiSecret_password ) throws MalformedURLException
82 this( hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, false );
85 public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
86 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, boolean allowSelfSignedCerts ) throws MalformedURLException
88 super ( hostPart, topic + "::" + consumerGroup + "::" + consumerId );
91 fGroup = consumerGroup;
93 fTimeoutMs = timeoutMs;
97 //setApiCredentials ( apiKey, apiSecret );
101 public Iterable<String> fetch () throws IOException,Exception
103 // fetch with the timeout and limit set in constructor
104 return fetch ( fTimeoutMs, fLimit );
108 public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException,Exception
110 final LinkedList<String> msgs = new LinkedList<String> ();
112 // FIXME: the timeout on the socket needs to be at least as long as the long poll
113 // // sanity check for long poll timeout vs. socket read timeout
114 // final int maxReasonableTimeoutMs = CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
115 // if ( timeoutMs > maxReasonableTimeoutMs )
117 // log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t. socket read timeout (" +
118 // CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll timeout to " + maxReasonableTimeoutMs + "." );
119 // timeoutMs = maxReasonableTimeoutMs;
122 // final String urlPath = createUrlPath ( timeoutMs, limit );
124 //getLog().info ( "UEB GET " + urlPath );
127 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
128 DMEConfigure(timeoutMs, limit);
131 //getLog().info ( "Receiving msgs from: " + url+subContextPath );
132 String reply = sender.sendAndWait(timeoutMs+10000L);
133 // System.out.println("Message received = "+reply);
134 final JSONObject o =getResponseDataInJson(reply);
138 final JSONArray a = o.getJSONArray ( "result" );
139 // final int b = o.getInt("status" );
140 //if ( a != null && a.length()>0 )
143 for ( int i=0; i<a.length (); i++ )
145 //msgs.add("DMAAP response status: "+Integer.toString(b));
146 if (a.get(i) instanceof String)
147 msgs.add ( a.getString(i) );
149 msgs.add ( a.getJSONObject(i).toString() );
154 // else if(a != null && a.length()<1){
159 catch ( JSONException e )
161 // unexpected response
162 reportProblemWithResponse ();
163 log.error("exception: ", e);
165 catch ( HttpException e )
167 throw new IOException ( e );
171 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
172 final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId,props.getProperty("Protocol")), timeoutMs, limit );
177 final JSONObject o = get ( urlPath, username, password, protocolFlag );
181 final JSONArray a = o.getJSONArray ( "result" );
182 final int b = o.getInt("status" );
183 //if ( a != null && a.length()>0 )
186 for ( int i=0; i<a.length (); i++ )
188 msgs.add("DMAAP response status: "+Integer.toString(b));
189 if (a.get(i) instanceof String)
190 msgs.add ( a.getString(i) );
192 msgs.add ( a.getJSONObject(i).toString() );
196 // else if(a != null && a.length()<1)
202 catch ( JSONException e )
204 // unexpected response
205 reportProblemWithResponse ();
206 log.error("exception: ", e);
208 catch ( HttpException e )
210 throw new IOException ( e );
214 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
215 final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ,props.getProperty("Protocol")), timeoutMs, limit );
220 final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag );
223 final JSONArray a = o.getJSONArray ( "result" );
224 final int b = o.getInt("status" );
225 //if ( a != null && a.length()>0)
228 for ( int i=0; i<a.length (); i++ )
230 msgs.add("DMAAP response status: "+Integer.toString(b));
231 if (a.get(i) instanceof String)
232 msgs.add ( a.getString(i) );
234 msgs.add ( a.getJSONObject(i).toString() );
238 // else if(a != null && a.length()<1){
243 catch ( JSONException e )
245 // unexpected response
246 reportProblemWithResponse ();
247 log.error("exception: ", e);
249 catch ( HttpException e )
251 throw new IOException ( e );
256 } catch ( JSONException e ) {
257 // unexpected response
258 reportProblemWithResponse ();
259 log.error("exception: ", e);
260 } catch (HttpException e) {
261 throw new IOException(e);
262 } catch (Exception e ) {
270 private JSONObject getResponseDataInJson(String response) {
274 //log.info("DMAAP response status: " + response.getStatus());
276 // final String responseData = response.readEntity(String.class);
277 JSONTokener jsonTokener = new JSONTokener(response);
278 JSONObject jsonObject = null;
279 final char firstChar = jsonTokener.next();
281 if ('[' == firstChar) {
282 JSONArray jsonArray = new JSONArray(jsonTokener);
283 jsonObject = new JSONObject();
284 jsonObject.put("result", jsonArray);
286 jsonObject = new JSONObject(jsonTokener);
290 } catch (JSONException excp) {
291 // log.error("DMAAP - Error reading response data.", excp);
299 private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
300 JSONTokener jsonTokener = new JSONTokener(response);
301 JSONObject jsonObject = null;
302 final char firstChar = jsonTokener.next();
304 if(null != response && response.length()==0){
308 if ('[' == firstChar) {
309 JSONArray jsonArray = new JSONArray(jsonTokener);
310 jsonObject = new JSONObject();
311 jsonObject.put("result", jsonArray);
312 } else if('{' == firstChar){
314 } else if('<' == firstChar){
317 jsonObject = new JSONObject(jsonTokener);
324 private final String fTopic;
325 private final String fGroup;
326 private final String fId;
327 private final int fTimeoutMs;
328 private final int fLimit;
329 private String fFilter;
330 private String username;
331 private String password;
333 private String latitude;
334 private String longitude;
335 private String version;
336 private String serviceName;
338 private String partner;
339 private String routeOffer;
340 private String subContextPath;
341 private String protocol;
342 private String methodType;
344 private String dmeuser;
345 private String dmepassword;
346 private String contenttype;
347 private DME2Client sender;
348 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
349 public String consumerFilePath;
350 private String authKey;
351 private String authDate;
352 private Properties props;
353 private HashMap<String, String> DMETimeOuts;
354 private String handlers;
355 public static final String routerFilePath = null;
356 public static String getRouterFilePath() {
357 return routerFilePath;
360 public static void setRouterFilePath(String routerFilePath) {
361 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
363 public String getConsumerFilePath() {
364 return consumerFilePath;
367 public void setConsumerFilePath(String consumerFilePath) {
368 this.consumerFilePath = consumerFilePath;
371 public String getProtocolFlag() {
375 public void setProtocolFlag(String protocolFlag) {
376 this.protocolFlag = protocolFlag;
379 private void DMEConfigure(int timeoutMs, int limit)throws IOException,DME2Exception, URISyntaxException{
380 latitude = props.getProperty("Latitude");
381 longitude = props.getProperty("Longitude");
382 version = props.getProperty("Version");
383 serviceName = props.getProperty("ServiceName");
384 env = props.getProperty("Environment");
385 partner = props.getProperty("Partner");
386 routeOffer = props.getProperty("routeOffer");
388 subContextPath=props.getProperty("SubContextPath")+fTopic+"/"+fGroup+"/"+fId;
389 // subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
390 //if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath, timeoutMs);
392 protocol = props.getProperty("Protocol");
393 methodType = props.getProperty("MethodType");
394 dmeuser = props.getProperty("username");
395 dmepassword = props.getProperty("password");
396 contenttype = props.getProperty("contenttype");
397 handlers = props.getProperty("sessionstickinessrequired");
398 //url =protocol+"://DME2SEARCH/"+ "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
399 // url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
402 * Changes to DME2Client url to use Partner for auto failover between data centers
403 * When Partner value is not provided use the routeOffer value for auto failover within a cluster
406 String preferredRouteKey = readRoute("preferredRouteKey");
408 if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty())
410 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner+"&routeoffer="+preferredRouteKey;
411 }else if (partner != null && !partner.isEmpty())
413 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner;
415 else if (routeOffer!=null && !routeOffer.isEmpty())
417 url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
420 //log.info("url :"+url);
422 if(timeoutMs != -1 )url=url+"&timeout="+timeoutMs;
423 if(limit != -1 )url=url+"&limit="+limit;
425 DMETimeOuts = new HashMap<String, String>();
426 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
427 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
428 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
429 DMETimeOuts.put("Content-Type", contenttype);
430 System.setProperty("AFT_LATITUDE", latitude);
431 System.setProperty("AFT_LONGITUDE", longitude);
432 System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
433 // System.setProperty("DME2.DEBUG", "true");
436 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
437 "SSLv3,TLSv1,TLSv1.1");
438 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
439 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
442 sender = new DME2Client(new URI(url), timeoutMs+10000L);
443 sender.setAllowAllHttpReturnCodes(true);
444 sender.setMethod(methodType);
445 sender.setSubContext(subContextPath);
446 if(dmeuser != null && dmepassword != null){
447 sender.setCredentials(dmeuser, dmepassword);
448 //System.out.println(dmepassword);
450 sender.setHeaders(DMETimeOuts);
451 sender.setPayload("");
453 if(handlers.equalsIgnoreCase("yes")){
454 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
455 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
456 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
458 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
460 /* HeaderReplyHandler headerhandler= new HeaderReplyHandler();
461 sender.setReplyHandler(headerhandler);*/
462 // } catch (DME2Exception x) {
463 // getLog().warn(x.getMessage(), x);
464 // System.out.println("XXXXXXXXXXXX"+x);
465 // } catch (URISyntaxException x) {
466 // System.out.println(x);
467 // getLog().warn(x.getMessage(), x);
468 // } catch (Exception x) {
469 // System.out.println("XXXXXXXXXXXX"+x);
470 // getLog().warn(x.getMessage(), x);
473 public Properties getProps() {
477 public void setProps(Properties props) {
481 protected String createUrlPath (String url, int timeoutMs , int limit ) throws IOException
483 final StringBuffer contexturl= new StringBuffer(url);
484 // final StringBuffer url = new StringBuffer ( CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
485 final StringBuffer adds = new StringBuffer ();
486 if ( timeoutMs > -1 ) adds.append ( "timeout=" ).append ( timeoutMs );
489 if ( adds.length () > 0 )
493 adds.append ( "limit=" ).append ( limit );
495 if ( fFilter != null && fFilter.length () > 0 )
498 if ( adds.length () > 0 )
502 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
503 } catch (UnsupportedEncodingException e) {
504 throw new RuntimeException(e.getMessage() + "....say whaaaat?!");
507 if ( adds.length () > 0 )
509 contexturl.append ( "?" ).append ( adds.toString () );
512 //sender.setSubContext(url.toString());
513 return contexturl.toString ();
516 public String getUsername() {
520 public void setUsername(String username) {
521 this.username = username;
524 public String getPassword() {
528 public void setPassword(String password) {
529 this.password = password;
532 public String getHost() {
536 public void setHost(String host) {
540 public String getAuthKey() {
544 public void setAuthKey(String authKey) {
545 this.authKey = authKey;
548 public String getAuthDate() {
552 public void setAuthDate(String authDate) {
553 this.authDate = authDate;
556 public String getfFilter() {
560 public void setfFilter(String fFilter) {
561 this.fFilter = fFilter;
564 private String readRoute(String routeKey) {
568 MRClientFactory.prop.load(new FileReader(new File (MRClientFactory.routeFilePath)));
570 } catch (Exception ex) {
571 log.error("Reply Router Error " + ex.toString() );
573 String routeOffer = MRClientFactory.prop.getProperty(routeKey);
578 public MRConsumerResponse fetchWithReturnConsumerResponse() {
580 // fetch with the timeout and limit set in constructor
581 return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
585 public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs,
587 final LinkedList<String> msgs = new LinkedList<String>();
588 MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
590 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(
592 DMEConfigure(timeoutMs, limit);
594 String reply = sender.sendAndWait(timeoutMs + 10000L);
596 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
599 final JSONArray a = o.getJSONArray("result");
602 for (int i = 0; i < a.length(); i++) {
603 if (a.get(i) instanceof String)
604 msgs.add(a.getString(i));
606 msgs.add(a.getJSONObject(i).toString());
612 createMRConsumerResponse(reply, mrConsumerResponse);
615 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(
617 final String urlPath = createUrlPath(
618 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
619 props.getProperty("Protocol")), timeoutMs,
622 String response = getResponse(urlPath, username, password,
625 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
628 final JSONArray a = o.getJSONArray("result");
631 for (int i = 0; i < a.length(); i++) {
632 if (a.get(i) instanceof String)
633 msgs.add(a.getString(i));
635 msgs.add(a.getJSONObject(i).toString());
641 createMRConsumerResponse(response, mrConsumerResponse);
644 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(
646 final String urlPath = createUrlPath(
647 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
648 props.getProperty("Protocol")), timeoutMs,
651 String response = getAuthResponse(urlPath, authKey, authDate,
652 username, password, protocolFlag);
653 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
655 final JSONArray a = o.getJSONArray("result");
658 for (int i = 0; i < a.length(); i++) {
659 if (a.get(i) instanceof String)
660 msgs.add(a.getString(i));
662 msgs.add(a.getJSONObject(i).toString());
668 createMRConsumerResponse(response, mrConsumerResponse);
673 } catch (JSONException e) {
674 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
675 mrConsumerResponse.setResponseMessage(e.getMessage());
676 log.error("json exception: ", e);
677 } catch (HttpException e) {
678 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
679 mrConsumerResponse.setResponseMessage(e.getMessage());
680 log.error("http exception: ", e);
681 }catch(DME2Exception e){
682 mrConsumerResponse.setResponseCode(e.getErrorCode());
683 mrConsumerResponse.setResponseMessage(e.getErrorMessage());
684 log.error("DME2 exception: ", e);
685 }catch (Exception e) {
686 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
687 mrConsumerResponse.setResponseMessage(e.getMessage());
688 log.error("exception: ", e);
690 mrConsumerResponse.setActualMessages(msgs);
691 return mrConsumerResponse;
694 private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
696 if(reply.startsWith("{")){
697 JSONObject jObject = new JSONObject(reply);
698 String message = jObject.getString("message");
699 int status = jObject.getInt("status");
701 mrConsumerResponse.setResponseCode(Integer.toString(status));
704 mrConsumerResponse.setResponseMessage(message);
706 }else if (reply.startsWith("<")){
707 mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
708 mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
710 mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
711 mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);