Switched to new version of cambria client.
Added code to set cambria socket timeout.
Modified code to use cambria client instead of dmaap client.
Removed extra, unnecessary items that had been added to
the top-level pom.xml
Change-Id: If71d36f50da5423ec0cf21b30e66aff5b1c9222a
Issue-ID: POLICY-742
Signed-off-by: Jim Hahn <jrh3@att.com>
============LICENSE_START=======================================================
ONAP Policy Engine - Drools PDP
================================================================================
- Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
- <exclusion>
- <groupId>com.att.nsa</groupId>
- <artifactId>saClientLibrary</artifactId>
- </exclusion>
</exclusions>
</dependency>
public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout,
int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
+ this(servers, topic, apiKey, apiSecret, null, null,
+ consumerGroup, consumerInstance, fetchTimeout, fetchLimit,
+ useHttps, useSelfSignedCerts);
+ }
+
+ public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
+ String apiSecret, String username, String password,
+ String consumerGroup, String consumerInstance, int fetchTimeout,
+ int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) {
this.fetchTimeout = fetchTimeout;
this.builder = new CambriaClientBuilders.ConsumerBuilder();
+ builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+ .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
+
+ // Set read timeout to fetch timeout + 30 seconds (TBD: this should be configurable)
+ builder.withSocketTimeout(fetchTimeout + 30000);
+
if (useHttps) {
+ builder.usingHttps();
if (useSelfSignedCerts) {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps()
- .allowSelfSignedCertificates();
- } else {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps();
+ builder.allowSelfSignedCertificates();
}
- } else {
- builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
- .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
}
if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
+ builder.authenticatedBy(apiKey, apiSecret);
+ }
+
+ if (username != null && !username.isEmpty() && password != null && !password.isEmpty()) {
+ builder.authenticatedByHttp(username, password);
}
try {
public CambriaPublisherWrapper(List<String> servers, String topic,
String apiKey,
String apiSecret, boolean useHttps) {
+ this(servers, topic, apiKey, apiSecret, null, null, useHttps);
+ }
+
+ public CambriaPublisherWrapper(List<String> servers, String topic,
+ String apiKey, String apiSecret,
+ String username, String password,
+ boolean useHttps) {
+
PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
-
- if (useHttps){
-
- builder.usingHosts(servers)
- .onTopic(topic)
- .usingHttps();
- }
- else{
- builder.usingHosts(servers)
- .onTopic(topic);
+ builder.usingHosts(servers).onTopic(topic);
+
+ // Set read timeout to 30 seconds (TBD: this should be configurable)
+ builder.withSocketTimeout(30000);
+
+ if (useHttps){
+ builder.usingHttps();
}
builder.authenticatedBy(apiKey, apiSecret);
}
+ if (username != null && !username.isEmpty() &&
+ password != null && !password.isEmpty()) {
+ builder.authenticatedByHttp(username, password);
+ }
+
try {
this.publisher = builder.build();
} catch (MalformedURLException | GeneralSecurityException e) {
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
(this.longitude == null || this.longitude.isEmpty()) &&
(this.partner == null || this.partner.isEmpty())) {
this.publisher =
- new BusPublisher.DmaapAafPublisherWrapper(this.servers,
+ new BusPublisher.CambriaPublisherWrapper(this.servers,
this.topic,
- this.userName,
- this.password, this.useHttps);
+ this.apiKey, this.apiSecret,
+ this.userName, this.password,
+ this.useHttps);
} else {
this.publisher =
new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic,
return builder.toString();
}
-}
\ No newline at end of file
+}
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
(this.longitude == null || this.longitude.isEmpty()) &&
(this.partner == null || this.partner.isEmpty())) {
this.consumer =
- new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic,
+ new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic,
this.apiKey, this.apiSecret,
this.userName, this.password,
this.consumerGroup, this.consumerInstance,
- this.fetchTimeout, this.fetchLimit, this.useHttps);
+ this.fetchTimeout, this.fetchLimit,
+ this.useHttps, this.allowSelfSignedCerts);
} else {
this.consumer =
new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic,
============LICENSE_START=======================================================
ONAP Policy Engine - Drools PDP
================================================================================
- Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
================================================================================
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
<version>4.1</version>
</dependency>
+ <!-- if we don't explicitly specify the version here, we seem to end up
+ with version 1.4 (as a dependency to drools-core). This version is
+ not compatible with 'saClientLibrary' version 1.2.1-oss -->
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.9</version>
+ </dependency>
+
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<!-- Project common dependency versions -->
<dmaap.version>1.1.3</dmaap.version>
- <cambria.version>0.0.1</cambria.version>
+ <cambria.version>1.2.1-oss</cambria.version>
<jersey.version>2.25.1</jersey.version>
<jersey.swagger.version>1.5.18</jersey.swagger.version>
<jackson.version>2.9.5</jackson.version>