2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.dmaap.service.subscriber;
\r
23 import com.google.common.base.Optional;
\r
24 import com.google.inject.Inject;
\r
25 import com.google.inject.assistedinject.Assisted;
\r
26 import org.apache.commons.lang3.tuple.Pair;
\r
27 import org.apache.http.HttpHeaders;
\r
28 import org.apache.http.client.HttpClient;
\r
29 import org.apache.http.client.methods.HttpGet;
\r
30 import org.apache.http.impl.client.CloseableHttpClient;
\r
31 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
\r
32 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
\r
33 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
\r
34 import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
\r
35 import org.slf4j.Logger;
\r
36 import org.slf4j.LoggerFactory;
\r
38 import java.io.IOException;
\r
39 import java.net.URI;
\r
40 import java.util.Date;
\r
41 import java.util.LinkedList;
\r
42 import java.util.List;
\r
44 import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
\r
45 import static java.lang.String.format;
\r
48 * Concrete Implementation of {@link DMaaPMRSubscriber} which uses {@link HttpClient}
\r
50 * @author Rajiv Singla . Creation Date: 10/13/2016.
\r
52 public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber {
\r
54 private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class);
\r
56 private final DMaaPMRSubscriberConfig subscriberConfig;
\r
57 private final CloseableHttpClient closeableHttpClient;
\r
58 private final URI subscriberUri;
\r
59 private final Date subscriberCreationTime;
\r
62 public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig,
\r
63 CloseableHttpClient closeableHttpClient) {
\r
64 this.subscriberConfig = subscriberConfig;
\r
65 this.closeableHttpClient = closeableHttpClient;
\r
66 this.subscriberUri = createSubscriberURI(subscriberConfig);
\r
67 this.subscriberCreationTime = new Date();
\r
71 public DMaaPMRSubscriberResponse fetchMessages() {
\r
73 final String userName = subscriberConfig.getUserName();
\r
74 final String userPassword = subscriberConfig.getUserPassword();
\r
76 final HttpGet getRequest = new HttpGet(subscriberUri);
\r
78 // add Authorization Header if username and password are present
\r
79 final Optional<String> authHeader = getAuthHeader(userName, userPassword);
\r
80 if (authHeader.isPresent()) {
\r
81 getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
\r
83 LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present.");
\r
88 final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler());
\r
89 final Integer responseCode = responsePair.getLeft();
\r
90 final String responseBody = responsePair.getRight();
\r
92 List<String> fetchedMessages = new LinkedList<>();
\r
93 String responseMessage = responseBody;
\r
95 // if messages were published successfully, return successful response
\r
96 if (isSuccessfulResponseCode(responseCode)) {
\r
97 if (responseBody != null) {
\r
98 fetchedMessages = convertJsonToStringMessages(responseBody);
\r
99 responseMessage = "Messages Fetched Successfully";
\r
101 responseMessage = "DMaaP Response Body had no messages";
\r
104 LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, " +
\r
105 "DMaaP Response Body: {}", responseCode, responseBody);
\r
108 return createSubscriberResponse(responseCode, responseMessage, fetchedMessages);
\r
110 } catch (IOException e) {
\r
112 final String errorMessage =
\r
113 format("IO Exception while fetching messages from DMaaP Topic. Exception %s", e);
\r
114 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
121 public Date getSubscriberCreationTime() {
\r
122 return new Date(subscriberCreationTime.getTime());
\r
126 public void close() throws Exception {
\r
127 closeableHttpClient.close();
\r