1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
25 import java.io.FileReader;
26 import java.io.IOException;
27 import java.io.UnsupportedEncodingException;
28 import java.net.MalformedURLException;
30 import java.net.URISyntaxException;
31 import java.net.URLEncoder;
32 import java.util.Collection;
33 import java.util.HashMap;
34 import java.util.LinkedList;
35 import java.util.List;
36 import java.util.Properties;
38 import org.apache.http.HttpException;
39 import org.apache.http.HttpStatus;
40 import org.json.JSONArray;
41 import org.json.JSONException;
42 import org.json.JSONObject;
43 import org.json.JSONTokener;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 import com.att.aft.dme2.api.DME2Client;
48 import com.att.aft.dme2.api.DME2Exception;
49 import com.att.nsa.mr.client.HostSelector;
50 import com.att.nsa.mr.client.MRClientFactory;
51 import com.att.nsa.mr.client.MRConsumer;
52 import com.att.nsa.mr.client.response.MRConsumerResponse;
53 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
55 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
57 private static final String SUCCESS_MESSAGE = "Success";
59 private Logger log = LoggerFactory.getLogger(this.getClass().getName());
61 public static List<String> stringToList(String str) {
62 final LinkedList<String> set = new LinkedList<String>();
64 final String[] parts = str.trim().split(",");
65 for (String part : parts) {
66 final String trimmed = part.trim();
67 if (trimmed.length() > 0) {
75 public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
76 final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
77 String apiSecret_password) throws MalformedURLException {
78 this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
82 public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
83 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
84 boolean allowSelfSignedCerts) throws MalformedURLException {
85 super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
88 fGroup = consumerGroup;
90 fTimeoutMs = timeoutMs;
94 fHostSelector = new HostSelector(hostPart);
98 public Iterable<String> fetch() throws IOException, Exception {
99 // fetch with the timeout and limit set in constructor
100 return fetch(fTimeoutMs, fLimit);
104 public Iterable<String> fetch(int timeoutMs, int limit) throws IOException, Exception {
105 final LinkedList<String> msgs = new LinkedList<String>();
107 // FIXME: the timeout on the socket needs to be at least as long as the
109 // // sanity check for long poll timeout vs. socket read timeout
110 // final int maxReasonableTimeoutMs =
111 // CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
112 // if ( timeoutMs > maxReasonableTimeoutMs )
114 // log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t.
115 // socket read timeout (" +
116 // CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll
117 // timeout to " + maxReasonableTimeoutMs + "." );
118 // timeoutMs = maxReasonableTimeoutMs;
121 // final String urlPath = createUrlPath ( timeoutMs, limit );
123 // getLog().info ( "UEB GET " + urlPath );
125 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
126 DMEConfigure(timeoutMs, limit);
128 // getLog().info ( "Receiving msgs from: " +
129 // url+subContextPath );
130 String reply = sender.sendAndWait(timeoutMs + 10000L);
131 final JSONObject o = getResponseDataInJson(reply);
134 final JSONArray a = o.getJSONArray("result");
135 // final int b = o.getInt("status" );
136 // if ( a != null && a.length()>0 )
138 for (int i = 0; i < a.length(); i++) {
139 // msgs.add("DMAAP response status:
140 // "+Integer.toString(b));
141 if (a.get(i) instanceof String)
142 msgs.add(a.getString(i));
144 msgs.add(a.getJSONObject(i).toString());
148 // else if(a != null && a.length()<1){
152 } catch (JSONException e) {
153 // unexpected response
154 reportProblemWithResponse();
155 log.error("exception: ", e);
156 } catch (HttpException e) {
157 throw new IOException(e);
161 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
162 // final String urlPath = createUrlPath
163 // (MRConstants.makeConsumerUrl ( host, fTopic, fGroup,
164 // fId,props.getProperty("Protocol")), timeoutMs, limit );
165 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
166 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
169 final JSONObject o = get(urlPath, username, password, protocolFlag);
172 final JSONArray a = o.getJSONArray("result");
173 final int b = o.getInt("status");
174 // if ( a != null && a.length()>0 )
176 for (int i = 0; i < a.length(); i++) {
177 // msgs.add("DMAAP response status:
178 // "+Integer.toString(b));
179 if (a.get(i) instanceof String)
180 msgs.add(a.getString(i));
182 msgs.add(a.getJSONObject(i).toString());
186 // else if(a != null && a.length()<1)
191 } catch (JSONException e) {
192 // unexpected response
193 reportProblemWithResponse();
194 log.error("exception: ", e);
195 } catch (HttpException e) {
196 throw new IOException(e);
200 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
201 final String urlPath = createUrlPath(
202 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
206 final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
208 final JSONArray a = o.getJSONArray("result");
209 final int b = o.getInt("status");
210 // if ( a != null && a.length()>0)
212 for (int i = 0; i < a.length(); i++) {
213 // msgs.add("DMAAP response status:
214 // "+Integer.toString(b));
215 if (a.get(i) instanceof String)
216 msgs.add(a.getString(i));
218 msgs.add(a.getJSONObject(i).toString());
222 // else if(a != null && a.length()<1){
226 } catch (JSONException e) {
227 // unexpected response
228 reportProblemWithResponse();
229 log.error("exception: ", e);
230 } catch (HttpException e) {
231 throw new IOException(e);
235 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
236 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
237 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
240 final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
242 final JSONArray a = o.getJSONArray("result");
243 final int b = o.getInt("status");
244 // if ( a != null && a.length()>0)
246 for (int i = 0; i < a.length(); i++) {
247 // msgs.add("DMAAP response status:
248 // "+Integer.toString(b));
249 if (a.get(i) instanceof String)
250 msgs.add(a.getString(i));
252 msgs.add(a.getJSONObject(i).toString());
258 } catch (JSONException e) {
259 // unexpected response
260 reportProblemWithResponse();
261 } catch (HttpException e) {
262 throw new IOException(e);
267 } catch (JSONException e) {
268 // unexpected response
269 reportProblemWithResponse();
270 log.error("exception: ", e);
271 } catch (HttpException e) {
272 throw new IOException(e);
273 } catch (Exception e) {
280 private JSONObject getResponseDataInJson(String response) {
283 // log.info("DMAAP response status: " + response.getStatus());
285 // final String responseData = response.readEntity(String.class);
286 JSONTokener jsonTokener = new JSONTokener(response);
287 JSONObject jsonObject = null;
288 final char firstChar = jsonTokener.next();
290 if ('[' == firstChar) {
291 JSONArray jsonArray = new JSONArray(jsonTokener);
292 jsonObject = new JSONObject();
293 jsonObject.put("result", jsonArray);
295 jsonObject = new JSONObject(jsonTokener);
299 } catch (JSONException excp) {
300 // log.error("DMAAP - Error reading response data.", excp);
306 private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
307 JSONTokener jsonTokener = new JSONTokener(response);
308 JSONObject jsonObject = null;
309 final char firstChar = jsonTokener.next();
311 if (null != response && response.length() == 0) {
315 if ('[' == firstChar) {
316 JSONArray jsonArray = new JSONArray(jsonTokener);
317 jsonObject = new JSONObject();
318 jsonObject.put("result", jsonArray);
319 } else if ('{' == firstChar) {
321 } else if ('<' == firstChar) {
324 jsonObject = new JSONObject(jsonTokener);
331 private final String fTopic;
332 private final String fGroup;
333 private final String fId;
334 private final int fTimeoutMs;
335 private final int fLimit;
336 private String fFilter;
337 private String username;
338 private String password;
340 HostSelector fHostSelector = null;
341 private String latitude;
342 private String longitude;
343 private String version;
344 private String serviceName;
346 private String partner;
347 private String routeOffer;
348 private String subContextPath;
349 private String protocol;
350 private String methodType;
352 private String dmeuser;
353 private String dmepassword;
354 private String contenttype;
355 private DME2Client sender;
356 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
357 public String consumerFilePath;
358 private String authKey;
359 private String authDate;
360 private Properties props;
361 private HashMap<String, String> DMETimeOuts;
362 private String handlers;
363 public static final String routerFilePath = null;
365 public static String getRouterFilePath() {
366 return routerFilePath;
369 public static void setRouterFilePath(String routerFilePath) {
370 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
373 public String getConsumerFilePath() {
374 return consumerFilePath;
377 public void setConsumerFilePath(String consumerFilePath) {
378 this.consumerFilePath = consumerFilePath;
381 public String getProtocolFlag() {
385 public void setProtocolFlag(String protocolFlag) {
386 this.protocolFlag = protocolFlag;
389 private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
390 latitude = props.getProperty("Latitude");
391 longitude = props.getProperty("Longitude");
392 version = props.getProperty("Version");
393 serviceName = props.getProperty("ServiceName");
394 env = props.getProperty("Environment");
395 partner = props.getProperty("Partner");
396 routeOffer = props.getProperty("routeOffer");
398 subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
399 // subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
400 // if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath,
403 protocol = props.getProperty("Protocol");
404 methodType = props.getProperty("MethodType");
405 dmeuser = props.getProperty("username");
406 dmepassword = props.getProperty("password");
407 contenttype = props.getProperty("contenttype");
408 handlers = props.getProperty("sessionstickinessrequired");
409 // url =protocol+"://DME2SEARCH/"+
410 // "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
412 // "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
415 * Changes to DME2Client url to use Partner for auto failover between
416 * data centers When Partner value is not provided use the routeOffer
417 * value for auto failover within a cluster
420 String preferredRouteKey = readRoute("preferredRouteKey");
422 if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
423 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
424 + "&routeoffer=" + preferredRouteKey;
425 } else if (partner != null && !partner.isEmpty()) {
426 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
427 } else if (routeOffer != null && !routeOffer.isEmpty()) {
428 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
432 // log.info("url :"+url);
435 url = url + "&timeout=" + timeoutMs;
437 url = url + "&limit=" + limit;
439 // Add filter to DME2 Url
440 if (fFilter != null && fFilter.length() > 0)
441 url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
443 DMETimeOuts = new HashMap<String, String>();
444 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
445 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
446 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
447 DMETimeOuts.put("Content-Type", contenttype);
448 System.setProperty("AFT_LATITUDE", latitude);
449 System.setProperty("AFT_LONGITUDE", longitude);
450 System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
451 // System.setProperty("DME2.DEBUG", "true");
454 // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
455 // "SSLv3,TLSv1,TLSv1.1");
456 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
457 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
458 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
461 sender = new DME2Client(new URI(url), timeoutMs + 10000L);
462 sender.setAllowAllHttpReturnCodes(true);
463 sender.setMethod(methodType);
464 sender.setSubContext(subContextPath);
465 if (dmeuser != null && dmepassword != null) {
466 sender.setCredentials(dmeuser, dmepassword);
468 sender.setHeaders(DMETimeOuts);
469 sender.setPayload("");
471 if (handlers.equalsIgnoreCase("yes")) {
472 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
473 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
474 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
475 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
477 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
481 public Properties getProps() {
485 public void setProps(Properties props) {
489 protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
490 final StringBuffer contexturl = new StringBuffer(url);
491 // final StringBuffer url = new StringBuffer (
492 // CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
493 final StringBuffer adds = new StringBuffer();
495 adds.append("timeout=").append(timeoutMs);
497 if (adds.length() > 0) {
500 adds.append("limit=").append(limit);
502 if (fFilter != null && fFilter.length() > 0) {
504 if (adds.length() > 0) {
507 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
508 } catch (UnsupportedEncodingException e) {
509 throw new RuntimeException(e.getMessage() + "....say whaaaat?!");
512 if (adds.length() > 0) {
513 contexturl.append("?").append(adds.toString());
516 // sender.setSubContext(url.toString());
517 return contexturl.toString();
520 public String getUsername() {
524 public void setUsername(String username) {
525 this.username = username;
528 public String getPassword() {
532 public void setPassword(String password) {
533 this.password = password;
536 public String getHost() {
540 public void setHost(String host) {
544 public String getAuthKey() {
548 public void setAuthKey(String authKey) {
549 this.authKey = authKey;
552 public String getAuthDate() {
556 public void setAuthDate(String authDate) {
557 this.authDate = authDate;
560 public String getfFilter() {
564 public void setfFilter(String fFilter) {
565 this.fFilter = fFilter;
568 private String readRoute(String routeKey) {
572 MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
574 } catch (Exception ex) {
575 log.error("Reply Router Error " + ex.toString());
577 String routeOffer = MRClientFactory.prop.getProperty(routeKey);
582 public MRConsumerResponse fetchWithReturnConsumerResponse() {
584 // fetch with the timeout and limit set in constructor
585 return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
589 public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
590 final LinkedList<String> msgs = new LinkedList<String>();
591 MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
593 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
594 DMEConfigure(timeoutMs, limit);
596 String reply = sender.sendAndWait(timeoutMs + 10000L);
598 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
601 final JSONArray a = o.getJSONArray("result");
604 for (int i = 0; i < a.length(); i++) {
605 if (a.get(i) instanceof String)
606 msgs.add(a.getString(i));
608 msgs.add(a.getJSONObject(i).toString());
614 createMRConsumerResponse(reply, mrConsumerResponse);
617 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
619 * final String urlPath = createUrlPath(
620 * MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
621 * props.getProperty("Protocol")), timeoutMs, limit);
624 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
625 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
626 String response = getResponse(urlPath, username, password, protocolFlag);
628 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
631 final JSONArray a = o.getJSONArray("result");
634 for (int i = 0; i < a.length(); i++) {
635 if (a.get(i) instanceof String)
636 msgs.add(a.getString(i));
638 msgs.add(a.getJSONObject(i).toString());
644 createMRConsumerResponse(response, mrConsumerResponse);
647 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
648 final String urlPath = createUrlPath(
649 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
652 String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
653 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
655 final JSONArray a = o.getJSONArray("result");
658 for (int i = 0; i < a.length(); i++) {
659 if (a.get(i) instanceof String)
660 msgs.add(a.getString(i));
662 msgs.add(a.getJSONObject(i).toString());
668 createMRConsumerResponse(response, mrConsumerResponse);
670 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
671 // final String urlPath = createUrlPath(
672 // MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
673 // props.getProperty("Protocol")), timeoutMs,
675 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
676 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
678 String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
679 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
681 final JSONArray a = o.getJSONArray("result");
684 for (int i = 0; i < a.length(); i++) {
685 if (a.get(i) instanceof String)
686 msgs.add(a.getString(i));
688 msgs.add(a.getJSONObject(i).toString());
694 createMRConsumerResponse(response, mrConsumerResponse);
697 } catch (JSONException e) {
698 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
699 mrConsumerResponse.setResponseMessage(e.getMessage());
700 log.error("json exception: ", e);
701 } catch (HttpException e) {
702 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
703 mrConsumerResponse.setResponseMessage(e.getMessage());
704 log.error("http exception: ", e);
705 } catch (DME2Exception e) {
706 mrConsumerResponse.setResponseCode(e.getErrorCode());
707 mrConsumerResponse.setResponseMessage(e.getErrorMessage());
708 log.error("DME2 exception: ", e);
709 } catch (Exception e) {
710 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
711 mrConsumerResponse.setResponseMessage(e.getMessage());
712 log.error("exception: ", e);
714 mrConsumerResponse.setActualMessages(msgs);
715 return mrConsumerResponse;
718 private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
720 if (reply.startsWith("{")) {
721 JSONObject jObject = new JSONObject(reply);
722 String message = jObject.getString("message");
723 int status = jObject.getInt("status");
725 mrConsumerResponse.setResponseCode(Integer.toString(status));
727 if (null != message) {
728 mrConsumerResponse.setResponseMessage(message);
730 } else if (reply.startsWith("<")) {
731 mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
732 mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
734 mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
735 mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);