686bb071f18735a490934b2dcf24d461d2f0e6ad
[appc.git] / appc-adapters / appc-dmaap-adapter / appc-dmaap-adapter-bundle / src / main / java / org / onap / appc / adapter / messaging / dmaap / impl / DmaapConsumerImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * ================================================================================
9  * Modifications Copyright (C) 2018 IBM
10  * ================================================================================
11  * Licensed under the Apache License, Version 2.0 (the "License");
12  * you may not use this file except in compliance with the License.
13  * You may obtain a copy of the License at
14  *
15  *      http://www.apache.org/licenses/LICENSE-2.0
16  *
17  * Unless required by applicable law or agreed to in writing, software
18  * distributed under the License is distributed on an "AS IS" BASIS,
19  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  * See the License for the specific language governing permissions and
21  * limitations under the License.
22  *
23  * ============LICENSE_END=========================================================
24  */
25
26 package org.onap.appc.adapter.messaging.dmaap.impl;
27
28 import com.att.eelf.configuration.EELFLogger;
29 import com.att.eelf.configuration.EELFManager;
30 import com.att.nsa.mr.client.MRClientFactory;
31 import com.att.nsa.mr.client.MRConsumer;
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.List;
36 import java.util.Properties;
37 import org.apache.commons.lang3.StringUtils;
38 import org.onap.appc.adapter.message.Consumer;
39 import org.onap.appc.adapter.messaging.dmaap.utils.DmaapUtil;
40 import org.onap.appc.configuration.Configuration;
41 import org.onap.appc.configuration.ConfigurationFactory;
42 import org.onap.appc.metricservice.MetricRegistry;
43 import org.onap.appc.metricservice.MetricService;
44 import org.onap.appc.metricservice.metric.DmaapRequestCounterMetric;
45 import org.onap.appc.metricservice.metric.Metric;
46 import org.onap.appc.metricservice.metric.MetricType;
47 import org.onap.appc.metricservice.policy.PublishingPolicy;
48 import org.onap.appc.metricservice.publisher.LogPublisher;
49 import org.osgi.framework.BundleContext;
50 import org.osgi.framework.FrameworkUtil;
51 import org.osgi.framework.ServiceReference;
52
53 public class DmaapConsumerImpl implements Consumer {
54
55     private static final EELFLogger LOG                = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
56     private final Configuration     configuration      = ConfigurationFactory.getConfiguration();
57     // Default values
58     private static final int        DEFAULT_TIMEOUT_MS = 60000;
59     private static final int        DEFAULT_LIMIT      = 1000;
60     private String                  topic;
61     private boolean                 isMetricEnabled    = false;
62     private boolean                 useHttps           = false;
63     private MetricRegistry          metricRegistry;
64     private MRConsumer              client             = null;
65     private Properties              props              = null;
66
67     public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
68             String user, String password) {
69
70         this(urls, topicName, consumerGroupName, consumerId, user, password, null);
71     }
72
73     public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
74             String user, String password, String filter) {
75
76         this.topic = topicName;
77         this.props = new Properties();
78         String urlsStr = StringUtils.join(urls, ',');
79         props.setProperty("host", urlsStr);
80         props.setProperty("group", consumerGroupName);
81         props.setProperty("id", consumerId);
82         if (user != null && password != null) {
83             props.setProperty("username", user);
84             props.setProperty("password", password);
85         } else {
86             props.setProperty("TransportType", "HTTPNOAUTH");
87         }
88
89         if (filter != null) {
90             props.setProperty("filter", filter);
91         }
92     }
93
94     private void initMetric() {
95         LOG.debug("Metric getting initialized");
96         MetricService metricService = getMetricservice();
97         if (metricService != null) {
98             metricRegistry = metricService.createRegistry("APPC");
99
100             DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
101                     .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER)
102                     .withRecievedMessage(0).withPublishedMessage(0).build();
103
104             if (metricRegistry.register(dmaapKpiMetric)) {
105                 Metric[] metrics = new Metric[] { dmaapKpiMetric };
106                 LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
107                 LogPublisher[] logPublishers = new LogPublisher[1];
108                 logPublishers[0] = logPublisher;
109
110                 PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
111                         .scheduledPolicyBuilder().withPublishers(logPublishers).withMetrics(metrics).build();
112
113                 LOG.debug("Policy getting initialized");
114                 manuallyScheduledPublishingPolicy.init();
115                 LOG.debug("Metric initialized");
116             }
117         }
118     }
119
120     /**
121      * @return An instance of MRConsumer created from our class variables.
122      */
123     synchronized MRConsumer getClient(int waitMs, int limit) {
124         try {
125             props.setProperty("timeout", String.valueOf(waitMs));
126             props.setProperty("limit", String.valueOf(limit));
127             String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic, props);
128             return MRClientFactory.createConsumer(topicProducerPropFileName);
129         } catch (IOException e1) {
130             LOG.error("failed to createConsumer", e1);
131             return null;
132         }
133     }
134
135     @Override
136     public synchronized void updateCredentials(String key, String secret) {
137         LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
138         props.setProperty("username", String.valueOf(key));
139         props.setProperty("password", String.valueOf(secret));
140         client = null;
141     }
142
143     @Override
144     public List<String> fetch() {
145         return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
146     }
147
148     @Override
149     public List<String> fetch(int waitMs, int limit) {
150         Properties properties = configuration.getProperties();
151         if (properties != null && properties.getProperty("metric.enabled") != null) {
152             isMetricEnabled = Boolean.valueOf(properties.getProperty("metric.enabled"));
153         }
154         if (isMetricEnabled) {
155             initMetric();
156         }
157         LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
158         List<String> out = new ArrayList<>();
159
160         // Create client once and reuse it on subsequent fetches. This is
161         // to support failover to other servers in the DMaaP cluster.
162         if (client == null) {
163             LOG.info("Getting DMaaP Client ...");
164             client = getClient(waitMs, limit);
165         }
166         if (client != null) {
167             try {
168                 for (String s : client.fetch(waitMs, limit)) {
169                     out.add(s);
170                     incrementReceivedMessage();
171                 }
172                 LOG.debug(String.format("Got %d records from %s", out.size(), this.toString()));
173             } catch (Exception e) {
174                 // Connection exception
175                 LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()), e);
176                 try {
177                     LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs));
178                     Thread.sleep(waitMs);
179                 } catch (InterruptedException e2) {
180                     LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs));
181                     Thread.currentThread().interrupt();
182                 }
183             }
184         }
185         return out;
186     }
187
188     private void incrementReceivedMessage() {
189         if (isMetricEnabled && metricRegistry != null) {
190             ((DmaapRequestCounterMetric) metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage();
191         }
192     }
193
194     /**
195      * Close consumer Dmaap client.
196      */
197     @Override
198     public void close() {
199         LOG.debug("Closing Dmaap consumer client....");
200         if (client != null) {
201             client.close();
202         }
203     }
204
205     @Override
206     public String toString() {
207         String hostStr = (props == null || props.getProperty("host") == null ? "N/A" : props.getProperty("host"));
208         String group = (props == null || props.getProperty("group") == null ? "N/A" : props.getProperty("group"));
209         String id = (props == null || props.getProperty("id") == null ? "N/A" : props.getProperty("id"));
210         return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr);
211     }
212
213     @Override
214     public void useHttps(boolean yes) {
215         useHttps = yes;
216     }
217
218     private MetricService getMetricservice() {
219         BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext();
220         ServiceReference sref = bctx.getServiceReference(MetricService.class.getName());
221         if (sref != null) {
222             LOG.info("Metric Service from bundlecontext");
223             return (MetricService) bctx.getService(sref);
224         } else {
225             LOG.info("Metric Service error from bundlecontext");
226             LOG.warn("Cannot find service reference for org.onap.appc.metricservice.MetricService");
227             return null;
228         }
229     }
230
231     public Properties getProperties() {
232         return props;
233     }
234
235     public boolean isHttps() {
236         return useHttps;
237     }
238
239 }