KPI-MS Switch from Cambria library to dmaap-sdk
[dcaegen2/services.git] / components / kpi-computation-ms / src / main / java / org / onap / dcaegen2 / kpi / utils / DmaapUtils.java
index bf25252..38c1ca8 100644 (file)
@@ -1,6 +1,7 @@
 /*-
  * ============LICENSE_START=======================================================
  *  Copyright (C) 2021 China Mobile.
+ *  Copyright (c) 2021-2022 Wipro Limited.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcaegen2.kpi.utils;
 
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
-
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-
 import org.onap.dcaegen2.kpi.models.Configuration;
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
 
 /**
  * Utility class to perform actions related to Dmaap.
@@ -39,65 +45,35 @@ import org.onap.dcaegen2.kpi.models.Configuration;
  */
 public class DmaapUtils {
 
-    /**
-     * Build publisher.
-     */
-    public CambriaBatchingPublisher buildPublisher(Configuration config, String topic) {
-        try {
-            return builder(config, topic).build();
-        } catch (MalformedURLException | GeneralSecurityException e) {
-            return null;
-
-        }
-    }
-
-    /**
-     * Build consumer.
-     */
-    public CambriaConsumer buildConsumer(Configuration config, String topic) {
-
-        try {
-            return builderConsumer(config, topic).build();
-        } catch (MalformedURLException | GeneralSecurityException e) {
-            return null;
-        }
-
+    public MessageRouterPublisher buildPublisher() {
+        final MessageRouterPublisher publisher = DmaapClientFactory
+             .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+        return publisher;
     }
 
-    private static PublisherBuilder builder(Configuration config, String topic) {
-        if (config.isSecured()) {
-            return authenticatedBuilder(config, topic);
-        } else {
-            return unAuthenticatedBuilder(config, topic);
-        }
+    public MessageRouterPublishRequest buildPublisherRequest(Configuration config, String topicUrl) {
+        MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder().topicUrl(topicUrl)
+             .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
+             .password(config.getAafPassword()).build())
+             .build();
+        MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
+             .sinkDefinition(sinkDefinition).build();
+        return request;
     }
 
-    private static PublisherBuilder authenticatedBuilder(Configuration config, String topic) {
-        return unAuthenticatedBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
-                config.getAafPassword());
+    public MessageRouterSubscriber buildSubscriber() {
+        MessageRouterSubscriber subscriber = DmaapClientFactory
+            .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
+        return subscriber;
     }
 
-    private static PublisherBuilder unAuthenticatedBuilder(Configuration config, String topic) {
-        return new CambriaClientBuilders.PublisherBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
-                .logSendFailuresAfter(5);
+    public MessageRouterSubscribeRequest buildSubscriberRequest(Configuration config, String topicUrl) {
+        MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder().topicUrl(topicUrl)
+             .aafCredentials(ImmutableAafCredentials.builder().username(config.getAafUsername())
+             .password(config.getAafPassword()).build())
+             .build();
+        MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
+             .consumerGroup(config.getCg()).consumerId(config.getCid()).sourceDefinition(sourceDefinition).build();
+        return request;
     }
-
-    private static ConsumerBuilder builderConsumer(Configuration config, String topic) {
-        if (config.isSecured()) {
-            return authenticatedConsumerBuilder(config, topic);
-        } else {
-            return unAuthenticatedConsumerBuilder(config, topic);
-        }
-    }
-
-    private static ConsumerBuilder unAuthenticatedConsumerBuilder(Configuration config, String topic) {
-        return new CambriaClientBuilders.ConsumerBuilder().usingHosts(config.getDmaapServers()).onTopic(topic)
-                .knownAs(config.getCg(), config.getCid()).withSocketTimeout(config.getPollingTimeout() * 1000);
-    }
-
-    private static ConsumerBuilder authenticatedConsumerBuilder(Configuration config, String topic) {
-        return unAuthenticatedConsumerBuilder(config, topic).usingHttps().authenticatedByHttp(config.getAafUsername(),
-                config.getAafPassword());
-    }
-
 }