-/*\r
- * ===============================LICENSE_START======================================\r
- * dcae-analytics\r
- * ================================================================================\r
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * ================================================================================\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- * ============================LICENSE_END===========================================\r
- */\r
-\r
-package org.openecomp.dcae.apod.analytics.dmaap.service.subscriber;\r
-\r
-import com.google.common.base.Optional;\r
-import com.google.inject.Inject;\r
-import com.google.inject.assistedinject.Assisted;\r
-import org.apache.commons.lang3.tuple.Pair;\r
-import org.apache.http.HttpHeaders;\r
-import org.apache.http.client.HttpClient;\r
-import org.apache.http.client.methods.HttpGet;\r
-import org.apache.http.impl.client.CloseableHttpClient;\r
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;\r
-import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;\r
-import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-import java.io.IOException;\r
-import java.net.URI;\r
-import java.util.Date;\r
-import java.util.LinkedList;\r
-import java.util.List;\r
-\r
-import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;\r
-import static java.lang.String.format;\r
-\r
-/**\r
- * Concrete Implementation of {@link DMaaPMRSubscriber} which uses {@link HttpClient}\r
- *\r
- * @author Rajiv Singla . Creation Date: 10/13/2016.\r
- */\r
-public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber {\r
-\r
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class);\r
-\r
- private final DMaaPMRSubscriberConfig subscriberConfig;\r
- private final CloseableHttpClient closeableHttpClient;\r
- private final URI subscriberUri;\r
- private final Date subscriberCreationTime;\r
-\r
- @Inject\r
- public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig,\r
- CloseableHttpClient closeableHttpClient) {\r
- this.subscriberConfig = subscriberConfig;\r
- this.closeableHttpClient = closeableHttpClient;\r
- this.subscriberUri = createSubscriberURI(subscriberConfig);\r
- this.subscriberCreationTime = new Date();\r
- }\r
-\r
- @Override\r
- public DMaaPMRSubscriberResponse fetchMessages() {\r
-\r
- final String userName = subscriberConfig.getUserName();\r
- final String userPassword = subscriberConfig.getUserPassword();\r
-\r
- final HttpGet getRequest = new HttpGet(subscriberUri);\r
-\r
- // add Authorization Header if username and password are present\r
- final Optional<String> authHeader = getAuthHeader(userName, userPassword);\r
- if (authHeader.isPresent()) {\r
- getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());\r
- } else {\r
- LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present.");\r
- }\r
-\r
- try {\r
-\r
- final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler());\r
- final Integer responseCode = responsePair.getLeft();\r
- final String responseBody = responsePair.getRight();\r
-\r
- List<String> fetchedMessages = new LinkedList<>();\r
- String responseMessage = responseBody;\r
-\r
- // if messages were published successfully, return successful response\r
- if (isSuccessfulResponseCode(responseCode)) {\r
- if (responseBody != null) {\r
- fetchedMessages = convertJsonToStringMessages(responseBody);\r
- responseMessage = "Messages Fetched Successfully";\r
- } else {\r
- responseMessage = "DMaaP Response Body had no messages";\r
- }\r
- } else {\r
- LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, " +\r
- "DMaaP Response Body: {}", responseCode, responseBody);\r
- }\r
-\r
- return createSubscriberResponse(responseCode, responseMessage, fetchedMessages);\r
-\r
- } catch (IOException e) {\r
-\r
- final String errorMessage =\r
- format("IO Exception while fetching messages from DMaaP Topic. Exception %s", e);\r
- throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
- }\r
-\r
-\r
- }\r
-\r
- @Override\r
- public Date getSubscriberCreationTime() {\r
- return new Date(subscriberCreationTime.getTime());\r
- }\r
-\r
- @Override\r
- public void close() throws Exception {\r
- closeableHttpClient.close();\r
- }\r
-}\r
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.dmaap.service.subscriber;
+
+import com.google.common.base.Optional;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
+import org.onap.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
+import org.onap.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.onap.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
+import static java.lang.String.format;
+
+/**
+ * Concrete Implementation of {@link DMaaPMRSubscriber} which uses {@link HttpClient}
+ *
+ * @author Rajiv Singla . Creation Date: 10/13/2016.
+ */
+public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class);
+
+ private final DMaaPMRSubscriberConfig subscriberConfig;
+ private final CloseableHttpClient closeableHttpClient;
+ private final URI subscriberUri;
+ private final Date subscriberCreationTime;
+
+ @Inject
+ public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig,
+ CloseableHttpClient closeableHttpClient) {
+ this.subscriberConfig = subscriberConfig;
+ this.closeableHttpClient = closeableHttpClient;
+ this.subscriberUri = createSubscriberURI(subscriberConfig);
+ this.subscriberCreationTime = new Date();
+ }
+
+ @Override
+ public DMaaPMRSubscriberResponse fetchMessages() {
+
+ final String userName = subscriberConfig.getUserName();
+ final String userPassword = subscriberConfig.getUserPassword();
+
+ final HttpGet getRequest = new HttpGet(subscriberUri);
+
+ // add Authorization Header if username and password are present
+ final Optional<String> authHeader = getAuthHeader(userName, userPassword);
+ if (authHeader.isPresent()) {
+ getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
+ } else {
+ LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present.");
+ }
+
+ try {
+
+ final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler());
+ final Integer responseCode = responsePair.getLeft();
+ final String responseBody = responsePair.getRight();
+
+ List<String> fetchedMessages = new LinkedList<>();
+ String responseMessage = responseBody;
+
+ // if messages were published successfully, return successful response
+ if (isSuccessfulResponseCode(responseCode)) {
+ if (responseBody != null) {
+ fetchedMessages = convertJsonToStringMessages(responseBody);
+ responseMessage = "Messages Fetched Successfully";
+ } else {
+ responseMessage = "DMaaP Response Body had no messages";
+ }
+ } else {
+ LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, " +
+ "DMaaP Response Body: {}", responseCode, responseBody);
+ }
+
+ return createSubscriberResponse(responseCode, responseMessage, fetchedMessages);
+
+ } catch (IOException e) {
+
+ final String errorMessage =
+ format("IO Exception while fetching messages from DMaaP Topic. Exception %s", e);
+ throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+ }
+
+
+ }
+
+ @Override
+ public Date getSubscriberCreationTime() {
+ return new Date(subscriberCreationTime.getTime());
+ }
+
+ @Override
+ public void close() throws Exception {
+ closeableHttpClient.close();
+ }
+}