Add a topic protocol configuration setting 19/78319/2
authormark.j.leonard <mark.j.leonard@gmail.com>
Tue, 12 Feb 2019 16:49:30 +0000 (16:49 +0000)
committermark.j.leonard <mark.j.leonard@gmail.com>
Wed, 13 Feb 2019 15:47:22 +0000 (15:47 +0000)
Use the latest version of the event-client for DMaaP.

Read an (optional) protocol value from each topic configuration
properties file, and pass this to the Consumer and/or Publisher.
Use the default protocol if the configuration is not supplied.

Change-Id: I3d6264e1f32c1fbba097eafbe7fe7fbd744f1373
Issue-ID: AAI-2150
Signed-off-by: mark.j.leonard <mark.j.leonard@gmail.com>
pom.xml
src/main/java/org/onap/aai/validation/config/TopicConfig.java
src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java [new file with mode: 0644]
src/main/java/org/onap/aai/validation/factory/DMaaPEventPublisherFactory.java
src/main/java/org/onap/aai/validation/publisher/ValidationEventPublisher.java
src/main/java/org/onap/aai/validation/services/EventPollingService.java
src/test/java/org/onap/aai/validation/publisher/TestValidationEventPublisher.java

diff --git a/pom.xml b/pom.xml
index d3f0e94..18f0139 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@ limitations under the License.
                <sslport>9501</sslport>
                <version.org.onap.aai.logging-service>1.2.2</version.org.onap.aai.logging-service>
                <version.aai.rest.client>1.4.0</version.aai.rest.client>
-               <version.org.onap.aai.event.client>1.2.2</version.org.onap.aai.event.client>
+               <version.org.onap.aai.event.client>1.4.0-SNAPSHOT</version.org.onap.aai.event.client>
                <version.com.ecomp.aai.gap.data.client>1.0</version.com.ecomp.aai.gap.data.client>
                <version.com.google.code.gson>2.7</version.com.google.code.gson>
                <version.com.jayway.jsonpath>2.2.0</version.com.jayway.jsonpath>
index 38d527e..f95a357 100644 (file)
@@ -91,6 +91,7 @@ public class TopicConfig {
                 topicConfig.setConsumerGroup(getTopicProperties().getProperty(topicName + ".consumer.group"));
                 topicConfig.setConsumerId(getTopicProperties().getProperty(topicName + ".consumer.id"));
                 topicConfig.setTransportType(getTopicProperties().getProperty(topicName + ".transport.type"));
+                topicConfig.setProtocol(getTopicProperties().getProperty(topicName + ".protocol"));
                 topics.add(topicConfig);
             }
         }
@@ -203,10 +204,18 @@ public class TopicConfig {
             this.transportType = transportType;
         }
 
+        public String getProtocol() {
+            return protocol;
+        }
+
+        public void setProtocol(String protocol) {
+            this.protocol = protocol;
+        }
+
         @Override
         public int hashCode() {
             return Objects.hash(this.consumerGroup, this.consumerId, this.host, this.username, this.name,
-                    this.partition, this.password, this.transportType);
+                    this.partition, this.password, this.transportType, this.protocol);
         }
 
         @Override
@@ -227,6 +236,7 @@ public class TopicConfig {
                          .append(partition, rhs.partition)
                          .append(password, rhs.password)
                          .append(transportType, rhs.transportType)
+                         .append(protocol, rhs.protocol)
                          .isEquals();
             // @formatter:on
         }
@@ -235,7 +245,7 @@ public class TopicConfig {
         public String toString() {
             return "Topic [name=" + name + ", host=" + host + ", username=" + username + ", password=" + password
                     + ", partition=" + partition + ", consumerGroup=" + consumerGroup + ", consumerId=" + consumerId
-                    + ", transportType =" + transportType + "]";
+                    + ", transportType=" + transportType + ", protocol=" + protocol + "]";
         }
     }
 }
diff --git a/src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java b/src/main/java/org/onap/aai/validation/factory/DMaaPEventConsumerFactory.java
new file mode 100644 (file)
index 0000000..d51c966
--- /dev/null
@@ -0,0 +1,40 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (c) 2018-2019 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.aai.validation.factory;
+
+import java.net.MalformedURLException;
+import org.onap.aai.event.client.DMaaPEventConsumer;
+
+public class DMaaPEventConsumerFactory {
+
+    public DMaaPEventConsumer createEventConsumer(String topicHost, String topicName, String topicUsername,
+            String topicPassword, String consumerGroup, String consumerId, String transportType, String protocol)
+            throws MalformedURLException {
+        return new DMaaPEventConsumer(topicHost, topicName, topicUsername, topicPassword, consumerGroup, consumerId,
+                DMaaPEventConsumer.DEFAULT_MESSAGE_WAIT_TIMEOUT, //
+                DMaaPEventConsumer.DEFAULT_MESSAGE_LIMIT,
+                transportType == null ? DMaaPEventConsumer.DEFAULT_TRANSPORT_TYPE : transportType,
+                protocol == null ? DMaaPEventConsumer.DEFAULT_PROTOCOL : protocol, //
+                null /* no filter */);
+    }
+
+}
index fa4af74..2990e31 100644 (file)
@@ -1,34 +1,39 @@
-/*
- * ============LICENSE_START===================================================
- * Copyright (c) 2018 Amdocs
- * ============================================================================
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (c) 2018-2019 European Software Marketing Ltd.
+ * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *        http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * ============LICENSE_END=====================================================
+ * ============LICENSE_END=========================================================
  */
+
 package org.onap.aai.validation.factory;
 
 import org.onap.aai.event.client.DMaaPEventPublisher;
 
-public class DMaaPEventPublisherFactory {  
-
+public class DMaaPEventPublisherFactory {
 
     public DMaaPEventPublisher createEventPublisher(String topicHost, String topicName, String topicUsername,
-            String topicPassword, String topicTransportType) {
-        int defaultBatchSize = DMaaPEventPublisher.DEFAULT_BATCH_SIZE;
-        long defaultBatchAge = DMaaPEventPublisher.DEFAULT_BATCH_AGE;
-        int defaultBatchDelay = DMaaPEventPublisher.DEFAULT_BATCH_DELAY;
-        return new DMaaPEventPublisher(topicHost, topicName, topicUsername, topicPassword, defaultBatchSize,
-                defaultBatchAge, defaultBatchDelay, topicTransportType);
+            String topicPassword, String transportType, String protocol) {
+        return new DMaaPEventPublisher(topicHost, topicName, topicUsername, topicPassword,
+                DMaaPEventPublisher.DEFAULT_BATCH_SIZE, //
+                DMaaPEventPublisher.DEFAULT_BATCH_AGE, //
+                DMaaPEventPublisher.DEFAULT_BATCH_DELAY, //
+                transportType == null ? DMaaPEventPublisher.DEFAULT_TRANSPORT_TYPE : transportType,
+                protocol == null ? DMaaPEventPublisher.DEFAULT_PROTOCOL : protocol,
+                DMaaPEventPublisher.DEFAULT_CONTENT_TYPE);
     }
 
 }
index c52ff10..4b0b583 100644 (file)
@@ -2,8 +2,8 @@
  * ============LICENSE_START=======================================================
  * org.onap.aai
  * ================================================================================
- * Copyright © 2018-2019 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2018-2019 European Software Marketing Ltd.
+ * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (c) 2018-2019 European Software Marketing Ltd.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -85,9 +85,7 @@ public class ValidationEventPublisher implements MessagePublisher {
      */
     @Override
     public void publishMessages(Collection<String> messages) throws ValidationServiceException {
-        if (!enablePublishing) {
-            return;
-        } else {
+        if (enablePublishing) {
             applicationLogger.debug("Publishing messages: " + messages);
             for (Topic topic : publisherTopics) {
                 retriesRemaining = retries;
@@ -99,7 +97,7 @@ public class ValidationEventPublisher implements MessagePublisher {
     private void publishMessages(Collection<String> messages, Topic topic) throws ValidationServiceException {
 
         DMaaPEventPublisher dMaapEventPublisher = dMaapFactory.createEventPublisher(topic.getHost(), topic.getName(),
-                topic.getUsername(), topic.getPassword(), topic.getTransportType());
+                topic.getUsername(), topic.getPassword(), topic.getTransportType(), topic.getProtocol());
 
         try {
             // Add our message to the publisher's queue/bus
index 4a85f57..bc0c260 100644 (file)
@@ -33,6 +33,7 @@ import org.onap.aai.validation.config.TopicConfig.Topic;
 import org.onap.aai.validation.controller.ValidationController;
 import org.onap.aai.validation.exception.ValidationServiceError;
 import org.onap.aai.validation.exception.ValidationServiceException;
+import org.onap.aai.validation.factory.DMaaPEventConsumerFactory;
 import org.onap.aai.validation.logging.ApplicationMsgs;
 import org.onap.aai.validation.logging.LogHelper;
 import org.onap.aai.validation.logging.LogHelper.MdcParameter;
@@ -60,12 +61,12 @@ public class EventPollingService implements Runnable {
     @Inject
     public EventPollingService(TopicConfig topicConfig) throws ValidationServiceException {
         consumers = new ArrayList<>();
+        DMaaPEventConsumerFactory factory = new DMaaPEventConsumerFactory();
         for (Topic topic : topicConfig.getConsumerTopics()) {
             try {
-                consumers.add(new DMaaPEventConsumer(topic.getHost(), topic.getName(), topic.getUsername(),
-                        topic.getPassword(), topic.getConsumerGroup(), topic.getConsumerId(),
-                        DMaaPEventConsumer.DEFAULT_MESSAGE_WAIT_TIMEOUT, DMaaPEventConsumer.DEFAULT_MESSAGE_LIMIT,
-                        topic.getTransportType()));
+                consumers.add(factory.createEventConsumer(topic.getHost(), topic.getName(), topic.getUsername(),
+                        topic.getPassword(), topic.getConsumerGroup(), topic.getConsumerId(), topic.getTransportType(),
+                        topic.getProtocol()));
             } catch (MalformedURLException e) {
                 throw new ValidationServiceException(ValidationServiceError.EVENT_CLIENT_CONSUMER_INIT_ERROR, e);
             }
index 604312e..86da99e 100644 (file)
@@ -1,19 +1,22 @@
-/*
- * ============LICENSE_START===================================================
- * Copyright (c) 2018 Amdocs
- * ============================================================================
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (c) 2018-2019 European Software Marketing Ltd.
+ * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *
- *        http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- * ============LICENSE_END=====================================================
+ * ============LICENSE_END=========================================================
  */
 package org.onap.aai.validation.publisher;
 
@@ -22,12 +25,6 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import org.onap.aai.event.client.DMaaPEventPublisher;
-import org.onap.aai.validation.config.TopicAdminConfig;
-import org.onap.aai.validation.config.TopicConfig;
-import org.onap.aai.validation.config.TopicConfig.Topic;
-import org.onap.aai.validation.factory.DMaaPEventPublisherFactory;
-import org.onap.aai.validation.publisher.ValidationEventPublisher;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -37,14 +34,17 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
+import org.onap.aai.event.client.DMaaPEventPublisher;
+import org.onap.aai.validation.config.TopicAdminConfig;
+import org.onap.aai.validation.config.TopicConfig;
+import org.onap.aai.validation.config.TopicConfig.Topic;
+import org.onap.aai.validation.factory.DMaaPEventPublisherFactory;
 
 @RunWith(MockitoJUnitRunner.class)
 public class TestValidationEventPublisher {
 
     static {
         System.setProperty("APP_HOME", ".");
-        System.setProperty("consumer.topic.names", "poa-rule-validation");
-        System.setProperty("publisher.topic.names", "poa-audit-result");
     }
 
     private DMaaPEventPublisher mockEventPublisher;
@@ -73,7 +73,7 @@ public class TestValidationEventPublisher {
         when(mockEventPublisher.closeWithUnsent()).thenReturn(new ArrayList<>());
 
         DMaaPEventPublisherFactory mockEventPublisherFactory = Mockito.mock(DMaaPEventPublisherFactory.class);
-        when(mockEventPublisherFactory.createEventPublisher(any(), any(), any(), any(), any()))
+        when(mockEventPublisherFactory.createEventPublisher(any(), any(), any(), any(), any(), any()))
                 .thenReturn(mockEventPublisher);
 
         validationEventPublisher.setEventPublisherFactory(mockEventPublisherFactory);