2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.dmaap.service.subscriber;
23 import com.google.common.base.Optional;
24 import com.google.inject.Inject;
25 import com.google.inject.assistedinject.Assisted;
26 import org.apache.commons.lang3.tuple.Pair;
27 import org.apache.http.HttpHeaders;
28 import org.apache.http.client.HttpClient;
29 import org.apache.http.client.methods.HttpGet;
30 import org.apache.http.impl.client.CloseableHttpClient;
31 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
32 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
33 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
34 import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 import java.io.IOException;
40 import java.util.Date;
41 import java.util.LinkedList;
42 import java.util.List;
44 import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
45 import static java.lang.String.format;
48 * Concrete Implementation of {@link DMaaPMRSubscriber} which uses {@link HttpClient}
50 * @author Rajiv Singla . Creation Date: 10/13/2016.
52 public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber {
54 private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class);
56 private final DMaaPMRSubscriberConfig subscriberConfig;
57 private final CloseableHttpClient closeableHttpClient;
58 private final URI subscriberUri;
59 private final Date subscriberCreationTime;
62 public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig,
63 CloseableHttpClient closeableHttpClient) {
64 this.subscriberConfig = subscriberConfig;
65 this.closeableHttpClient = closeableHttpClient;
66 this.subscriberUri = createSubscriberURI(subscriberConfig);
67 this.subscriberCreationTime = new Date();
71 public DMaaPMRSubscriberResponse fetchMessages() {
73 final String userName = subscriberConfig.getUserName();
74 final String userPassword = subscriberConfig.getUserPassword();
76 final HttpGet getRequest = new HttpGet(subscriberUri);
78 // add Authorization Header if username and password are present
79 final Optional<String> authHeader = getAuthHeader(userName, userPassword);
80 if (authHeader.isPresent()) {
81 getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
83 LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present.");
88 final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler());
89 final Integer responseCode = responsePair.getLeft();
90 final String responseBody = responsePair.getRight();
92 List<String> fetchedMessages = new LinkedList<>();
93 String responseMessage = responseBody;
95 // if messages were published successfully, return successful response
96 if (isSuccessfulResponseCode(responseCode)) {
97 if (responseBody != null) {
98 fetchedMessages = convertJsonToStringMessages(responseBody);
99 responseMessage = "Messages Fetched Successfully";
101 responseMessage = "DMaaP Response Body had no messages";
104 LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, " +
105 "DMaaP Response Body: {}", responseCode, responseBody);
108 return createSubscriberResponse(responseCode, responseMessage, fetchedMessages);
110 } catch (IOException e) {
112 final String errorMessage =
113 format("IO Exception while fetching messages from DMaaP Topic. Exception %s", e);
114 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
121 public Date getSubscriberCreationTime() {
122 return new Date(subscriberCreationTime.getTime());
126 public void close() throws Exception {
127 closeableHttpClient.close();