2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright (C) 2017 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
20 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23 package org.openecomp.appc.adapter.messaging.dmaap.impl;
25 import java.io.IOException;
28 import com.att.eelf.configuration.EELFLogger;
29 import com.att.eelf.configuration.EELFManager;
30 //import com.att.nsa.cambria.client.CambriaClientBuilders;
31 //import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
32 //import com.att.nsa.cambria.client.CambriaConsumer;
34 import com.att.nsa.mr.client.MRClientFactory;
35 import com.att.nsa.mr.client.MRConsumer;
36 import org.apache.commons.lang3.StringUtils;
37 import org.openecomp.appc.adapter.message.Consumer;
38 import org.openecomp.appc.configuration.Configuration;
39 import org.openecomp.appc.configuration.ConfigurationFactory;
40 import org.openecomp.appc.metricservice.MetricRegistry;
41 import org.openecomp.appc.metricservice.MetricService;
42 import org.openecomp.appc.metricservice.impl.MetricServiceImpl;
43 import org.openecomp.appc.metricservice.metric.Metric;
44 import org.openecomp.appc.metricservice.metric.MetricType;
45 import org.openecomp.appc.metricservice.metric.DmaapRequestCounterMetric;
46 import org.openecomp.appc.metricservice.policy.PublishingPolicy;
47 import org.openecomp.appc.metricservice.publisher.LogPublisher;
48 import org.osgi.framework.BundleContext;
49 import org.osgi.framework.FrameworkUtil;
50 import org.osgi.framework.ServiceReference;
52 public class DmaapConsumerImpl implements Consumer {
54 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
55 private static final Configuration configuration = ConfigurationFactory.getConfiguration();
57 private static final int DEFAULT_TIMEOUT_MS = 60000;
58 private static final int DEFAULT_LIMIT = 1000;
59 private static MetricRegistry metricRegistry;
61 private DmaapRequestCounterMetric dmaapKpiMetric;
62 private boolean isMetricEnabled=false;
63 private boolean useHttps = false;
64 private MRConsumer client = null;
65 private Properties props = null;
68 public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password) {
69 this(urls, topicName, consumerGroupName, consumerId,user, password,null);
73 public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,String user, String password,String filter) {
74 this.topic = topicName;
75 this.props = new Properties();
76 String urlsStr = StringUtils.join(urls, ',');
77 props.setProperty("host",urlsStr);
78 props.setProperty("group",consumerGroupName);
79 props.setProperty("id",consumerId);
80 props.setProperty("username",user);
81 props.setProperty("password",password);
83 props.setProperty("filter", filter);
88 private void initMetric() {
89 LOG.debug("Metric getting initialized");
90 MetricService metricService = getMetricservice();
91 metricRegistry = metricService.createRegistry("APPC");
92 dmaapKpiMetric = metricRegistry.metricBuilderFactory().
93 dmaapRequestCounterBuilder().
94 withName("DMAAP_KPI").withType(MetricType.COUNTER).
95 withRecievedMessage(0)
96 .withPublishedMessage(0)
98 if (metricRegistry.register(dmaapKpiMetric)) {
99 Metric[] metrics = new Metric[]{dmaapKpiMetric};
100 LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
101 LogPublisher[] logPublishers = new LogPublisher[1];
102 logPublishers[0] = logPublisher;
103 PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory().
104 scheduledPolicyBuilder().withPublishers(logPublishers).
105 withMetrics(metrics).
107 LOG.debug("Policy getting initialized");
108 manuallyScheduledPublishingPolicy.init();
109 LOG.debug("Metric initialized");
112 private MRConsumer getClient() {
113 return getClient(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
117 * @return An instance of MRConsumer created from our class variables
119 private synchronized MRConsumer getClient(int waitMs, int limit) {
121 props.setProperty("timeout",String.valueOf(waitMs));
122 props.setProperty("limit",String.valueOf(limit));
123 String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props);
124 return MRClientFactory.createConsumer ( topicProducerPropFileName);
125 } catch (IOException e1) {
126 LOG.error("failed to createConsumer",e1);
132 public synchronized void updateCredentials(String key, String secret) {
133 LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
135 String password = secret;
136 props.setProperty("user",String.valueOf(user));
137 props.setProperty("password",String.valueOf(password));
142 public List<String> fetch(int waitMs, int limit) {
143 Properties properties=configuration.getProperties();
144 if(properties!=null && properties.getProperty("metric.enabled")!=null ){
145 isMetricEnabled=Boolean.valueOf(properties.getProperty("metric.enabled"));
150 LOG.debug(String.format("Fetching up to %d records with %dms wait on %s", limit, waitMs, this.toString()));
151 List<String> out = new ArrayList<String>();
153 // Create client once and reuse it on subsequent fetches. This is
154 // to support failover to other servers in the DMaaP cluster.
155 if (client == null) {
156 LOG.info("Getting DMaaP Client ...");
157 client = getClient(waitMs, limit);
160 for (String s : client.fetch(waitMs, limit)) {
163 ((DmaapRequestCounterMetric)metricRegistry.metric("DMAAP_KPI")).incrementRecievedMessage();
166 LOG.debug(String.format("Got %d records from %s", out.size(), this.toString()));
167 } catch (Exception e) {
168 // Connection exception
169 LOG.error(String.format("Dmaap Connection Issue Detected. %s", e.getMessage()));
172 LOG.warn(String.format("Sleeping for %dms to compensate for connection failure", waitMs));
173 Thread.sleep(waitMs);
174 } catch (InterruptedException e2) {
175 LOG.warn(String.format("Failed to wait for %dms after bad fetch", waitMs));
184 * Close consumer Dmaap client
187 public void close() {
188 LOG.debug("Closing Dmaap consumer client....");
189 if (client != null) {
195 public List<String> fetch() {
196 return fetch(DEFAULT_TIMEOUT_MS, DEFAULT_LIMIT);
200 public String toString() {
201 String hostStr = (props == null || props.getProperty("host") == null? "N/A" : props.getProperty("host"));
202 String group = (props == null || props.getProperty("group") == null? "N/A" : props.getProperty("group"));
203 String id = (props == null || props.getProperty("id") == null? "N/A" : props.getProperty("id"));
204 return String.format("Consumer %s/%s listening to %s on [%s]", group, id, topic, hostStr);
208 public void useHttps(boolean yes) {
213 private MetricService getMetricservice() {
214 BundleContext bctx = FrameworkUtil.getBundle(MetricService.class).getBundleContext();
215 // Get AAIadapter reference
216 ServiceReference sref = bctx.getServiceReference(MetricService.class.getName());
218 LOG.info("Metric Service from bundlecontext");
219 return (MetricServiceImpl) bctx.getService(sref);
222 LOG.info("Metric Service error from bundlecontext");
223 LOG.warn("Cannot find service reference for org.openecomp.appc.metricservice.MetricService");
229 public Metric getMetric(String name){
230 return metricRegistry.metric(name);