Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-dmaap / src / main / java / org / openecomp / dcae / apod / analytics / dmaap / service / subscriber / DMaaPMRSubscriberImpl.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.dmaap.service.subscriber;\r
22 \r
23 import com.google.common.base.Optional;\r
24 import com.google.inject.Inject;\r
25 import com.google.inject.assistedinject.Assisted;\r
26 import org.apache.commons.lang3.tuple.Pair;\r
27 import org.apache.http.HttpHeaders;\r
28 import org.apache.http.client.HttpClient;\r
29 import org.apache.http.client.methods.HttpGet;\r
30 import org.apache.http.impl.client.CloseableHttpClient;\r
31 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
32 import org.openecomp.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;\r
33 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;\r
34 import org.openecomp.dcae.apod.analytics.dmaap.service.BaseDMaaPMRComponent;\r
35 import org.slf4j.Logger;\r
36 import org.slf4j.LoggerFactory;\r
37 \r
38 import java.io.IOException;\r
39 import java.net.URI;\r
40 import java.util.Date;\r
41 import java.util.LinkedList;\r
42 import java.util.List;\r
43 \r
44 import static org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils.isSuccessfulResponseCode;\r
45 import static java.lang.String.format;\r
46 \r
47 /**\r
48  * Concrete Implementation of {@link DMaaPMRSubscriber} which uses {@link HttpClient}\r
49  *\r
50  * @author Rajiv Singla . Creation Date: 10/13/2016.\r
51  */\r
52 public class DMaaPMRSubscriberImpl extends BaseDMaaPMRComponent implements DMaaPMRSubscriber {\r
53 \r
54     private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRSubscriberImpl.class);\r
55 \r
56     private final DMaaPMRSubscriberConfig subscriberConfig;\r
57     private final CloseableHttpClient closeableHttpClient;\r
58     private final URI subscriberUri;\r
59     private final Date subscriberCreationTime;\r
60 \r
61     @Inject\r
62     public DMaaPMRSubscriberImpl(@Assisted DMaaPMRSubscriberConfig subscriberConfig,\r
63                                  CloseableHttpClient closeableHttpClient) {\r
64         this.subscriberConfig = subscriberConfig;\r
65         this.closeableHttpClient = closeableHttpClient;\r
66         this.subscriberUri = createSubscriberURI(subscriberConfig);\r
67         this.subscriberCreationTime = new Date();\r
68     }\r
69 \r
70     @Override\r
71     public DMaaPMRSubscriberResponse fetchMessages() {\r
72 \r
73         final String userName = subscriberConfig.getUserName();\r
74         final String userPassword = subscriberConfig.getUserPassword();\r
75 \r
76         final HttpGet getRequest = new HttpGet(subscriberUri);\r
77 \r
78         // add Authorization Header if username and password are present\r
79         final Optional<String> authHeader = getAuthHeader(userName, userPassword);\r
80         if (authHeader.isPresent()) {\r
81             getRequest.addHeader(HttpHeaders.AUTHORIZATION, authHeader.get());\r
82         } else {\r
83             LOG.debug("DMaaP MR Subscriber Authentication is disabled as username or password is not present.");\r
84         }\r
85 \r
86         try {\r
87 \r
88             final Pair<Integer, String> responsePair = closeableHttpClient.execute(getRequest, responseHandler());\r
89             final Integer responseCode = responsePair.getLeft();\r
90             final String responseBody = responsePair.getRight();\r
91 \r
92             List<String> fetchedMessages = new LinkedList<>();\r
93             String responseMessage = responseBody;\r
94 \r
95             // if messages were published successfully, return successful response\r
96             if (isSuccessfulResponseCode(responseCode)) {\r
97                 if (responseBody != null) {\r
98                     fetchedMessages = convertJsonToStringMessages(responseBody);\r
99                     responseMessage = "Messages Fetched Successfully";\r
100                 } else {\r
101                     responseMessage = "DMaaP Response Body had no messages";\r
102                 }\r
103             } else {\r
104                 LOG.error("Unable to fetch messages to DMaaP MR Topic. DMaaP MR unsuccessful Response Code: {}, " +\r
105                         "DMaaP Response Body: {}", responseCode, responseBody);\r
106             }\r
107 \r
108             return createSubscriberResponse(responseCode, responseMessage, fetchedMessages);\r
109 \r
110         } catch (IOException e) {\r
111 \r
112             final String errorMessage =\r
113                     format("IO Exception while fetching messages from DMaaP Topic. Exception %s", e);\r
114             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
115         }\r
116 \r
117 \r
118     }\r
119 \r
120     @Override\r
121     public Date getSubscriberCreationTime() {\r
122         return new Date(subscriberCreationTime.getTime());\r
123     }\r
124 \r
125     @Override\r
126     public void close() throws Exception {\r
127         closeableHttpClient.close();\r
128     }\r
129 }\r