1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.mr.client.impl;
24 import com.att.aft.dme2.api.DME2Client;
25 import com.att.aft.dme2.api.DME2Exception;
26 import org.onap.dmaap.mr.client.HostSelector;
27 import org.onap.dmaap.mr.client.MRClientFactory;
28 import org.onap.dmaap.mr.client.MRConsumer;
29 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
30 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
33 import java.net.MalformedURLException;
35 import java.net.URISyntaxException;
36 import java.net.URLEncoder;
38 import java.util.concurrent.TimeUnit;
39 import org.apache.http.HttpException;
40 import org.apache.http.HttpStatus;
41 import org.json.JSONArray;
42 import org.json.JSONException;
43 import org.json.JSONObject;
44 import org.json.JSONTokener;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
50 private Logger log = LoggerFactory.getLogger(this.getClass().getName());
52 public static final String ROUTER_FILE_PATH = null;
54 public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
55 public String consumerFilePath;
57 private static final String JSON_RESULT = "result";
58 private static final String PROPS_PROTOCOL = "Protocol";
60 private static final String EXECPTION_MESSAGE = "exception: ";
61 private static final String SUCCESS_MESSAGE = "Success";
62 private static final long DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS = 10000L;
63 private static final long DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS = 10000L;
65 private static final String URL_PARAM_ROUTE_OFFER = "routeoffer";
66 private static final String URL_PARAM_PARTNER = "partner";
67 private static final String URL_PARAM_ENV_CONTEXT = "envContext";
68 private static final String URL_PARAM_VERSION = "version";
69 private static final String URL_PARAM_FILTER = "filter";
70 private static final String URL_PARAM_LIMIT = "limit";
71 private static final String URL_PARAM_TIMEOUT = "timeout";
73 private final String fTopic;
74 private final String fGroup;
75 private final String fId;
76 private final int fTimeoutMs;
77 private final int fLimit;
78 private String fFilter;
79 private String username;
80 private String password;
82 private HostSelector fHostSelector = null;
84 private DME2Client sender;
85 private String authKey;
86 private String authDate;
87 private Properties props;
88 private HashMap<String, String> DMETimeOuts;
89 private long dme2ReplyHandlerTimeoutMs;
90 private long longPollingMs;
92 public MRConsumerImpl(MRConsumerImplBuilder builder) throws MalformedURLException {
93 super(builder.hostPart,
94 builder.topic + "::" + builder.consumerGroup + "::" + builder.consumerId);
96 fTopic = builder.topic;
97 fGroup = builder.consumerGroup;
98 fId = builder.consumerId;
99 fTimeoutMs = builder.timeoutMs;
100 fLimit = builder.limit;
101 fFilter = builder.filter;
103 fHostSelector = new HostSelector(builder.hostPart);
106 public static class MRConsumerImplBuilder {
107 private Collection<String> hostPart;
108 private String topic;
109 private String consumerGroup;
110 private String consumerId;
111 private int timeoutMs;
113 private String filter;
114 private String apiKey_username;
115 private String apiSecret_password;
116 private String apiKey;
117 private String apiSecret;
118 private boolean allowSelfSignedCerts = false;
120 public MRConsumerImplBuilder setHostPart(Collection<String> hostPart) {
121 this.hostPart = hostPart;
125 public MRConsumerImplBuilder setTopic(String topic) {
130 public MRConsumerImplBuilder setConsumerGroup(String consumerGroup) {
131 this.consumerGroup = consumerGroup;
135 public MRConsumerImplBuilder setConsumerId(String consumerId) {
136 this.consumerId = consumerId;
140 public MRConsumerImplBuilder setTimeoutMs(int timeoutMs) {
141 this.timeoutMs = timeoutMs;
145 public MRConsumerImplBuilder setLimit(int limit) {
150 public MRConsumerImplBuilder setFilter(String filter) {
151 this.filter = filter;
155 public MRConsumerImplBuilder setApiKey_username(String apiKey_username) {
156 this.apiKey_username = apiKey_username;
160 public MRConsumerImplBuilder setApiSecret_password(String apiSecret_password) {
161 this.apiSecret_password = apiSecret_password;
165 public MRConsumerImplBuilder setApiKey(String apiKey) {
166 this.apiKey = apiKey;
170 public MRConsumerImplBuilder setApiSecret(String apiSecret) {
171 this.apiSecret = apiSecret;
175 public MRConsumerImplBuilder setAllowSelfSignedCerts(boolean allowSelfSignedCerts) {
176 this.allowSelfSignedCerts = allowSelfSignedCerts;
180 public MRConsumerImpl createMRConsumerImpl() throws MalformedURLException {
181 return new MRConsumerImpl(this);
186 public Iterable<String> fetch() throws IOException, Exception {
187 // fetch with the timeout and limit set in constructor
188 return fetch(fTimeoutMs, fLimit);
192 public Iterable<String> fetch(int timeoutMs, int limit) throws Exception {
193 final LinkedList<String> msgs = new LinkedList<>();
195 ProtocolTypeConstants protocolFlagEnum = null;
196 for(ProtocolTypeConstants type : ProtocolTypeConstants.values()) {
197 if (type.getValue().equalsIgnoreCase(protocolFlag)) {
198 protocolFlagEnum = type;
201 if (protocolFlagEnum == null) {
206 switch (protocolFlagEnum) {
208 dmeConfigure(timeoutMs, limit);
209 String reply = sender.sendAndWait(timeoutMs + 10000L);
210 readJsonData(msgs, getResponseDataInJson(reply));
213 String urlAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
214 fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
215 final JSONObject o = get(urlAuthPath, username, password, protocolFlag);
216 readJsonData(msgs, o);
219 final String urlKeyPath = createUrlPath(
220 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)),
222 final JSONObject authObject = getAuth(urlKeyPath, authKey, authDate, username, password, protocolFlag);
223 readJsonData(msgs, authObject);
226 final String urlNoAuthPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
227 fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
228 readJsonData(msgs, getNoAuth(urlNoAuthPath));
231 } catch (JSONException e) {
232 // unexpected response
233 reportProblemWithResponse();
234 log.error(EXECPTION_MESSAGE, e);
235 } catch (HttpException e) {
236 throw new IOException(e);
242 private void readJsonData(LinkedList<String> msgs, JSONObject o) {
244 final JSONArray a = o.getJSONArray(JSON_RESULT);
246 for (int i = 0; i < a.length(); i++) {
247 if (a.get(i) instanceof String)
248 msgs.add(a.getString(i));
250 msgs.add(a.getJSONObject(i).toString());
257 public MRConsumerResponse fetchWithReturnConsumerResponse() {
258 // fetch with the timeout and limit set in constructor
259 return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
263 public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
264 final LinkedList<String> msgs = new LinkedList<>();
265 MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
267 if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
268 dmeConfigure(timeoutMs, limit);
270 long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs
271 : (timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS);
272 String reply = sender.sendAndWait(timeout);
274 final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
276 readJsonData(msgs, o);
277 createMRConsumerResponse(reply, mrConsumerResponse);
280 if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
281 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
282 fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
284 String response = getResponse(urlPath, username, password, protocolFlag);
285 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
286 readJsonData(msgs, o);
287 createMRConsumerResponse(response, mrConsumerResponse);
290 if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
291 final String urlPath = createUrlPath(
292 MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty(PROPS_PROTOCOL)),
295 String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
296 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
297 readJsonData(msgs, o);
298 createMRConsumerResponse(response, mrConsumerResponse);
301 if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
302 final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
303 fGroup, fId, props.getProperty(PROPS_PROTOCOL)), timeoutMs, limit);
305 String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
306 final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
307 readJsonData(msgs, o);
308 createMRConsumerResponse(response, mrConsumerResponse);
311 } catch (JSONException e) {
312 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
313 mrConsumerResponse.setResponseMessage(e.getMessage());
314 log.error("json exception: ", e);
315 } catch (HttpException e) {
316 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
317 mrConsumerResponse.setResponseMessage(e.getMessage());
318 log.error("http exception: ", e);
319 } catch (DME2Exception e) {
320 mrConsumerResponse.setResponseCode(e.getErrorCode());
321 mrConsumerResponse.setResponseMessage(e.getErrorMessage());
322 log.error("DME2 exception: ", e);
323 } catch (Exception e) {
324 mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
325 mrConsumerResponse.setResponseMessage(e.getMessage());
326 log.error(EXECPTION_MESSAGE, e);
328 mrConsumerResponse.setActualMessages(msgs);
329 return mrConsumerResponse;
333 protected void reportProblemWithResponse() {
334 log.warn("There was a problem with the server response. Blacklisting for 3 minutes.");
335 super.reportProblemWithResponse();
336 fHostSelector.reportReachabilityProblem(3, TimeUnit.MINUTES);
339 private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
340 if (reply.startsWith("{")) {
341 JSONObject jObject = new JSONObject(reply);
342 String message = jObject.getString("message");
343 int status = jObject.getInt("status");
345 mrConsumerResponse.setResponseCode(Integer.toString(status));
347 if (null != message) {
348 mrConsumerResponse.setResponseMessage(message);
350 } else if (reply.startsWith("<")) {
351 mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
352 mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
354 mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
355 mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
359 private JSONObject getResponseDataInJson(String response) {
361 JSONTokener jsonTokener = new JSONTokener(response);
362 JSONObject jsonObject = null;
363 final char firstChar = jsonTokener.next();
365 if ('[' == firstChar) {
366 JSONArray jsonArray = new JSONArray(jsonTokener);
367 jsonObject = new JSONObject();
368 jsonObject.put(JSON_RESULT, jsonArray);
370 jsonObject = new JSONObject(jsonTokener);
374 } catch (JSONException excp) {
375 log.error("DMAAP - Error reading response data.", excp);
380 private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
381 JSONTokener jsonTokener = new JSONTokener(response);
382 JSONObject jsonObject = null;
383 final char firstChar = jsonTokener.next();
385 if (null != response && response.length() == 0) {
389 if ('[' == firstChar) {
390 JSONArray jsonArray = new JSONArray(jsonTokener);
391 jsonObject = new JSONObject();
392 jsonObject.put(JSON_RESULT, jsonArray);
393 } else if ('{' == firstChar) {
395 } else if ('<' == firstChar) {
398 jsonObject = new JSONObject(jsonTokener);
404 private void dmeConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
405 this.longPollingMs = timeoutMs;
406 String latitude = props.getProperty("Latitude");
407 String longitude = props.getProperty("Longitude");
408 String version = props.getProperty("Version");
409 String serviceName = props.getProperty("ServiceName");
410 String env = props.getProperty("Environment");
411 String partner = props.getProperty("Partner");
412 String routeOffer = props.getProperty("routeOffer");
413 String subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
414 String protocol = props.getProperty(PROPS_PROTOCOL);
415 String methodType = props.getProperty("MethodType");
416 String dmeuser = props.getProperty("username");
417 String dmepassword = props.getProperty("password");
418 String contenttype = props.getProperty("contenttype");
419 String handlers = props.getProperty("sessionstickinessrequired");
422 * Changes to DME2Client url to use Partner for auto failover between data centers When Partner value is not
423 * provided use the routeOffer value for auto failover within a cluster
426 String preferredRouteKey = readRoute("preferredRouteKey");
427 StringBuilder contextUrl = new StringBuilder();
428 if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
429 contextUrl.append(protocol).append("://").append(serviceName).append("?")
430 .append(URL_PARAM_VERSION).append("=").append(version).append("&")
431 .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
432 .append(URL_PARAM_PARTNER).append("=").append(partner).append("&")
433 .append(URL_PARAM_ROUTE_OFFER).append("=").append(preferredRouteKey);
434 } else if (partner != null && !partner.isEmpty()) {
435 contextUrl.append(protocol).append("://").append(serviceName).append("?")
436 .append(URL_PARAM_VERSION).append("=").append(version).append("&")
437 .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
438 .append(URL_PARAM_PARTNER).append("=").append(partner);
439 } else if (routeOffer != null && !routeOffer.isEmpty()) {
440 contextUrl.append(protocol).append("://").append(serviceName).append("?")
441 .append(URL_PARAM_VERSION).append("=").append(version).append("&")
442 .append(URL_PARAM_ENV_CONTEXT).append("=").append(env).append("&")
443 .append(URL_PARAM_ROUTE_OFFER).append("=").append(routeOffer);
446 if (timeoutMs != -1) {
447 contextUrl.append("&").append(URL_PARAM_TIMEOUT).append("=").append(timeoutMs);
450 contextUrl.append("&").append(URL_PARAM_LIMIT).append("=").append(limit);
453 // Add filter to DME2 Url
454 if (fFilter != null && fFilter.length() > 0) {
455 contextUrl.append("&").append(URL_PARAM_FILTER).append("=").append(URLEncoder.encode(fFilter, "UTF-8"));
458 url = contextUrl.toString();
460 DMETimeOuts = new HashMap<>();
461 DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
462 DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
463 DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
464 DMETimeOuts.put("Content-Type", contenttype);
465 System.setProperty("AFT_LATITUDE", latitude);
466 System.setProperty("AFT_LONGITUDE", longitude);
467 System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
470 System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
471 System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
472 System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
475 long dme2PerEndPointTimeoutMs;
477 dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
478 // backward compatibility
479 if (dme2PerEndPointTimeoutMs <= 0) {
480 dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
482 } catch (NumberFormatException nfe) {
483 // backward compatibility
484 dme2PerEndPointTimeoutMs = timeoutMs + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS;
486 "DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + DEFAULT_DME2_PER_ENDPOINT_TIMEOUT_MS);
490 dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
491 } catch (NumberFormatException nfe) {
493 long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
494 long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
495 dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
497 "DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT "
498 + dme2ReplyHandlerTimeoutMs);
499 } catch (NumberFormatException e) {
500 // backward compatibility
501 dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
502 getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
505 // backward compatibility
506 if (dme2ReplyHandlerTimeoutMs <= 0) {
507 dme2ReplyHandlerTimeoutMs = timeoutMs + DEFAULT_DME2_REPLY_HANDLER_TIMEOUT_MS;
510 sender = new DME2Client(new URI(url), dme2PerEndPointTimeoutMs);
511 sender.setAllowAllHttpReturnCodes(true);
512 sender.setMethod(methodType);
513 sender.setSubContext(subContextPath);
514 if (dmeuser != null && dmepassword != null) {
515 sender.setCredentials(dmeuser, dmepassword);
517 sender.setHeaders(DMETimeOuts);
518 sender.setPayload("");
519 if (handlers != null && handlers.equalsIgnoreCase("yes")) {
520 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
521 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
522 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
523 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
525 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
529 protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
530 final StringBuilder contexturl = new StringBuilder(url);
531 final StringBuilder adds = new StringBuilder();
533 if (timeoutMs > -1) {
534 adds.append("timeout=").append(timeoutMs);
538 if (adds.length() > 0) {
541 adds.append("limit=").append(limit);
544 if (fFilter != null && fFilter.length() > 0) {
546 if (adds.length() > 0) {
549 adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
550 } catch (UnsupportedEncodingException e) {
551 log.error("exception at createUrlPath () : ", e);
555 if (adds.length() > 0) {
556 contexturl.append("?").append(adds.toString());
559 return contexturl.toString();
562 private String readRoute(String routeKey) {
563 try(InputStream input = new FileInputStream(MRClientFactory.routeFilePath)) {
564 MRClientFactory.prop.load(input);
565 } catch (Exception ex) {
566 log.error("Reply Router Error " + ex);
568 return MRClientFactory.prop.getProperty(routeKey);
571 public static List<String> stringToList(String str) {
572 final LinkedList<String> set = new LinkedList<>();
574 final String[] parts = str.trim().split(",");
575 for (String part : parts) {
576 final String trimmed = part.trim();
577 if (trimmed.length() > 0) {
585 public static String getRouterFilePath() {
586 return ROUTER_FILE_PATH;
589 public static void setRouterFilePath(String routerFilePath) {
590 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
593 public String getConsumerFilePath() {
594 return consumerFilePath;
597 public void setConsumerFilePath(String consumerFilePath) {
598 this.consumerFilePath = consumerFilePath;
601 public String getProtocolFlag() {
605 public void setProtocolFlag(String protocolFlag) {
606 this.protocolFlag = protocolFlag;
609 public Properties getProps() {
613 public void setProps(Properties props) {
615 setClientConfig(DmaapClientUtil.getClientConfig(props));
618 public String getUsername() {
622 public void setUsername(String username) {
623 this.username = username;
626 public String getPassword() {
630 public void setPassword(String password) {
631 this.password = password;
634 public String getHost() {
638 public void setHost(String host) {
642 public String getAuthKey() {
646 public void setAuthKey(String authKey) {
647 this.authKey = authKey;
650 public String getAuthDate() {
654 public void setAuthDate(String authDate) {
655 this.authDate = authDate;
658 public String getfFilter() {
662 public void setfFilter(String fFilter) {
663 this.fFilter = fFilter;