1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
25 import java.io.FileReader;
26 import java.io.IOException;
27 import java.io.UnsupportedEncodingException;
28 import java.net.MalformedURLException;
30 import java.net.URISyntaxException;
31 import java.net.URLEncoder;
32 import java.util.Collection;
33 import java.util.HashMap;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Properties;
38 import org.apache.http.HttpException;
39 import org.apache.http.HttpStatus;
40 import org.json.JSONArray;
41 import org.json.JSONException;
42 import org.json.JSONObject;
43 import org.json.JSONTokener;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 import com.att.aft.dme2.api.DME2Client;
48 import com.att.aft.dme2.api.DME2Exception;
49 import com.att.nsa.mr.client.HostSelector;
50 import com.att.nsa.mr.client.MRClientFactory;
51 import com.att.nsa.mr.client.MRConsumer;
52 import com.att.nsa.mr.client.response.MRConsumerResponse;
53 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
55 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
57 private static final String SUCCESS_MESSAGE = "Success";
59 private Logger log = LoggerFactory.getLogger(this.getClass().getName());
61 public static List<String> stringToList(String str) {
62 final LinkedList<String> set = new LinkedList<String>();
64 final String[] parts = str.trim().split(",");
65 for (String part : parts) {
66 final String trimmed = part.trim();
67 if (trimmed.length() > 0) {
75 public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
76 final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
77 String apiSecret_password) throws MalformedURLException {
78 this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
82 public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
83 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
84 boolean allowSelfSignedCerts) throws MalformedURLException {
85 super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
88 fGroup = consumerGroup;
90 fTimeoutMs = timeoutMs;
94 fHostSelector = new HostSelector(hostPart);
98 public Iterable<String> fetch() throws IOException, Exception {
99 // fetch with the timeout and limit set in constructor
100 return fetch(fTimeoutMs, fLimit);
104 public Iterable<String> fetch(int timeoutMs, int limit) throws IOException, Exception {
105 final LinkedList<String> msgs = new LinkedList<String>();
107 // FIXME: the timeout on the socket needs to be at least as long as the
109 // // sanity check for long poll timeout vs. socket read timeout
110 // final int maxReasonableTimeoutMs =
111 // CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
112 // if ( timeoutMs > maxReasonableTimeoutMs )
114 // log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t.
115 // socket read timeout (" +
116 // CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll
117 // timeout to " + maxReasonableTimeoutMs + "." );
118 // timeoutMs = maxReasonableTimeoutMs;
121 // final String urlPath = createUrlPath ( timeoutMs, limit );
123 // getLog().info ( "UEB GET " + urlPath );
125 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
126 DMEConfigure(timeoutMs, limit);
128 // getLog().info ( "Receiving msgs from: " +
129 // url+subContextPath );
130 String reply = sender.sendAndWait(timeoutMs + 10000L);
131 final JSONObject o = getResponseDataInJson(reply);
134 final JSONArray a = o.getJSONArray("result");
135 // final int b = o.getInt("status" );
136 // if ( a != null && a.length()>0 )
138 for (int i = 0; i < a.length(); i++) {
139 // msgs.add("DMAAP response status:
140 // "+Integer.toString(b));
141 if (a.get(i) instanceof String)
142 msgs.add(a.getString(i));
144 msgs.add(a.getJSONObject(i).toString());
148 // else if(a != null && a.length()<1){
152 } catch (JSONException e) {
153 // unexpected response
154 reportProblemWithResponse();
155 log.error("exception: ", e);
156 } catch (HttpException e) {
157 throw new IOException(e);
161 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
162 // final String urlPath = createUrlPath
163 // (MRConstants.makeConsumerUrl ( host, fTopic, fGroup,
164 // fId,props.getProperty("Protocol")), timeoutMs, limit );
165 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
166 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
169 final JSONObject o = get(urlPath, username, password, protocolFlag);
172 final JSONArray a = o.getJSONArray("result");
173 final int b = o.getInt("status");
174 // if ( a != null && a.length()>0 )
176 for (int i = 0; i < a.length(); i++) {
177 // msgs.add("DMAAP response status:
178 // "+Integer.toString(b));
179 if (a.get(i) instanceof String)
180 msgs.add(a.getString(i));
182 msgs.add(a.getJSONObject(i).toString());
186 // else if(a != null && a.length()<1)
191 } catch (JSONException e) {
192 // unexpected response
193 reportProblemWithResponse();
194 log.error("exception: ", e);
195 } catch (HttpException e) {
196 throw new IOException(e);
200 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
201 final String urlPath = createUrlPath(
202 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
206 final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
208 final JSONArray a = o.getJSONArray("result");
209 final int b = o.getInt("status");
210 // if ( a != null && a.length()>0)
212 for (int i = 0; i < a.length(); i++) {
213 // msgs.add("DMAAP response status:
214 // "+Integer.toString(b));
215 if (a.get(i) instanceof String)
216 msgs.add(a.getString(i));
218 msgs.add(a.getJSONObject(i).toString());
222 // else if(a != null && a.length()<1){
226 } catch (JSONException e) {
227 // unexpected response
228 reportProblemWithResponse();
229 log.error("exception: ", e);
230 } catch (HttpException e) {
231 throw new IOException(e);
235 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
236 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
237 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
240 final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
242 final JSONArray a = o.getJSONArray("result");
243 final int b = o.getInt("status");
244 // if ( a != null && a.length()>0)
246 for (int i = 0; i < a.length(); i++) {
247 // msgs.add("DMAAP response status:
248 // "+Integer.toString(b));
249 if (a.get(i) instanceof String)
250 msgs.add(a.getString(i));
252 msgs.add(a.getJSONObject(i).toString());
258 } catch (JSONException e) {
259 // unexpected response
260 reportProblemWithResponse();
261 log.error("exception: ", e);
262 } catch (HttpException e) {
263 throw new IOException(e);
268 } catch (JSONException e) {
269 // unexpected response
270 reportProblemWithResponse();
271 log.error("exception: ", e);
272 } catch (HttpException e) {
273 throw new IOException(e);
274 } catch (Exception e) {
281 private JSONObject getResponseDataInJson(String response) {
284 // log.info("DMAAP response status: " + response.getStatus());
286 // final String responseData = response.readEntity(String.class);
287 JSONTokener jsonTokener = new JSONTokener(response);
288 JSONObject jsonObject = null;
289 final char firstChar = jsonTokener.next();
291 if ('[' == firstChar) {
292 JSONArray jsonArray = new JSONArray(jsonTokener);
293 jsonObject = new JSONObject();
294 jsonObject.put("result", jsonArray);
296 jsonObject = new JSONObject(jsonTokener);
300 } catch (JSONException excp) {
301 log.error("DMAAP - Error reading response data.", excp);
307 private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
308 JSONTokener jsonTokener = new JSONTokener(response);
309 JSONObject jsonObject = null;
310 final char firstChar = jsonTokener.next();
312 if (null != response && response.length() == 0) {
316 if ('[' == firstChar) {
317 JSONArray jsonArray = new JSONArray(jsonTokener);
318 jsonObject = new JSONObject();
319 jsonObject.put("result", jsonArray);
320 } else if ('{' == firstChar) {
322 } else if ('<' == firstChar) {
325 jsonObject = new JSONObject(jsonTokener);
332 private final String fTopic;
333 private final String fGroup;
334 private final String fId;
335 private final int fTimeoutMs;
336 private final int fLimit;
337 private String fFilter;
338 private String username;
339 private String password;
341 HostSelector fHostSelector = null;
342 private String latitude;
343 private String longitude;
344 private String version;
345 private String serviceName;
347 private String partner;
348 private String routeOffer;
349 private String subContextPath;
350 private String protocol;
351 private String methodType;
353 private String dmeuser;
354 private String dmepassword;
355 private String contenttype;
356 private DME2Client sender;
357 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
358 public String consumerFilePath;
359 private String authKey;
360 private String authDate;
361 private Properties props;
362 private HashMap<String, String> DMETimeOuts;
363 private String handlers;
364 public static final String routerFilePath = null;
366 public static String getRouterFilePath() {
367 return routerFilePath;
370 public static void setRouterFilePath(String routerFilePath) {
371 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
374 public String getConsumerFilePath() {
375 return consumerFilePath;
378 public void setConsumerFilePath(String consumerFilePath) {
379 this.consumerFilePath = consumerFilePath;
382 public String getProtocolFlag() {
386 public void setProtocolFlag(String protocolFlag) {
387 this.protocolFlag = protocolFlag;
390 private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
391 latitude = props.getProperty("Latitude");
392 longitude = props.getProperty("Longitude");
393 version = props.getProperty("Version");
394 serviceName = props.getProperty("ServiceName");
395 env = props.getProperty("Environment");
396 partner = props.getProperty("Partner");
397 routeOffer = props.getProperty("routeOffer");
399 subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
400 // subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
401 // if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath,
404 protocol = props.getProperty("Protocol");
405 methodType = props.getProperty("MethodType");
406 dmeuser = props.getProperty("username");
407 dmepassword = props.getProperty("password");
408 contenttype = props.getProperty("contenttype");
409 handlers = props.getProperty("sessionstickinessrequired");
410 // url =protocol+"://DME2SEARCH/"+
411 // "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
413 // "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
416 * Changes to DME2Client url to use Partner for auto failover between
417 * data centers When Partner value is not provided use the routeOffer
418 * value for auto failover within a cluster
421 String preferredRouteKey = readRoute("preferredRouteKey");
423 if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
424 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
425 + "&routeoffer=" + preferredRouteKey;
426 } else if (partner != null && !partner.isEmpty()) {
427 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
428 } else if (routeOffer != null && !routeOffer.isEmpty()) {
429 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
433 // log.info("url :"+url);
436 url = url + "&timeout=" + timeoutMs;
438 url = url + "&limit=" + limit;
440 // Add filter to DME2 Url
441 if (fFilter != null && fFilter.length() > 0)
442 url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
444 DMETimeOuts = new HashMap<String, String>();
445 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
446 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
447 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
448 DMETimeOuts.put("Content-Type", contenttype);
449 System.setProperty("AFT_LATITUDE", latitude);
450 System.setProperty("AFT_LONGITUDE", longitude);
451 System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
452 // System.setProperty("DME2.DEBUG", "true");
455 // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
456 // "SSLv3,TLSv1,TLSv1.1");
457 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
458 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
459 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
462 sender = new DME2Client(new URI(url), timeoutMs + 10000L);
463 sender.setAllowAllHttpReturnCodes(true);
464 sender.setMethod(methodType);
465 sender.setSubContext(subContextPath);
466 if (dmeuser != null && dmepassword != null) {
467 sender.setCredentials(dmeuser, dmepassword);
469 sender.setHeaders(DMETimeOuts);
470 sender.setPayload("");
472 if (handlers.equalsIgnoreCase("yes")) {
473 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
474 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
475 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
476 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
478 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
482 public Properties getProps() {
486 public void setProps(Properties props) {
490 protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
491 final StringBuffer contexturl = new StringBuffer(url);
492 // final StringBuffer url = new StringBuffer (
493 // CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
494 final StringBuffer adds = new StringBuffer();
496 adds.append("timeout=").append(timeoutMs);
498 if (adds.length() > 0) {
501 adds.append("limit=").append(limit);
503 if (fFilter != null && fFilter.length() > 0) {
505 if (adds.length() > 0) {
508 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
509 } catch (UnsupportedEncodingException e) {
510 throw new RuntimeException(e.getMessage() + "....say whaaaat?!");
513 if (adds.length() > 0) {
514 contexturl.append("?").append(adds.toString());
517 // sender.setSubContext(url.toString());
518 return contexturl.toString();
521 public String getUsername() {
525 public void setUsername(String username) {
526 this.username = username;
529 public String getPassword() {
533 public void setPassword(String password) {
534 this.password = password;
537 public String getHost() {
541 public void setHost(String host) {
545 public String getAuthKey() {
549 public void setAuthKey(String authKey) {
550 this.authKey = authKey;
553 public String getAuthDate() {
557 public void setAuthDate(String authDate) {
558 this.authDate = authDate;
561 public String getfFilter() {
565 public void setfFilter(String fFilter) {
566 this.fFilter = fFilter;
569 private String readRoute(String routeKey) {
573 MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
575 } catch (Exception ex) {
576 log.error("Reply Router Error " + ex.toString());
578 String routeOffer = MRClientFactory.prop.getProperty(routeKey);
583 public MRConsumerResponse fetchWithReturnConsumerResponse() {
585 // fetch with the timeout and limit set in constructor
586 return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
590 public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
591 final LinkedList<String> msgs = new LinkedList<String>();
592 MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
594 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
595 DMEConfigure(timeoutMs, limit);
597 String reply = sender.sendAndWait(timeoutMs + 10000L);
599 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
602 final JSONArray a = o.getJSONArray("result");
605 for (int i = 0; i < a.length(); i++) {
606 if (a.get(i) instanceof String)
607 msgs.add(a.getString(i));
609 msgs.add(a.getJSONObject(i).toString());
615 createMRConsumerResponse(reply, mrConsumerResponse);
618 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
620 * final String urlPath = createUrlPath(
621 * MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
622 * props.getProperty("Protocol")), timeoutMs, limit);
625 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
626 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
627 String response = getResponse(urlPath, username, password, protocolFlag);
629 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
632 final JSONArray a = o.getJSONArray("result");
635 for (int i = 0; i < a.length(); i++) {
636 if (a.get(i) instanceof String)
637 msgs.add(a.getString(i));
639 msgs.add(a.getJSONObject(i).toString());
645 createMRConsumerResponse(response, mrConsumerResponse);
648 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
649 final String urlPath = createUrlPath(
650 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
653 String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
654 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
656 final JSONArray a = o.getJSONArray("result");
659 for (int i = 0; i < a.length(); i++) {
660 if (a.get(i) instanceof String)
661 msgs.add(a.getString(i));
663 msgs.add(a.getJSONObject(i).toString());
669 createMRConsumerResponse(response, mrConsumerResponse);
671 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
672 // final String urlPath = createUrlPath(
673 // MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
674 // props.getProperty("Protocol")), timeoutMs,
676 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
677 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
679 String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
680 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
682 final JSONArray a = o.getJSONArray("result");
685 for (int i = 0; i < a.length(); i++) {
686 if (a.get(i) instanceof String)
687 msgs.add(a.getString(i));
689 msgs.add(a.getJSONObject(i).toString());
695 createMRConsumerResponse(response, mrConsumerResponse);
698 } catch (JSONException e) {
699 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
700 mrConsumerResponse.setResponseMessage(e.getMessage());
701 log.error("json exception: ", e);
702 } catch (HttpException e) {
703 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
704 mrConsumerResponse.setResponseMessage(e.getMessage());
705 log.error("http exception: ", e);
706 } catch (DME2Exception e) {
707 mrConsumerResponse.setResponseCode(e.getErrorCode());
708 mrConsumerResponse.setResponseMessage(e.getErrorMessage());
709 log.error("DME2 exception: ", e);
710 } catch (Exception e) {
711 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
712 mrConsumerResponse.setResponseMessage(e.getMessage());
713 log.error("exception: ", e);
715 mrConsumerResponse.setActualMessages(msgs);
716 return mrConsumerResponse;
719 private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
721 if (reply.startsWith("{")) {
722 JSONObject jObject = new JSONObject(reply);
723 String message = jObject.getString("message");
724 int status = jObject.getInt("status");
726 mrConsumerResponse.setResponseCode(Integer.toString(status));
728 if (null != message) {
729 mrConsumerResponse.setResponseMessage(message);
731 } else if (reply.startsWith("<")) {
732 mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
733 mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
735 mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
736 mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);