1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.mr.client.impl;
24 import com.att.aft.dme2.api.DME2Client;
25 import com.att.aft.dme2.api.DME2Exception;
26 import org.onap.dmaap.mr.client.HostSelector;
27 import org.onap.dmaap.mr.client.MRClientFactory;
28 import org.onap.dmaap.mr.client.MRConsumer;
29 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
30 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
32 import java.io.FileReader;
33 import java.io.IOException;
34 import java.io.UnsupportedEncodingException;
35 import java.net.MalformedURLException;
37 import java.net.URISyntaxException;
38 import java.net.URLEncoder;
39 import java.util.Collection;
40 import java.util.HashMap;
41 import java.util.LinkedList;
42 import java.util.List;
43 import java.util.Properties;
44 import java.util.concurrent.TimeUnit;
45 import org.apache.http.HttpException;
46 import org.apache.http.HttpStatus;
47 import org.json.JSONArray;
48 import org.json.JSONException;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
56 private Logger log = LoggerFactory.getLogger(this.getClass().getName());
58 public static final String routerFilePath = null;
60 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
61 public String consumerFilePath;
63 private static final String SUCCESS_MESSAGE = "Success";
64 private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
65 private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
67 private final String fTopic;
68 private final String fGroup;
69 private final String fId;
70 private final int fTimeoutMs;
71 private final int fLimit;
72 private String fFilter;
73 private String username;
74 private String password;
76 private HostSelector fHostSelector = null;
78 private DME2Client sender;
79 private String authKey;
80 private String authDate;
81 private Properties props;
82 private HashMap<String, String> DMETimeOuts;
83 private long dme2ReplyHandlerTimeoutMs;
84 private long longPollingMs;
86 public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
87 final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
88 String apiSecret_password) throws MalformedURLException {
89 this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
93 public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
94 final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
95 boolean allowSelfSignedCerts) throws MalformedURLException {
96 super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
99 fGroup = consumerGroup;
101 fTimeoutMs = timeoutMs;
105 fHostSelector = new HostSelector(hostPart);
109 public Iterable<String> fetch() throws IOException, Exception {
110 // fetch with the timeout and limit set in constructor
111 return fetch(fTimeoutMs, fLimit);
115 public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
116 final LinkedList<String> msgs = new LinkedList<>();
119 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
120 dmeConfigure(timeoutMs, limit);
122 String reply = sender.sendAndWait(timeoutMs + 10000L);
123 final JSONObject o = getResponseDataInJson(reply);
125 final JSONArray a = o.getJSONArray("result");
127 for (int i = 0; i < a.length(); i++) {
128 if (a.get(i) instanceof String)
129 msgs.add(a.getString(i));
131 msgs.add(a.getJSONObject(i).toString());
135 } catch (JSONException e) {
136 // unexpected response
137 reportProblemWithResponse();
138 log.error("exception: ", e);
139 } catch (HttpException e) {
140 throw new IOException(e);
144 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
145 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
146 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
149 final JSONObject o = get(urlPath, username, password, protocolFlag);
152 final JSONArray a = o.getJSONArray("result");
154 for (int i = 0; i < a.length(); i++) {
155 if (a.get(i) instanceof String)
156 msgs.add(a.getString(i));
158 msgs.add(a.getJSONObject(i).toString());
162 } catch (JSONException e) {
163 // unexpected response
164 reportProblemWithResponse();
165 log.error("exception: ", e);
166 } catch (HttpException e) {
167 throw new IOException(e);
171 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
172 final String urlPath = createUrlPath(
173 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
177 final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
179 final JSONArray a = o.getJSONArray("result");
181 for (int i = 0; i < a.length(); i++) {
182 if (a.get(i) instanceof String)
183 msgs.add(a.getString(i));
185 msgs.add(a.getJSONObject(i).toString());
189 } catch (JSONException e) {
190 // unexpected response
191 reportProblemWithResponse();
192 log.error("exception: ", e);
193 } catch (HttpException e) {
194 throw new IOException(e);
198 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
199 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
200 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
203 final JSONObject o = getNoAuth(urlPath);
205 final JSONArray a = o.getJSONArray("result");
207 for (int i = 0; i < a.length(); i++) {
208 if (a.get(i) instanceof String)
209 msgs.add(a.getString(i));
211 msgs.add(a.getJSONObject(i).toString());
215 } catch (JSONException e) {
216 // unexpected response
217 reportProblemWithResponse();
218 log.error("exception: ", e);
219 } catch (HttpException e) {
220 throw new IOException(e);
223 } catch (JSONException e) {
224 // unexpected response
225 reportProblemWithResponse();
226 log.error("exception: ", e);
227 } catch (HttpException e) {
228 throw new IOException(e);
229 } catch (Exception e) {
237 public MRConsumerResponse fetchWithReturnConsumerResponse() {
238 // fetch with the timeout and limit set in constructor
239 return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
243 public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
244 final LinkedList<String> msgs = new LinkedList<String>();
245 MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
247 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
248 dmeConfigure(timeoutMs, limit);
250 long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
251 : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS);
252 String reply = sender.sendAndWait(timeout);
254 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
257 final JSONArray a = o.getJSONArray("result");
260 for (int i = 0; i < a.length(); i++) {
261 if (a.get(i) instanceof String)
262 msgs.add(a.getString(i));
264 msgs.add(a.getJSONObject(i).toString());
268 createMRConsumerResponse(reply, mrConsumerResponse);
271 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
272 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
273 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
275 String response = getResponse(urlPath, username, password, protocolFlag);
276 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
278 final JSONArray a = o.getJSONArray("result");
281 for (int i = 0; i < a.length(); i++) {
282 if (a.get(i) instanceof String)
283 msgs.add(a.getString(i));
285 msgs.add(a.getJSONObject(i).toString());
289 createMRConsumerResponse(response, mrConsumerResponse);
292 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
293 final String urlPath = createUrlPath(
294 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
297 String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
298 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
300 final JSONArray a = o.getJSONArray("result");
303 for (int i = 0; i < a.length(); i++) {
304 if (a.get(i) instanceof String)
305 msgs.add(a.getString(i));
307 msgs.add(a.getJSONObject(i).toString());
311 createMRConsumerResponse(response, mrConsumerResponse);
314 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
315 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
316 fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
318 String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
319 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
321 final JSONArray a = o.getJSONArray("result");
324 for (int i = 0; i < a.length(); i++) {
325 if (a.get(i) instanceof String)
326 msgs.add(a.getString(i));
328 msgs.add(a.getJSONObject(i).toString());
332 createMRConsumerResponse(response, mrConsumerResponse);
335 } catch (JSONException e) {
336 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
337 mrConsumerResponse.setResponseMessage(e.getMessage());
338 log.error("json exception: ", e);
339 } catch (HttpException e) {
340 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
341 mrConsumerResponse.setResponseMessage(e.getMessage());
342 log.error("http exception: ", e);
343 } catch (DME2Exception e) {
344 mrConsumerResponse.setResponseCode(e.getErrorCode());
345 mrConsumerResponse.setResponseMessage(e.getErrorMessage());
346 log.error("DME2 exception: ", e);
347 } catch (Exception e) {
348 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
349 mrConsumerResponse.setResponseMessage(e.getMessage());
350 log.error("exception: ", e);
352 mrConsumerResponse.setActualMessages(msgs);
353 return mrConsumerResponse;
357 protected void reportProblemWithResponse() {
358 log.warn("There was a problem with the server response. Blacklisting for 3 minutes.");
359 super.reportProblemWithResponse();
360 fHostSelector.reportReachabilityProblem(3, TimeUnit.MINUTES);
363 private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
364 if (reply.startsWith("{")) {
365 JSONObject jObject = new JSONObject(reply);
366 String message = jObject.getString("message");
367 int status = jObject.getInt("status");
369 mrConsumerResponse.setResponseCode(Integer.toString(status));
371 if (null != message) {
372 mrConsumerResponse.setResponseMessage(message);
374 } else if (reply.startsWith("<")) {
375 mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
376 mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
378 mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
379 mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
383 private JSONObject getResponseDataInJson(String response) {
385 JSONTokener jsonTokener = new JSONTokener(response);
386 JSONObject jsonObject = null;
387 final char firstChar = jsonTokener.next();
389 if ('[' == firstChar) {
390 JSONArray jsonArray = new JSONArray(jsonTokener);
391 jsonObject = new JSONObject();
392 jsonObject.put("result", jsonArray);
394 jsonObject = new JSONObject(jsonTokener);
398 } catch (JSONException excp) {
399 log.error("DMAAP - Error reading response data.", excp);
404 private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
405 JSONTokener jsonTokener = new JSONTokener(response);
406 JSONObject jsonObject = null;
407 final char firstChar = jsonTokener.next();
409 if (null != response && response.length() == 0) {
413 if ('[' == firstChar) {
414 JSONArray jsonArray = new JSONArray(jsonTokener);
415 jsonObject = new JSONObject();
416 jsonObject.put("result", jsonArray);
417 } else if ('{' == firstChar) {
419 } else if ('<' == firstChar) {
422 jsonObject = new JSONObject(jsonTokener);
428 private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
429 this.longPollingMs = timeoutMs;
430 String latitude = props.getProperty("Latitude");
431 String longitude = props.getProperty("Longitude");
432 String version = props.getProperty("Version");
433 String serviceName = props.getProperty("ServiceName");
434 String env = props.getProperty("Environment");
435 String partner = props.getProperty("Partner");
436 String routeOffer = props.getProperty("routeOffer");
437 String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
438 String protocol = props.getProperty("Protocol");
439 String methodType = props.getProperty("MethodType");
440 String dmeuser = props.getProperty("username");
441 String dmepassword = props.getProperty("password");
442 String contenttype = props.getProperty("contenttype");
443 String handlers = props.getProperty("sessionstickinessrequired");
446 * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
447 * provided use the routeOffer value for auto failover within a cluster
450 String preferredRouteKey = readRoute("preferredRouteKey");
452 if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
453 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
454 + "&routeoffer=" + preferredRouteKey;
455 } else if (partner != null && !partner.isEmpty()) {
456 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
457 } else if (routeOffer != null && !routeOffer.isEmpty()) {
458 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
463 url = url + "&timeout=" + timeoutMs;
465 url = url + "&limit=" + limit;
467 // Add filter to DME2 Url
468 if (fFilter != null && fFilter.length() > 0)
469 url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
471 DMETimeOuts = new HashMap<>();
472 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
473 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
474 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
475 DMETimeOuts.put("Content-Type", contenttype);
476 System.setProperty("AFT_LATITUDE", latitude);
477 System.setProperty("AFT_LONGITUDE", longitude);
478 System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
481 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
482 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
483 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
486 long dme2PerEndPointTimeoutMs;
488 dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
489 // backward compatibility
490 if (dme2PerEndPointTimeoutMs <= 0) {
491 dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
493 } catch (NumberFormatException nfe) {
494 // backward compatibility
495 dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
497 "DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
501 dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
502 } catch (NumberFormatException nfe) {
504 long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
505 long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
506 dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
508 "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
509 + dme2ReplyHandlerTimeoutMs);
510 } catch (NumberFormatException e) {
511 // backward compatibility
512 dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
513 getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
516 // backward compatibility
517 if (dme2ReplyHandlerTimeoutMs <= 0) {
518 dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
521 sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs);
522 sender.setAllowAllHttpReturnCodes(true);
523 sender.setMethod(methodType);
524 sender.setSubContext(subContextPath);
525 if (dmeuser != null && dmepassword != null) {
526 sender.setCredentials(dmeuser, dmepassword);
528 sender.setHeaders(DMETimeOuts);
529 sender.setPayload("");
530 if (handlers != null && handlers.equalsIgnoreCase("yes")) {
531 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
532 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
533 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
534 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
536 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
540 protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
541 final StringBuilder contexturl = new StringBuilder(url);
542 final StringBuilder adds = new StringBuilder();
544 if (timeoutMs > -1) {
545 adds.append("timeout=").append(timeoutMs);
549 if (adds.length() > 0) {
552 adds.append("limit=").append(limit);
555 if (fFilter != null && fFilter.length() > 0) {
557 if (adds.length() > 0) {
560 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
561 } catch (UnsupportedEncodingException e) {
562 log.error("exception at createUrlPath () : ", e);
566 if (adds.length() > 0) {
567 contexturl.append("?").append(adds.toString());
570 return contexturl.toString();
573 private String readRoute(String routeKey) {
575 MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
576 } catch (Exception ex) {
577 log.error("Reply Router Error " + ex);
579 return MRClientFactory.prop.getProperty(routeKey);
582 public static List<String> stringToList(String str) {
583 final LinkedList<String> set = new LinkedList<>();
585 final String[] parts = str.trim().split(",");
586 for (String part : parts) {
587 final String trimmed = part.trim();
588 if (trimmed.length() > 0) {
596 public static String getRouterFilePath() {
597 return routerFilePath;
600 public static void setRouterFilePath(String routerFilePath) {
601 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
604 public String getConsumerFilePath() {
605 return consumerFilePath;
608 public void setConsumerFilePath(String consumerFilePath) {
609 this.consumerFilePath = consumerFilePath;
612 public String getProtocolFlag() {
616 public void setProtocolFlag(String protocolFlag) {
617 this.protocolFlag = protocolFlag;
620 public Properties getProps() {
624 public void setProps(Properties props) {
628 public String getUsername() {
632 public void setUsername(String username) {
633 this.username = username;
636 public String getPassword() {
640 public void setPassword(String password) {
641 this.password = password;
644 public String getHost() {
648 public void setHost(String host) {
652 public String getAuthKey() {
656 public void setAuthKey(String authKey) {
657 this.authKey = authKey;
660 public String getAuthDate() {
664 public void setAuthDate(String authDate) {
665 this.authDate = authDate;
668 public String getfFilter() {
672 public void setfFilter(String fFilter) {
673 this.fFilter = fFilter;