Switch to cambria client 1.2.1-oss 23/42823/3
authorJim Hahn <jrh3@att.com>
Fri, 13 Apr 2018 20:34:35 +0000 (16:34 -0400)
committerJim Hahn <jrh3@att.com>
Mon, 16 Apr 2018 12:50:18 +0000 (08:50 -0400)
Switched to new version of cambria client.
Added code to set cambria socket timeout.
Modified code to use cambria client instead of dmaap client.
Removed extra, unnecessary items that had been added to
the top-level pom.xml

Change-Id: If71d36f50da5423ec0cf21b30e66aff5b1c9222a
Issue-ID: POLICY-742
Signed-off-by: Jim Hahn <jrh3@att.com>
policy-endpoints/pom.xml
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusPublisher.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
policy-management/pom.xml
pom.xml

index 9f8cabc..8664646 100644 (file)
@@ -2,7 +2,7 @@
   ============LICENSE_START=======================================================
   ONAP Policy Engine - Drools PDP
   ================================================================================
-  Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+  Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
   ================================================================================
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
                                        <groupId>org.slf4j</groupId>
                                        <artifactId>slf4j-log4j12</artifactId>
                                </exclusion>    
-                               <exclusion>
-                                       <groupId>com.att.nsa</groupId>
-                                       <artifactId>saClientLibrary</artifactId>
-                               </exclusion>                                            
                        </exclusions>
                </dependency>
 
index db240b3..70c37d5 100644 (file)
@@ -122,28 +122,40 @@ public interface BusConsumer {
     public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
         String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout,
         int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
+      this(servers, topic, apiKey, apiSecret, null, null,
+           consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
+           useHttps, useSelfSignedCerts);
+    }
+
+    public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
+        String apiSecret, String username, String password,
+        String consumerGroup, String consumerInstance, int fetchTimeout,
+        int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
 
       this.fetchTimeout = fetchTimeout;
 
       this.builder = new CambriaClientBuilders.ConsumerBuilder();
 
+      builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+          .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
+      
+      // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
+      builder.withSocketTimeout(fetchTimeout + 30000);
+
       if (useHttps) {
+          builder.usingHttps();
 
         if (useSelfSignedCerts) {
-          builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
-              .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps()
-              .allowSelfSignedCertificates();
-        } else {
-          builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
-              .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps();
+          builder.allowSelfSignedCertificates();
         }
-      } else {
-        builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
-            .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
       }
 
       if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
-        builder.authenticatedBy(apiKey, apiSecret);
+                 builder.authenticatedBy(apiKey, apiSecret);
+      }
+
+      if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
+                 builder.authenticatedByHttp(username, password);
       }
 
       try {
index 852c9c1..8e18bba 100644 (file)
@@ -74,18 +74,23 @@ public interface BusPublisher {
                public CambriaPublisherWrapper(List<String> servers, String topic,
                                                               String apiKey,
                                                               String apiSecret, boolean useHttps) {
+                       this(servers, topic, apiKey, apiSecret, null, null, useHttps);
+               }
+
+               public CambriaPublisherWrapper(List<String> servers, String topic,
+                                                              String apiKey, String apiSecret,
+                                                              String username, String password,
+                                                              boolean useHttps) {
+
                        PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
-               
 
-                       if (useHttps){
-                               
-                               builder.usingHosts(servers)
-                              .onTopic(topic)
-                              .usingHttps();
-                       }
-                       else{
-                               builder.usingHosts(servers)
-                              .onTopic(topic);
+            builder.usingHosts(servers).onTopic(topic);
+            
+                       // Set read timeout to 30 seconds (TBD: this should be configurable)
+                       builder.withSocketTimeout(30000);
+
+                       if (useHttps){                          
+                               builder.usingHttps();
                        }
                
                        
@@ -94,6 +99,11 @@ public interface BusPublisher {
                                builder.authenticatedBy(apiKey, apiSecret);
                        }
                        
+                       if (username != null && !username.isEmpty() &&
+                               password != null && !password.isEmpty()) {
+                               builder.authenticatedByHttp(username, password);
+                       }
+                       
                        try {
                                this.publisher = builder.build();
                        } catch (MalformedURLException | GeneralSecurityException e) {
index 912607f..718bb21 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -114,10 +114,11 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
                   (this.longitude == null              || this.longitude.isEmpty()) &&
                   (this.partner == null                || this.partner.isEmpty())) {
                        this.publisher = 
-                               new BusPublisher.DmaapAafPublisherWrapper(this.servers, 
+                               new BusPublisher.CambriaPublisherWrapper(this.servers, 
                                                                               this.topic, 
-                                                                              this.userName, 
-                                                                              this.password, this.useHttps);
+                                                                              this.apiKey, this.apiSecret,
+                                                                              this.userName, this.password,
+                                                                              this.useHttps);
                } else {
                        this.publisher = 
                                new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, 
@@ -148,4 +149,4 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop
                return builder.toString();
        }
 
-}
\ No newline at end of file
+}
index 6c1bc8a..88d67fd 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -162,11 +162,12 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
                                   (this.longitude == null         || this.longitude.isEmpty()) &&
                                   (this.partner == null           || this.partner.isEmpty())) {
                        this.consumer =
-                                       new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic, 
+                                       new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
                                                                                    this.apiKey, this.apiSecret,
                                                                                    this.userName, this.password,
                                                                                    this.consumerGroup, this.consumerInstance,
-                                                                                   this.fetchTimeout, this.fetchLimit, this.useHttps);
+                                                                                   this.fetchTimeout, this.fetchLimit,
+                                                                                   this.useHttps, this.allowSelfSignedCerts);
                } else {
                        this.consumer =
                                        new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, 
index 2c0f968..5386e15 100644 (file)
@@ -3,7 +3,7 @@
   ============LICENSE_START=======================================================
   ONAP Policy Engine - Drools PDP
   ================================================================================
-  Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+  Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
   ================================================================================
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
                <version>4.1</version>
        </dependency>
        
+       <!-- if we don't explicitly specify the version here, we seem to end up
+                with version 1.4 (as a dependency to drools-core). This version is
+                not compatible with 'saClientLibrary' version 1.2.1-oss -->
+       <dependency>
+               <groupId>commons-codec</groupId>
+               <artifactId>commons-codec</artifactId>
+               <version>1.9</version>
+       </dependency>
+       
        <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
diff --git a/pom.xml b/pom.xml
index a8d8fab..f32faba 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@
 
                <!-- Project common dependency versions -->
                <dmaap.version>1.1.3</dmaap.version>
-               <cambria.version>0.0.1</cambria.version>
+               <cambria.version>1.2.1-oss</cambria.version>
                <jersey.version>2.25.1</jersey.version>
                <jersey.swagger.version>1.5.18</jersey.swagger.version>
                <jackson.version>2.9.5</jackson.version>