tag_version: nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.components.bbs-event-processor:1.0.0-SNAPSHOT
-pnf_reregistration_url: http:message-router:3904/events/unauthenticated.PNF_UPDATE
-cpe_authentication_url: http:message-router:3904/events/unauthenticated.CPE_AUTHENTICATION
-close_loop_url: http:message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT
+pnf_reregistration_url: https:message-router:3905/events/unauthenticated.PNF_UPDATE
+cpe_authentication_url: https:message-router:3905/events/unauthenticated.CPE_AUTHENTICATION
+close_loop_url: https:message-router:3905/events/unauthenticated.DCAE_CL_OUTPUT
application_rereg_policy_scope: policyScopeReReg
application_rereg_cl_control_name: clControlNameReReg
application_cpeAuth_policy_scope: policyScopeCpeAuth
inputs:
aai_enrichment_host:
type: string
- default: "aai"
+ default: "aai.onap"
aai_enrichment_port:
type: integer
default: 8443
aai_enrichment_protocol:
type: string
default: "https"
+ aai_secure_enable_cert:
+ type: boolean
+ description: enable certificates-based connection with AAI
+ default: true
tag_version:
type: string
replicas:
application_logging_level:
type: string
default: "INFO"
+ dmaap_username:
+ type: string
+ default: admin
+ dmaap_password:
+ type: string
+ default: admin
dmaap_consumer_id:
type: string
dmaap_consumer_group:
type: string
+ dmaap_secure_enable_cert:
+ type: boolean
+ description: enable certificates-based connection with DMaaP
+ default: true
node_templates:
bbs-event-processor:
type: dcae.nodes.ContainerizedPlatformComponent
streams_subscribes:
pnf_reregistration:
type: message_router
+ aaf_username: { get_input: dmaap_username }
+ aaf_password: { get_input: dmaap_password }
dmaap_info:
topic_url: { get_input: pnf_reregistration_url }
cpe_authentication:
type: message_router
+ aaf_username: { get_input: dmaap_username }
+ aaf_password: { get_input: dmaap_password }
dmaap_info:
topic_url: { get_input: cpe_authentication_url }
streams_publishes:
close_loop:
type: message_router
+ aaf_username: { get_input: dmaap_username }
+ aaf_password: { get_input: dmaap_password }
dmaap_info:
topic_url: { get_input: close_loop_url }
- dmaap.protocol: "http"
+ dmaap.protocol: "https"
dmaap.contentType: "application/json"
dmaap.consumer.consumerId: { get_input: dmaap_consumer_id }
dmaap.consumer.consumerGroup: { get_input: dmaap_consumer_group }
application.cpeAuth.configKey: "cpe_authentication"
application.closeLoop.configKey: "close_loop"
application.loggingLevel: { get_input: application_logging_level }
+ application.ssl.keyStorePath: "/opt/app/bbs-event-processor/etc/cert/cert.jks"
+ application.ssl.keyStorePasswordPath: "/opt/app/bbs-event-processor/etc/cert/jks.pass"
+ application.ssl.trustStorePath: "/opt/app/bbs-event-processor/etc/cert/trust.jks"
+ application.ssl.trustStorePasswordPath: "/opt/app/bbs-event-processor/etc/cert/trust.pass"
+ application.ssl.enableAaiCertAuth: { get_input: aai_secure_enable_cert }
+ application.ssl.enableDmaapCertAuth: { get_input: dmaap_secure_enable_cert }
host_port:
{ get_input: host_port }
container_port:
log_info:
log_directory: "/opt/app/bbs-event-processor/logs"
tls_info:
- cert_directory: '/opt/app/bbs-event-processor/etc/cert/'
- use_tls: false
\ No newline at end of file
+ cert_directory: '/opt/app/bbs-event-processor/etc/cert'
+ use_tls: true
\ No newline at end of file
},
"artifacts": [
{
- "uri": "nexus3.onap.org:10003/onap/org.onap.dcaegen2.services.components.bbs-event-processor:1.0.0-SNAPSHOT",
+ "uri": "nexus3.onap.org:10001/onap/org.onap.dcaegen2.services.components.bbs-event-processor:1.0.0-SNAPSHOT",
"type": "docker image"
}
]
<slf4j.version>1.7.25</slf4j.version>
<junit-platform.version>1.1.0</junit-platform.version>
<jacoco.version>0.8.2</jacoco.version>
- <sdk.version>1.1.2-SNAPSHOT</sdk.version>
- <common.sdk.version>1.1.3</common.sdk.version>
- <cbs.version>1.1.3</cbs.version>
+ <dcae.sdk.version>1.1.4</dcae.sdk.version>
<wiremock.version>2.21.0</wiremock.version>
<springfox-swagger.version>2.8.0</springfox-swagger.version>
<maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
<dependency>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
<artifactId>cbs-client</artifactId>
- <version>${cbs.version}</version>
- </dependency>
- <dependency>
- <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
- <artifactId>aai-client</artifactId>
- <version>${sdk.version}</version>
+ <version>${dcae.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
<artifactId>dmaap-client</artifactId>
- <version>${sdk.version}</version>
+ <version>${dcae.sdk.version}</version>
</dependency>
<dependency>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
<artifactId>common-dependency</artifactId>
- <version>${common.sdk.version}</version>
+ <version>${dcae.sdk.version}</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
<artifactId>cbs-client</artifactId>
</dependency>
- <dependency>
- <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
- <artifactId>aai-client</artifactId>
- </dependency>
<dependency>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
<artifactId>dmaap-client</artifactId>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.tomcat.embed</groupId>
- <artifactId>tomcat-embed-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tomcat.embed</groupId>
- <artifactId>tomcat-embed-el</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tomcat.embed</groupId>
- <artifactId>tomcat-embed-websocket</artifactId>
- </dependency>
<dependency>
<groupId>org.immutables</groupId>
<artifactId>value</artifactId>
synchronized (this) {
cbsPollingInterval = newConfiguration.cbsPollingIntervalSec();
+ securityProperties.setEnableAaiCertAuth(newConfiguration.enableAaiCertAuth());
+ securityProperties.setEnableDmaapCertAuth(newConfiguration.enableDmaapCertAuth());
+ securityProperties.setKeyStorePath(newConfiguration.keyStorePath());
+ securityProperties.setKeyStorePasswordPath(newConfiguration.keyStorePasswordPath());
+ securityProperties.setTrustStorePath(newConfiguration.trustStorePath());
+ securityProperties.setTrustStorePasswordPath(newConfiguration.trustStorePasswordPath());
+
GeneratedAppConfigObject.StreamsObject reRegObject =
getStreamsObject(newConfiguration.streamSubscribesMap(), newConfiguration.reRegConfigKey(),
"PNF Re-Registration");
dmaapReRegistrationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapReRegistrationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapReRegistrationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapReRegistrationConsumerProperties.setDmaapUserName(reRegObject.aafUsername());
+ dmaapReRegistrationConsumerProperties.setDmaapUserPassword(reRegObject.aafPassword());
dmaapReRegistrationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapReRegistrationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
dmaapReRegistrationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
dmaapCpeAuthenticationConsumerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapCpeAuthenticationConsumerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapCpeAuthenticationConsumerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapCpeAuthenticationConsumerProperties.setDmaapUserName(cpeAuthObject.aafUsername());
+ dmaapCpeAuthenticationConsumerProperties.setDmaapUserPassword(cpeAuthObject.aafPassword());
dmaapCpeAuthenticationConsumerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapCpeAuthenticationConsumerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
dmaapCpeAuthenticationConsumerProperties.setConsumerId(newConfiguration.dmaapConsumerConsumerId());
dmaapProducerProperties.setDmaapHostName(topicUrlInfo.getHost());
dmaapProducerProperties.setDmaapPortNumber(topicUrlInfo.getPort());
dmaapProducerProperties.setDmaapProtocol(newConfiguration.dmaapProtocol());
+ dmaapProducerProperties.setDmaapUserName(closeLoopObject.aafUsername());
+ dmaapProducerProperties.setDmaapUserPassword(closeLoopObject.aafPassword());
dmaapProducerProperties.setDmaapContentType(newConfiguration.dmaapContentType());
dmaapProducerProperties.setDmaapTopicName(topicUrlInfo.getTopicName());
constructDmaapProducerConfiguration();
throw new ConfigurationParsingException("Wrong topic name structure");
}
topicUrlInfo.setPort(Integer.valueOf(tokensAfterHost[0]));
- topicUrlInfo.setTopicName("/events/" + tokensAfterHost[1]);
+ topicUrlInfo.setTopicName("events/" + tokensAfterHost[1]);
return topicUrlInfo;
}
import org.onap.bbs.event.processor.model.ImmutableGeneratedAppConfigObject;
import org.onap.bbs.event.processor.model.ImmutableStreamsObject;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import org.slf4j.Logger;
// Necessary properties from the environment (Consul host:port, service-name (hostname), CBS name)
EnvProperties env = EnvProperties.fromEnvironment();
-
+ CbsRequest cbsRequest = CbsRequests.getConfiguration(diagnosticContext);
// Create the client and use it to get the configuration
cbsFetchPipeline = CbsClientFactory.createCbsClient(env)
.doOnError(e -> LOGGER.warn("CBS Configuration fetch failed with error: {}", e))
.retry(e -> true)
- .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, initialDelay, period))
+ .flatMapMany(cbsClient -> cbsClient.updates(cbsRequest, initialDelay, period))
.subscribe(this::parseConsulRetrievedConfiguration, this::handleErrors);
}
final String loggingLevel = configObject.get("application.loggingLevel").getAsString();
+ final String keyStorePath = configObject.get("application.ssl.keyStorePath").getAsString();
+ final String keyStorePasswordPath = configObject.get("application.ssl.keyStorePasswordPath").getAsString();
+ final String trustStorePath = configObject.get("application.ssl.trustStorePath").getAsString();
+ final String trustStorePasswordPath = configObject.get("application.ssl.trustStorePasswordPath").getAsString();
+ final boolean aaiEnableCertAuth = configObject.get("application.ssl.enableAaiCertAuth").getAsBoolean();
+ final boolean dmaapEnableCertAuth = configObject.get("application.ssl.enableDmaapCertAuth").getAsBoolean();
+
final JsonObject streamsPublishes = configObject.getAsJsonObject("streams_publishes");
final JsonObject streamsSubscribes = configObject.getAsJsonObject("streams_subscribes");
.cpeAuthConfigKey(cpeAuthConfigKey)
.closeLoopConfigKey(closeLoopConfigKey)
.loggingLevel(loggingLevel)
+ .keyStorePath(keyStorePath)
+ .keyStorePasswordPath(keyStorePasswordPath)
+ .trustStorePath(trustStorePath)
+ .trustStorePasswordPath(trustStorePasswordPath)
+ .enableAaiCertAuth(aaiEnableCertAuth)
+ .enableDmaapCertAuth(dmaapEnableCertAuth)
.streamSubscribesMap(parseStreamsObjects(streamsSubscribes))
.streamPublishesMap(parseStreamsObjects(streamsPublishes))
.build();
@SerializedName(value = "application.loggingLevel", alternate = "application.loggingLevel")
String loggingLevel();
+ @SerializedName(value = "application.ssl.trustStorePath", alternate = "application.ssl.trustStorePath")
+ String trustStorePath();
+
+ @SerializedName(value = "application.ssl.trustStorePasswordPath",
+ alternate = "application.ssl.trustStorePasswordPath")
+ String trustStorePasswordPath();
+
+ @SerializedName(value = "application.ssl.keyStorePath", alternate = "application.ssl.keyStorePath")
+ String keyStorePath();
+
+ @SerializedName(value = "application.ssl.keyStorePasswordPath", alternate = "application.ssl.keyStorePasswordPath")
+ String keyStorePasswordPath();
+
+ @SerializedName(value = "application.ssl.enableAaiCertAuth", alternate = "application.ssl.enableAaiCertAuth")
+ boolean enableAaiCertAuth();
+
+ @SerializedName(value = "application.ssl.enableDmaapCertAuth", alternate = "application.ssl.enableDmaapCertAuth")
+ boolean enableDmaapCertAuth();
+
@SerializedName(value = "streams_subscribes", alternate = "streams_subscribes")
Map<String, StreamsObject> streamSubscribesMap();
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
LOGGER.trace("Reactive CPE Authentication pipeline subscribed - Execution started");
}
- Flux<ResponseEntity<String>> executePipeline() {
+ Flux<HttpResponse> executePipeline() {
return
// Consume CPE Authentication from DMaaP
consumeCpeAuthenticationFromDmaap()
.flatMap(this::triggerPolicy);
}
- private void onSuccess(ResponseEntity<String> responseCode) {
- MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+ private void onSuccess(HttpResponse responseCode) {
+ MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
LOGGER.info("CPE Authentication event successfully handled. "
+ "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase());
+ responseCode.statusCode(), responseCode.statusReason());
MDC.remove(RESPONSE_CODE);
}
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving PNF: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
});
}
- private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) {
+ private Mono<HttpResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
return Mono.empty();
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
LOGGER.trace("Reactive PNF Re-registration pipeline subscribed - Execution started");
}
- Flux<ResponseEntity<String>> executePipeline() {
+ Flux<HttpResponse> executePipeline() {
return
// Consume Re-Registration from DMaaP
consumeReRegistrationsFromDmaap()
.flatMap(this::triggerPolicy);
}
- private void onSuccess(ResponseEntity<String> responseCode) {
- MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+ private void onSuccess(HttpResponse responseCode) {
+ MDC.put(RESPONSE_CODE, String.valueOf(responseCode.statusCode()));
LOGGER.info("PNF Re-Registration event successfully handled. "
+ "Publishing to DMaaP for Policy returned a status code of ({} {})",
- responseCode.getStatusCode().value(), responseCode.getStatusCode().getReasonPhrase());
+ responseCode.statusCode(), responseCode.statusReason());
MDC.remove(RESPONSE_CODE);
}
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving PNF: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving PNF: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
.doOnError(TimeoutException.class,
e -> LOGGER.warn("Timed out waiting for A&AI response")
)
- .doOnError(e -> LOGGER.error("Error while retrieving HSI CFS Service instance: {}",
- e.getMessage())
+ .doOnError(e -> {
+ LOGGER.error("Error while retrieving HSI CFS Service instance: {}", e.getMessage());
+ LOGGER.debug("Error\n", e);
+ }
)
.onErrorResume(
e -> e instanceof Exception,
return isNotRelocation;
}
- private Mono<ResponseEntity<String>> triggerPolicy(PipelineState state) {
+ private Mono<HttpResponse> triggerPolicy(PipelineState state) {
if (state == null || state.getHsiCfsServiceInstance() == null) {
return Mono.empty();
package org.onap.bbs.event.processor.tasks;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.net.ssl.SSLException;
public synchronized void updateConfiguration() {
try {
LOGGER.info("DMaaP CPE authentication consumer update due to new application configuration");
+ LOGGER.info("Creating secure context with:\n {}",
+ this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
httpClient = httpClientFactory.create(this.configuration.getDmaapCpeAuthenticationConsumerConfiguration());
} catch (SSLException e) {
LOGGER.error("SSL error while updating HTTP Client after a config update");
public Flux<CpeAuthenticationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for CPE-Authentication with name \"{}\"", taskName);
DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<String> response = httpClient.getDMaaPConsumerResponse();
+ Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
return cpeAuthenticationDmaapConsumerJsonParser.extractModelFromDmaap(response)
.switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
package org.onap.bbs.event.processor.tasks;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
-import org.springframework.http.ResponseEntity;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import reactor.core.publisher.Mono;
public interface DmaapPublisherTask {
- Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
+ Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel);
}
package org.onap.bbs.event.processor.tasks;
+import java.util.Optional;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import javax.net.ssl.SSLException;
import org.onap.bbs.event.processor.config.ApplicationConfiguration;
import org.onap.bbs.event.processor.config.ConfigurationChangeObserver;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.utilities.ControlLoopJsonBodyBuilder;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
this.configuration = configuration;
this.httpClientFactory = httpClientFactory;
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ try {
+ httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ } catch (SSLException e) {
+ LOGGER.error("SSL error while creating HTTP Client: {}", e.getMessage());
+ LOGGER.debug("SSL exception\n", e);
+ }
}
@PostConstruct
@Override
public synchronized void updateConfiguration() {
LOGGER.info("DMaaP Publisher update due to new application configuration");
- httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ try {
+ LOGGER.info("Creating secure context with:\n {}", this.configuration.getDmaapPublisherConfiguration());
+ httpClient = httpClientFactory.create(this.configuration.getDmaapPublisherConfiguration());
+ } catch (SSLException e) {
+ LOGGER.error("SSL error while updating HTTP Client after a config update: {}", e.getMessage());
+ LOGGER.debug("SSL exception\n", e);
+ }
}
@Override
- public Mono<ResponseEntity<String>> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
+ public Mono<HttpResponse> execute(ControlLoopPublisherDmaapModel controlLoopPublisherDmaapModel) {
if (controlLoopPublisherDmaapModel == null) {
throw new DmaapException("Cannot invoke a DMaaP Publish task with a null message");
}
- LOGGER.info("Executing task for publishing control loop message \n{}", controlLoopPublisherDmaapModel);
+ LOGGER.info("Executing task for publishing control loop message");
+ LOGGER.debug("CL message \n{}", controlLoopPublisherDmaapModel);
DMaaPPublisherReactiveHttpClient httpClient = getHttpClient();
- return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
+ return httpClient.getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
}
private synchronized DMaaPPublisherReactiveHttpClient getHttpClient() {
package org.onap.bbs.event.processor.tasks;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.net.ssl.SSLException;
public synchronized void updateConfiguration() {
try {
LOGGER.info("DMaaP PNF reregistration consumer update due to new application configuration");
+ LOGGER.info("Creating secure context with:\n {}",
+ this.configuration.getDmaapReRegistrationConsumerConfiguration());
httpClient = httpClientFactory.create(this.configuration.getDmaapReRegistrationConsumerConfiguration());
} catch (SSLException e) {
LOGGER.error("SSL error while updating HTTP Client after a config update");
public Flux<ReRegistrationConsumerDmaapModel> execute(String taskName) {
LOGGER.debug("Executing task for Re-Registration with name \"{}\"", taskName);
DMaaPConsumerReactiveHttpClient httpClient = getHttpClient();
- Mono<String> response = httpClient.getDMaaPConsumerResponse();
+ Mono<JsonElement> response = httpClient.getDMaaPConsumerResponse(Optional.empty());
return reRegistrationDmaapConsumerJsonParser.extractModelFromDmaap(response)
.switchIfEmpty(Flux.error(EMPTY_DMAAP_EXCEPTION))
.doOnError(e -> {
private SslContext createSslContext() throws SSLException {
if (aaiClientConfiguration.enableAaiCertAuth()) {
+ LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration);
return sslFactory.createSecureContext(
aaiClientConfiguration.keyStorePath(),
aaiClientConfiguration.keyStorePasswordPath(),
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
import java.util.Optional;
import java.util.stream.StreamSupport;
* @param dmaapResponse Response from DMaaP
* @return CPE Authentication Consumer DMaaP reactive model
*/
- public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
+ public Flux<CpeAuthenticationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) {
return dmaapResponse
- .flatMapMany(this::parseToMono)
- .flatMap(this::createTargetFlux);
- }
-
- private Mono<JsonElement> parseToMono(String message) {
- if (StringUtils.isEmpty(message)) {
- LOGGER.warn("DMaaP response is empty");
- return Mono.empty();
- }
- return Mono.fromCallable(() -> new JsonParser().parse(message))
- .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
- .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
+ .flatMapMany(this::createTargetFlux);
}
private Flux<CpeAuthenticationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
import java.util.Optional;
import java.util.stream.StreamSupport;
* @param dmaapResponse Response from DMaaP
* @return Re-Registration Consumer DMaaP reactive model
*/
- public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<String> dmaapResponse) {
+ public Flux<ReRegistrationConsumerDmaapModel> extractModelFromDmaap(Mono<JsonElement> dmaapResponse) {
return dmaapResponse
- .flatMapMany(this::parseToMono)
- .flatMap(this::createTargetFlux);
- }
-
- private Mono<JsonElement> parseToMono(String message) {
- if (StringUtils.isEmpty(message)) {
- LOGGER.warn("DMaaP response is empty");
- return Mono.empty();
- }
- return Mono.fromCallable(() -> new JsonParser().parse(message))
- .doOnError(e -> e instanceof JsonSyntaxException, e -> LOGGER.error("Invalid JSON. Ignoring"))
- .onErrorResume(e -> e instanceof JsonSyntaxException, e -> Mono.empty());
+ .flatMapMany(this::createTargetFlux);
}
private Flux<ReRegistrationConsumerDmaapModel> createTargetFlux(JsonElement jsonElement) {
"configs.security.trustStorePasswordPath=test trust store password path",
"configs.security.keyStorePath=test key store path",
"configs.security.keyStorePasswordPath=test key store password path",
- "configs.security.enableDmaapCertAuth=true",
- "configs.security.enableAaiCertAuth=true",
+ "configs.security.enableDmaapCertAuth=false",
+ "configs.security.enableAaiCertAuth=false",
"configs.application.pipelinesPollingIntervalSec=30",
"configs.application.pipelinesTimeoutSec=15",
"configs.application.policyVersion=1.0.0",
() -> assertEquals("reRegControlName", configuration.getReRegistrationCloseLoopControlName()),
() -> assertEquals("cpeAuthControlName", configuration.getCpeAuthenticationCloseLoopControlName())
);
+
+ assertAll("Security Application Properties",
+ () -> assertFalse(aaiClientConfiguration.enableAaiCertAuth()),
+ () -> assertFalse(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()),
+ () -> assertEquals("test key store path", aaiClientConfiguration.keyStorePath()),
+ () -> assertEquals("test key store password path",
+ aaiClientConfiguration.keyStorePasswordPath()),
+ () -> assertEquals("test trust store path", aaiClientConfiguration.trustStorePath()),
+ () -> assertEquals("test trust store password path",
+ aaiClientConfiguration.trustStorePasswordPath())
+ );
}
@Test
.cpeAuthConfigKey("config_key_2")
.closeLoopConfigKey("config_key_3")
.loggingLevel("TRACE")
+ .keyStorePath("test key store path - update")
+ .keyStorePasswordPath("test key store password path - update")
+ .trustStorePath("test trust store path - update")
+ .trustStorePasswordPath("test trust store password path - update")
+ .enableAaiCertAuth(true)
+ .enableDmaapCertAuth(true)
.streamSubscribesMap(subscribes)
.streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3))
.build();
assertAll("DMaaP Consumer Re-Registration Configuration Properties",
() -> assertEquals("we-are-message-router1.us", dmaapConsumerReRegistrationConfig.dmaapHostName()),
() -> assertEquals(Integer.valueOf(3901), dmaapConsumerReRegistrationConfig.dmaapPortNumber()),
- () -> assertEquals("/events/unauthenticated.PNF_UPDATE",
+ () -> assertEquals("events/unauthenticated.PNF_UPDATE",
dmaapConsumerReRegistrationConfig.dmaapTopicName()),
() -> assertEquals("https", dmaapConsumerReRegistrationConfig.dmaapProtocol()),
- () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserName()),
- () -> assertEquals("", dmaapConsumerReRegistrationConfig.dmaapUserPassword()),
+ () -> assertEquals("some-user", dmaapConsumerReRegistrationConfig.dmaapUserName()),
+ () -> assertEquals("some-password", dmaapConsumerReRegistrationConfig.dmaapUserPassword()),
() -> assertEquals("application/json", dmaapConsumerReRegistrationConfig.dmaapContentType()),
() -> assertEquals("c13", dmaapConsumerReRegistrationConfig.consumerId()),
() -> assertEquals("OpenDcae-c13", dmaapConsumerReRegistrationConfig.consumerGroup()),
assertAll("DMaaP Consumer CPE Authentication Configuration Properties",
() -> assertEquals("we-are-message-router2.us", dmaapConsumerCpeAuthenticationConfig.dmaapHostName()),
() -> assertEquals(Integer.valueOf(3902), dmaapConsumerCpeAuthenticationConfig.dmaapPortNumber()),
- () -> assertEquals("/events/unauthenticated.CPE_AUTHENTICATION",
+ () -> assertEquals("events/unauthenticated.CPE_AUTHENTICATION",
dmaapConsumerCpeAuthenticationConfig.dmaapTopicName()),
() -> assertEquals("https", dmaapConsumerCpeAuthenticationConfig.dmaapProtocol()),
- () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()),
- () -> assertEquals("", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()),
+ () -> assertEquals("some-user", dmaapConsumerCpeAuthenticationConfig.dmaapUserName()),
+ () -> assertEquals("some-password", dmaapConsumerCpeAuthenticationConfig.dmaapUserPassword()),
() -> assertEquals("application/json", dmaapConsumerCpeAuthenticationConfig.dmaapContentType()),
() -> assertEquals("c13", dmaapConsumerCpeAuthenticationConfig.consumerId()),
() -> assertEquals("OpenDcae-c13", dmaapConsumerCpeAuthenticationConfig.consumerGroup()),
assertAll("DMaaP Publisher Configuration Properties",
() -> assertEquals("we-are-message-router3.us", dmaapPublisherConfiguration.dmaapHostName()),
() -> assertEquals(Integer.valueOf(3903), dmaapPublisherConfiguration.dmaapPortNumber()),
- () -> assertEquals("/events/unauthenticated.DCAE_CL_OUTPUT",
+ () -> assertEquals("events/unauthenticated.DCAE_CL_OUTPUT",
dmaapPublisherConfiguration.dmaapTopicName()),
() -> assertEquals("https", dmaapPublisherConfiguration.dmaapProtocol()),
- () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserName()),
- () -> assertEquals("", dmaapPublisherConfiguration.dmaapUserPassword()),
+ () -> assertEquals("some-user", dmaapPublisherConfiguration.dmaapUserName()),
+ () -> assertEquals("some-password", dmaapPublisherConfiguration.dmaapUserPassword()),
() -> assertEquals("application/json", dmaapPublisherConfiguration.dmaapContentType())
);
() -> assertEquals("controlName-update", configuration.getReRegistrationCloseLoopControlName()),
() -> assertEquals("controlName-update", configuration.getCpeAuthenticationCloseLoopControlName())
);
+
+ assertAll("Security Application Properties",
+ () -> assertTrue(aaiClientConfiguration.enableAaiCertAuth()),
+ () -> assertTrue(dmaapConsumerReRegistrationConfig.enableDmaapCertAuth()),
+ () -> assertEquals("test key store path - update", aaiClientConfiguration.keyStorePath()),
+ () -> assertEquals("test key store password path - update",
+ aaiClientConfiguration.keyStorePasswordPath()),
+ () -> assertEquals("test trust store path - update", aaiClientConfiguration.trustStorePath()),
+ () -> assertEquals("test trust store password path - update",
+ aaiClientConfiguration.trustStorePasswordPath())
+ );
}
}
\ No newline at end of file
+ "\"application.cpeAuth.configKey\": \"config_key_1\","
+ "\"application.closeLoop.configKey\": \"config_key_3\","
+ "\"application.loggingLevel\": \"TRACE\","
+ + "\"application.ssl.keyStorePath\": \"/opt/app/bbs-event-processor/etc/cert/key.p12\","
+ + "\"application.ssl.keyStorePasswordPath\": \"/opt/app/bbs-event-processor/etc/cert/key.pass\","
+ + "\"application.ssl.trustStorePath\": \"/opt/app/bbs-event-processor/etc/cert/trust.jks\","
+ + "\"application.ssl.trustStorePasswordPath\": \"/opt/app/bbs-event-processor/etc/cert/trust.pass\","
+ + "\"application.ssl.enableAaiCertAuth\": true,"
+ + "\"application.ssl.enableDmaapCertAuth\": true,"
+ "\"streams_subscribes\": {"
+ "\"config_key_1\": {"
+ "\"type\": \"message_router\","
.cpeAuthConfigKey("config_key_1")
.closeLoopConfigKey("config_key_3")
.loggingLevel("TRACE")
+ .keyStorePath("/opt/app/bbs-event-processor/etc/cert/key.p12")
+ .keyStorePasswordPath("/opt/app/bbs-event-processor/etc/cert/key.pass")
+ .trustStorePath("/opt/app/bbs-event-processor/etc/cert/trust.jks")
+ .trustStorePasswordPath("/opt/app/bbs-event-processor/etc/cert/trust.pass")
+ .enableAaiCertAuth(true)
+ .enableDmaapCertAuth(true)
.streamSubscribesMap(subscribes)
.streamPublishesMap(Collections.singletonMap("config_key_3", streamsObject3))
.build();
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapCpeAuthenticationConsumerTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-// We can safely suppress unchecked assignment warnings for the ResponseEntity mock
-@SuppressWarnings("unchecked")
@DisplayName("CPE Authentication Pipeline Unit-Tests")
class CpeAuthenticationPipelineTest {
private DmaapPublisherTask publisherTask;
private AaiClientTask aaiClientTask;
- private ResponseEntity<String> responseEntity;
+ private HttpResponse httpResponse;
@BeforeEach
void setup() {
- responseEntity = Mockito.mock(ResponseEntity.class);
+ httpResponse = Mockito.mock(HttpResponse.class);
configuration = Mockito.mock(ApplicationConfiguration.class);
consumerTask = Mockito.mock(DmaapCpeAuthenticationConsumerTask.class);
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(aaiClientTask, times(2))
import org.onap.bbs.event.processor.tasks.AaiClientTask;
import org.onap.bbs.event.processor.tasks.DmaapPublisherTask;
import org.onap.bbs.event.processor.tasks.DmaapReRegistrationConsumerTask;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
-// We can safely suppress unchecked assignment warnings for the ResponseEntity mock
-@SuppressWarnings("unchecked")
@DisplayName("PNF Re-registration Pipeline Unit-Tests")
class ReRegistrationPipelineTest {
private DmaapPublisherTask publisherTask;
private AaiClientTask aaiClientTask;
- private ResponseEntity<String> responseEntity;
+ private HttpResponse httpResponse;
@BeforeEach
void setup() {
- responseEntity = Mockito.mock(ResponseEntity.class);
+ httpResponse = Mockito.mock(HttpResponse.class);
configuration = Mockito.mock(ApplicationConfiguration.class);
consumerTask = Mockito.mock(DmaapReRegistrationConsumerTask.class);
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask, times(2)).execute(any(ControlLoopPublisherDmaapModel.class));
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl2))
.thenReturn(Mono.just(hsiCfsServiceInstance2));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(publisherTask).execute(any(ControlLoopPublisherDmaapModel.class));
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(aaiClientTask, times(2)).executePnfRetrieval(anyString(), anyString());
.executeServiceInstanceRetrieval(RETRIEVE_HSI_CFS_SERVICE_INSTANCE_TASK_NAME, cfsUrl))
.thenReturn(Mono.just(hsiCfsServiceInstance));
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(HttpStatus.OK.value()));
- when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(responseEntity));
+ when(httpResponse.statusCode()).thenReturn(HttpStatus.OK.value());
+ when(publisherTask.execute(any(ControlLoopPublisherDmaapModel.class))).thenReturn(Mono.just(httpResponse));
// Execute the pipeline
StepVerifier.create(pipeline.executePipeline())
.expectSubscription()
- .assertNext(r -> assertEquals(HttpStatus.OK, r.getStatusCode()))
+ .assertNext(r -> assertEquals(HttpStatus.OK.value(), r.statusCode()))
.verifyComplete();
verify(aaiClientTask, times(2))
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import javax.net.ssl.SSLException;
import org.junit.Assert;
private static CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel;
private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
private static String eventsArray;
+ private static Gson gson = new Gson();
@BeforeAll
static void setUp() throws SSLException {
@Test
void passingEmptyMessage_NothingHappens() throws Exception {
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(""));
+ JsonElement empty = gson.toJsonTree("");
+ when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty));
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.expectError(EmptyDmaapResponseException.class);
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+ verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
}
@Test
void passingNormalMessage_ResponseSucceeds() throws Exception {
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(eventsArray));
+ JsonElement normalEventsArray = gson.toJsonTree(eventsArray);
+ when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
+ .thenReturn(Mono.just(normalEventsArray));
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.consumeNextWith(e -> Assert.assertEquals(e, cpeAuthenticationConsumerDmaapModel));
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+ verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
}
private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() {
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
+
+import javax.net.ssl.SSLException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.onap.bbs.event.processor.exceptions.DmaapException;
import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel;
import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
}
@Test
- void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException {
- ResponseEntity<String> responseEntity = setupMocks(HttpStatus.OK.value());
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.OK);
+ void passingNormalMessage_ReactiveClientProcessesIt() throws DmaapException, SSLException {
+ HttpResponse response = setupMocks(HttpStatus.OK.value());
+
StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription()
- .expectNext(responseEntity).verifyComplete();
+ .expectNext(response).verifyComplete();
verify(reactiveHttpClient, times(1))
- .getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
+ .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
verifyNoMoreInteractions(reactiveHttpClient);
}
@Test
- void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException {
- ResponseEntity<String> responseEntity = setupMocks(HttpStatus.UNAUTHORIZED.value());
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.UNAUTHORIZED);
+ void passingNormalMessage_IncorrectResponseIsHandled() throws DmaapException, SSLException {
+ HttpResponse response = setupMocks(HttpStatus.UNAUTHORIZED.value());
+
StepVerifier.create(task.execute(controlLoopPublisherDmaapModel)).expectSubscription()
- .expectNext(responseEntity).verifyComplete();
+ .expectNext(response).verifyComplete();
verify(reactiveHttpClient, times(1))
- .getDMaaPProducerResponse(controlLoopPublisherDmaapModel);
+ .getDMaaPProducerResponse(controlLoopPublisherDmaapModel, Optional.empty());
verifyNoMoreInteractions(reactiveHttpClient);
}
// We can safely suppress unchecked assignment warning here since it is a mock class
@SuppressWarnings("unchecked")
- private ResponseEntity<String> setupMocks(Integer httpResponseCode) {
+ private HttpResponse setupMocks(Integer httpResponseCode) throws SSLException {
- ResponseEntity<String> responseEntity = mock(ResponseEntity.class);
- when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(httpResponseCode));
+ HttpResponse response = mock(HttpResponse.class);
+ when(response.statusCode()).thenReturn(httpResponseCode);
reactiveHttpClient = mock(DMaaPPublisherReactiveHttpClient.class);
- when(reactiveHttpClient.getDMaaPProducerResponse(any()))
- .thenReturn(Mono.just(responseEntity));
+ when(reactiveHttpClient.getDMaaPProducerResponse(any(), any(Optional.class)))
+ .thenReturn(Mono.just(response));
PublisherReactiveHttpClientFactory httpClientFactory = mock(PublisherReactiveHttpClientFactory.class);
doReturn(reactiveHttpClient).when(httpClientFactory).create(dmaapPublisherConfiguration);
task = new DmaapPublisherTaskImpl(configuration, httpClientFactory);
- return responseEntity;
+ return response;
}
private static DmaapPublisherConfiguration testVersionOfDmaapPublisherConfiguration() {
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+
+import java.util.Optional;
+
import javax.net.ssl.SSLException;
import org.junit.Assert;
private static DmaapReRegistrationConsumerTaskImpl dmaapConsumerTask;
private static ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel;
private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
- private static String message;
+ private static String eventsArray;
+ private static Gson gson = new Gson();
@BeforeAll
static void setUp() throws SSLException {
.sVlan(svlan)
.build();
- message = String.format("[" + RE_REGISTRATION_EVENT_TEMPLATE + "]",
- sourceName,
- attachmentPoint,
- remoteId,
- cvlan,
- svlan);
+ String event = String.format(RE_REGISTRATION_EVENT_TEMPLATE, sourceName, attachmentPoint, remoteId,
+ cvlan, svlan);
+
+ eventsArray = "[" + event + "]";
}
@AfterEach
}
@Test
- void passingEmptyMessage_NothingHappens() throws Exception {
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(""));
+ void passingEmptyMessage_NothingHappens() {
+ JsonElement empty = gson.toJsonTree("");
+ when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty())).thenReturn(Mono.just(empty));
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.expectError(EmptyDmaapResponseException.class);
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+ verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
}
@Test
- void passingNormalMessage_ResponseSucceeds() throws Exception {
- when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(message));
+ void passingNormalMessage_ResponseSucceeds() {
+ JsonElement normalEventsArray = gson.toJsonTree(eventsArray);
+ when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse(Optional.empty()))
+ .thenReturn(Mono.just(normalEventsArray));
StepVerifier.create(dmaapConsumerTask.execute("Sample input"))
.expectSubscription()
.consumeNextWith(e -> Assert.assertEquals(e, reRegistrationConsumerDmaapModel));
- verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse();
+ verify(dMaaPConsumerReactiveHttpClient).getDMaaPConsumerResponse(Optional.empty());
}
private static DmaapConsumerConfiguration testVersionOfDmaapConsumerConfiguration() {
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import com.google.gson.stream.JsonReader;
+import java.io.StringReader;
import java.util.Optional;
import org.junit.jupiter.api.BeforeAll;
}
@Test
- void passingNonJson_EmptyFluxIsReturned() {
+ void passingNonJson_getIllegalStateException() {
CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
new CpeAuthenticationDmaapConsumerJsonParser();
+ JsonReader jsonReader = new JsonReader(new StringReader("not JSON"));
+ jsonReader.setLenient(true);
+ JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("not JSON")))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson)))
.expectSubscription()
- .verifyComplete();
+ .verifyError(IllegalStateException.class);
}
@Test
CpeAuthenticationDmaapConsumerJsonParser consumerJsonParser =
new CpeAuthenticationDmaapConsumerJsonParser();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("[]")))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]"))))
.expectSubscription()
.verifyComplete();
}
.swVersion(swVersion)
.build();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.expectNext(expectedEventObject);
}
Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2);
- String eventsArray = "[" + firstEvent + secondEvent + "]";
+ String eventsArray = "[" + firstEvent + "," + secondEvent + "]";
CpeAuthenticationConsumerDmaapModel expectedFirstEventObject =
ImmutableCpeAuthenticationConsumerDmaapModel.builder()
.swVersion(swVersion)
.build();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.expectNext(expectedFirstEventObject)
.expectNext(expectedSecondEventObject);
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+import com.google.gson.stream.JsonReader;
+import java.io.StringReader;
import java.util.Optional;
import org.junit.jupiter.api.BeforeAll;
}
@Test
- void passingNonJson_EmptyFluxIsReturned() {
+ void passingNonJson_getIllegalStateException() {
ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
-
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("not JSON")))
+ JsonReader jsonReader = new JsonReader(new StringReader("not JSON"));
+ jsonReader.setLenient(true);
+ JsonElement notJson = jsonParser.parse(jsonReader).getAsJsonPrimitive();
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(notJson)))
.expectSubscription()
- .verifyComplete();
+ .verifyError(IllegalStateException.class);
}
@Test
void passingNoEvents_EmptyFluxIsReturned() {
ReRegistrationDmaapConsumerJsonParser consumerJsonParser = new ReRegistrationDmaapConsumerJsonParser();
-
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just("[]")))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse("[]"))))
.expectSubscription()
.verifyComplete();
}
.sVlan(svlan)
.build();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.expectNext(expectedEventObject);
}
Mockito.doReturn(Optional.of(jsonElement2.getAsJsonObject()))
.when(consumerJsonParser).getJsonObjectFromAnArray(jsonElement2);
- String eventsArray = "[" + firstEvent + secondEvent + "]";
+ String eventsArray = "[" + firstEvent + "," + secondEvent + "]";
ReRegistrationConsumerDmaapModel expectedFirstEventObject = ImmutableReRegistrationConsumerDmaapModel.builder()
.correlationId(correlationId1)
.sVlan(svlan)
.build();
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.expectNext(expectedFirstEventObject)
.expectNext(expectedSecondEventObject);
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}
String eventsArray = "[" + event + "]";
- StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(eventsArray)))
+ StepVerifier.create(consumerJsonParser.extractModelFromDmaap(Mono.just(jsonParser.parse(eventsArray))))
.expectSubscription()
.verifyComplete();
}