+ /**
+ * Populated App Preferences DMaaP Information from Application Config String
+ *
+ * @param appConfigString CDAP Application config String
+ * @param tcaAppPreferences TCA App Preferences
+ */
+ private static void populateDMaaPInfoFromAppConfiguration(final String appConfigString,
+ final TCAAppPreferences tcaAppPreferences) {
+
+ if (null != tcaAppPreferences.getSubscriberHostName() || null != tcaAppPreferences.getPublisherHostName()) {
+ LOG.info("DMaaP Information is set from runtime preferences. Skipping getting DMaaP info from App Config");
+ }
+
+ LOG.info("Fetching DMaaP information from App Configuration String: {}", appConfigString);
+
+ try {
+ final TCAControllerAppConfig tcaControllerAppConfig =
+ readValue(appConfigString, TCAControllerAppConfig.class);
+
+ // Parse Subscriber DMaaP information from App Config String
+ if (tcaControllerAppConfig.getStreamsSubscribes() != null &&
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn() != null &&
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo() != null) {
+
+ final DMAAPInfo subscriberDmaapInfo =
+ tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn().getDmaapInfo();
+ LOG.debug("App Config Subscriber Host URL: {}", subscriberDmaapInfo.getTopicUrl());
+ final URL subscriberUrl = parseURL(subscriberDmaapInfo.getTopicUrl());
+ tcaAppPreferences.setSubscriberProtocol(subscriberUrl.getProtocol());
+ tcaAppPreferences.setSubscriberHostName(subscriberUrl.getHost());
+ final int subscriberUrlPort = subscriberUrl.getPort() != -1 ?
+ subscriberUrl.getPort() : getDefaultDMaaPPort(subscriberUrl.getProtocol());
+ tcaAppPreferences.setSubscriberHostPort(subscriberUrlPort);
+ tcaAppPreferences.setSubscriberTopicName(subscriberUrl.getPath().substring(8));
+
+ final TCAHandleIn tcaHandleIn = tcaControllerAppConfig.getStreamsSubscribes().getTcaHandleIn();
+ tcaAppPreferences.setSubscriberUserName(tcaHandleIn.getAafUserName());
+ tcaAppPreferences.setSubscriberUserPassword(tcaHandleIn.getAafPassword());
+ } else {
+ LOG.warn("Unable to populate Subscriber DMaaP Information from App Config String: {}", appConfigString);
+ }
+
+
+ // Parse Publisher DMaaP information from App Config String
+ if (tcaControllerAppConfig.getStreamsPublishes() != null &&
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut() != null &&
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo() != null) {
+
+ final DMAAPInfo publisherDmaapInfo =
+ tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut().getDmaapInfo();
+ LOG.debug("App Config Publisher Host URL: {}", publisherDmaapInfo.getTopicUrl());
+ final URL publisherUrl = parseURL(publisherDmaapInfo.getTopicUrl());
+ tcaAppPreferences.setPublisherProtocol(publisherUrl.getProtocol());
+ tcaAppPreferences.setPublisherHostName(publisherUrl.getHost());
+ final int publisherUrlPort = publisherUrl.getPort() != -1 ?
+ publisherUrl.getPort() : getDefaultDMaaPPort(publisherUrl.getProtocol());
+ tcaAppPreferences.setPublisherHostPort(publisherUrlPort);
+ tcaAppPreferences.setPublisherTopicName(publisherUrl.getPath().substring(8));
+
+ final TCAHandleOut tcaHandleOut = tcaControllerAppConfig.getStreamsPublishes().getTcaHandleOut();
+ tcaAppPreferences.setPublisherUserName(tcaHandleOut.getAafUserName());
+ tcaAppPreferences.setPublisherUserPassword(tcaHandleOut.getAafPassword());
+ } else {
+ LOG.warn("Unable to populate Publisher DMaaP Information from App Config String: {}", appConfigString);
+ }
+
+
+ } catch (IOException e) {
+ throw new CDAPSettingsException(
+ "Unable to parse App Config to Json Object.Invalid App Config String: " + appConfigString, LOG, e);
+ }
+ }
+
+ /**
+ * Parses provided DMaaP MR URL string to {@link URL} object
+ *
+ * @param urlString url string
+ *
+ * @return url object
+ */
+ private static URL parseURL(final String urlString) {
+ try {
+ return new URL(urlString);
+ } catch (MalformedURLException e) {
+ final String errorMessage = String.format("Invalid URL format: %s", urlString);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+ }
+
+ /**
+ * Sets up default DMaaP Port if not provided with DMaaP URL
+ *
+ * @param protocol protocol e.g. http or https
+ *
+ * @return default DMaaP MR port number
+ */
+ private static int getDefaultDMaaPPort(final String protocol) {
+ if ("http".equals(protocol)) {
+ return 3904;
+ } else if ("https".equals(protocol)) {
+ return 3905;
+ } else {
+ return 80;
+ }
+ }
+