Enhance DMaaP Adapter Configuration
[appc.git] / appc-adapters / appc-dmaap-adapter / appc-dmaap-adapter-bundle / src / main / java / org / onap / appc / adapter / messaging / dmaap / impl / DmaapConsumerImpl.java
index 6f907ae..40ee1c7 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * ONAP : APPC
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Copyright (C) 2017 Amdocs
  * =============================================================================
@@ -18,7 +18,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * 
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
  * ============LICENSE_END=========================================================
  */
 
@@ -50,27 +49,26 @@ import org.osgi.framework.ServiceReference;
 
 public class DmaapConsumerImpl implements Consumer {
 
-    private static final EELFLogger LOG = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
-    private final Configuration configuration = ConfigurationFactory.getConfiguration();
+    private static final EELFLogger LOG                = EELFManager.getInstance().getLogger(DmaapConsumerImpl.class);
+    private final Configuration     configuration      = ConfigurationFactory.getConfiguration();
     // Default values
-    private static final int DEFAULT_TIMEOUT_MS = 60000;
-    private static final int DEFAULT_LIMIT = 1000;
-    private String topic;
-    private boolean isMetricEnabled = false;
-    private boolean useHttps = false;
-    private MetricRegistry metricRegistry;
-    private MRConsumer client = null;
-    private Properties props = null;
-
+    private static final int        DEFAULT_TIMEOUT_MS = 60000;
+    private static final int        DEFAULT_LIMIT      = 1000;
+    private String                  topic;
+    private boolean                 isMetricEnabled    = false;
+    private boolean                 useHttps           = false;
+    private MetricRegistry          metricRegistry;
+    private MRConsumer              client             = null;
+    private Properties              props              = null;
 
     public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
-        String user, String password) {
+            String user, String password) {
 
-        this(urls, topicName, consumerGroupName, consumerId,user, password,null);
+        this(urls, topicName, consumerGroupName, consumerId, user, password, null);
     }
 
     public DmaapConsumerImpl(Collection<String> urls, String topicName, String consumerGroupName, String consumerId,
-        String user, String password, String filter) {
+            String user, String password, String filter) {
 
         this.topic = topicName;
         this.props = new Properties();
@@ -78,8 +76,13 @@ public class DmaapConsumerImpl implements Consumer {
         props.setProperty("host", urlsStr);
         props.setProperty("group", consumerGroupName);
         props.setProperty("id", consumerId);
-        props.setProperty("username", user);
-        props.setProperty("password", password);
+        if (user != null && password != null) {
+            props.setProperty("username", user);
+            props.setProperty("password", password);
+        } else {
+            props.setProperty("TransportType", "HTTPNOAUTH");
+        }
+
         if (filter != null) {
             props.setProperty("filter", filter);
         }
@@ -92,22 +95,17 @@ public class DmaapConsumerImpl implements Consumer {
             metricRegistry = metricService.createRegistry("APPC");
 
             DmaapRequestCounterMetric dmaapKpiMetric = metricRegistry.metricBuilderFactory()
-                .dmaapRequestCounterBuilder()
-                .withName("DMAAP_KPI").withType(MetricType.COUNTER)
-                .withRecievedMessage(0)
-                .withPublishedMessage(0)
-                .build();
+                    .dmaapRequestCounterBuilder().withName("DMAAP_KPI").withType(MetricType.COUNTER)
+                    .withRecievedMessage(0).withPublishedMessage(0).build();
 
             if (metricRegistry.register(dmaapKpiMetric)) {
-                Metric[] metrics = new Metric[]{dmaapKpiMetric};
+                Metric[] metrics = new Metric[] { dmaapKpiMetric };
                 LogPublisher logPublisher = new LogPublisher(metricRegistry, metrics);
                 LogPublisher[] logPublishers = new LogPublisher[1];
                 logPublishers[0] = logPublisher;
 
                 PublishingPolicy manuallyScheduledPublishingPolicy = metricRegistry.policyBuilderFactory()
-                    .scheduledPolicyBuilder().withPublishers(logPublishers)
-                    .withMetrics(metrics)
-                    .build();
+                        .scheduledPolicyBuilder().withPublishers(logPublishers).withMetrics(metrics).build();
 
                 LOG.debug("Policy getting initialized");
                 manuallyScheduledPublishingPolicy.init();
@@ -121,12 +119,12 @@ public class DmaapConsumerImpl implements Consumer {
      */
     private synchronized MRConsumer getClient(int waitMs, int limit) {
         try {
-            props.setProperty("timeout",String.valueOf(waitMs));
-            props.setProperty("limit",String.valueOf(limit));
-            String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic,props);
+            props.setProperty("timeout", String.valueOf(waitMs));
+            props.setProperty("limit", String.valueOf(limit));
+            String topicProducerPropFileName = DmaapUtil.createConsumerPropFile(topic, props);
             return MRClientFactory.createConsumer(topicProducerPropFileName);
         } catch (IOException e1) {
-            LOG.error("failed to createConsumer",e1);
+            LOG.error("failed to createConsumer", e1);
             return null;
         }
     }
@@ -134,8 +132,8 @@ public class DmaapConsumerImpl implements Consumer {
     @Override
     public synchronized void updateCredentials(String key, String secret) {
         LOG.info(String.format("Setting auth to %s for %s", key, this.toString()));
-        props.setProperty("user",String.valueOf(key));
-        props.setProperty("password",String.valueOf(secret));
+        props.setProperty("username", String.valueOf(key));
+        props.setProperty("password", String.valueOf(secret));
         client = null;
     }
 
@@ -227,4 +225,11 @@ public class DmaapConsumerImpl implements Consumer {
         }
     }
 
+    public Properties getProperties() {
+        return props;
+    }
+
+    public boolean isHttps() {
+        return useHttps;
+    }
 }