Apply builder pattern for topic cnsmer prdcr 31/59031/4
authorkrishnajinka <krishna.jinka@gmail.com>
Sun, 5 Aug 2018 07:52:10 +0000 (16:52 +0900)
committerKrishnakumar Jinka <kris.jinka@samsung.com>
Mon, 6 Aug 2018 13:33:03 +0000 (13:33 +0000)
Modify endpoints event bus related classes to use builder pattern
in particular apply bus topic params object instead of
using parameters as it is. Rework based on commnts

Issue-ID: POLICY-1017
Change-Id: I572a72fa525cf4f664eb70d0415be73116499bd2
Signed-off-by: krisjinka <kris.jinka@samsung.com>
15 files changed:
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/TopicEndpoint.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSinkFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/DmaapTopicSourceFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSinkFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/UebTopicSourceFactory.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusConsumer.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusPublisher.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicBase.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java [new file with mode: 0644]
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineDmaapTopicSink.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineUebTopicSink.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedBusTopicSource.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/SingleThreadedUebTopicSource.java

index e7a21ca..4e2f4ec 100644 (file)
@@ -75,7 +75,7 @@ public interface TopicEndpoint extends Startable, Lockable {
     /**
      * get the Topic Sources for the given topic name
      *
-     * @param topicName the topic name
+     * @param topicNames the topic name
      *
      * @return the Topic Source List
      * @throws IllegalStateException if the entity is in an invalid state
@@ -150,7 +150,6 @@ public interface TopicEndpoint extends Startable, Lockable {
      * infrastructure type
      *
      * @param topicName the topic name
-     * @param commType communication infrastructure type
      *
      * @return the Topic Sink List
      * @throws IllegalStateException if the entity is in an invalid state, for example multiple
index 26e8d41..08a1db8 100644 (file)
@@ -3,13 +3,14 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineDmaapTopicSink;
 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 import org.slf4j.Logger;
@@ -47,53 +49,50 @@ public interface DmaapTopicSinkFactory {
 
     /**
      * Instantiates a new DMAAP Topic Sink
-     * 
-     * @param servers list of servers
-     * @param topic topic name
-     * @param apiKey API Key
-     * @param apiSecret API Secret
-     * @param userName AAF user name
-     * @param password AAF password
-     * @param partitionKey Consumer Group
-     * @param environment DME2 environment
-     * @param aftEnvironment DME2 AFT environment
-     * @param partner DME2 Partner
-     * @param latitude DME2 latitude
-     * @param longitude DME2 longitude
+     *
+     * @param servers         list of servers
+     * @param topic           topic name
+     * @param apiKey          API Key
+     * @param apiSecret       API Secret
+     * @param userName        AAF user name
+     * @param password        AAF password
+     * @param partitionKey    Consumer Group
+     * @param environment     DME2 environment
+     * @param aftEnvironment  DME2 AFT environment
+     * @param partner         DME2 Partner
+     * @param latitude        DME2 latitude
+     * @param longitude       DME2 longitude
      * @param additionalProps additional properties to pass to DME2
-     * @param managed is this sink endpoint managed?
-     * 
+     * @param managed         is this sink endpoint managed?
      * @return an DMAAP Topic Sink
      * @throws IllegalArgumentException if invalid parameters are present
      */
     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
-            String password, String partitionKey, String environment, String aftEnvironment, String partner,
-            String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
-            boolean allowSelfSignedCerts);
+                                String password, String partitionKey, String environment, String aftEnvironment, String partner,
+                                String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
+                                boolean allowSelfSignedCerts);
 
     /**
      * Instantiates a new DMAAP Topic Sink
-     * 
-     * @param servers list of servers
-     * @param topic topic name
-     * @param apiKey API Key
-     * @param apiSecret API Secret
-     * @param userName AAF user name
-     * @param password AAF password
+     *
+     * @param servers      list of servers
+     * @param topic        topic name
+     * @param apiKey       API Key
+     * @param apiSecret    API Secret
+     * @param userName     AAF user name
+     * @param password     AAF password
      * @param partitionKey Consumer Group
-     * @param managed is this sink endpoint managed?
-     * 
+     * @param managed      is this sink endpoint managed?
      * @return an DMAAP Topic Sink
      * @throws IllegalArgumentException if invalid parameters are present
      */
     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
-            String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
+                                String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
 
     /**
      * Creates an DMAAP Topic Sink based on properties files
-     * 
+     *
      * @param properties Properties containing initialization values
-     * 
      * @return an DMAAP Topic Sink
      * @throws IllegalArgumentException if invalid parameters are present
      */
@@ -101,10 +100,9 @@ public interface DmaapTopicSinkFactory {
 
     /**
      * Instantiates a new DMAAP Topic Sink
-     * 
+     *
      * @param servers list of servers
-     * @param topic topic name
-     * 
+     * @param topic   topic name
      * @return an DMAAP Topic Sink
      * @throws IllegalArgumentException if invalid parameters are present
      */
@@ -112,7 +110,7 @@ public interface DmaapTopicSinkFactory {
 
     /**
      * Destroys an DMAAP Topic Sink based on a topic
-     * 
+     *
      * @param topic topic name
      * @throws IllegalArgumentException if invalid parameters are present
      */
@@ -120,18 +118,17 @@ public interface DmaapTopicSinkFactory {
 
     /**
      * gets an DMAAP Topic Sink based on topic name
-     * 
+     *
      * @param topic the topic name
-     * 
      * @return an DMAAP Topic Sink with topic name
      * @throws IllegalArgumentException if an invalid topic is provided
-     * @throws IllegalStateException if the DMAAP Topic Reader is an incorrect state
+     * @throws IllegalStateException    if the DMAAP Topic Reader is an incorrect state
      */
     public DmaapTopicSink get(String topic);
 
     /**
      * Provides a snapshot of the DMAAP Topic Sinks
-     * 
+     *
      * @return a list of the DMAAP Topic Sinks
      */
     public List<DmaapTopicSink> inventory();
@@ -163,9 +160,9 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
 
     @Override
     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
-            String password, String partitionKey, String environment, String aftEnvironment, String partner,
-            String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
-            boolean allowSelfSignedCerts) {
+                                String password, String partitionKey, String environment, String aftEnvironment, String partner,
+                                String latitude, String longitude, Map<String, String> additionalProps, boolean managed, boolean useHttps,
+                                boolean allowSelfSignedCerts) {
 
         if (topic == null || topic.isEmpty()) {
             throw new IllegalArgumentException(MISSING_TOPIC);
@@ -176,9 +173,23 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
                 return dmaapTopicWriters.get(topic);
             }
 
-            DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName,
-                    password, partitionKey, environment, aftEnvironment, partner, latitude, longitude, additionalProps,
-                    useHttps, allowSelfSignedCerts);
+            DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
+                    .servers(servers)
+                    .topic(topic)
+                    .apiKey(apiKey)
+                    .apiSecret(apiSecret)
+                    .userName(userName)
+                    .password(password)
+                    .partitionId(partitionKey)
+                    .environment(environment)
+                    .aftEnvironment(aftEnvironment)
+                    .partner(partner)
+                    .latitude(latitude)
+                    .longitude(longitude)
+                    .additionalProps(additionalProps)
+                    .useHttps(useHttps)
+                    .allowSelfSignedCerts(allowSelfSignedCerts)
+                    .build());
 
             if (managed) {
                 dmaapTopicWriters.put(topic, dmaapTopicSink);
@@ -189,7 +200,8 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
 
     @Override
     public DmaapTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
-            String password, String partitionKey, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
+                                String password, String partitionKey, boolean managed, boolean useHttps,
+                                boolean allowSelfSignedCerts) {
 
         if (topic == null || topic.isEmpty()) {
             throw new IllegalArgumentException(MISSING_TOPIC);
@@ -200,8 +212,17 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
                 return dmaapTopicWriters.get(topic);
             }
 
-            DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName,
-                    password, partitionKey, useHttps, allowSelfSignedCerts);
+            DmaapTopicSink dmaapTopicSink = new InlineDmaapTopicSink(BusTopicParams.builder()
+                    .servers(servers)
+                    .topic(topic)
+                    .apiKey(apiKey)
+                    .apiSecret(apiSecret)
+                    .userName(userName)
+                    .password(password)
+                    .partitionId(partitionKey)
+                    .useHttps(useHttps)
+                    .allowSelfSignedCerts(allowSelfSignedCerts)
+                    .build());
 
             if (managed) {
                 dmaapTopicWriters.put(topic, dmaapTopicSink);
index 4285b3a..11dfd29 100644 (file)
@@ -3,6 +3,7 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -27,7 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 import org.slf4j.Logger;
@@ -204,7 +205,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
                 return dmaapTopicSources.get(topic);
             }
 
-            DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
+            DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
                     .servers(servers)
                     .topic(topic)
                     .apiKey(apiKey)
@@ -255,7 +256,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
             }
 
             DmaapTopicSource dmaapTopicSource =
-                    new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
+                    new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
                             .servers(servers)
                             .topic(topic)
                             .apiKey(apiKey)
index a522e2c..9d1bd8a 100644 (file)
@@ -3,6 +3,7 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 import org.slf4j.Logger;
@@ -141,8 +143,15 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
                 return uebTopicSinks.get(topic);
             }
 
-            UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, apiKey, apiSecret, partitionKey,
-                    useHttps, allowSelfSignedCerts);
+            UebTopicSink uebTopicWriter = new InlineUebTopicSink(BusTopicParams.builder()
+                    .servers(servers)
+                    .topic(topic)
+                    .apiKey(apiKey)
+                    .apiSecret(apiSecret)
+                    .partitionId(partitionKey)
+                    .useHttps(useHttps)
+                    .allowSelfSignedCerts(allowSelfSignedCerts)
+                    .build());
 
             if (managed) {
                 uebTopicSinks.put(topic, uebTopicWriter);
index 4c3cbbf..8d3f28e 100644 (file)
@@ -3,6 +3,7 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -26,7 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
 
-import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource;
+import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 import org.slf4j.Logger;
@@ -161,7 +162,7 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
                 return uebTopicSources.get(topic);
             }
 
-            UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
+            UebTopicSource uebTopicSource = new SingleThreadedUebTopicSource(BusTopicParams.builder()
                     .servers(servers)
                     .topic(topic)
                     .apiKey(apiKey)
@@ -366,5 +367,4 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
         builder.append("IndexedUebTopicSourceFactory []");
         return builder.toString();
     }
-
 }
index 636dc6e..6d34d32 100644 (file)
@@ -3,6 +3,7 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -32,7 +33,6 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Wrapper around libraries to consume from message bus
- *
  */
 public interface BusConsumer {
 
@@ -67,7 +66,7 @@ public interface BusConsumer {
 
         /**
          * Sets the server-side filter.
-         * 
+         *
          * @param filter new filter value, or {@code null}
          * @throws IllegalArgumentException if the consumer cannot be built with the new filter
          */
@@ -116,53 +115,47 @@ public interface BusConsumer {
 
         /**
          * Cambria Consumer Wrapper
+         * BusTopicParam object contains the following parameters
+         * servers messaging bus hosts
+         * topic topic
+         * apiKey API Key
+         * apiSecret API Secret
+         * consumerGroup Consumer Group
+         * consumerInstance Consumer Instance
+         * fetchTimeout Fetch Timeout
+         * fetchLimit Fetch Limit
          *
-         * @param servers messaging bus hosts
-         * @param topic topic
-         * @param apiKey API Key
-         * @param apiSecret API Secret
-         * @param consumerGroup Consumer Group
-         * @param consumerInstance Consumer Instance
-         * @param fetchTimeout Fetch Timeout
-         * @param fetchLimit Fetch Limit
+         * @param busTopicParams
          * @throws GeneralSecurityException
          * @throws MalformedURLException
          */
-        public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
-                String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
-                boolean useSelfSignedCerts) {
-            this(servers, topic, apiKey, apiSecret, null, null, consumerGroup, consumerInstance, fetchTimeout,
-                    fetchLimit, useHttps, useSelfSignedCerts);
-        }
+        public CambriaConsumerWrapper(BusTopicParams busTopicParams) {
 
-        public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
-                String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
-                int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
-
-            this.fetchTimeout = fetchTimeout;
+            this.fetchTimeout = busTopicParams.getFetchTimeout();
 
             this.builder = new CambriaClientBuilders.ConsumerBuilder();
 
-            builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
-                    .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
+            builder.knownAs(busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance())
+                    .usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic())
+                    .waitAtServer(fetchTimeout).receivingAtMost(busTopicParams.getFetchLimit());
 
             // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
             builder.withSocketTimeout(fetchTimeout + 30000);
 
-            if (useHttps) {
+            if (busTopicParams.isUseHttps()) {
                 builder.usingHttps();
 
-                if (useSelfSignedCerts) {
+                if (busTopicParams.isAllowSelfSignedCerts()) {
                     builder.allowSelfSignedCertificates();
                 }
             }
 
-            if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
-                builder.authenticatedBy(apiKey, apiSecret);
+            if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
+                builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
             }
 
-            if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
-                builder.authenticatedByHttp(username, password);
+            if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
+                builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
             }
 
             try {
@@ -282,34 +275,36 @@ public interface BusConsumer {
 
         /**
          * MR Consumer Wrapper
+         * <p>
+         * servers          messaging bus hosts
+         * topic            topic
+         * apiKey           API Key
+         * apiSecret        API Secret
+         * username         AAF Login
+         * password         AAF Password
+         * consumerGroup    Consumer Group
+         * consumerInstance Consumer Instance
+         * fetchTimeout     Fetch Timeout
+         * fetchLimit       Fetch Limit
          *
-         * @param servers messaging bus hosts
-         * @param topic topic
-         * @param apiKey API Key
-         * @param apiSecret API Secret
-         * @param username AAF Login
-         * @param password AAF Password
-         * @param consumerGroup Consumer Group
-         * @param consumerInstance Consumer Instance
-         * @param fetchTimeout Fetch Timeout
-         * @param fetchLimit Fetch Limit
+         * @param busTopicParams contains above listed attributes
          * @throws MalformedURLException
          */
-        public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
-                String username, String password, String consumerGroup, String consumerInstance, int fetchTimeout,
-                int fetchLimit) throws MalformedURLException {
+        public DmaapConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
 
-            this.fetchTimeout = fetchTimeout;
+            this.fetchTimeout = busTopicParams.getFetchTimeout();
 
-            if (topic == null || topic.isEmpty()) {
+            if (busTopicParams.isTopicNullOrEmpty()) {
                 throw new IllegalArgumentException("No topic for DMaaP");
             }
 
-            this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout,
-                    fetchLimit, null, apiKey, apiSecret);
+            this.consumer = new MRConsumerImpl(busTopicParams.getServers(), busTopicParams.getTopic(),
+                    busTopicParams.getConsumerGroup(), busTopicParams.getConsumerInstance(),
+                    busTopicParams.getFetchTimeout(), busTopicParams.getFetchLimit(), null,
+                    busTopicParams.getApiKey(), busTopicParams.getApiSecret());
 
-            this.consumer.setUsername(username);
-            this.consumer.setPassword(password);
+            this.consumer.setUsername(busTopicParams.getUserName());
+            this.consumer.setPassword(busTopicParams.getPassword());
         }
 
         @Override
@@ -374,29 +369,29 @@ public interface BusConsumer {
         private final Properties props;
 
         /**
+         * BusTopicParams contain the following parameters
          * MR Consumer Wrapper
+         * <p>
+         * servers messaging bus hosts
+         * topic topic
+         * apiKey API Key
+         * apiSecret API Secret
+         * aafLogin AAF Login
+         * aafPassword AAF Password
+         * consumerGroup Consumer Group
+         * consumerInstance Consumer Instance
+         * fetchTimeout Fetch Timeout
+         * fetchLimit Fetch Limit
          *
-         * @param servers messaging bus hosts
-         * @param topic topic
-         * @param apiKey API Key
-         * @param apiSecret API Secret
-         * @param aafLogin AAF Login
-         * @param aafPassword AAF Password
-         * @param consumerGroup Consumer Group
-         * @param consumerInstance Consumer Instance
-         * @param fetchTimeout Fetch Timeout
-         * @param fetchLimit Fetch Limit
+         * @param busTopicParams contains above listed params
          * @throws MalformedURLException
          */
-        public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
-                String aafLogin, String aafPassword, String consumerGroup, String consumerInstance, int fetchTimeout,
-                int fetchLimit, boolean useHttps) throws MalformedURLException {
+        public DmaapAafConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
 
-            super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, consumerInstance,
-                    fetchTimeout, fetchLimit);
+            super(busTopicParams);
 
             // super constructor sets servers = {""} if empty to avoid errors when using DME2
-            if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) || (servers.isEmpty())) {
+            if (busTopicParams.isServersNullOrEmpty()) {
                 throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
             }
 
@@ -404,13 +399,13 @@ public interface BusConsumer {
 
             props = new Properties();
 
-            if (useHttps) {
+            if (busTopicParams.isUseHttps()) {
                 props.setProperty(PROTOCOL_PROP, "https");
-                this.consumer.setHost(servers.get(0) + ":3905");
+                this.consumer.setHost(busTopicParams.getServers().get(0) + ":3905");
 
             } else {
                 props.setProperty(PROTOCOL_PROP, "http");
-                this.consumer.setHost(servers.get(0) + ":3904");
+                this.consumer.setHost(busTopicParams.getServers().get(0) + ":3904");
             }
 
             this.consumer.setProps(props);
@@ -434,70 +429,72 @@ public interface BusConsumer {
 
         private final Properties props;
 
-        public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
-                String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout,
-                int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude,
-                String longitude, Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException {
-
+        public DmaapDmeConsumerWrapper(BusTopicParams busTopicParams) throws MalformedURLException {
 
 
-            super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance,
-                    fetchTimeout, fetchLimit);
+            super(busTopicParams);
 
 
-            final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+            final String dme2RouteOffer = busTopicParams.getAdditionalProps()
+                    .get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
 
-            if (environment == null || environment.isEmpty()) {
-                throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+            if (busTopicParams.isEnvironmentNullOrEmpty()) {
+                throw parmException(busTopicParams.getTopic(),
+                        PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
             }
-            if (aftEnvironment == null || aftEnvironment.isEmpty()) {
-                throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+            if (busTopicParams.isAftEnvironmentNullOrEmpty()) {
+                throw parmException(busTopicParams.getTopic(),
+                        PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
             }
-            if (latitude == null || latitude.isEmpty()) {
-                throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+            if (busTopicParams.isLatitudeNullOrEmpty()) {
+                throw parmException(busTopicParams.getTopic(),
+                        PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
             }
-            if (longitude == null || longitude.isEmpty()) {
-                throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+            if (busTopicParams.isLongitudeNullOrEmpty()) {
+                throw parmException(busTopicParams.getTopic(),
+                        PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
             }
 
-            if ((dme2Partner == null || dme2Partner.isEmpty())
+            if ((busTopicParams.isPartnerNullOrEmpty())
                     && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
                 throw new IllegalArgumentException(
-                        "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+                        "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
+                                + "." + busTopicParams.getTopic()
                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
-                                + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+                                + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+                                + busTopicParams.getTopic()
                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
             }
 
-            final String serviceName = servers.get(0);
+            final String serviceName = busTopicParams.getServers().get(0);
 
             this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
 
-            this.consumer.setUsername(dme2Login);
-            this.consumer.setPassword(dme2Password);
+            this.consumer.setUsername(busTopicParams.getUserName());
+            this.consumer.setPassword(busTopicParams.getPassword());
 
             props = new Properties();
 
             props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
 
-            props.setProperty("username", dme2Login);
-            props.setProperty("password", dme2Password);
+            props.setProperty("username", busTopicParams.getUserName());
+            props.setProperty("password", busTopicParams.getPassword());
 
             /* These are required, no defaults */
-            props.setProperty("topic", topic);
+            props.setProperty("topic", busTopicParams.getTopic());
 
-            props.setProperty("Environment", environment);
-            props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+            props.setProperty("Environment", busTopicParams.getEnvironment());
+            props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
 
-            if (dme2Partner != null) {
-                props.setProperty("Partner", dme2Partner);
+            if (busTopicParams.getPartner() != null) {
+                props.setProperty("Partner", busTopicParams.getPartner());
             }
             if (dme2RouteOffer != null) {
                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
             }
 
-            props.setProperty("Latitude", latitude);
-            props.setProperty("Longitude", longitude);
+            props.setProperty("Latitude", busTopicParams.getLatitude());
+            props.setProperty("Longitude", busTopicParams.getLongitude());
 
             /* These are optional, will default to these values if not set in additionalProps */
             props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
@@ -511,7 +508,7 @@ public interface BusConsumer {
             props.setProperty("TransportType", "DME2");
             props.setProperty("MethodType", "GET");
 
-            if (useHttps) {
+            if (busTopicParams.isUseHttps()) {
                 props.setProperty(PROTOCOL_PROP, "https");
 
             } else {
@@ -520,8 +517,8 @@ public interface BusConsumer {
 
             props.setProperty("contenttype", "application/json");
 
-            if (additionalProps != null) {
-                for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
+            if (busTopicParams.isAdditionalPropsValid()) {
+                for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
                     props.put(entry.getKey(), entry.getValue());
                 }
             }
index 9db9131..348100a 100644 (file)
@@ -3,6 +3,7 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -47,7 +48,7 @@ public interface BusPublisher {
     /**
      * sends a message
      * 
-     * @param partition id
+     * @param partitionId id
      * @param message the message
      * @return true if success, false otherwise
      * @throws IllegalArgumentException if no message provided
@@ -72,23 +73,17 @@ public interface BusPublisher {
         @JsonIgnore
         protected volatile CambriaBatchingPublisher publisher;
 
-        public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
-                boolean useHttps) {
-            this(servers, topic, apiKey, apiSecret, null, null, useHttps, false);
-        }
-
-        public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
-                String username, String password, boolean useHttps, boolean selfSignedCerts) {
+        public CambriaPublisherWrapper(BusTopicParams busTopicParams) {
 
             PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
 
-            builder.usingHosts(servers).onTopic(topic);
+            builder.usingHosts(busTopicParams.getServers()).onTopic(busTopicParams.getTopic());
 
             // Set read timeout to 30 seconds (TBD: this should be configurable)
             builder.withSocketTimeout(30000);
 
-            if (useHttps) {
-                if (selfSignedCerts) {
+            if (busTopicParams.isUseHttps()) {
+                if (busTopicParams.isAllowSelfSignedCerts()) {
                     builder.withConnectionType(ConnectionType.HTTPS_NO_VALIDATION);
                 } else {
                     builder.withConnectionType(ConnectionType.HTTPS);
@@ -96,12 +91,12 @@ public interface BusPublisher {
             }
 
 
-            if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
-                builder.authenticatedBy(apiKey, apiSecret);
+            if (busTopicParams.isApiKeyValid() && busTopicParams.isApiSecretValid()) {
+                builder.authenticatedBy(busTopicParams.getApiKey(), busTopicParams.getApiSecret());
             }
 
-            if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
-                builder.authenticatedByHttp(username, password);
+            if (busTopicParams.isUserNameValid() && busTopicParams.isPasswordValid()) {
+                builder.authenticatedByHttp(busTopicParams.getUserName(), busTopicParams.getPassword());
             }
 
             try {
@@ -297,55 +292,60 @@ public interface BusPublisher {
     }
 
     public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
-        public DmaapDmePublisherWrapper(List<String> servers, String topic, String username, String password,
-                String environment, String aftEnvironment, String dme2Partner, String latitude, String longitude,
-                Map<String, String> additionalProps, boolean useHttps) {
-
-            super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
-
-
-
-            String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+        public DmaapDmePublisherWrapper(BusTopicParams busTopicParams) {
+
+            super(ProtocolTypeConstants.DME2, busTopicParams.getServers(),busTopicParams.getTopic(),
+                    busTopicParams.getUserName(),busTopicParams.getPassword(),busTopicParams.isUseHttps());
+            String dme2RouteOffer = null;
+            if (busTopicParams.isAdditionalPropsValid()) {
+                dme2RouteOffer = busTopicParams.getAdditionalProps().get(
+                        DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+            }
 
-            if (environment == null || environment.isEmpty()) {
-                throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
+            if (busTopicParams.isEnvironmentNullOrEmpty()) {
+                throw parmException(busTopicParams.getTopic(),
+                        PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
             }
-            if (aftEnvironment == null || aftEnvironment.isEmpty()) {
-                throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
+            if (busTopicParams.isAftEnvironmentNullOrEmpty()) {
+                throw parmException(busTopicParams.getTopic(),
+                        PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
             }
-            if (latitude == null || latitude.isEmpty()) {
-                throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
+            if (busTopicParams.isLatitudeNullOrEmpty()) {
+                throw parmException(busTopicParams.getTopic(),
+                        PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
             }
-            if (longitude == null || longitude.isEmpty()) {
-                throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
+            if (busTopicParams.isLongitudeNullOrEmpty()) {
+                throw parmException(busTopicParams.getTopic(),
+                        PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
             }
 
-            if ((dme2Partner == null || dme2Partner.isEmpty())
-                    && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
+            if ((busTopicParams.isPartnerNullOrEmpty())
+                    && (dme2RouteOffer == null || dme2RouteOffer.trim().isEmpty())) {
                 throw new IllegalArgumentException(
-                        "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+                        "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
+                                + busTopicParams.getTopic()
                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
-                                + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
+                                + PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + busTopicParams.getTopic()
                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
             }
 
-            String serviceName = servers.get(0);
+            String serviceName = busTopicParams.getServers().get(0);
 
             /* These are required, no defaults */
-            props.setProperty("Environment", environment);
-            props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+            props.setProperty("Environment", busTopicParams.getEnvironment());
+            props.setProperty("AFT_ENVIRONMENT", busTopicParams.getAftEnvironment());
 
             props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
 
-            if (dme2Partner != null) {
-                props.setProperty("Partner", dme2Partner);
+            if (busTopicParams.getPartner() != null) {
+                props.setProperty("Partner", busTopicParams.getPartner());
             }
             if (dme2RouteOffer != null) {
                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
             }
 
-            props.setProperty("Latitude", latitude);
-            props.setProperty("Longitude", longitude);
+            props.setProperty("Latitude", busTopicParams.getLatitude());
+            props.setProperty("Longitude", busTopicParams.getLongitude());
 
             // ServiceName also a default, found in additionalProps
 
@@ -361,7 +361,7 @@ public interface BusPublisher {
             props.setProperty("TransportType", "DME2");
             props.setProperty("MethodType", "POST");
 
-            for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
+            for (Map.Entry<String, String> entry : busTopicParams.getAdditionalProps().entrySet()) {
                 String key = entry.getKey();
                 String value = entry.getValue();
 
index 7f4c0dd..0899312 100644 (file)
@@ -52,25 +52,22 @@ public abstract class BusTopicBase extends TopicBase implements ApiKeyEnabled {
     /**
      * Instantiates a new Bus Topic Base
      * 
-     * @param servers list of servers
-     * @param topic topic name
-     * @param apiKey API Key
-     * @param apiSecret API Secret
-     * @param useHttps does connection use HTTPS?
-     * @param allowSelfSignedCerts are self-signed certificates allow
-     * 
+     *  servers list of servers
+     *  topic topic name
+     *  apiKey API Key
+     *  apiSecret API Secret
+     *  useHttps does connection use HTTPS?
+     *  allowSelfSignedCerts are self-signed certificates allow
+     * @param busTopicParams
      * @return a Bus Topic Base
      * @throws IllegalArgumentException if invalid parameters are present
      */
-    public BusTopicBase(List<String> servers, String topic, String apiKey, String apiSecret, boolean useHttps,
-            boolean allowSelfSignedCerts) {
-
-        super(servers, topic);
-
-        this.apiKey = apiKey;
-        this.apiSecret = apiSecret;
-        this.useHttps = useHttps;
-        this.allowSelfSignedCerts = allowSelfSignedCerts;
+    public BusTopicBase(BusTopicParams busTopicParams) {
+        super(busTopicParams.getServers(), busTopicParams.getTopic());
+        this.apiKey = busTopicParams.getApiKey();
+        this.apiSecret = busTopicParams.getApiSecret();
+        this.useHttps = busTopicParams.isUseHttps();
+        this.allowSelfSignedCerts = busTopicParams.isAllowSelfSignedCerts();
     }
 
     @Override
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
new file mode 100644 (file)
index 0000000..ffefcbf
--- /dev/null
@@ -0,0 +1,316 @@
+/*
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.bus.internal;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Member variables of this Params class are as follows
+ * servers DMaaP servers
+ * topic DMaaP Topic to be monitored
+ * apiKey DMaaP API Key (optional)
+ * apiSecret DMaaP API Secret (optional)
+ * consumerGroup DMaaP Reader Consumer Group
+ * consumerInstance DMaaP Reader Instance
+ * fetchTimeout DMaaP fetch timeout
+ * fetchLimit DMaaP fetch limit
+ * environment DME2 Environment
+ * aftEnvironment DME2 AFT Environment
+ * partner DME2 Partner
+ * latitude DME2 Latitude
+ * longitude DME2 Longitude
+ * additionalProps Additional properties to pass to DME2
+ * useHttps does connection use HTTPS?
+ * allowSelfSignedCerts are self-signed certificates allow
+ */
+public class BusTopicParams {
+
+    public static TopicParamsBuilder builder() {
+        return new TopicParamsBuilder();
+    }
+
+    private List<String> servers;
+    private String topic;
+    private String apiKey;
+    private String apiSecret;
+    private String consumerGroup;
+    private String consumerInstance;
+    private int fetchTimeout;
+    private int fetchLimit;
+    private boolean useHttps;
+    private boolean allowSelfSignedCerts;
+
+    private String userName;
+    private String password;
+    private String environment;
+    private String aftEnvironment;
+    private String partner;
+    private String latitude;
+    private String longitude;
+    private Map<String, String> additionalProps;
+    private String partitionId;
+
+    String getPartitionId() {
+        return partitionId;
+    }
+
+    String getUserName() {
+        return userName;
+    }
+
+    String getPassword() {
+        return password;
+    }
+
+    String getEnvironment() {
+        return environment;
+    }
+
+    String getAftEnvironment() {
+        return aftEnvironment;
+    }
+
+    String getPartner() {
+        return partner;
+    }
+
+    String getLatitude() {
+        return latitude;
+    }
+
+    String getLongitude() {
+        return longitude;
+    }
+
+    Map<String, String> getAdditionalProps() {
+        return additionalProps;
+    }
+
+    List<String> getServers() {
+        return servers;
+    }
+
+    String getTopic() {
+        return topic;
+    }
+
+    String getApiKey() {
+        return apiKey;
+    }
+
+    String getApiSecret() {
+        return apiSecret;
+    }
+
+    String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    String getConsumerInstance() {
+        return consumerInstance;
+    }
+
+    int getFetchTimeout() {
+        return fetchTimeout;
+    }
+
+    int getFetchLimit() {
+        return fetchLimit;
+    }
+
+    boolean isUseHttps() {
+        return useHttps;
+    }
+
+    boolean isAllowSelfSignedCerts() {
+        return allowSelfSignedCerts;
+    }
+
+    boolean isEnvironmentNullOrEmpty() {
+        return (environment == null || environment.trim().isEmpty());
+    }
+
+    boolean isAftEnvironmentNullOrEmpty() {
+        return (aftEnvironment == null || aftEnvironment.trim().isEmpty());
+    }
+
+    boolean isLatitudeNullOrEmpty() {
+        return (latitude == null || latitude.trim().isEmpty());
+    }
+
+    boolean isLongitudeNullOrEmpty() {
+        return (longitude == null || longitude.trim().isEmpty());
+    }
+
+    boolean isConsumerInstanceNullOrEmpty() {
+        return (consumerInstance == null || consumerInstance.trim().isEmpty());
+    }
+
+    boolean isConsumerGroupNullOrEmpty() {
+        return (consumerGroup == null || consumerGroup.trim().isEmpty());
+    }
+
+    boolean isApiKeyValid() {
+        return !(apiKey == null || apiKey.trim().isEmpty());
+    }
+
+    boolean isApiSecretValid() {
+        return !(apiSecret == null || apiSecret.trim().isEmpty());
+    }
+
+    boolean isUserNameValid() {
+        return !(userName == null || userName.trim().isEmpty());
+    }
+
+    boolean isPasswordValid() {
+        return !(password == null || password.trim().isEmpty());
+    }
+
+    boolean isPartnerNullOrEmpty() {
+        return (partner == null || partner.trim().isEmpty());
+    }
+
+    boolean isServersNullOrEmpty() {
+        return (servers == null || servers.isEmpty()
+                || (servers.size() == 1 && ("".equals(servers.get(0)))));
+    }
+
+    boolean isAdditionalPropsValid() {
+        return additionalProps != null;
+    }
+
+    boolean isTopicNullOrEmpty() {
+        return (topic == null || topic.trim().isEmpty());
+    }
+
+    boolean isPartitionIdNullOrEmpty() {
+        return (partitionId == null || partitionId.trim().isEmpty());
+    }
+
+    public static class TopicParamsBuilder {
+        BusTopicParams m = new BusTopicParams();
+
+        private TopicParamsBuilder() {
+        }
+
+        public TopicParamsBuilder servers(List<String> servers) {
+            this.m.servers = servers;
+            return this;
+        }
+
+        public TopicParamsBuilder topic(String topic) {
+            this.m.topic = topic;
+            return this;
+        }
+
+        public TopicParamsBuilder apiKey(String apiKey) {
+            this.m.apiKey = apiKey;
+            return this;
+        }
+
+        public TopicParamsBuilder apiSecret(String apiSecret) {
+            this.m.apiSecret = apiSecret;
+            return this;
+        }
+
+        public TopicParamsBuilder consumerGroup(String consumerGroup) {
+            this.m.consumerGroup = consumerGroup;
+            return this;
+        }
+
+        public TopicParamsBuilder consumerInstance(String consumerInstance) {
+            this.m.consumerInstance = consumerInstance;
+            return this;
+        }
+
+        public TopicParamsBuilder fetchTimeout(int fetchTimeout) {
+            this.m.fetchTimeout = fetchTimeout;
+            return this;
+        }
+
+        public TopicParamsBuilder fetchLimit(int fetchLimit) {
+            this.m.fetchLimit = fetchLimit;
+            return this;
+        }
+
+        public TopicParamsBuilder useHttps(boolean useHttps) {
+            this.m.useHttps = useHttps;
+            return this;
+        }
+
+        public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) {
+            this.m.allowSelfSignedCerts = allowSelfSignedCerts;
+            return this;
+        }
+
+        public TopicParamsBuilder userName(String userName) {
+            this.m.userName = userName;
+            return this;
+        }
+
+        public TopicParamsBuilder password(String password) {
+            this.m.password = password;
+            return this;
+        }
+
+        public TopicParamsBuilder environment(String environment) {
+            this.m.environment = environment;
+            return this;
+        }
+
+        public TopicParamsBuilder aftEnvironment(String aftEnvironment) {
+            this.m.aftEnvironment = aftEnvironment;
+            return this;
+        }
+
+        public TopicParamsBuilder partner(String partner) {
+            this.m.partner = partner;
+            return this;
+        }
+
+        public TopicParamsBuilder latitude(String latitude) {
+            this.m.latitude = latitude;
+            return this;
+        }
+
+        public TopicParamsBuilder longitude(String longitude) {
+            this.m.longitude = longitude;
+            return this;
+        }
+
+        public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) {
+            this.m.additionalProps = additionalProps;
+            return this;
+        }
+
+        public TopicParamsBuilder partitionId(String partitionId) {
+            this.m.partitionId = partitionId;
+            return this;
+        }
+
+        public BusTopicParams build() {
+            return m;
+        }
+
+    }
+}
+
index f3c736d..5493468 100644 (file)
@@ -3,6 +3,7 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -52,23 +53,24 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
 
     /**
      * constructor for abstract sink
-     * 
-     * @param servers servers
-     * @param topic topic
-     * @param apiKey api secret
-     * @param apiSecret api secret
-     * @param partitionId partition id
-     * @param useHttps does connection use HTTPS?
-     * @param allowSelfSignedCerts are self-signed certificates allow
+     * @param busTopicParams contains below listed attributes
+     * servers servers
+     * topic topic
+     * apiKey api secret
+     * apiSecret api secret
+     * partitionId partition id
+     * useHttps does connection use HTTPS?
+     * allowSelfSignedCerts are self-signed certificates allow     *
      * @throws IllegalArgumentException in invalid parameters are passed in
      */
-    public InlineBusTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String partitionId,
-            boolean useHttps, boolean allowSelfSignedCerts) {
+    public InlineBusTopicSink(BusTopicParams busTopicParams) {
 
-        super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
+        super(busTopicParams);
 
-        if (partitionId == null || partitionId.isEmpty()) {
+        if (busTopicParams.isPartitionIdNullOrEmpty()) {
             this.partitionId = UUID.randomUUID().toString();
+        } else {
+            this.partitionId = busTopicParams.getPartitionId();
         }
     }
 
index 3ea7185..3dd4031 100644 (file)
@@ -3,13 +3,14 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -48,65 +49,67 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
     protected Map<String, String> additionalProps = null;
 
     /**
-     * 
-     * @param servers DMaaP servers
-     * @param topic DMaaP Topic to be monitored
-     * @param apiKey DMaaP API Key (optional)
-     * @param apiSecret DMaaP API Secret (optional)
-     * @param consumerGroup DMaaP Reader Consumer Group
-     * @param consumerInstance DMaaP Reader Instance
-     * @param fetchTimeout DMaaP fetch timeout
-     * @param fetchLimit DMaaP fetch limit
-     * @param environment DME2 Environment
-     * @param aftEnvironment DME2 AFT Environment
-     * @param partner DME2 Partner
-     * @param latitude DME2 Latitude
-     * @param longitude DME2 Longitude
-     * @param additionalProps Additional properties to pass to DME2
-     * @param useHttps does connection use HTTPS?
-     * @param allowSelfSignedCerts are self-signed certificates allow
-     * 
+     * BusTopicParams contains the below mentioned attributes
+     * servers              DMaaP servers
+     * topic                DMaaP Topic to be monitored
+     * apiKey               DMaaP API Key (optional)
+     * apiSecret            DMaaP API Secret (optional)
+     * environment          DME2 Environment
+     * aftEnvironment       DME2 AFT Environment
+     * partner              DME2 Partner
+     * latitude             DME2 Latitude
+     * longitude            DME2 Longitude
+     * additionalProps      Additional properties to pass to DME2
+     * useHttps             does connection use HTTPS?
+     * allowSelfSignedCerts are self-signed certificates allow
+     * @param busTopicParams Contains the above mentioned parameters
      * @throws IllegalArgumentException An invalid parameter passed in
      */
-    public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
-            String password, String partitionKey, String environment, String aftEnvironment, String partner,
-            String latitude, String longitude, Map<String, String> additionalProps, boolean useHttps,
-            boolean allowSelfSignedCerts) {
+    public InlineDmaapTopicSink(BusTopicParams busTopicParams) {
 
-        super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
+        super(busTopicParams);
 
-        this.userName = userName;
-        this.password = password;
+        this.userName = busTopicParams.getUserName();
+        this.password = busTopicParams.getPassword();
 
-        this.environment = environment;
-        this.aftEnvironment = aftEnvironment;
-        this.partner = partner;
+        this.environment = busTopicParams.getEnvironment();
+        this.aftEnvironment = busTopicParams.getAftEnvironment();
+        this.partner = busTopicParams.getPartner();
 
-        this.latitude = latitude;
-        this.longitude = longitude;
+        this.latitude = busTopicParams.getLatitude();
+        this.longitude = busTopicParams.getLongitude();
 
-        this.additionalProps = additionalProps;
-    }
-
-    public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
-            String password, String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) {
-
-        super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
-
-        this.userName = userName;
-        this.password = password;
+        this.additionalProps = busTopicParams.getAdditionalProps();
     }
 
 
     @Override
     public void init() {
         if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
-            this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey,
-                    this.apiSecret, this.userName, this.password, this.useHttps, this.allowSelfSignedCerts);
+            this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder()
+                    .servers(this.servers)
+                    .topic(this.topic)
+                    .apiKey(this.apiKey)
+                    .apiSecret(this.apiSecret)
+                    .userName(this.userName)
+                    .password(this.password)
+                    .useHttps(this.useHttps)
+                    .allowSelfSignedCerts(this.allowSelfSignedCerts)
+                    .build());
         } else {
-            this.publisher = new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, this.userName,
-                    this.password, this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude,
-                    this.additionalProps, this.useHttps);
+            this.publisher = new BusPublisher.DmaapDmePublisherWrapper(BusTopicParams.builder()
+                    .servers(this.servers)
+                    .topic(this.topic)
+                    .userName(this.userName)
+                    .password(this.password)
+                    .environment(this.environment)
+                    .aftEnvironment(this.aftEnvironment)
+                    .partner(this.partner)
+                    .latitude(this.latitude)
+                    .longitude(this.longitude)
+                    .additionalProps(this.additionalProps)
+                    .useHttps(this.useHttps)
+                    .build());
         }
 
         logger.info("{}: DMAAP SINK created", this);
index fefe649..218e44b 100644 (file)
@@ -3,13 +3,14 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -39,21 +40,21 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
     private static Logger logger = LoggerFactory.getLogger(InlineUebTopicSink.class);
 
     /**
-     * Argument-based UEB Topic Writer instantiation
-     * 
-     * @param servers list of UEB servers available for publishing
-     * @param topic the topic to publish to
-     * @param apiKey the api key (optional)
-     * @param apiSecret the api secret (optional)
-     * @param partitionId the partition key (optional, autogenerated if not provided)
-     * @param useHttps does connection use HTTPS?
-     * @param allowSelfSignedCerts are self-signed certificates allow
-     * 
+     * Argument-based UEB Topic Writer instantiation. BusTopicParams contains below mentioned
+     * attributes
+     *
+     * servers              list of UEB servers available for publishing
+     * topic                the topic to publish to
+     * apiKey               the api key (optional)
+     * apiSecret            the api secret (optional)
+     * partitionId          the partition key (optional, autogenerated if not provided)
+     * useHttps             does connection use HTTPS?
+     * allowSelfSignedCerts are self-signed certificates allow
+     * @param busTopicParams contains attributes needed
      * @throws IllegalArgumentException if invalid arguments are detected
      */
-    public InlineUebTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String partitionId,
-            boolean useHttps, boolean allowSelfSignedCerts) {
-        super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts);
+    public InlineUebTopicSink(BusTopicParams busTopicParams) {
+        super(busTopicParams);
     }
 
     /**
@@ -62,8 +63,14 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi
     @Override
     public void init() {
 
-        this.publisher = new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
-                    null, null, this.useHttps, this.allowSelfSignedCerts);
+        this.publisher = new BusPublisher.CambriaPublisherWrapper(BusTopicParams.builder()
+                .servers(this.servers)
+                .topic(this.topic)
+                .apiKey(this.apiKey)
+                .apiSecret(this.apiSecret)
+                .useHttps(this.useHttps)
+                .allowSelfSignedCerts(this.allowSelfSignedCerts)
+                .build());
         logger.info("{}: UEB SINK created", this);
     }
 
index 74912ca..400cbfe 100644 (file)
@@ -3,6 +3,7 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,8 +22,6 @@
 package org.onap.policy.common.endpoints.event.comm.bus.internal;
 
 import java.net.MalformedURLException;
-import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 
 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
@@ -85,15 +84,15 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
      */
     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
 
-        super(busTopicParams.getServers(), busTopicParams.getTopic(), busTopicParams.getApiKey(), busTopicParams.getApiSecret(), busTopicParams.isUseHttps(), busTopicParams.isAllowSelfSignedCerts());
+        super(busTopicParams);
 
-        if (busTopicParams.getConsumerGroup() == null || busTopicParams.getConsumerGroup().isEmpty()) {
+        if (busTopicParams.isConsumerGroupNullOrEmpty()) {
             this.consumerGroup = UUID.randomUUID().toString();
         } else {
             this.consumerGroup = busTopicParams.getConsumerGroup();
         }
 
-        if (busTopicParams.getConsumerInstance() == null || busTopicParams.getConsumerInstance().isEmpty()) {
+        if (busTopicParams.isConsumerInstanceNullOrEmpty()) {
             this.consumerInstance = NetworkUtil.getHostname();
         } else {
             this.consumerInstance = busTopicParams.getConsumerInstance();
@@ -312,225 +311,4 @@ public abstract class SingleThreadedBusTopicSource extends BusTopicBase
         return fetchLimit;
     }
 
-    /**
-     * Member variables of this Params class are as follows
-     * servers DMaaP servers
-     * topic DMaaP Topic to be monitored
-     * apiKey DMaaP API Key (optional)
-     * apiSecret DMaaP API Secret (optional)
-     * consumerGroup DMaaP Reader Consumer Group
-     * consumerInstance DMaaP Reader Instance
-     * fetchTimeout DMaaP fetch timeout
-     * fetchLimit DMaaP fetch limit
-     * environment DME2 Environment
-     * aftEnvironment DME2 AFT Environment
-     * partner DME2 Partner
-     * latitude DME2 Latitude
-     * longitude DME2 Longitude
-     * additionalProps Additional properties to pass to DME2
-     * useHttps does connection use HTTPS?
-     * allowSelfSignedCerts are self-signed certificates allow
-     *
-     */
-    public static class BusTopicParams {
-
-        public static TopicParamsBuilder builder() {
-            return new TopicParamsBuilder();
-        }
-        private List<String> servers;
-        private String topic;
-        private String apiKey;
-        private String apiSecret;
-        private String consumerGroup;
-        private String consumerInstance;
-        private int fetchTimeout;
-        private int fetchLimit;
-        private boolean useHttps;
-        private boolean allowSelfSignedCerts;
-
-        private String userName;
-        private String password;
-        private String environment;
-        private String aftEnvironment;
-        private String partner;
-        private String latitude;
-        private String longitude;
-        private Map<String, String> additionalProps;
-
-        public String getUserName() {
-            return userName;
-        }
-
-        public String getPassword() {
-            return password;
-        }
-
-        public String getEnvironment() {
-            return environment;
-        }
-
-        public String getAftEnvironment() {
-            return aftEnvironment;
-        }
-
-        public String getPartner() {
-            return partner;
-        }
-
-        public String getLatitude() {
-            return latitude;
-        }
-
-        public String getLongitude() {
-            return longitude;
-        }
-
-        public Map<String, String> getAdditionalProps() {
-            return additionalProps;
-        }
-
-        public List<String> getServers() {
-            return servers;
-        }
-
-        public String getTopic() {
-            return topic;
-        }
-
-        public String getApiKey() {
-            return apiKey;
-        }
-
-        public String getApiSecret() {
-            return apiSecret;
-        }
-
-        public String getConsumerGroup() {
-            return consumerGroup;
-        }
-
-        public String getConsumerInstance() {
-            return consumerInstance;
-        }
-
-        public int getFetchTimeout() {
-            return fetchTimeout;
-        }
-
-        public int getFetchLimit() {
-            return fetchLimit;
-        }
-
-        public boolean isUseHttps() {
-            return useHttps;
-        }
-
-        public boolean isAllowSelfSignedCerts() {
-            return allowSelfSignedCerts;
-        }
-
-
-        public static class TopicParamsBuilder {
-            BusTopicParams m = new BusTopicParams();
-
-            private TopicParamsBuilder() {
-            }
-
-            public TopicParamsBuilder servers(List<String> servers) {
-                this.m.servers = servers;
-                return this;
-            }
-
-            public TopicParamsBuilder topic(String topic) {
-                this.m.topic = topic;
-                return this;
-            }
-
-            public TopicParamsBuilder apiKey(String apiKey) {
-                this.m.apiKey = apiKey;
-                return this;
-            }
-
-            public TopicParamsBuilder apiSecret(String apiSecret) {
-                this.m.apiSecret = apiSecret;
-                return this;
-            }
-
-            public TopicParamsBuilder consumerGroup(String consumerGroup) {
-                this.m.consumerGroup = consumerGroup;
-                return this;
-            }
-
-            public TopicParamsBuilder consumerInstance(String consumerInstance) {
-                this.m.consumerInstance = consumerInstance;
-                return this;
-            }
-
-            public TopicParamsBuilder fetchTimeout(int fetchTimeout) {
-                this.m.fetchTimeout = fetchTimeout;
-                return this;
-            }
-
-            public TopicParamsBuilder fetchLimit(int fetchLimit) {
-                this.m.fetchLimit = fetchLimit;
-                return this;
-            }
-
-            public TopicParamsBuilder useHttps(boolean useHttps) {
-                this.m.useHttps = useHttps;
-                return this;
-            }
-
-            public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) {
-                this.m.allowSelfSignedCerts = allowSelfSignedCerts;
-                return this;
-            }
-
-            public TopicParamsBuilder userName(String userName) {
-                this.m.userName = userName;
-                return this;
-            }
-
-            public TopicParamsBuilder password(String password) {
-                this.m.password = password;
-                return this;
-            }
-
-            public TopicParamsBuilder environment(String environment) {
-                this.m.environment = environment;
-                return this;
-            }
-
-            public TopicParamsBuilder aftEnvironment(String aftEnvironment) {
-                this.m.aftEnvironment = aftEnvironment;
-                return this;
-            }
-
-            public TopicParamsBuilder partner(String partner) {
-                this.m.partner = partner;
-                return this;
-            }
-
-            public TopicParamsBuilder latitude(String latitude) {
-                this.m.latitude = latitude;
-                return this;
-            }
-
-            public TopicParamsBuilder longitude(String longitude) {
-                this.m.longitude = longitude;
-                return this;
-            }
-
-            public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) {
-                this.m.additionalProps = additionalProps;
-                return this;
-            }
-
-            public BusTopicParams build() {
-                return m;
-            }
-
-        }
-
-    }
 }
index c6bd556..65f75aa 100644 (file)
@@ -3,6 +3,7 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -84,18 +85,52 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
     @Override
     public void init() throws MalformedURLException {
         if (anyNullOrEmpty(this.userName, this.password)) {
-            this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
-                    this.apiSecret, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit,
-                    this.useHttps, this.allowSelfSignedCerts);
+            this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
+                    .servers(this.servers)
+                    .topic(this.topic)
+                    .apiKey(this.apiKey)
+                    .apiSecret(this.apiSecret)
+                    .consumerGroup(this.consumerGroup)
+                    .consumerInstance(this.consumerInstance)
+                    .fetchTimeout(this.fetchTimeout)
+                    .fetchLimit(this.fetchLimit)
+                    .useHttps(this.useHttps)
+                    .allowSelfSignedCerts(this.allowSelfSignedCerts)
+                    .build());
         } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
-            this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
-                    this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
-                    this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
+            this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
+                    .servers(this.servers)
+                    .topic(this.topic)
+                    .apiKey(this.apiKey)
+                    .apiSecret(this.apiSecret)
+                    .userName(this.userName)
+                    .password(this.password)
+                    .consumerGroup(this.consumerGroup)
+                    .consumerInstance(this.consumerInstance)
+                    .fetchTimeout(this.fetchTimeout)
+                    .fetchLimit(this.fetchLimit)
+                    .useHttps(this.useHttps)
+                    .allowSelfSignedCerts(this.allowSelfSignedCerts)
+                    .build());
         } else {
-            this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey,
-                    this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
-                    this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner,
-                    this.latitude, this.longitude, this.additionalProps, this.useHttps);
+            this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(BusTopicParams.builder()
+                    .servers(this.servers)
+                    .topic(this.topic)
+                    .apiKey(this.apiKey)
+                    .apiSecret(this.apiSecret)
+                    .userName(this.userName)
+                    .password(this.password)
+                    .consumerGroup(this.consumerGroup)
+                    .consumerInstance(this.consumerInstance)
+                    .fetchTimeout(this.fetchTimeout)
+                    .fetchLimit(this.fetchLimit)
+                    .environment(this.environment)
+                    .aftEnvironment(this.aftEnvironment)
+                    .partner(this.partner)
+                    .latitude(this.latitude)
+                    .longitude(this.longitude)
+                    .additionalProps(this.additionalProps)
+                    .useHttps(this.useHttps).build());
         }
 
         logger.info("{}: INITTED", this);
index 03273a2..fb20ccc 100644 (file)
@@ -3,6 +3,7 @@
  * policy-endpoints
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -30,8 +31,7 @@ import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSource;
 public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource implements UebTopicSource {
 
     /**
-     * 
-     * @param busTopicParams Parameters object containing all the required inputs     *
+     * @param busTopicParams Parameters object containing all the required inputs
      * @throws IllegalArgumentException An invalid parameter passed in
      */
 
@@ -50,9 +50,17 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource i
      */
     @Override
     public void init() {
-        this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
-                this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit, this.useHttps,
-                this.allowSelfSignedCerts);
+        this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
+                .servers(this.servers)
+                .topic(this.topic)
+                .apiKey(this.apiKey)
+                .apiSecret(this.apiSecret)
+                .consumerGroup(this.consumerGroup)
+                .consumerInstance(this.consumerInstance)
+                .fetchTimeout(this.fetchTimeout)
+                .fetchLimit(this.fetchLimit)
+                .useHttps(this.useHttps)
+                .allowSelfSignedCerts(this.allowSelfSignedCerts).build());
     }
 
     /**