/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2021 China Mobile.
+ * Copyright (c) 2021-2022 Wipro Limited.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.dcaegen2.kpi.utils;
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-
import org.onap.dcaegen2.kpi.models.Configuration;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
/**
* Utility class to perform actions related to Dmaap.
*/
public class DmaapUtils {
- /**
- * Build publisher.
- */
- public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) {
- try {
- return builder(config, topic).build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- return null;
-
- }
- }
-
- /**
- * Build consumer.
- */
- public CambriaConsumer buildConsumer(Configuration config, String topic) {
-
- try {
- return builderConsumer(config, topic).build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- return null;
- }
-
+ public MessageRouterPublisher buildPublisher() {
+ final MessageRouterPublisher publisher = DmaapClientFactory
+ .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+ return publisher;
}
- private static PublisherBuilder builder(Configuration config, String topic) {
- if (config.isSecured()) {
- return authenticatedBuilder(config, topic);
- } else {
- return unAuthenticatedBuilder(config, topic);
- }
+ public MessageRouterPublishRequest buildPublisherRequest(Configuration config, String topicUrl) {
+ MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder().topicUrl(topicUrl)
+ .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
+ .password(config.getAafPassword()).build())
+ .build();
+ MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(sinkDefinition).build();
+ return request;
}
- private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) {
- return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
- config.getAafPassword());
+ public MessageRouterSubscriber buildSubscriber() {
+ MessageRouterSubscriber subscriber = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+ return subscriber;
}
- private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) {
- return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
- .logSendFailuresAfter(5);
+ public MessageRouterSubscribeRequest buildSubscriberRequest(Configuration config, String topicUrl) {
+ MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder().topicUrl(topicUrl)
+ .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
+ .password(config.getAafPassword()).build())
+ .build();
+ MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
+ .consumerGroup(config.getCg()).consumerId(config.getCid()).sourceDefinition(sourceDefinition).build();
+ return request;
}
-
- private static ConsumerBuilder builderConsumer(Configuration config, String topic) {
- if (config.isSecured()) {
- return authenticatedConsumerBuilder(config, topic);
- } else {
- return unAuthenticatedConsumerBuilder(config, topic);
- }
- }
-
- private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) {
- return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
- .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000);
- }
-
- private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) {
- return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
- config.getAafPassword());
- }
-
}