Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-dmaap / src / main / java / org / openecomp / dcae / apod / analytics / dmaap / service / subscriber / DMaaPMRSubscriberImpl.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.dmaap.service.subscriber;
22
23 import com.google.common.base.Optional;
24 import com.google.inject.Inject;
25 import com.google.inject.assistedinject.Assisted;
26 import org.apache.commons.lang3.tuple.Pair;
27 import org.apache.http.HttpHeaders;
28 import org.apache.http.client.HttpClient;
29 import org.apache.http.client.methods.HttpGet;
30 import org.apache.http.impl.client.CloseableHttpClient;
31 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
32 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
33 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
34 import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import java.io.IOException;
39 import java.net.URI;
40 import java.util.Date;
41 import java.util.LinkedList;
42 import java.util.List;
43
44 import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;
45 import static java.lang.String.format;
46
47 /**
48  * Concrete Implementation of {@link DMaaPMRSubscriber} which uses {@link HttpClient}
49  *
50  * @author Rajiv Singla . Creation Date: 10/13/2016.
51  */
52 public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber {
53
54     private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class);
55
56     private final DMaaPMRSubscriberConfig subscriberConfig;
57     private final CloseableHttpClient closeableHttpClient;
58     private final URI subscriberUri;
59     private final Date subscriberCreationTime;
60
61     @Inject
62     public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig,
63                                  CloseableHttpClient closeableHttpClient) {
64         this.subscriberConfig = subscriberConfig;
65         this.closeableHttpClient = closeableHttpClient;
66         this.subscriberUri = createSubscriberURI(subscriberConfig);
67         this.subscriberCreationTime = new Date();
68     }
69
70     @Override
71     public DMaaPMRSubscriberResponse fetchMessages() {
72
73         final String userName = subscriberConfig.getUserName();
74         final String userPassword = subscriberConfig.getUserPassword();
75
76         final HttpGet getRequest = new HttpGet(subscriberUri);
77
78         // add Authorization Header if username and password are present
79         final Optional<String> authHeader = getAuthHeader(userName, userPassword);
80         if (authHeader.isPresent()) {
81             getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());
82         } else {
83             LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present.");
84         }
85
86         try {
87
88             final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler());
89             final Integer responseCode = responsePair.getLeft();
90             final String responseBody = responsePair.getRight();
91
92             List<String> fetchedMessages = new LinkedList<>();
93             String responseMessage = responseBody;
94
95             // if messages were published successfully, return successful response
96             if (isSuccessfulResponseCode(responseCode)) {
97                 if (responseBody != null) {
98                     fetchedMessages = convertJsonToStringMessages(responseBody);
99                     responseMessage = "Messages Fetched Successfully";
100                 } else {
101                     responseMessage = "DMaaP Response Body had no messages";
102                 }
103             } else {
104                 LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, " +
105                         "DMaaP Response Body: {}", responseCode, responseBody);
106             }
107
108             return createSubscriberResponse(responseCode, responseMessage, fetchedMessages);
109
110         } catch (IOException e) {
111
112             final String errorMessage =
113                     format("IO Exception while fetching messages from DMaaP Topic. Exception %s", e);
114             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
115         }
116
117
118     }
119
120     @Override
121     public Date getSubscriberCreationTime() {
122         return new Date(subscriberCreationTime.getTime());
123     }
124
125     @Override
126     public void close() throws Exception {
127         closeableHttpClient.close();
128     }
129 }